1/******************************************************************************
2*******************************************************************************
3**
4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
6**
7**  This copyrighted material is made available to anyone wishing to use,
8**  modify, copy, or redistribute it subject to the terms and conditions
9**  of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "lowcomms.h"
18#include "midcomms.h"
19#include "rcom.h"
20#include "recover.h"
21#include "dir.h"
22#include "config.h"
23#include "memory.h"
24#include "lock.h"
25#include "util.h"
26
27
28static int rcom_response(struct dlm_ls *ls)
29{
30	return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
31}
32
33static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
34		       struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
35{
36	struct dlm_rcom *rc;
37	struct dlm_mhandle *mh;
38	char *mb;
39	int mb_len = sizeof(struct dlm_rcom) + len;
40
41	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
42	if (!mh) {
43		log_print("create_rcom to %d type %d len %d ENOBUFS",
44			  to_nodeid, type, len);
45		return -ENOBUFS;
46	}
47	memset(mb, 0, mb_len);
48
49	rc = (struct dlm_rcom *) mb;
50
51	rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
52	rc->rc_header.h_lockspace = ls->ls_global_id;
53	rc->rc_header.h_nodeid = dlm_our_nodeid();
54	rc->rc_header.h_length = mb_len;
55	rc->rc_header.h_cmd = DLM_RCOM;
56
57	rc->rc_type = type;
58
59	spin_lock(&ls->ls_recover_lock);
60	rc->rc_seq = ls->ls_recover_seq;
61	spin_unlock(&ls->ls_recover_lock);
62
63	*mh_ret = mh;
64	*rc_ret = rc;
65	return 0;
66}
67
68static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
69		      struct dlm_rcom *rc)
70{
71	dlm_rcom_out(rc);
72	dlm_lowcomms_commit_buffer(mh);
73}
74
75/* When replying to a status request, a node also sends back its
76   configuration values.  The requesting node then checks that the remote
77   node is configured the same way as itself. */
78
79static void make_config(struct dlm_ls *ls, struct rcom_config *rf)
80{
81	rf->rf_lvblen = ls->ls_lvblen;
82	rf->rf_lsflags = ls->ls_exflags;
83}
84
85static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
86{
87	struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
88
89	if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
90		log_error(ls, "version mismatch: %x nodeid %d: %x",
91			  DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
92			  rc->rc_header.h_version);
93		return -EINVAL;
94	}
95
96	if (rf->rf_lvblen != ls->ls_lvblen ||
97	    rf->rf_lsflags != ls->ls_exflags) {
98		log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
99			  ls->ls_lvblen, ls->ls_exflags,
100			  nodeid, rf->rf_lvblen, rf->rf_lsflags);
101		return -EINVAL;
102	}
103	return 0;
104}
105
106static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
107{
108	spin_lock(&ls->ls_rcom_spin);
109	*new_seq = ++ls->ls_rcom_seq;
110	set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
111	spin_unlock(&ls->ls_rcom_spin);
112}
113
114static void disallow_sync_reply(struct dlm_ls *ls)
115{
116	spin_lock(&ls->ls_rcom_spin);
117	clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
118	clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
119	spin_unlock(&ls->ls_rcom_spin);
120}
121
122int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
123{
124	struct dlm_rcom *rc;
125	struct dlm_mhandle *mh;
126	int error = 0;
127
128	ls->ls_recover_nodeid = nodeid;
129
130	if (nodeid == dlm_our_nodeid()) {
131		rc = (struct dlm_rcom *) ls->ls_recover_buf;
132		rc->rc_result = dlm_recover_status(ls);
133		goto out;
134	}
135
136	error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh);
137	if (error)
138		goto out;
139
140	allow_sync_reply(ls, &rc->rc_id);
141	memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
142
143	send_rcom(ls, mh, rc);
144
145	error = dlm_wait_function(ls, &rcom_response);
146	disallow_sync_reply(ls);
147	if (error)
148		goto out;
149
150	rc = (struct dlm_rcom *) ls->ls_recover_buf;
151
152	if (rc->rc_result == -ESRCH) {
153		/* we pretend the remote lockspace exists with 0 status */
154		log_debug(ls, "remote node %d not ready", nodeid);
155		rc->rc_result = 0;
156	} else
157		error = check_config(ls, rc, nodeid);
158	/* the caller looks at rc_result for the remote recovery status */
159 out:
160	return error;
161}
162
163static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
164{
165	struct dlm_rcom *rc;
166	struct dlm_mhandle *mh;
167	int error, nodeid = rc_in->rc_header.h_nodeid;
168
169	error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
170			    sizeof(struct rcom_config), &rc, &mh);
171	if (error)
172		return;
173	rc->rc_id = rc_in->rc_id;
174	rc->rc_seq_reply = rc_in->rc_seq;
175	rc->rc_result = dlm_recover_status(ls);
176	make_config(ls, (struct rcom_config *) rc->rc_buf);
177
178	send_rcom(ls, mh, rc);
179}
180
181static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
182{
183	spin_lock(&ls->ls_rcom_spin);
184	if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
185	    rc_in->rc_id != ls->ls_rcom_seq) {
186		log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
187			  rc_in->rc_type, rc_in->rc_header.h_nodeid,
188			  (unsigned long long)rc_in->rc_id,
189			  (unsigned long long)ls->ls_rcom_seq);
190		goto out;
191	}
192	memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
193	set_bit(LSFL_RCOM_READY, &ls->ls_flags);
194	clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
195	wake_up(&ls->ls_wait_general);
196 out:
197	spin_unlock(&ls->ls_rcom_spin);
198}
199
200static void receive_rcom_status_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
201{
202	receive_sync_reply(ls, rc_in);
203}
204
205int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
206{
207	struct dlm_rcom *rc;
208	struct dlm_mhandle *mh;
209	int error = 0, len = sizeof(struct dlm_rcom);
210
211	ls->ls_recover_nodeid = nodeid;
212
213	if (nodeid == dlm_our_nodeid()) {
214		dlm_copy_master_names(ls, last_name, last_len,
215		                      ls->ls_recover_buf + len,
216		                      dlm_config.ci_buffer_size - len, nodeid);
217		goto out;
218	}
219
220	error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
221	if (error)
222		goto out;
223	memcpy(rc->rc_buf, last_name, last_len);
224
225	allow_sync_reply(ls, &rc->rc_id);
226	memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
227
228	send_rcom(ls, mh, rc);
229
230	error = dlm_wait_function(ls, &rcom_response);
231	disallow_sync_reply(ls);
232 out:
233	return error;
234}
235
236static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
237{
238	struct dlm_rcom *rc;
239	struct dlm_mhandle *mh;
240	int error, inlen, outlen, nodeid;
241
242	nodeid = rc_in->rc_header.h_nodeid;
243	inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
244	outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
245
246	error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
247	if (error)
248		return;
249	rc->rc_id = rc_in->rc_id;
250	rc->rc_seq_reply = rc_in->rc_seq;
251
252	dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
253			      nodeid);
254	send_rcom(ls, mh, rc);
255}
256
257static void receive_rcom_names_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
258{
259	receive_sync_reply(ls, rc_in);
260}
261
262int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
263{
264	struct dlm_rcom *rc;
265	struct dlm_mhandle *mh;
266	struct dlm_ls *ls = r->res_ls;
267	int error;
268
269	error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
270			    &rc, &mh);
271	if (error)
272		goto out;
273	memcpy(rc->rc_buf, r->res_name, r->res_length);
274	rc->rc_id = (unsigned long) r;
275
276	send_rcom(ls, mh, rc);
277 out:
278	return error;
279}
280
281static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
282{
283	struct dlm_rcom *rc;
284	struct dlm_mhandle *mh;
285	int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
286	int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
287
288	error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
289	if (error)
290		return;
291
292	error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid);
293	if (error)
294		ret_nodeid = error;
295	rc->rc_result = ret_nodeid;
296	rc->rc_id = rc_in->rc_id;
297	rc->rc_seq_reply = rc_in->rc_seq;
298
299	send_rcom(ls, mh, rc);
300}
301
302static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
303{
304	dlm_recover_master_reply(ls, rc_in);
305}
306
307static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
308			   struct rcom_lock *rl)
309{
310	memset(rl, 0, sizeof(*rl));
311
312	rl->rl_ownpid = lkb->lkb_ownpid;
313	rl->rl_lkid = lkb->lkb_id;
314	rl->rl_exflags = lkb->lkb_exflags;
315	rl->rl_flags = lkb->lkb_flags;
316	rl->rl_lvbseq = lkb->lkb_lvbseq;
317	rl->rl_rqmode = lkb->lkb_rqmode;
318	rl->rl_grmode = lkb->lkb_grmode;
319	rl->rl_status = lkb->lkb_status;
320	rl->rl_wait_type = lkb->lkb_wait_type;
321
322	if (lkb->lkb_bastaddr)
323		rl->rl_asts |= AST_BAST;
324	if (lkb->lkb_astaddr)
325		rl->rl_asts |= AST_COMP;
326
327	rl->rl_namelen = r->res_length;
328	memcpy(rl->rl_name, r->res_name, r->res_length);
329
330
331	if (lkb->lkb_lvbptr)
332		memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
333}
334
335int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
336{
337	struct dlm_ls *ls = r->res_ls;
338	struct dlm_rcom *rc;
339	struct dlm_mhandle *mh;
340	struct rcom_lock *rl;
341	int error, len = sizeof(struct rcom_lock);
342
343	if (lkb->lkb_lvbptr)
344		len += ls->ls_lvblen;
345
346	error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
347	if (error)
348		goto out;
349
350	rl = (struct rcom_lock *) rc->rc_buf;
351	pack_rcom_lock(r, lkb, rl);
352	rc->rc_id = (unsigned long) r;
353
354	send_rcom(ls, mh, rc);
355 out:
356	return error;
357}
358
359static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
360{
361	struct dlm_rcom *rc;
362	struct dlm_mhandle *mh;
363	int error, nodeid = rc_in->rc_header.h_nodeid;
364
365	dlm_recover_master_copy(ls, rc_in);
366
367	error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
368			    sizeof(struct rcom_lock), &rc, &mh);
369	if (error)
370		return;
371
372	/* We send back the same rcom_lock struct we received, but
373	   dlm_recover_master_copy() has filled in rl_remid and rl_result */
374
375	memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
376	rc->rc_id = rc_in->rc_id;
377	rc->rc_seq_reply = rc_in->rc_seq;
378
379	send_rcom(ls, mh, rc);
380}
381
382static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
383{
384	dlm_recover_process_copy(ls, rc_in);
385}
386
387static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
388{
389	struct dlm_rcom *rc;
390	struct rcom_config *rf;
391	struct dlm_mhandle *mh;
392	char *mb;
393	int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
394
395	mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_KERNEL, &mb);
396	if (!mh)
397		return -ENOBUFS;
398	memset(mb, 0, mb_len);
399
400	rc = (struct dlm_rcom *) mb;
401
402	rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
403	rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
404	rc->rc_header.h_nodeid = dlm_our_nodeid();
405	rc->rc_header.h_length = mb_len;
406	rc->rc_header.h_cmd = DLM_RCOM;
407
408	rc->rc_type = DLM_RCOM_STATUS_REPLY;
409	rc->rc_id = rc_in->rc_id;
410	rc->rc_seq_reply = rc_in->rc_seq;
411	rc->rc_result = -ESRCH;
412
413	rf = (struct rcom_config *) rc->rc_buf;
414	rf->rf_lvblen = -1;
415
416	dlm_rcom_out(rc);
417	dlm_lowcomms_commit_buffer(mh);
418
419	return 0;
420}
421
422static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
423{
424	uint64_t seq;
425	int rv = 0;
426
427	switch (rc->rc_type) {
428	case DLM_RCOM_STATUS_REPLY:
429	case DLM_RCOM_NAMES_REPLY:
430	case DLM_RCOM_LOOKUP_REPLY:
431	case DLM_RCOM_LOCK_REPLY:
432		spin_lock(&ls->ls_recover_lock);
433		seq = ls->ls_recover_seq;
434		spin_unlock(&ls->ls_recover_lock);
435		if (rc->rc_seq_reply != seq) {
436			log_debug(ls, "ignoring old reply %x from %d "
437				      "seq_reply %llx expect %llx",
438				      rc->rc_type, rc->rc_header.h_nodeid,
439				      (unsigned long long)rc->rc_seq_reply,
440				      (unsigned long long)seq);
441			rv = 1;
442		}
443	}
444	return rv;
445}
446
447/* Called by dlm_recvd; corresponds to dlm_receive_message() but special
448   recovery-only comms are sent through here. */
449
450void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
451{
452	struct dlm_rcom *rc = (struct dlm_rcom *) hd;
453	struct dlm_ls *ls;
454
455	dlm_rcom_in(rc);
456
457	/* If the lockspace doesn't exist then still send a status message
458	   back; it's possible that it just doesn't have its global_id yet. */
459
460	ls = dlm_find_lockspace_global(hd->h_lockspace);
461	if (!ls) {
462		log_print("lockspace %x from %d type %x not found",
463			  hd->h_lockspace, nodeid, rc->rc_type);
464		if (rc->rc_type == DLM_RCOM_STATUS)
465			send_ls_not_ready(nodeid, rc);
466		return;
467	}
468
469	if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
470		log_debug(ls, "ignoring recovery message %x from %d",
471			  rc->rc_type, nodeid);
472		goto out;
473	}
474
475	if (is_old_reply(ls, rc))
476		goto out;
477
478	if (nodeid != rc->rc_header.h_nodeid) {
479		log_error(ls, "bad rcom nodeid %d from %d",
480			  rc->rc_header.h_nodeid, nodeid);
481		goto out;
482	}
483
484	switch (rc->rc_type) {
485	case DLM_RCOM_STATUS:
486		receive_rcom_status(ls, rc);
487		break;
488
489	case DLM_RCOM_NAMES:
490		receive_rcom_names(ls, rc);
491		break;
492
493	case DLM_RCOM_LOOKUP:
494		receive_rcom_lookup(ls, rc);
495		break;
496
497	case DLM_RCOM_LOCK:
498		receive_rcom_lock(ls, rc);
499		break;
500
501	case DLM_RCOM_STATUS_REPLY:
502		receive_rcom_status_reply(ls, rc);
503		break;
504
505	case DLM_RCOM_NAMES_REPLY:
506		receive_rcom_names_reply(ls, rc);
507		break;
508
509	case DLM_RCOM_LOOKUP_REPLY:
510		receive_rcom_lookup_reply(ls, rc);
511		break;
512
513	case DLM_RCOM_LOCK_REPLY:
514		receive_rcom_lock_reply(ls, rc);
515		break;
516
517	default:
518		DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type););
519	}
520 out:
521	dlm_put_lockspace(ls);
522}
523