1/******************************************************************************
2*******************************************************************************
3**
4**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
5**
6**  This copyrighted material is made available to anyone wishing to use,
7**  modify, copy, or redistribute it subject to the terms and conditions
8**  of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15   dlm_lock()
16   dlm_unlock()
17
18   request_lock(ls, lkb)
19   convert_lock(ls, lkb)
20   unlock_lock(ls, lkb)
21   cancel_lock(ls, lkb)
22
23   _request_lock(r, lkb)
24   _convert_lock(r, lkb)
25   _unlock_lock(r, lkb)
26   _cancel_lock(r, lkb)
27
28   do_request(r, lkb)
29   do_convert(r, lkb)
30   do_unlock(r, lkb)
31   do_cancel(r, lkb)
32
33   Stage 1 (lock, unlock) is mainly about checking input args and
34   splitting into one of the four main operations:
35
36       dlm_lock          = request_lock
37       dlm_lock+CONVERT  = convert_lock
38       dlm_unlock        = unlock_lock
39       dlm_unlock+CANCEL = cancel_lock
40
41   Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42   provided to the next stage.
43
44   Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45   When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47   Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48   given rsb and lkb and queues callbacks.
49
50   For remote operations, send_xxxx() results in the corresponding do_xxxx()
51   function being executed on the remote node.  The connecting send/receive
52   calls on local (L) and remote (R) nodes:
53
54   L: send_xxxx()              ->  R: receive_xxxx()
55                                   R: do_xxxx()
56   L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57*/
58#include <linux/types.h>
59#include "dlm_internal.h"
60#include <linux/dlm_device.h>
61#include "memory.h"
62#include "lowcomms.h"
63#include "requestqueue.h"
64#include "util.h"
65#include "dir.h"
66#include "member.h"
67#include "lockspace.h"
68#include "ast.h"
69#include "lock.h"
70#include "rcom.h"
71#include "recover.h"
72#include "lvb_table.h"
73#include "user.h"
74#include "config.h"
75
76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86				    struct dlm_message *ms);
87static int receive_extralen(struct dlm_message *ms);
88static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
89
90/*
91 * Lock compatibilty matrix - thanks Steve
92 * UN = Unlocked state. Not really a state, used as a flag
93 * PD = Padding. Used to make the matrix a nice power of two in size
94 * Other states are the same as the VMS DLM.
95 * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
96 */
97
98static const int __dlm_compat_matrix[8][8] = {
99      /* UN NL CR CW PR PW EX PD */
100        {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
101        {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
102        {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
103        {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
104        {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
105        {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
106        {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
107        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
108};
109
110/*
111 * This defines the direction of transfer of LVB data.
112 * Granted mode is the row; requested mode is the column.
113 * Usage: matrix[grmode+1][rqmode+1]
114 * 1 = LVB is returned to the caller
115 * 0 = LVB is written to the resource
116 * -1 = nothing happens to the LVB
117 */
118
119const int dlm_lvb_operations[8][8] = {
120        /* UN   NL  CR  CW  PR  PW  EX  PD*/
121        {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
122        {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
123        {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
124        {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
125        {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
126        {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
127        {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
128        {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
129};
130
131#define modes_compat(gr, rq) \
132	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
133
134int dlm_modes_compat(int mode1, int mode2)
135{
136	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
137}
138
139/*
140 * Compatibility matrix for conversions with QUECVT set.
141 * Granted mode is the row; requested mode is the column.
142 * Usage: matrix[grmode+1][rqmode+1]
143 */
144
145static const int __quecvt_compat_matrix[8][8] = {
146      /* UN NL CR CW PR PW EX PD */
147        {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
148        {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
149        {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
150        {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
151        {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
152        {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
153        {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
154        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
155};
156
157void dlm_print_lkb(struct dlm_lkb *lkb)
158{
159	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
160	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
161	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
162	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
163	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
164}
165
166void dlm_print_rsb(struct dlm_rsb *r)
167{
168	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
169	       r->res_nodeid, r->res_flags, r->res_first_lkid,
170	       r->res_recover_locks_count, r->res_name);
171}
172
173void dlm_dump_rsb(struct dlm_rsb *r)
174{
175	struct dlm_lkb *lkb;
176
177	dlm_print_rsb(r);
178
179	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
180	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
181	printk(KERN_ERR "rsb lookup list\n");
182	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
183		dlm_print_lkb(lkb);
184	printk(KERN_ERR "rsb grant queue:\n");
185	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
186		dlm_print_lkb(lkb);
187	printk(KERN_ERR "rsb convert queue:\n");
188	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
189		dlm_print_lkb(lkb);
190	printk(KERN_ERR "rsb wait queue:\n");
191	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
192		dlm_print_lkb(lkb);
193}
194
195/* Threads cannot use the lockspace while it's being recovered */
196
197static inline void lock_recovery(struct dlm_ls *ls)
198{
199	down_read(&ls->ls_in_recovery);
200}
201
202static inline void unlock_recovery(struct dlm_ls *ls)
203{
204	up_read(&ls->ls_in_recovery);
205}
206
207static inline int lock_recovery_try(struct dlm_ls *ls)
208{
209	return down_read_trylock(&ls->ls_in_recovery);
210}
211
212static inline int can_be_queued(struct dlm_lkb *lkb)
213{
214	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
215}
216
217static inline int force_blocking_asts(struct dlm_lkb *lkb)
218{
219	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
220}
221
222static inline int is_demoted(struct dlm_lkb *lkb)
223{
224	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
225}
226
227static inline int is_altmode(struct dlm_lkb *lkb)
228{
229	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
230}
231
232static inline int is_granted(struct dlm_lkb *lkb)
233{
234	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
235}
236
237static inline int is_remote(struct dlm_rsb *r)
238{
239	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
240	return !!r->res_nodeid;
241}
242
243static inline int is_process_copy(struct dlm_lkb *lkb)
244{
245	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
246}
247
248static inline int is_master_copy(struct dlm_lkb *lkb)
249{
250	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
251		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
252	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
253}
254
255static inline int middle_conversion(struct dlm_lkb *lkb)
256{
257	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
258	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
259		return 1;
260	return 0;
261}
262
263static inline int down_conversion(struct dlm_lkb *lkb)
264{
265	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
266}
267
268static inline int is_overlap_unlock(struct dlm_lkb *lkb)
269{
270	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
271}
272
273static inline int is_overlap_cancel(struct dlm_lkb *lkb)
274{
275	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
276}
277
278static inline int is_overlap(struct dlm_lkb *lkb)
279{
280	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
281				  DLM_IFL_OVERLAP_CANCEL));
282}
283
284static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
285{
286	if (is_master_copy(lkb))
287		return;
288
289	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
290
291	lkb->lkb_lksb->sb_status = rv;
292	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
293
294	dlm_add_ast(lkb, AST_COMP);
295}
296
297static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
298{
299	queue_cast(r, lkb,
300		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
301}
302
303static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
304{
305	if (is_master_copy(lkb))
306		send_bast(r, lkb, rqmode);
307	else {
308		lkb->lkb_bastmode = rqmode;
309		dlm_add_ast(lkb, AST_BAST);
310	}
311}
312
313/*
314 * Basic operations on rsb's and lkb's
315 */
316
317static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
318{
319	struct dlm_rsb *r;
320
321	r = allocate_rsb(ls, len);
322	if (!r)
323		return NULL;
324
325	r->res_ls = ls;
326	r->res_length = len;
327	memcpy(r->res_name, name, len);
328	mutex_init(&r->res_mutex);
329
330	INIT_LIST_HEAD(&r->res_lookup);
331	INIT_LIST_HEAD(&r->res_grantqueue);
332	INIT_LIST_HEAD(&r->res_convertqueue);
333	INIT_LIST_HEAD(&r->res_waitqueue);
334	INIT_LIST_HEAD(&r->res_root_list);
335	INIT_LIST_HEAD(&r->res_recover_list);
336
337	return r;
338}
339
340static int search_rsb_list(struct list_head *head, char *name, int len,
341			   unsigned int flags, struct dlm_rsb **r_ret)
342{
343	struct dlm_rsb *r;
344	int error = 0;
345
346	list_for_each_entry(r, head, res_hashchain) {
347		if (len == r->res_length && !memcmp(name, r->res_name, len))
348			goto found;
349	}
350	return -EBADR;
351
352 found:
353	if (r->res_nodeid && (flags & R_MASTER))
354		error = -ENOTBLK;
355	*r_ret = r;
356	return error;
357}
358
359static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
360		       unsigned int flags, struct dlm_rsb **r_ret)
361{
362	struct dlm_rsb *r;
363	int error;
364
365	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
366	if (!error) {
367		kref_get(&r->res_ref);
368		goto out;
369	}
370	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
371	if (error)
372		goto out;
373
374	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
375
376	if (dlm_no_directory(ls))
377		goto out;
378
379	if (r->res_nodeid == -1) {
380		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
381		r->res_first_lkid = 0;
382	} else if (r->res_nodeid > 0) {
383		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
384		r->res_first_lkid = 0;
385	} else {
386		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
387		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
388	}
389 out:
390	*r_ret = r;
391	return error;
392}
393
394static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
395		      unsigned int flags, struct dlm_rsb **r_ret)
396{
397	int error;
398	write_lock(&ls->ls_rsbtbl[b].lock);
399	error = _search_rsb(ls, name, len, b, flags, r_ret);
400	write_unlock(&ls->ls_rsbtbl[b].lock);
401	return error;
402}
403
404/*
405 * Find rsb in rsbtbl and potentially create/add one
406 *
407 * Delaying the release of rsb's has a similar benefit to applications keeping
408 * NL locks on an rsb, but without the guarantee that the cached master value
409 * will still be valid when the rsb is reused.  Apps aren't always smart enough
410 * to keep NL locks on an rsb that they may lock again shortly; this can lead
411 * to excessive master lookups and removals if we don't delay the release.
412 *
413 * Searching for an rsb means looking through both the normal list and toss
414 * list.  When found on the toss list the rsb is moved to the normal list with
415 * ref count of 1; when found on normal list the ref count is incremented.
416 */
417
418static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
419		    unsigned int flags, struct dlm_rsb **r_ret)
420{
421	struct dlm_rsb *r, *tmp;
422	uint32_t hash, bucket;
423	int error = 0;
424
425	if (dlm_no_directory(ls))
426		flags |= R_CREATE;
427
428	hash = jhash(name, namelen, 0);
429	bucket = hash & (ls->ls_rsbtbl_size - 1);
430
431	error = search_rsb(ls, name, namelen, bucket, flags, &r);
432	if (!error)
433		goto out;
434
435	if (error == -EBADR && !(flags & R_CREATE))
436		goto out;
437
438	/* the rsb was found but wasn't a master copy */
439	if (error == -ENOTBLK)
440		goto out;
441
442	error = -ENOMEM;
443	r = create_rsb(ls, name, namelen);
444	if (!r)
445		goto out;
446
447	r->res_hash = hash;
448	r->res_bucket = bucket;
449	r->res_nodeid = -1;
450	kref_init(&r->res_ref);
451
452	/* With no directory, the master can be set immediately */
453	if (dlm_no_directory(ls)) {
454		int nodeid = dlm_dir_nodeid(r);
455		if (nodeid == dlm_our_nodeid())
456			nodeid = 0;
457		r->res_nodeid = nodeid;
458	}
459
460	write_lock(&ls->ls_rsbtbl[bucket].lock);
461	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
462	if (!error) {
463		write_unlock(&ls->ls_rsbtbl[bucket].lock);
464		free_rsb(r);
465		r = tmp;
466		goto out;
467	}
468	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
469	write_unlock(&ls->ls_rsbtbl[bucket].lock);
470	error = 0;
471 out:
472	*r_ret = r;
473	return error;
474}
475
476int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
477		 unsigned int flags, struct dlm_rsb **r_ret)
478{
479	return find_rsb(ls, name, namelen, flags, r_ret);
480}
481
482/* This is only called to add a reference when the code already holds
483   a valid reference to the rsb, so there's no need for locking. */
484
485static inline void hold_rsb(struct dlm_rsb *r)
486{
487	kref_get(&r->res_ref);
488}
489
490void dlm_hold_rsb(struct dlm_rsb *r)
491{
492	hold_rsb(r);
493}
494
495static void toss_rsb(struct kref *kref)
496{
497	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
498	struct dlm_ls *ls = r->res_ls;
499
500	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
501	kref_init(&r->res_ref);
502	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
503	r->res_toss_time = jiffies;
504	if (r->res_lvbptr) {
505		free_lvb(r->res_lvbptr);
506		r->res_lvbptr = NULL;
507	}
508}
509
510/* When all references to the rsb are gone it's transfered to
511   the tossed list for later disposal. */
512
513static void put_rsb(struct dlm_rsb *r)
514{
515	struct dlm_ls *ls = r->res_ls;
516	uint32_t bucket = r->res_bucket;
517
518	write_lock(&ls->ls_rsbtbl[bucket].lock);
519	kref_put(&r->res_ref, toss_rsb);
520	write_unlock(&ls->ls_rsbtbl[bucket].lock);
521}
522
523void dlm_put_rsb(struct dlm_rsb *r)
524{
525	put_rsb(r);
526}
527
528/* See comment for unhold_lkb */
529
530static void unhold_rsb(struct dlm_rsb *r)
531{
532	int rv;
533	rv = kref_put(&r->res_ref, toss_rsb);
534	DLM_ASSERT(!rv, dlm_dump_rsb(r););
535}
536
537static void kill_rsb(struct kref *kref)
538{
539	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
540
541	/* All work is done after the return from kref_put() so we
542	   can release the write_lock before the remove and free. */
543
544	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
545	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
546	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
547	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
548	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
549	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
550}
551
552/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
553   The rsb must exist as long as any lkb's for it do. */
554
555static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
556{
557	hold_rsb(r);
558	lkb->lkb_resource = r;
559}
560
561static void detach_lkb(struct dlm_lkb *lkb)
562{
563	if (lkb->lkb_resource) {
564		put_rsb(lkb->lkb_resource);
565		lkb->lkb_resource = NULL;
566	}
567}
568
569static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
570{
571	struct dlm_lkb *lkb, *tmp;
572	uint32_t lkid = 0;
573	uint16_t bucket;
574
575	lkb = allocate_lkb(ls);
576	if (!lkb)
577		return -ENOMEM;
578
579	lkb->lkb_nodeid = -1;
580	lkb->lkb_grmode = DLM_LOCK_IV;
581	kref_init(&lkb->lkb_ref);
582	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
583	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
584
585	get_random_bytes(&bucket, sizeof(bucket));
586	bucket &= (ls->ls_lkbtbl_size - 1);
587
588	write_lock(&ls->ls_lkbtbl[bucket].lock);
589
590	/* counter can roll over so we must verify lkid is not in use */
591
592	while (lkid == 0) {
593		lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
594
595		list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
596				    lkb_idtbl_list) {
597			if (tmp->lkb_id != lkid)
598				continue;
599			lkid = 0;
600			break;
601		}
602	}
603
604	lkb->lkb_id = lkid;
605	list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
606	write_unlock(&ls->ls_lkbtbl[bucket].lock);
607
608	*lkb_ret = lkb;
609	return 0;
610}
611
612static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
613{
614	struct dlm_lkb *lkb;
615	uint16_t bucket = (lkid >> 16);
616
617	list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
618		if (lkb->lkb_id == lkid)
619			return lkb;
620	}
621	return NULL;
622}
623
624static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
625{
626	struct dlm_lkb *lkb;
627	uint16_t bucket = (lkid >> 16);
628
629	if (bucket >= ls->ls_lkbtbl_size)
630		return -EBADSLT;
631
632	read_lock(&ls->ls_lkbtbl[bucket].lock);
633	lkb = __find_lkb(ls, lkid);
634	if (lkb)
635		kref_get(&lkb->lkb_ref);
636	read_unlock(&ls->ls_lkbtbl[bucket].lock);
637
638	*lkb_ret = lkb;
639	return lkb ? 0 : -ENOENT;
640}
641
642static void kill_lkb(struct kref *kref)
643{
644	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
645
646	/* All work is done after the return from kref_put() so we
647	   can release the write_lock before the detach_lkb */
648
649	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
650}
651
652/* __put_lkb() is used when an lkb may not have an rsb attached to
653   it so we need to provide the lockspace explicitly */
654
655static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
656{
657	uint16_t bucket = (lkb->lkb_id >> 16);
658
659	write_lock(&ls->ls_lkbtbl[bucket].lock);
660	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
661		list_del(&lkb->lkb_idtbl_list);
662		write_unlock(&ls->ls_lkbtbl[bucket].lock);
663
664		detach_lkb(lkb);
665
666		/* for local/process lkbs, lvbptr points to caller's lksb */
667		if (lkb->lkb_lvbptr && is_master_copy(lkb))
668			free_lvb(lkb->lkb_lvbptr);
669		free_lkb(lkb);
670		return 1;
671	} else {
672		write_unlock(&ls->ls_lkbtbl[bucket].lock);
673		return 0;
674	}
675}
676
677int dlm_put_lkb(struct dlm_lkb *lkb)
678{
679	struct dlm_ls *ls;
680
681	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
682	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
683
684	ls = lkb->lkb_resource->res_ls;
685	return __put_lkb(ls, lkb);
686}
687
688/* This is only called to add a reference when the code already holds
689   a valid reference to the lkb, so there's no need for locking. */
690
691static inline void hold_lkb(struct dlm_lkb *lkb)
692{
693	kref_get(&lkb->lkb_ref);
694}
695
696/* This is called when we need to remove a reference and are certain
697   it's not the last ref.  e.g. del_lkb is always called between a
698   find_lkb/put_lkb and is always the inverse of a previous add_lkb.
699   put_lkb would work fine, but would involve unnecessary locking */
700
701static inline void unhold_lkb(struct dlm_lkb *lkb)
702{
703	int rv;
704	rv = kref_put(&lkb->lkb_ref, kill_lkb);
705	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
706}
707
708static void lkb_add_ordered(struct list_head *new, struct list_head *head,
709			    int mode)
710{
711	struct dlm_lkb *lkb = NULL;
712
713	list_for_each_entry(lkb, head, lkb_statequeue)
714		if (lkb->lkb_rqmode < mode)
715			break;
716
717	if (!lkb)
718		list_add_tail(new, head);
719	else
720		__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
721}
722
723/* add/remove lkb to rsb's grant/convert/wait queue */
724
725static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
726{
727	kref_get(&lkb->lkb_ref);
728
729	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
730
731	lkb->lkb_status = status;
732
733	switch (status) {
734	case DLM_LKSTS_WAITING:
735		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
736			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
737		else
738			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
739		break;
740	case DLM_LKSTS_GRANTED:
741		/* convention says granted locks kept in order of grmode */
742		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
743				lkb->lkb_grmode);
744		break;
745	case DLM_LKSTS_CONVERT:
746		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
747			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
748		else
749			list_add_tail(&lkb->lkb_statequeue,
750				      &r->res_convertqueue);
751		break;
752	default:
753		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
754	}
755}
756
757static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
758{
759	lkb->lkb_status = 0;
760	list_del(&lkb->lkb_statequeue);
761	unhold_lkb(lkb);
762}
763
764static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
765{
766	hold_lkb(lkb);
767	del_lkb(r, lkb);
768	add_lkb(r, lkb, sts);
769	unhold_lkb(lkb);
770}
771
772static int msg_reply_type(int mstype)
773{
774	switch (mstype) {
775	case DLM_MSG_REQUEST:
776		return DLM_MSG_REQUEST_REPLY;
777	case DLM_MSG_CONVERT:
778		return DLM_MSG_CONVERT_REPLY;
779	case DLM_MSG_UNLOCK:
780		return DLM_MSG_UNLOCK_REPLY;
781	case DLM_MSG_CANCEL:
782		return DLM_MSG_CANCEL_REPLY;
783	case DLM_MSG_LOOKUP:
784		return DLM_MSG_LOOKUP_REPLY;
785	}
786	return -1;
787}
788
789/* add/remove lkb from global waiters list of lkb's waiting for
790   a reply from a remote node */
791
792static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
793{
794	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
795	int error = 0;
796
797	mutex_lock(&ls->ls_waiters_mutex);
798
799	if (is_overlap_unlock(lkb) ||
800	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
801		error = -EINVAL;
802		goto out;
803	}
804
805	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
806		switch (mstype) {
807		case DLM_MSG_UNLOCK:
808			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
809			break;
810		case DLM_MSG_CANCEL:
811			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
812			break;
813		default:
814			error = -EBUSY;
815			goto out;
816		}
817		lkb->lkb_wait_count++;
818		hold_lkb(lkb);
819
820		log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
821			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
822			  lkb->lkb_wait_count, lkb->lkb_flags);
823		goto out;
824	}
825
826	DLM_ASSERT(!lkb->lkb_wait_count,
827		   dlm_print_lkb(lkb);
828		   printk("wait_count %d\n", lkb->lkb_wait_count););
829
830	lkb->lkb_wait_count++;
831	lkb->lkb_wait_type = mstype;
832	hold_lkb(lkb);
833	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
834 out:
835	if (error)
836		log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
837			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
838			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
839	mutex_unlock(&ls->ls_waiters_mutex);
840	return error;
841}
842
843/* We clear the RESEND flag because we might be taking an lkb off the waiters
844   list as part of process_requestqueue (e.g. a lookup that has an optimized
845   request reply on the requestqueue) between dlm_recover_waiters_pre() which
846   set RESEND and dlm_recover_waiters_post() */
847
848static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
849{
850	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
851	int overlap_done = 0;
852
853	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
854		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
855		overlap_done = 1;
856		goto out_del;
857	}
858
859	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
860		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
861		overlap_done = 1;
862		goto out_del;
863	}
864
865	/* N.B. type of reply may not always correspond to type of original
866	   msg due to lookup->request optimization, verify others? */
867
868	if (lkb->lkb_wait_type) {
869		lkb->lkb_wait_type = 0;
870		goto out_del;
871	}
872
873	log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
874		  lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
875	return -1;
876
877 out_del:
878
879	if (overlap_done && lkb->lkb_wait_type) {
880		log_error(ls, "remove_from_waiters %x reply %d give up on %d",
881			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
882		lkb->lkb_wait_count--;
883		lkb->lkb_wait_type = 0;
884	}
885
886	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
887
888	lkb->lkb_flags &= ~DLM_IFL_RESEND;
889	lkb->lkb_wait_count--;
890	if (!lkb->lkb_wait_count)
891		list_del_init(&lkb->lkb_wait_reply);
892	unhold_lkb(lkb);
893	return 0;
894}
895
896static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
897{
898	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
899	int error;
900
901	mutex_lock(&ls->ls_waiters_mutex);
902	error = _remove_from_waiters(lkb, mstype);
903	mutex_unlock(&ls->ls_waiters_mutex);
904	return error;
905}
906
907/* Handles situations where we might be processing a "fake" or "stub" reply in
908   which we can't try to take waiters_mutex again. */
909
910static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
911{
912	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
913	int error;
914
915	if (ms != &ls->ls_stub_ms)
916		mutex_lock(&ls->ls_waiters_mutex);
917	error = _remove_from_waiters(lkb, ms->m_type);
918	if (ms != &ls->ls_stub_ms)
919		mutex_unlock(&ls->ls_waiters_mutex);
920	return error;
921}
922
923static void dir_remove(struct dlm_rsb *r)
924{
925	int to_nodeid;
926
927	if (dlm_no_directory(r->res_ls))
928		return;
929
930	to_nodeid = dlm_dir_nodeid(r);
931	if (to_nodeid != dlm_our_nodeid())
932		send_remove(r);
933	else
934		dlm_dir_remove_entry(r->res_ls, to_nodeid,
935				     r->res_name, r->res_length);
936}
937
938
939static int shrink_bucket(struct dlm_ls *ls, int b)
940{
941	struct dlm_rsb *r;
942	int count = 0, found;
943
944	for (;;) {
945		found = 0;
946		write_lock(&ls->ls_rsbtbl[b].lock);
947		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
948					    res_hashchain) {
949			if (!time_after_eq(jiffies, r->res_toss_time +
950					   dlm_config.ci_toss_secs * HZ))
951				continue;
952			found = 1;
953			break;
954		}
955
956		if (!found) {
957			write_unlock(&ls->ls_rsbtbl[b].lock);
958			break;
959		}
960
961		if (kref_put(&r->res_ref, kill_rsb)) {
962			list_del(&r->res_hashchain);
963			write_unlock(&ls->ls_rsbtbl[b].lock);
964
965			if (is_master(r))
966				dir_remove(r);
967			free_rsb(r);
968			count++;
969		} else {
970			write_unlock(&ls->ls_rsbtbl[b].lock);
971			log_error(ls, "tossed rsb in use %s", r->res_name);
972		}
973	}
974
975	return count;
976}
977
978void dlm_scan_rsbs(struct dlm_ls *ls)
979{
980	int i;
981
982	if (dlm_locking_stopped(ls))
983		return;
984
985	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
986		shrink_bucket(ls, i);
987		cond_resched();
988	}
989}
990
991/* lkb is master or local copy */
992
993static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
994{
995	int b, len = r->res_ls->ls_lvblen;
996
997	/* b=1 lvb returned to caller
998	   b=0 lvb written to rsb or invalidated
999	   b=-1 do nothing */
1000
1001	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1002
1003	if (b == 1) {
1004		if (!lkb->lkb_lvbptr)
1005			return;
1006
1007		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1008			return;
1009
1010		if (!r->res_lvbptr)
1011			return;
1012
1013		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1014		lkb->lkb_lvbseq = r->res_lvbseq;
1015
1016	} else if (b == 0) {
1017		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1018			rsb_set_flag(r, RSB_VALNOTVALID);
1019			return;
1020		}
1021
1022		if (!lkb->lkb_lvbptr)
1023			return;
1024
1025		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1026			return;
1027
1028		if (!r->res_lvbptr)
1029			r->res_lvbptr = allocate_lvb(r->res_ls);
1030
1031		if (!r->res_lvbptr)
1032			return;
1033
1034		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1035		r->res_lvbseq++;
1036		lkb->lkb_lvbseq = r->res_lvbseq;
1037		rsb_clear_flag(r, RSB_VALNOTVALID);
1038	}
1039
1040	if (rsb_flag(r, RSB_VALNOTVALID))
1041		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1042}
1043
1044static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1045{
1046	if (lkb->lkb_grmode < DLM_LOCK_PW)
1047		return;
1048
1049	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1050		rsb_set_flag(r, RSB_VALNOTVALID);
1051		return;
1052	}
1053
1054	if (!lkb->lkb_lvbptr)
1055		return;
1056
1057	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1058		return;
1059
1060	if (!r->res_lvbptr)
1061		r->res_lvbptr = allocate_lvb(r->res_ls);
1062
1063	if (!r->res_lvbptr)
1064		return;
1065
1066	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1067	r->res_lvbseq++;
1068	rsb_clear_flag(r, RSB_VALNOTVALID);
1069}
1070
1071/* lkb is process copy (pc) */
1072
1073static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1074			    struct dlm_message *ms)
1075{
1076	int b;
1077
1078	if (!lkb->lkb_lvbptr)
1079		return;
1080
1081	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1082		return;
1083
1084	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1085	if (b == 1) {
1086		int len = receive_extralen(ms);
1087		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1088		lkb->lkb_lvbseq = ms->m_lvbseq;
1089	}
1090}
1091
1092/* Manipulate lkb's on rsb's convert/granted/waiting queues
1093   remove_lock -- used for unlock, removes lkb from granted
1094   revert_lock -- used for cancel, moves lkb from convert to granted
1095   grant_lock  -- used for request and convert, adds lkb to granted or
1096                  moves lkb from convert or waiting to granted
1097
1098   Each of these is used for master or local copy lkb's.  There is
1099   also a _pc() variation used to make the corresponding change on
1100   a process copy (pc) lkb. */
1101
1102static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1103{
1104	del_lkb(r, lkb);
1105	lkb->lkb_grmode = DLM_LOCK_IV;
1106	/* this unhold undoes the original ref from create_lkb()
1107	   so this leads to the lkb being freed */
1108	unhold_lkb(lkb);
1109}
1110
1111static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1112{
1113	set_lvb_unlock(r, lkb);
1114	_remove_lock(r, lkb);
1115}
1116
1117static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1118{
1119	_remove_lock(r, lkb);
1120}
1121
1122/* returns: 0 did nothing
1123	    1 moved lock to granted
1124	   -1 removed lock */
1125
1126static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1127{
1128	int rv = 0;
1129
1130	lkb->lkb_rqmode = DLM_LOCK_IV;
1131
1132	switch (lkb->lkb_status) {
1133	case DLM_LKSTS_GRANTED:
1134		break;
1135	case DLM_LKSTS_CONVERT:
1136		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1137		rv = 1;
1138		break;
1139	case DLM_LKSTS_WAITING:
1140		del_lkb(r, lkb);
1141		lkb->lkb_grmode = DLM_LOCK_IV;
1142		/* this unhold undoes the original ref from create_lkb()
1143		   so this leads to the lkb being freed */
1144		unhold_lkb(lkb);
1145		rv = -1;
1146		break;
1147	default:
1148		log_print("invalid status for revert %d", lkb->lkb_status);
1149	}
1150	return rv;
1151}
1152
1153static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1154{
1155	return revert_lock(r, lkb);
1156}
1157
1158static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1159{
1160	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1161		lkb->lkb_grmode = lkb->lkb_rqmode;
1162		if (lkb->lkb_status)
1163			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1164		else
1165			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1166	}
1167
1168	lkb->lkb_rqmode = DLM_LOCK_IV;
1169}
1170
1171static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1172{
1173	set_lvb_lock(r, lkb);
1174	_grant_lock(r, lkb);
1175	lkb->lkb_highbast = 0;
1176}
1177
1178static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1179			  struct dlm_message *ms)
1180{
1181	set_lvb_lock_pc(r, lkb, ms);
1182	_grant_lock(r, lkb);
1183}
1184
1185/* called by grant_pending_locks() which means an async grant message must
1186   be sent to the requesting node in addition to granting the lock if the
1187   lkb belongs to a remote node. */
1188
1189static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1190{
1191	grant_lock(r, lkb);
1192	if (is_master_copy(lkb))
1193		send_grant(r, lkb);
1194	else
1195		queue_cast(r, lkb, 0);
1196}
1197
1198/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1199   change the granted/requested modes.  We're munging things accordingly in
1200   the process copy.
1201   CONVDEADLK: our grmode may have been forced down to NL to resolve a
1202   conversion deadlock
1203   ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1204   compatible with other granted locks */
1205
1206static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1207{
1208	if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1209		log_print("munge_demoted %x invalid reply type %d",
1210			  lkb->lkb_id, ms->m_type);
1211		return;
1212	}
1213
1214	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1215		log_print("munge_demoted %x invalid modes gr %d rq %d",
1216			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1217		return;
1218	}
1219
1220	lkb->lkb_grmode = DLM_LOCK_NL;
1221}
1222
1223static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1224{
1225	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1226	    ms->m_type != DLM_MSG_GRANT) {
1227		log_print("munge_altmode %x invalid reply type %d",
1228			  lkb->lkb_id, ms->m_type);
1229		return;
1230	}
1231
1232	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1233		lkb->lkb_rqmode = DLM_LOCK_PR;
1234	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1235		lkb->lkb_rqmode = DLM_LOCK_CW;
1236	else {
1237		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1238		dlm_print_lkb(lkb);
1239	}
1240}
1241
1242static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1243{
1244	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1245					   lkb_statequeue);
1246	if (lkb->lkb_id == first->lkb_id)
1247		return 1;
1248
1249	return 0;
1250}
1251
1252/* Check if the given lkb conflicts with another lkb on the queue. */
1253
1254static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1255{
1256	struct dlm_lkb *this;
1257
1258	list_for_each_entry(this, head, lkb_statequeue) {
1259		if (this == lkb)
1260			continue;
1261		if (!modes_compat(this, lkb))
1262			return 1;
1263	}
1264	return 0;
1265}
1266
1267/*
1268 * "A conversion deadlock arises with a pair of lock requests in the converting
1269 * queue for one resource.  The granted mode of each lock blocks the requested
1270 * mode of the other lock."
1271 *
1272 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1273 * convert queue from being granted, then demote lkb (set grmode to NL).
1274 * This second form requires that we check for conv-deadlk even when
1275 * now == 0 in _can_be_granted().
1276 *
1277 * Example:
1278 * Granted Queue: empty
1279 * Convert Queue: NL->EX (first lock)
1280 *                PR->EX (second lock)
1281 *
1282 * The first lock can't be granted because of the granted mode of the second
1283 * lock and the second lock can't be granted because it's not first in the
1284 * list.  We demote the granted mode of the second lock (the lkb passed to this
1285 * function).
1286 *
1287 * After the resolution, the "grant pending" function needs to go back and try
1288 * to grant locks on the convert queue again since the first lock can now be
1289 * granted.
1290 */
1291
1292static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1293{
1294	struct dlm_lkb *this, *first = NULL, *self = NULL;
1295
1296	list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1297		if (!first)
1298			first = this;
1299		if (this == lkb) {
1300			self = lkb;
1301			continue;
1302		}
1303
1304		if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1305			return 1;
1306	}
1307
1308	/* if lkb is on the convert queue and is preventing the first
1309	   from being granted, then there's deadlock and we demote lkb.
1310	   multiple converting locks may need to do this before the first
1311	   converting lock can be granted. */
1312
1313	if (self && self != first) {
1314		if (!modes_compat(lkb, first) &&
1315		    !queue_conflict(&rsb->res_grantqueue, first))
1316			return 1;
1317	}
1318
1319	return 0;
1320}
1321
1322/*
1323 * Return 1 if the lock can be granted, 0 otherwise.
1324 * Also detect and resolve conversion deadlocks.
1325 *
1326 * lkb is the lock to be granted
1327 *
1328 * now is 1 if the function is being called in the context of the
1329 * immediate request, it is 0 if called later, after the lock has been
1330 * queued.
1331 *
1332 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1333 */
1334
1335static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1336{
1337	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1338
1339	/*
1340	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1341	 * a new request for a NL mode lock being blocked.
1342	 *
1343	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1344	 * request, then it would be granted.  In essence, the use of this flag
1345	 * tells the Lock Manager to expedite theis request by not considering
1346	 * what may be in the CONVERTING or WAITING queues...  As of this
1347	 * writing, the EXPEDITE flag can be used only with new requests for NL
1348	 * mode locks.  This flag is not valid for conversion requests.
1349	 *
1350	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1351	 * conversion or used with a non-NL requested mode.  We also know an
1352	 * EXPEDITE request is always granted immediately, so now must always
1353	 * be 1.  The full condition to grant an expedite request: (now &&
1354	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1355	 * therefore be shortened to just checking the flag.
1356	 */
1357
1358	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1359		return 1;
1360
1361	/*
1362	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1363	 * added to the remaining conditions.
1364	 */
1365
1366	if (queue_conflict(&r->res_grantqueue, lkb))
1367		goto out;
1368
1369	/*
1370	 * 6-3: By default, a conversion request is immediately granted if the
1371	 * requested mode is compatible with the modes of all other granted
1372	 * locks
1373	 */
1374
1375	if (queue_conflict(&r->res_convertqueue, lkb))
1376		goto out;
1377
1378	/*
1379	 * 6-5: But the default algorithm for deciding whether to grant or
1380	 * queue conversion requests does not by itself guarantee that such
1381	 * requests are serviced on a "first come first serve" basis.  This, in
1382	 * turn, can lead to a phenomenon known as "indefinate postponement".
1383	 *
1384	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1385	 * the system service employed to request a lock conversion.  This flag
1386	 * forces certain conversion requests to be queued, even if they are
1387	 * compatible with the granted modes of other locks on the same
1388	 * resource.  Thus, the use of this flag results in conversion requests
1389	 * being ordered on a "first come first servce" basis.
1390	 *
1391	 * DCT: This condition is all about new conversions being able to occur
1392	 * "in place" while the lock remains on the granted queue (assuming
1393	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1394	 * doesn't _have_ to go onto the convert queue where it's processed in
1395	 * order.  The "now" variable is necessary to distinguish converts
1396	 * being received and processed for the first time now, because once a
1397	 * convert is moved to the conversion queue the condition below applies
1398	 * requiring fifo granting.
1399	 */
1400
1401	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1402		return 1;
1403
1404	/*
1405	 * The NOORDER flag is set to avoid the standard vms rules on grant
1406	 * order.
1407	 */
1408
1409	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1410		return 1;
1411
1412	/*
1413	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1414	 * granted until all other conversion requests ahead of it are granted
1415	 * and/or canceled.
1416	 */
1417
1418	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1419		return 1;
1420
1421	/*
1422	 * 6-4: By default, a new request is immediately granted only if all
1423	 * three of the following conditions are satisfied when the request is
1424	 * issued:
1425	 * - The queue of ungranted conversion requests for the resource is
1426	 *   empty.
1427	 * - The queue of ungranted new requests for the resource is empty.
1428	 * - The mode of the new request is compatible with the most
1429	 *   restrictive mode of all granted locks on the resource.
1430	 */
1431
1432	if (now && !conv && list_empty(&r->res_convertqueue) &&
1433	    list_empty(&r->res_waitqueue))
1434		return 1;
1435
1436	/*
1437	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1438	 * it cannot be granted until the queue of ungranted conversion
1439	 * requests is empty, all ungranted new requests ahead of it are
1440	 * granted and/or canceled, and it is compatible with the granted mode
1441	 * of the most restrictive lock granted on the resource.
1442	 */
1443
1444	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1445	    first_in_list(lkb, &r->res_waitqueue))
1446		return 1;
1447
1448 out:
1449	/*
1450	 * The following, enabled by CONVDEADLK, departs from VMS.
1451	 */
1452
1453	if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1454	    conversion_deadlock_detect(r, lkb)) {
1455		lkb->lkb_grmode = DLM_LOCK_NL;
1456		lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1457	}
1458
1459	return 0;
1460}
1461
1462/*
1463 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1464 * simple way to provide a big optimization to applications that can use them.
1465 */
1466
1467static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1468{
1469	uint32_t flags = lkb->lkb_exflags;
1470	int rv;
1471	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1472
1473	rv = _can_be_granted(r, lkb, now);
1474	if (rv)
1475		goto out;
1476
1477	if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1478		goto out;
1479
1480	if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1481		alt = DLM_LOCK_PR;
1482	else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1483		alt = DLM_LOCK_CW;
1484
1485	if (alt) {
1486		lkb->lkb_rqmode = alt;
1487		rv = _can_be_granted(r, lkb, now);
1488		if (rv)
1489			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1490		else
1491			lkb->lkb_rqmode = rqmode;
1492	}
1493 out:
1494	return rv;
1495}
1496
1497static int grant_pending_convert(struct dlm_rsb *r, int high)
1498{
1499	struct dlm_lkb *lkb, *s;
1500	int hi, demoted, quit, grant_restart, demote_restart;
1501
1502	quit = 0;
1503 restart:
1504	grant_restart = 0;
1505	demote_restart = 0;
1506	hi = DLM_LOCK_IV;
1507
1508	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1509		demoted = is_demoted(lkb);
1510		if (can_be_granted(r, lkb, 0)) {
1511			grant_lock_pending(r, lkb);
1512			grant_restart = 1;
1513		} else {
1514			hi = max_t(int, lkb->lkb_rqmode, hi);
1515			if (!demoted && is_demoted(lkb))
1516				demote_restart = 1;
1517		}
1518	}
1519
1520	if (grant_restart)
1521		goto restart;
1522	if (demote_restart && !quit) {
1523		quit = 1;
1524		goto restart;
1525	}
1526
1527	return max_t(int, high, hi);
1528}
1529
1530static int grant_pending_wait(struct dlm_rsb *r, int high)
1531{
1532	struct dlm_lkb *lkb, *s;
1533
1534	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1535		if (can_be_granted(r, lkb, 0))
1536			grant_lock_pending(r, lkb);
1537                else
1538			high = max_t(int, lkb->lkb_rqmode, high);
1539	}
1540
1541	return high;
1542}
1543
1544static void grant_pending_locks(struct dlm_rsb *r)
1545{
1546	struct dlm_lkb *lkb, *s;
1547	int high = DLM_LOCK_IV;
1548
1549	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1550
1551	high = grant_pending_convert(r, high);
1552	high = grant_pending_wait(r, high);
1553
1554	if (high == DLM_LOCK_IV)
1555		return;
1556
1557
1558	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1559		if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1560		    !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1561			queue_bast(r, lkb, high);
1562			lkb->lkb_highbast = high;
1563		}
1564	}
1565}
1566
1567static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1568			    struct dlm_lkb *lkb)
1569{
1570	struct dlm_lkb *gr;
1571
1572	list_for_each_entry(gr, head, lkb_statequeue) {
1573		if (gr->lkb_bastaddr &&
1574		    gr->lkb_highbast < lkb->lkb_rqmode &&
1575		    !modes_compat(gr, lkb)) {
1576			queue_bast(r, gr, lkb->lkb_rqmode);
1577			gr->lkb_highbast = lkb->lkb_rqmode;
1578		}
1579	}
1580}
1581
1582static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1583{
1584	send_bast_queue(r, &r->res_grantqueue, lkb);
1585}
1586
1587static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1588{
1589	send_bast_queue(r, &r->res_grantqueue, lkb);
1590	send_bast_queue(r, &r->res_convertqueue, lkb);
1591}
1592
1593/* set_master(r, lkb) -- set the master nodeid of a resource
1594
1595   The purpose of this function is to set the nodeid field in the given
1596   lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1597   known, it can just be copied to the lkb and the function will return
1598   0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1599   before it can be copied to the lkb.
1600
1601   When the rsb nodeid is being looked up remotely, the initial lkb
1602   causing the lookup is kept on the ls_waiters list waiting for the
1603   lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1604   on the rsb's res_lookup list until the master is verified.
1605
1606   Return values:
1607   0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1608   1: the rsb master is not available and the lkb has been placed on
1609      a wait queue
1610*/
1611
1612static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1613{
1614	struct dlm_ls *ls = r->res_ls;
1615	int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1616
1617	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1618		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1619		r->res_first_lkid = lkb->lkb_id;
1620		lkb->lkb_nodeid = r->res_nodeid;
1621		return 0;
1622	}
1623
1624	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1625		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1626		return 1;
1627	}
1628
1629	if (r->res_nodeid == 0) {
1630		lkb->lkb_nodeid = 0;
1631		return 0;
1632	}
1633
1634	if (r->res_nodeid > 0) {
1635		lkb->lkb_nodeid = r->res_nodeid;
1636		return 0;
1637	}
1638
1639	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1640
1641	dir_nodeid = dlm_dir_nodeid(r);
1642
1643	if (dir_nodeid != our_nodeid) {
1644		r->res_first_lkid = lkb->lkb_id;
1645		send_lookup(r, lkb);
1646		return 1;
1647	}
1648
1649	for (;;) {
1650		/* It's possible for dlm_scand to remove an old rsb for
1651		   this same resource from the toss list, us to create
1652		   a new one, look up the master locally, and find it
1653		   already exists just before dlm_scand does the
1654		   dir_remove() on the previous rsb. */
1655
1656		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1657				       r->res_length, &ret_nodeid);
1658		if (!error)
1659			break;
1660		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1661		schedule();
1662	}
1663
1664	if (ret_nodeid == our_nodeid) {
1665		r->res_first_lkid = 0;
1666		r->res_nodeid = 0;
1667		lkb->lkb_nodeid = 0;
1668	} else {
1669		r->res_first_lkid = lkb->lkb_id;
1670		r->res_nodeid = ret_nodeid;
1671		lkb->lkb_nodeid = ret_nodeid;
1672	}
1673	return 0;
1674}
1675
1676static void process_lookup_list(struct dlm_rsb *r)
1677{
1678	struct dlm_lkb *lkb, *safe;
1679
1680	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1681		list_del_init(&lkb->lkb_rsb_lookup);
1682		_request_lock(r, lkb);
1683		schedule();
1684	}
1685}
1686
1687/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1688
1689static void confirm_master(struct dlm_rsb *r, int error)
1690{
1691	struct dlm_lkb *lkb;
1692
1693	if (!r->res_first_lkid)
1694		return;
1695
1696	switch (error) {
1697	case 0:
1698	case -EINPROGRESS:
1699		r->res_first_lkid = 0;
1700		process_lookup_list(r);
1701		break;
1702
1703	case -EAGAIN:
1704		/* the remote master didn't queue our NOQUEUE request;
1705		   make a waiting lkb the first_lkid */
1706
1707		r->res_first_lkid = 0;
1708
1709		if (!list_empty(&r->res_lookup)) {
1710			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1711					 lkb_rsb_lookup);
1712			list_del_init(&lkb->lkb_rsb_lookup);
1713			r->res_first_lkid = lkb->lkb_id;
1714			_request_lock(r, lkb);
1715		} else
1716			r->res_nodeid = -1;
1717		break;
1718
1719	default:
1720		log_error(r->res_ls, "confirm_master unknown error %d", error);
1721	}
1722}
1723
1724static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1725			 int namelen, uint32_t parent_lkid, void *ast,
1726			 void *astarg, void *bast, struct dlm_args *args)
1727{
1728	int rv = -EINVAL;
1729
1730	/* check for invalid arg usage */
1731
1732	if (mode < 0 || mode > DLM_LOCK_EX)
1733		goto out;
1734
1735	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1736		goto out;
1737
1738	if (flags & DLM_LKF_CANCEL)
1739		goto out;
1740
1741	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1742		goto out;
1743
1744	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1745		goto out;
1746
1747	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1748		goto out;
1749
1750	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1751		goto out;
1752
1753	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1754		goto out;
1755
1756	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1757		goto out;
1758
1759	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1760		goto out;
1761
1762	if (!ast || !lksb)
1763		goto out;
1764
1765	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1766		goto out;
1767
1768	/* parent/child locks not yet supported */
1769	if (parent_lkid)
1770		goto out;
1771
1772	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1773		goto out;
1774
1775	/* these args will be copied to the lkb in validate_lock_args,
1776	   it cannot be done now because when converting locks, fields in
1777	   an active lkb cannot be modified before locking the rsb */
1778
1779	args->flags = flags;
1780	args->astaddr = ast;
1781	args->astparam = (long) astarg;
1782	args->bastaddr = bast;
1783	args->mode = mode;
1784	args->lksb = lksb;
1785	rv = 0;
1786 out:
1787	return rv;
1788}
1789
1790static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1791{
1792	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1793 		      DLM_LKF_FORCEUNLOCK))
1794		return -EINVAL;
1795
1796	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1797		return -EINVAL;
1798
1799	args->flags = flags;
1800	args->astparam = (long) astarg;
1801	return 0;
1802}
1803
1804static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1805			      struct dlm_args *args)
1806{
1807	int rv = -EINVAL;
1808
1809	if (args->flags & DLM_LKF_CONVERT) {
1810		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1811			goto out;
1812
1813		if (args->flags & DLM_LKF_QUECVT &&
1814		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1815			goto out;
1816
1817		rv = -EBUSY;
1818		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1819			goto out;
1820
1821		if (lkb->lkb_wait_type)
1822			goto out;
1823
1824		if (is_overlap(lkb))
1825			goto out;
1826	}
1827
1828	lkb->lkb_exflags = args->flags;
1829	lkb->lkb_sbflags = 0;
1830	lkb->lkb_astaddr = args->astaddr;
1831	lkb->lkb_astparam = args->astparam;
1832	lkb->lkb_bastaddr = args->bastaddr;
1833	lkb->lkb_rqmode = args->mode;
1834	lkb->lkb_lksb = args->lksb;
1835	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1836	lkb->lkb_ownpid = (int) current->pid;
1837	rv = 0;
1838 out:
1839	return rv;
1840}
1841
1842/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1843   for success */
1844
1845/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1846   because there may be a lookup in progress and it's valid to do
1847   cancel/unlockf on it */
1848
1849static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1850{
1851	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1852	int rv = -EINVAL;
1853
1854	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1855		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1856		dlm_print_lkb(lkb);
1857		goto out;
1858	}
1859
1860	/* an lkb may still exist even though the lock is EOL'ed due to a
1861	   cancel, unlock or failed noqueue request; an app can't use these
1862	   locks; return same error as if the lkid had not been found at all */
1863
1864	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1865		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1866		rv = -ENOENT;
1867		goto out;
1868	}
1869
1870	/* an lkb may be waiting for an rsb lookup to complete where the
1871	   lookup was initiated by another lock */
1872
1873	if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1874		if (!list_empty(&lkb->lkb_rsb_lookup)) {
1875			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1876			list_del_init(&lkb->lkb_rsb_lookup);
1877			queue_cast(lkb->lkb_resource, lkb,
1878				   args->flags & DLM_LKF_CANCEL ?
1879				   -DLM_ECANCEL : -DLM_EUNLOCK);
1880			unhold_lkb(lkb); /* undoes create_lkb() */
1881			rv = -EBUSY;
1882			goto out;
1883		}
1884	}
1885
1886	/* cancel not allowed with another cancel/unlock in progress */
1887
1888	if (args->flags & DLM_LKF_CANCEL) {
1889		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1890			goto out;
1891
1892		if (is_overlap(lkb))
1893			goto out;
1894
1895		if (lkb->lkb_flags & DLM_IFL_RESEND) {
1896			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1897			rv = -EBUSY;
1898			goto out;
1899		}
1900
1901		switch (lkb->lkb_wait_type) {
1902		case DLM_MSG_LOOKUP:
1903		case DLM_MSG_REQUEST:
1904			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1905			rv = -EBUSY;
1906			goto out;
1907		case DLM_MSG_UNLOCK:
1908		case DLM_MSG_CANCEL:
1909			goto out;
1910		}
1911		/* add_to_waiters() will set OVERLAP_CANCEL */
1912		goto out_ok;
1913	}
1914
1915	/* do we need to allow a force-unlock if there's a normal unlock
1916	   already in progress?  in what conditions could the normal unlock
1917	   fail such that we'd want to send a force-unlock to be sure? */
1918
1919	if (args->flags & DLM_LKF_FORCEUNLOCK) {
1920		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1921			goto out;
1922
1923		if (is_overlap_unlock(lkb))
1924			goto out;
1925
1926		if (lkb->lkb_flags & DLM_IFL_RESEND) {
1927			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1928			rv = -EBUSY;
1929			goto out;
1930		}
1931
1932		switch (lkb->lkb_wait_type) {
1933		case DLM_MSG_LOOKUP:
1934		case DLM_MSG_REQUEST:
1935			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1936			rv = -EBUSY;
1937			goto out;
1938		case DLM_MSG_UNLOCK:
1939			goto out;
1940		}
1941		/* add_to_waiters() will set OVERLAP_UNLOCK */
1942		goto out_ok;
1943	}
1944
1945	/* normal unlock not allowed if there's any op in progress */
1946	rv = -EBUSY;
1947	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
1948		goto out;
1949
1950 out_ok:
1951	/* an overlapping op shouldn't blow away exflags from other op */
1952	lkb->lkb_exflags |= args->flags;
1953	lkb->lkb_sbflags = 0;
1954	lkb->lkb_astparam = args->astparam;
1955	rv = 0;
1956 out:
1957	if (rv)
1958		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1959			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1960			  args->flags, lkb->lkb_wait_type,
1961			  lkb->lkb_resource->res_name);
1962	return rv;
1963}
1964
1965/*
1966 * Four stage 4 varieties:
1967 * do_request(), do_convert(), do_unlock(), do_cancel()
1968 * These are called on the master node for the given lock and
1969 * from the central locking logic.
1970 */
1971
1972static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1973{
1974	int error = 0;
1975
1976	if (can_be_granted(r, lkb, 1)) {
1977		grant_lock(r, lkb);
1978		queue_cast(r, lkb, 0);
1979		goto out;
1980	}
1981
1982	if (can_be_queued(lkb)) {
1983		error = -EINPROGRESS;
1984		add_lkb(r, lkb, DLM_LKSTS_WAITING);
1985		send_blocking_asts(r, lkb);
1986		goto out;
1987	}
1988
1989	error = -EAGAIN;
1990	if (force_blocking_asts(lkb))
1991		send_blocking_asts_all(r, lkb);
1992	queue_cast(r, lkb, -EAGAIN);
1993
1994 out:
1995	return error;
1996}
1997
1998static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1999{
2000	int error = 0;
2001
2002	/* changing an existing lock may allow others to be granted */
2003
2004	if (can_be_granted(r, lkb, 1)) {
2005		grant_lock(r, lkb);
2006		queue_cast(r, lkb, 0);
2007		grant_pending_locks(r);
2008		goto out;
2009	}
2010
2011	/* is_demoted() means the can_be_granted() above set the grmode
2012	   to NL, and left us on the granted queue.  This auto-demotion
2013	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
2014	   now grantable.  We have to try to grant other converting locks
2015	   before we try again to grant this one. */
2016
2017	if (is_demoted(lkb)) {
2018		grant_pending_convert(r, DLM_LOCK_IV);
2019		if (_can_be_granted(r, lkb, 1)) {
2020			grant_lock(r, lkb);
2021			queue_cast(r, lkb, 0);
2022			grant_pending_locks(r);
2023			goto out;
2024		}
2025		/* else fall through and move to convert queue */
2026	}
2027
2028	if (can_be_queued(lkb)) {
2029		error = -EINPROGRESS;
2030		del_lkb(r, lkb);
2031		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2032		send_blocking_asts(r, lkb);
2033		goto out;
2034	}
2035
2036	error = -EAGAIN;
2037	if (force_blocking_asts(lkb))
2038		send_blocking_asts_all(r, lkb);
2039	queue_cast(r, lkb, -EAGAIN);
2040
2041 out:
2042	return error;
2043}
2044
2045static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2046{
2047	remove_lock(r, lkb);
2048	queue_cast(r, lkb, -DLM_EUNLOCK);
2049	grant_pending_locks(r);
2050	return -DLM_EUNLOCK;
2051}
2052
2053/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2054
2055static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2056{
2057	int error;
2058
2059	error = revert_lock(r, lkb);
2060	if (error) {
2061		queue_cast(r, lkb, -DLM_ECANCEL);
2062		grant_pending_locks(r);
2063		return -DLM_ECANCEL;
2064	}
2065	return 0;
2066}
2067
2068/*
2069 * Four stage 3 varieties:
2070 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2071 */
2072
2073/* add a new lkb to a possibly new rsb, called by requesting process */
2074
2075static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2076{
2077	int error;
2078
2079	/* set_master: sets lkb nodeid from r */
2080
2081	error = set_master(r, lkb);
2082	if (error < 0)
2083		goto out;
2084	if (error) {
2085		error = 0;
2086		goto out;
2087	}
2088
2089	if (is_remote(r))
2090		/* receive_request() calls do_request() on remote node */
2091		error = send_request(r, lkb);
2092	else
2093		error = do_request(r, lkb);
2094 out:
2095	return error;
2096}
2097
2098/* change some property of an existing lkb, e.g. mode */
2099
2100static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2101{
2102	int error;
2103
2104	if (is_remote(r))
2105		/* receive_convert() calls do_convert() on remote node */
2106		error = send_convert(r, lkb);
2107	else
2108		error = do_convert(r, lkb);
2109
2110	return error;
2111}
2112
2113/* remove an existing lkb from the granted queue */
2114
2115static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2116{
2117	int error;
2118
2119	if (is_remote(r))
2120		/* receive_unlock() calls do_unlock() on remote node */
2121		error = send_unlock(r, lkb);
2122	else
2123		error = do_unlock(r, lkb);
2124
2125	return error;
2126}
2127
2128/* remove an existing lkb from the convert or wait queue */
2129
2130static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2131{
2132	int error;
2133
2134	if (is_remote(r))
2135		/* receive_cancel() calls do_cancel() on remote node */
2136		error = send_cancel(r, lkb);
2137	else
2138		error = do_cancel(r, lkb);
2139
2140	return error;
2141}
2142
2143/*
2144 * Four stage 2 varieties:
2145 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2146 */
2147
2148static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2149			int len, struct dlm_args *args)
2150{
2151	struct dlm_rsb *r;
2152	int error;
2153
2154	error = validate_lock_args(ls, lkb, args);
2155	if (error)
2156		goto out;
2157
2158	error = find_rsb(ls, name, len, R_CREATE, &r);
2159	if (error)
2160		goto out;
2161
2162	lock_rsb(r);
2163
2164	attach_lkb(r, lkb);
2165	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2166
2167	error = _request_lock(r, lkb);
2168
2169	unlock_rsb(r);
2170	put_rsb(r);
2171
2172 out:
2173	return error;
2174}
2175
2176static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2177			struct dlm_args *args)
2178{
2179	struct dlm_rsb *r;
2180	int error;
2181
2182	r = lkb->lkb_resource;
2183
2184	hold_rsb(r);
2185	lock_rsb(r);
2186
2187	error = validate_lock_args(ls, lkb, args);
2188	if (error)
2189		goto out;
2190
2191	error = _convert_lock(r, lkb);
2192 out:
2193	unlock_rsb(r);
2194	put_rsb(r);
2195	return error;
2196}
2197
2198static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2199		       struct dlm_args *args)
2200{
2201	struct dlm_rsb *r;
2202	int error;
2203
2204	r = lkb->lkb_resource;
2205
2206	hold_rsb(r);
2207	lock_rsb(r);
2208
2209	error = validate_unlock_args(lkb, args);
2210	if (error)
2211		goto out;
2212
2213	error = _unlock_lock(r, lkb);
2214 out:
2215	unlock_rsb(r);
2216	put_rsb(r);
2217	return error;
2218}
2219
2220static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2221		       struct dlm_args *args)
2222{
2223	struct dlm_rsb *r;
2224	int error;
2225
2226	r = lkb->lkb_resource;
2227
2228	hold_rsb(r);
2229	lock_rsb(r);
2230
2231	error = validate_unlock_args(lkb, args);
2232	if (error)
2233		goto out;
2234
2235	error = _cancel_lock(r, lkb);
2236 out:
2237	unlock_rsb(r);
2238	put_rsb(r);
2239	return error;
2240}
2241
2242/*
2243 * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2244 */
2245
2246int dlm_lock(dlm_lockspace_t *lockspace,
2247	     int mode,
2248	     struct dlm_lksb *lksb,
2249	     uint32_t flags,
2250	     void *name,
2251	     unsigned int namelen,
2252	     uint32_t parent_lkid,
2253	     void (*ast) (void *astarg),
2254	     void *astarg,
2255	     void (*bast) (void *astarg, int mode))
2256{
2257	struct dlm_ls *ls;
2258	struct dlm_lkb *lkb;
2259	struct dlm_args args;
2260	int error, convert = flags & DLM_LKF_CONVERT;
2261
2262	ls = dlm_find_lockspace_local(lockspace);
2263	if (!ls)
2264		return -EINVAL;
2265
2266	lock_recovery(ls);
2267
2268	if (convert)
2269		error = find_lkb(ls, lksb->sb_lkid, &lkb);
2270	else
2271		error = create_lkb(ls, &lkb);
2272
2273	if (error)
2274		goto out;
2275
2276	error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
2277			      astarg, bast, &args);
2278	if (error)
2279		goto out_put;
2280
2281	if (convert)
2282		error = convert_lock(ls, lkb, &args);
2283	else
2284		error = request_lock(ls, lkb, name, namelen, &args);
2285
2286	if (error == -EINPROGRESS)
2287		error = 0;
2288 out_put:
2289	if (convert || error)
2290		__put_lkb(ls, lkb);
2291	if (error == -EAGAIN)
2292		error = 0;
2293 out:
2294	unlock_recovery(ls);
2295	dlm_put_lockspace(ls);
2296	return error;
2297}
2298
2299int dlm_unlock(dlm_lockspace_t *lockspace,
2300	       uint32_t lkid,
2301	       uint32_t flags,
2302	       struct dlm_lksb *lksb,
2303	       void *astarg)
2304{
2305	struct dlm_ls *ls;
2306	struct dlm_lkb *lkb;
2307	struct dlm_args args;
2308	int error;
2309
2310	ls = dlm_find_lockspace_local(lockspace);
2311	if (!ls)
2312		return -EINVAL;
2313
2314	lock_recovery(ls);
2315
2316	error = find_lkb(ls, lkid, &lkb);
2317	if (error)
2318		goto out;
2319
2320	error = set_unlock_args(flags, astarg, &args);
2321	if (error)
2322		goto out_put;
2323
2324	if (flags & DLM_LKF_CANCEL)
2325		error = cancel_lock(ls, lkb, &args);
2326	else
2327		error = unlock_lock(ls, lkb, &args);
2328
2329	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2330		error = 0;
2331	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2332		error = 0;
2333 out_put:
2334	dlm_put_lkb(lkb);
2335 out:
2336	unlock_recovery(ls);
2337	dlm_put_lockspace(ls);
2338	return error;
2339}
2340
2341/*
2342 * send/receive routines for remote operations and replies
2343 *
2344 * send_args
2345 * send_common
2346 * send_request			receive_request
2347 * send_convert			receive_convert
2348 * send_unlock			receive_unlock
2349 * send_cancel			receive_cancel
2350 * send_grant			receive_grant
2351 * send_bast			receive_bast
2352 * send_lookup			receive_lookup
2353 * send_remove			receive_remove
2354 *
2355 * 				send_common_reply
2356 * receive_request_reply	send_request_reply
2357 * receive_convert_reply	send_convert_reply
2358 * receive_unlock_reply		send_unlock_reply
2359 * receive_cancel_reply		send_cancel_reply
2360 * receive_lookup_reply		send_lookup_reply
2361 */
2362
2363static int _create_message(struct dlm_ls *ls, int mb_len,
2364			   int to_nodeid, int mstype,
2365			   struct dlm_message **ms_ret,
2366			   struct dlm_mhandle **mh_ret)
2367{
2368	struct dlm_message *ms;
2369	struct dlm_mhandle *mh;
2370	char *mb;
2371
2372	/* get_buffer gives us a message handle (mh) that we need to
2373	   pass into lowcomms_commit and a message buffer (mb) that we
2374	   write our data into */
2375
2376	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2377	if (!mh)
2378		return -ENOBUFS;
2379
2380	memset(mb, 0, mb_len);
2381
2382	ms = (struct dlm_message *) mb;
2383
2384	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2385	ms->m_header.h_lockspace = ls->ls_global_id;
2386	ms->m_header.h_nodeid = dlm_our_nodeid();
2387	ms->m_header.h_length = mb_len;
2388	ms->m_header.h_cmd = DLM_MSG;
2389
2390	ms->m_type = mstype;
2391
2392	*mh_ret = mh;
2393	*ms_ret = ms;
2394	return 0;
2395}
2396
2397static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2398			  int to_nodeid, int mstype,
2399			  struct dlm_message **ms_ret,
2400			  struct dlm_mhandle **mh_ret)
2401{
2402	int mb_len = sizeof(struct dlm_message);
2403
2404	switch (mstype) {
2405	case DLM_MSG_REQUEST:
2406	case DLM_MSG_LOOKUP:
2407	case DLM_MSG_REMOVE:
2408		mb_len += r->res_length;
2409		break;
2410	case DLM_MSG_CONVERT:
2411	case DLM_MSG_UNLOCK:
2412	case DLM_MSG_REQUEST_REPLY:
2413	case DLM_MSG_CONVERT_REPLY:
2414	case DLM_MSG_GRANT:
2415		if (lkb && lkb->lkb_lvbptr)
2416			mb_len += r->res_ls->ls_lvblen;
2417		break;
2418	}
2419
2420	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2421			       ms_ret, mh_ret);
2422}
2423
2424/* further lowcomms enhancements or alternate implementations may make
2425   the return value from this function useful at some point */
2426
2427static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2428{
2429	dlm_message_out(ms);
2430	dlm_lowcomms_commit_buffer(mh);
2431	return 0;
2432}
2433
2434static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2435		      struct dlm_message *ms)
2436{
2437	ms->m_nodeid   = lkb->lkb_nodeid;
2438	ms->m_pid      = lkb->lkb_ownpid;
2439	ms->m_lkid     = lkb->lkb_id;
2440	ms->m_remid    = lkb->lkb_remid;
2441	ms->m_exflags  = lkb->lkb_exflags;
2442	ms->m_sbflags  = lkb->lkb_sbflags;
2443	ms->m_flags    = lkb->lkb_flags;
2444	ms->m_lvbseq   = lkb->lkb_lvbseq;
2445	ms->m_status   = lkb->lkb_status;
2446	ms->m_grmode   = lkb->lkb_grmode;
2447	ms->m_rqmode   = lkb->lkb_rqmode;
2448	ms->m_hash     = r->res_hash;
2449
2450	/* m_result and m_bastmode are set from function args,
2451	   not from lkb fields */
2452
2453	if (lkb->lkb_bastaddr)
2454		ms->m_asts |= AST_BAST;
2455	if (lkb->lkb_astaddr)
2456		ms->m_asts |= AST_COMP;
2457
2458	/* compare with switch in create_message; send_remove() doesn't
2459	   use send_args() */
2460
2461	switch (ms->m_type) {
2462	case DLM_MSG_REQUEST:
2463	case DLM_MSG_LOOKUP:
2464		memcpy(ms->m_extra, r->res_name, r->res_length);
2465		break;
2466	case DLM_MSG_CONVERT:
2467	case DLM_MSG_UNLOCK:
2468	case DLM_MSG_REQUEST_REPLY:
2469	case DLM_MSG_CONVERT_REPLY:
2470	case DLM_MSG_GRANT:
2471		if (!lkb->lkb_lvbptr)
2472			break;
2473		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2474		break;
2475	}
2476}
2477
2478static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2479{
2480	struct dlm_message *ms;
2481	struct dlm_mhandle *mh;
2482	int to_nodeid, error;
2483
2484	error = add_to_waiters(lkb, mstype);
2485	if (error)
2486		return error;
2487
2488	to_nodeid = r->res_nodeid;
2489
2490	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2491	if (error)
2492		goto fail;
2493
2494	send_args(r, lkb, ms);
2495
2496	error = send_message(mh, ms);
2497	if (error)
2498		goto fail;
2499	return 0;
2500
2501 fail:
2502	remove_from_waiters(lkb, msg_reply_type(mstype));
2503	return error;
2504}
2505
2506static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2507{
2508	return send_common(r, lkb, DLM_MSG_REQUEST);
2509}
2510
2511static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2512{
2513	int error;
2514
2515	error = send_common(r, lkb, DLM_MSG_CONVERT);
2516
2517	/* down conversions go without a reply from the master */
2518	if (!error && down_conversion(lkb)) {
2519		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2520		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2521		r->res_ls->ls_stub_ms.m_result = 0;
2522		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2523		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2524	}
2525
2526	return error;
2527}
2528
2529
2530static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2531{
2532	return send_common(r, lkb, DLM_MSG_UNLOCK);
2533}
2534
2535static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2536{
2537	return send_common(r, lkb, DLM_MSG_CANCEL);
2538}
2539
2540static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2541{
2542	struct dlm_message *ms;
2543	struct dlm_mhandle *mh;
2544	int to_nodeid, error;
2545
2546	to_nodeid = lkb->lkb_nodeid;
2547
2548	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2549	if (error)
2550		goto out;
2551
2552	send_args(r, lkb, ms);
2553
2554	ms->m_result = 0;
2555
2556	error = send_message(mh, ms);
2557 out:
2558	return error;
2559}
2560
2561static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2562{
2563	struct dlm_message *ms;
2564	struct dlm_mhandle *mh;
2565	int to_nodeid, error;
2566
2567	to_nodeid = lkb->lkb_nodeid;
2568
2569	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2570	if (error)
2571		goto out;
2572
2573	send_args(r, lkb, ms);
2574
2575	ms->m_bastmode = mode;
2576
2577	error = send_message(mh, ms);
2578 out:
2579	return error;
2580}
2581
2582static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2583{
2584	struct dlm_message *ms;
2585	struct dlm_mhandle *mh;
2586	int to_nodeid, error;
2587
2588	error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2589	if (error)
2590		return error;
2591
2592	to_nodeid = dlm_dir_nodeid(r);
2593
2594	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2595	if (error)
2596		goto fail;
2597
2598	send_args(r, lkb, ms);
2599
2600	error = send_message(mh, ms);
2601	if (error)
2602		goto fail;
2603	return 0;
2604
2605 fail:
2606	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2607	return error;
2608}
2609
2610static int send_remove(struct dlm_rsb *r)
2611{
2612	struct dlm_message *ms;
2613	struct dlm_mhandle *mh;
2614	int to_nodeid, error;
2615
2616	to_nodeid = dlm_dir_nodeid(r);
2617
2618	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2619	if (error)
2620		goto out;
2621
2622	memcpy(ms->m_extra, r->res_name, r->res_length);
2623	ms->m_hash = r->res_hash;
2624
2625	error = send_message(mh, ms);
2626 out:
2627	return error;
2628}
2629
2630static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2631			     int mstype, int rv)
2632{
2633	struct dlm_message *ms;
2634	struct dlm_mhandle *mh;
2635	int to_nodeid, error;
2636
2637	to_nodeid = lkb->lkb_nodeid;
2638
2639	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2640	if (error)
2641		goto out;
2642
2643	send_args(r, lkb, ms);
2644
2645	ms->m_result = rv;
2646
2647	error = send_message(mh, ms);
2648 out:
2649	return error;
2650}
2651
2652static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2653{
2654	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2655}
2656
2657static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2658{
2659	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2660}
2661
2662static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2663{
2664	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2665}
2666
2667static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2668{
2669	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2670}
2671
2672static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2673			     int ret_nodeid, int rv)
2674{
2675	struct dlm_rsb *r = &ls->ls_stub_rsb;
2676	struct dlm_message *ms;
2677	struct dlm_mhandle *mh;
2678	int error, nodeid = ms_in->m_header.h_nodeid;
2679
2680	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2681	if (error)
2682		goto out;
2683
2684	ms->m_lkid = ms_in->m_lkid;
2685	ms->m_result = rv;
2686	ms->m_nodeid = ret_nodeid;
2687
2688	error = send_message(mh, ms);
2689 out:
2690	return error;
2691}
2692
2693/* which args we save from a received message depends heavily on the type
2694   of message, unlike the send side where we can safely send everything about
2695   the lkb for any type of message */
2696
2697static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2698{
2699	lkb->lkb_exflags = ms->m_exflags;
2700	lkb->lkb_sbflags = ms->m_sbflags;
2701	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2702		         (ms->m_flags & 0x0000FFFF);
2703}
2704
2705static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2706{
2707	lkb->lkb_sbflags = ms->m_sbflags;
2708	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2709		         (ms->m_flags & 0x0000FFFF);
2710}
2711
2712static int receive_extralen(struct dlm_message *ms)
2713{
2714	return (ms->m_header.h_length - sizeof(struct dlm_message));
2715}
2716
2717static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2718		       struct dlm_message *ms)
2719{
2720	int len;
2721
2722	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2723		if (!lkb->lkb_lvbptr)
2724			lkb->lkb_lvbptr = allocate_lvb(ls);
2725		if (!lkb->lkb_lvbptr)
2726			return -ENOMEM;
2727		len = receive_extralen(ms);
2728		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2729	}
2730	return 0;
2731}
2732
2733static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2734				struct dlm_message *ms)
2735{
2736	lkb->lkb_nodeid = ms->m_header.h_nodeid;
2737	lkb->lkb_ownpid = ms->m_pid;
2738	lkb->lkb_remid = ms->m_lkid;
2739	lkb->lkb_grmode = DLM_LOCK_IV;
2740	lkb->lkb_rqmode = ms->m_rqmode;
2741	lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2742	lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2743
2744	DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2745
2746	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2747		/* lkb was just created so there won't be an lvb yet */
2748		lkb->lkb_lvbptr = allocate_lvb(ls);
2749		if (!lkb->lkb_lvbptr)
2750			return -ENOMEM;
2751	}
2752
2753	return 0;
2754}
2755
2756static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2757				struct dlm_message *ms)
2758{
2759	if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2760		log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2761			  lkb->lkb_nodeid, ms->m_header.h_nodeid,
2762			  lkb->lkb_id, lkb->lkb_remid);
2763		return -EINVAL;
2764	}
2765
2766	if (!is_master_copy(lkb))
2767		return -EINVAL;
2768
2769	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2770		return -EBUSY;
2771
2772	if (receive_lvb(ls, lkb, ms))
2773		return -ENOMEM;
2774
2775	lkb->lkb_rqmode = ms->m_rqmode;
2776	lkb->lkb_lvbseq = ms->m_lvbseq;
2777
2778	return 0;
2779}
2780
2781static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2782			       struct dlm_message *ms)
2783{
2784	if (!is_master_copy(lkb))
2785		return -EINVAL;
2786	if (receive_lvb(ls, lkb, ms))
2787		return -ENOMEM;
2788	return 0;
2789}
2790
2791/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2792   uses to send a reply and that the remote end uses to process the reply. */
2793
2794static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2795{
2796	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2797	lkb->lkb_nodeid = ms->m_header.h_nodeid;
2798	lkb->lkb_remid = ms->m_lkid;
2799}
2800
2801static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2802{
2803	struct dlm_lkb *lkb;
2804	struct dlm_rsb *r;
2805	int error, namelen;
2806
2807	error = create_lkb(ls, &lkb);
2808	if (error)
2809		goto fail;
2810
2811	receive_flags(lkb, ms);
2812	lkb->lkb_flags |= DLM_IFL_MSTCPY;
2813	error = receive_request_args(ls, lkb, ms);
2814	if (error) {
2815		__put_lkb(ls, lkb);
2816		goto fail;
2817	}
2818
2819	namelen = receive_extralen(ms);
2820
2821	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2822	if (error) {
2823		__put_lkb(ls, lkb);
2824		goto fail;
2825	}
2826
2827	lock_rsb(r);
2828
2829	attach_lkb(r, lkb);
2830	error = do_request(r, lkb);
2831	send_request_reply(r, lkb, error);
2832
2833	unlock_rsb(r);
2834	put_rsb(r);
2835
2836	if (error == -EINPROGRESS)
2837		error = 0;
2838	if (error)
2839		dlm_put_lkb(lkb);
2840	return;
2841
2842 fail:
2843	setup_stub_lkb(ls, ms);
2844	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2845}
2846
2847static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2848{
2849	struct dlm_lkb *lkb;
2850	struct dlm_rsb *r;
2851	int error, reply = 1;
2852
2853	error = find_lkb(ls, ms->m_remid, &lkb);
2854	if (error)
2855		goto fail;
2856
2857	r = lkb->lkb_resource;
2858
2859	hold_rsb(r);
2860	lock_rsb(r);
2861
2862	receive_flags(lkb, ms);
2863	error = receive_convert_args(ls, lkb, ms);
2864	if (error)
2865		goto out;
2866	reply = !down_conversion(lkb);
2867
2868	error = do_convert(r, lkb);
2869 out:
2870	if (reply)
2871		send_convert_reply(r, lkb, error);
2872
2873	unlock_rsb(r);
2874	put_rsb(r);
2875	dlm_put_lkb(lkb);
2876	return;
2877
2878 fail:
2879	setup_stub_lkb(ls, ms);
2880	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2881}
2882
2883static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2884{
2885	struct dlm_lkb *lkb;
2886	struct dlm_rsb *r;
2887	int error;
2888
2889	error = find_lkb(ls, ms->m_remid, &lkb);
2890	if (error)
2891		goto fail;
2892
2893	r = lkb->lkb_resource;
2894
2895	hold_rsb(r);
2896	lock_rsb(r);
2897
2898	receive_flags(lkb, ms);
2899	error = receive_unlock_args(ls, lkb, ms);
2900	if (error)
2901		goto out;
2902
2903	error = do_unlock(r, lkb);
2904 out:
2905	send_unlock_reply(r, lkb, error);
2906
2907	unlock_rsb(r);
2908	put_rsb(r);
2909	dlm_put_lkb(lkb);
2910	return;
2911
2912 fail:
2913	setup_stub_lkb(ls, ms);
2914	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2915}
2916
2917static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2918{
2919	struct dlm_lkb *lkb;
2920	struct dlm_rsb *r;
2921	int error;
2922
2923	error = find_lkb(ls, ms->m_remid, &lkb);
2924	if (error)
2925		goto fail;
2926
2927	receive_flags(lkb, ms);
2928
2929	r = lkb->lkb_resource;
2930
2931	hold_rsb(r);
2932	lock_rsb(r);
2933
2934	error = do_cancel(r, lkb);
2935	send_cancel_reply(r, lkb, error);
2936
2937	unlock_rsb(r);
2938	put_rsb(r);
2939	dlm_put_lkb(lkb);
2940	return;
2941
2942 fail:
2943	setup_stub_lkb(ls, ms);
2944	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2945}
2946
2947static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2948{
2949	struct dlm_lkb *lkb;
2950	struct dlm_rsb *r;
2951	int error;
2952
2953	error = find_lkb(ls, ms->m_remid, &lkb);
2954	if (error) {
2955		log_error(ls, "receive_grant no lkb");
2956		return;
2957	}
2958	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2959
2960	r = lkb->lkb_resource;
2961
2962	hold_rsb(r);
2963	lock_rsb(r);
2964
2965	receive_flags_reply(lkb, ms);
2966	if (is_altmode(lkb))
2967		munge_altmode(lkb, ms);
2968	grant_lock_pc(r, lkb, ms);
2969	queue_cast(r, lkb, 0);
2970
2971	unlock_rsb(r);
2972	put_rsb(r);
2973	dlm_put_lkb(lkb);
2974}
2975
2976static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2977{
2978	struct dlm_lkb *lkb;
2979	struct dlm_rsb *r;
2980	int error;
2981
2982	error = find_lkb(ls, ms->m_remid, &lkb);
2983	if (error) {
2984		log_error(ls, "receive_bast no lkb");
2985		return;
2986	}
2987	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2988
2989	r = lkb->lkb_resource;
2990
2991	hold_rsb(r);
2992	lock_rsb(r);
2993
2994	queue_bast(r, lkb, ms->m_bastmode);
2995
2996	unlock_rsb(r);
2997	put_rsb(r);
2998	dlm_put_lkb(lkb);
2999}
3000
3001static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3002{
3003	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3004
3005	from_nodeid = ms->m_header.h_nodeid;
3006	our_nodeid = dlm_our_nodeid();
3007
3008	len = receive_extralen(ms);
3009
3010	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3011	if (dir_nodeid != our_nodeid) {
3012		log_error(ls, "lookup dir_nodeid %d from %d",
3013			  dir_nodeid, from_nodeid);
3014		error = -EINVAL;
3015		ret_nodeid = -1;
3016		goto out;
3017	}
3018
3019	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3020
3021	/* Optimization: we're master so treat lookup as a request */
3022	if (!error && ret_nodeid == our_nodeid) {
3023		receive_request(ls, ms);
3024		return;
3025	}
3026 out:
3027	send_lookup_reply(ls, ms, ret_nodeid, error);
3028}
3029
3030static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3031{
3032	int len, dir_nodeid, from_nodeid;
3033
3034	from_nodeid = ms->m_header.h_nodeid;
3035
3036	len = receive_extralen(ms);
3037
3038	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3039	if (dir_nodeid != dlm_our_nodeid()) {
3040		log_error(ls, "remove dir entry dir_nodeid %d from %d",
3041			  dir_nodeid, from_nodeid);
3042		return;
3043	}
3044
3045	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3046}
3047
3048static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3049{
3050	do_purge(ls, ms->m_nodeid, ms->m_pid);
3051}
3052
3053static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3054{
3055	struct dlm_lkb *lkb;
3056	struct dlm_rsb *r;
3057	int error, mstype, result;
3058
3059	error = find_lkb(ls, ms->m_remid, &lkb);
3060	if (error) {
3061		log_error(ls, "receive_request_reply no lkb");
3062		return;
3063	}
3064	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3065
3066	r = lkb->lkb_resource;
3067	hold_rsb(r);
3068	lock_rsb(r);
3069
3070	mstype = lkb->lkb_wait_type;
3071	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3072	if (error)
3073		goto out;
3074
3075	/* Optimization: the dir node was also the master, so it took our
3076	   lookup as a request and sent request reply instead of lookup reply */
3077	if (mstype == DLM_MSG_LOOKUP) {
3078		r->res_nodeid = ms->m_header.h_nodeid;
3079		lkb->lkb_nodeid = r->res_nodeid;
3080	}
3081
3082	/* this is the value returned from do_request() on the master */
3083	result = ms->m_result;
3084
3085	switch (result) {
3086	case -EAGAIN:
3087		/* request would block (be queued) on remote master */
3088		queue_cast(r, lkb, -EAGAIN);
3089		confirm_master(r, -EAGAIN);
3090		unhold_lkb(lkb); /* undoes create_lkb() */
3091		break;
3092
3093	case -EINPROGRESS:
3094	case 0:
3095		/* request was queued or granted on remote master */
3096		receive_flags_reply(lkb, ms);
3097		lkb->lkb_remid = ms->m_lkid;
3098		if (is_altmode(lkb))
3099			munge_altmode(lkb, ms);
3100		if (result)
3101			add_lkb(r, lkb, DLM_LKSTS_WAITING);
3102		else {
3103			grant_lock_pc(r, lkb, ms);
3104			queue_cast(r, lkb, 0);
3105		}
3106		confirm_master(r, result);
3107		break;
3108
3109	case -EBADR:
3110	case -ENOTBLK:
3111		/* find_rsb failed to find rsb or rsb wasn't master */
3112		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3113			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3114		r->res_nodeid = -1;
3115		lkb->lkb_nodeid = -1;
3116
3117		if (is_overlap(lkb)) {
3118			/* we'll ignore error in cancel/unlock reply */
3119			queue_cast_overlap(r, lkb);
3120			unhold_lkb(lkb); /* undoes create_lkb() */
3121		} else
3122			_request_lock(r, lkb);
3123		break;
3124
3125	default:
3126		log_error(ls, "receive_request_reply %x error %d",
3127			  lkb->lkb_id, result);
3128	}
3129
3130	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3131		log_debug(ls, "receive_request_reply %x result %d unlock",
3132			  lkb->lkb_id, result);
3133		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3134		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3135		send_unlock(r, lkb);
3136	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3137		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3138		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3139		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3140		send_cancel(r, lkb);
3141	} else {
3142		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3143		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3144	}
3145 out:
3146	unlock_rsb(r);
3147	put_rsb(r);
3148	dlm_put_lkb(lkb);
3149}
3150
3151static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3152				    struct dlm_message *ms)
3153{
3154	/* this is the value returned from do_convert() on the master */
3155	switch (ms->m_result) {
3156	case -EAGAIN:
3157		/* convert would block (be queued) on remote master */
3158		queue_cast(r, lkb, -EAGAIN);
3159		break;
3160
3161	case -EINPROGRESS:
3162		/* convert was queued on remote master */
3163		receive_flags_reply(lkb, ms);
3164		if (is_demoted(lkb))
3165			munge_demoted(lkb, ms);
3166		del_lkb(r, lkb);
3167		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3168		break;
3169
3170	case 0:
3171		/* convert was granted on remote master */
3172		receive_flags_reply(lkb, ms);
3173		if (is_demoted(lkb))
3174			munge_demoted(lkb, ms);
3175		grant_lock_pc(r, lkb, ms);
3176		queue_cast(r, lkb, 0);
3177		break;
3178
3179	default:
3180		log_error(r->res_ls, "receive_convert_reply %x error %d",
3181			  lkb->lkb_id, ms->m_result);
3182	}
3183}
3184
3185static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3186{
3187	struct dlm_rsb *r = lkb->lkb_resource;
3188	int error;
3189
3190	hold_rsb(r);
3191	lock_rsb(r);
3192
3193	/* stub reply can happen with waiters_mutex held */
3194	error = remove_from_waiters_ms(lkb, ms);
3195	if (error)
3196		goto out;
3197
3198	__receive_convert_reply(r, lkb, ms);
3199 out:
3200	unlock_rsb(r);
3201	put_rsb(r);
3202}
3203
3204static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3205{
3206	struct dlm_lkb *lkb;
3207	int error;
3208
3209	error = find_lkb(ls, ms->m_remid, &lkb);
3210	if (error) {
3211		log_error(ls, "receive_convert_reply no lkb");
3212		return;
3213	}
3214	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3215
3216	_receive_convert_reply(lkb, ms);
3217	dlm_put_lkb(lkb);
3218}
3219
3220static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3221{
3222	struct dlm_rsb *r = lkb->lkb_resource;
3223	int error;
3224
3225	hold_rsb(r);
3226	lock_rsb(r);
3227
3228	/* stub reply can happen with waiters_mutex held */
3229	error = remove_from_waiters_ms(lkb, ms);
3230	if (error)
3231		goto out;
3232
3233	/* this is the value returned from do_unlock() on the master */
3234
3235	switch (ms->m_result) {
3236	case -DLM_EUNLOCK:
3237		receive_flags_reply(lkb, ms);
3238		remove_lock_pc(r, lkb);
3239		queue_cast(r, lkb, -DLM_EUNLOCK);
3240		break;
3241	case -ENOENT:
3242		break;
3243	default:
3244		log_error(r->res_ls, "receive_unlock_reply %x error %d",
3245			  lkb->lkb_id, ms->m_result);
3246	}
3247 out:
3248	unlock_rsb(r);
3249	put_rsb(r);
3250}
3251
3252static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3253{
3254	struct dlm_lkb *lkb;
3255	int error;
3256
3257	error = find_lkb(ls, ms->m_remid, &lkb);
3258	if (error) {
3259		log_error(ls, "receive_unlock_reply no lkb");
3260		return;
3261	}
3262	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3263
3264	_receive_unlock_reply(lkb, ms);
3265	dlm_put_lkb(lkb);
3266}
3267
3268static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3269{
3270	struct dlm_rsb *r = lkb->lkb_resource;
3271	int error;
3272
3273	hold_rsb(r);
3274	lock_rsb(r);
3275
3276	/* stub reply can happen with waiters_mutex held */
3277	error = remove_from_waiters_ms(lkb, ms);
3278	if (error)
3279		goto out;
3280
3281	/* this is the value returned from do_cancel() on the master */
3282
3283	switch (ms->m_result) {
3284	case -DLM_ECANCEL:
3285		receive_flags_reply(lkb, ms);
3286		revert_lock_pc(r, lkb);
3287		if (ms->m_result)
3288			queue_cast(r, lkb, -DLM_ECANCEL);
3289		break;
3290	case 0:
3291		break;
3292	default:
3293		log_error(r->res_ls, "receive_cancel_reply %x error %d",
3294			  lkb->lkb_id, ms->m_result);
3295	}
3296 out:
3297	unlock_rsb(r);
3298	put_rsb(r);
3299}
3300
3301static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3302{
3303	struct dlm_lkb *lkb;
3304	int error;
3305
3306	error = find_lkb(ls, ms->m_remid, &lkb);
3307	if (error) {
3308		log_error(ls, "receive_cancel_reply no lkb");
3309		return;
3310	}
3311	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3312
3313	_receive_cancel_reply(lkb, ms);
3314	dlm_put_lkb(lkb);
3315}
3316
3317static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3318{
3319	struct dlm_lkb *lkb;
3320	struct dlm_rsb *r;
3321	int error, ret_nodeid;
3322
3323	error = find_lkb(ls, ms->m_lkid, &lkb);
3324	if (error) {
3325		log_error(ls, "receive_lookup_reply no lkb");
3326		return;
3327	}
3328
3329
3330	r = lkb->lkb_resource;
3331	hold_rsb(r);
3332	lock_rsb(r);
3333
3334	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3335	if (error)
3336		goto out;
3337
3338	ret_nodeid = ms->m_nodeid;
3339	if (ret_nodeid == dlm_our_nodeid()) {
3340		r->res_nodeid = 0;
3341		ret_nodeid = 0;
3342		r->res_first_lkid = 0;
3343	} else {
3344		/* set_master() will copy res_nodeid to lkb_nodeid */
3345		r->res_nodeid = ret_nodeid;
3346	}
3347
3348	if (is_overlap(lkb)) {
3349		log_debug(ls, "receive_lookup_reply %x unlock %x",
3350			  lkb->lkb_id, lkb->lkb_flags);
3351		queue_cast_overlap(r, lkb);
3352		unhold_lkb(lkb); /* undoes create_lkb() */
3353		goto out_list;
3354	}
3355
3356	_request_lock(r, lkb);
3357
3358 out_list:
3359	if (!ret_nodeid)
3360		process_lookup_list(r);
3361 out:
3362	unlock_rsb(r);
3363	put_rsb(r);
3364	dlm_put_lkb(lkb);
3365}
3366
3367int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3368{
3369	struct dlm_message *ms = (struct dlm_message *) hd;
3370	struct dlm_ls *ls;
3371	int error = 0;
3372
3373	if (!recovery)
3374		dlm_message_in(ms);
3375
3376	ls = dlm_find_lockspace_global(hd->h_lockspace);
3377	if (!ls) {
3378		log_print("drop message %d from %d for unknown lockspace %d",
3379			  ms->m_type, nodeid, hd->h_lockspace);
3380		return -EINVAL;
3381	}
3382
3383	/* recovery may have just ended leaving a bunch of backed-up requests
3384	   in the requestqueue; wait while dlm_recoverd clears them */
3385
3386	if (!recovery)
3387		dlm_wait_requestqueue(ls);
3388
3389	/* recovery may have just started while there were a bunch of
3390	   in-flight requests -- save them in requestqueue to be processed
3391	   after recovery.  we can't let dlm_recvd block on the recovery
3392	   lock.  if dlm_recoverd is calling this function to clear the
3393	   requestqueue, it needs to be interrupted (-EINTR) if another
3394	   recovery operation is starting. */
3395
3396	while (1) {
3397		if (dlm_locking_stopped(ls)) {
3398			if (recovery) {
3399				error = -EINTR;
3400				goto out;
3401			}
3402			error = dlm_add_requestqueue(ls, nodeid, hd);
3403			if (error == -EAGAIN)
3404				continue;
3405			else {
3406				error = -EINTR;
3407				goto out;
3408			}
3409		}
3410
3411		if (lock_recovery_try(ls))
3412			break;
3413		schedule();
3414	}
3415
3416	switch (ms->m_type) {
3417
3418	/* messages sent to a master node */
3419
3420	case DLM_MSG_REQUEST:
3421		receive_request(ls, ms);
3422		break;
3423
3424	case DLM_MSG_CONVERT:
3425		receive_convert(ls, ms);
3426		break;
3427
3428	case DLM_MSG_UNLOCK:
3429		receive_unlock(ls, ms);
3430		break;
3431
3432	case DLM_MSG_CANCEL:
3433		receive_cancel(ls, ms);
3434		break;
3435
3436	/* messages sent from a master node (replies to above) */
3437
3438	case DLM_MSG_REQUEST_REPLY:
3439		receive_request_reply(ls, ms);
3440		break;
3441
3442	case DLM_MSG_CONVERT_REPLY:
3443		receive_convert_reply(ls, ms);
3444		break;
3445
3446	case DLM_MSG_UNLOCK_REPLY:
3447		receive_unlock_reply(ls, ms);
3448		break;
3449
3450	case DLM_MSG_CANCEL_REPLY:
3451		receive_cancel_reply(ls, ms);
3452		break;
3453
3454	/* messages sent from a master node (only two types of async msg) */
3455
3456	case DLM_MSG_GRANT:
3457		receive_grant(ls, ms);
3458		break;
3459
3460	case DLM_MSG_BAST:
3461		receive_bast(ls, ms);
3462		break;
3463
3464	/* messages sent to a dir node */
3465
3466	case DLM_MSG_LOOKUP:
3467		receive_lookup(ls, ms);
3468		break;
3469
3470	case DLM_MSG_REMOVE:
3471		receive_remove(ls, ms);
3472		break;
3473
3474	/* messages sent from a dir node (remove has no reply) */
3475
3476	case DLM_MSG_LOOKUP_REPLY:
3477		receive_lookup_reply(ls, ms);
3478		break;
3479
3480	/* other messages */
3481
3482	case DLM_MSG_PURGE:
3483		receive_purge(ls, ms);
3484		break;
3485
3486	default:
3487		log_error(ls, "unknown message type %d", ms->m_type);
3488	}
3489
3490	unlock_recovery(ls);
3491 out:
3492	dlm_put_lockspace(ls);
3493	dlm_astd_wake();
3494	return error;
3495}
3496
3497
3498/*
3499 * Recovery related
3500 */
3501
3502static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3503{
3504	if (middle_conversion(lkb)) {
3505		hold_lkb(lkb);
3506		ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3507		ls->ls_stub_ms.m_result = -EINPROGRESS;
3508		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3509		_receive_convert_reply(lkb, &ls->ls_stub_ms);
3510
3511		/* Same special case as in receive_rcom_lock_args() */
3512		lkb->lkb_grmode = DLM_LOCK_IV;
3513		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3514		unhold_lkb(lkb);
3515
3516	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3517		lkb->lkb_flags |= DLM_IFL_RESEND;
3518	}
3519
3520	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3521	   conversions are async; there's no reply from the remote master */
3522}
3523
3524/* A waiting lkb needs recovery if the master node has failed, or
3525   the master node is changing (only when no directory is used) */
3526
3527static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3528{
3529	if (dlm_is_removed(ls, lkb->lkb_nodeid))
3530		return 1;
3531
3532	if (!dlm_no_directory(ls))
3533		return 0;
3534
3535	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3536		return 1;
3537
3538	return 0;
3539}
3540
3541/* Recovery for locks that are waiting for replies from nodes that are now
3542   gone.  We can just complete unlocks and cancels by faking a reply from the
3543   dead node.  Requests and up-conversions we flag to be resent after
3544   recovery.  Down-conversions can just be completed with a fake reply like
3545   unlocks.  Conversions between PR and CW need special attention. */
3546
3547void dlm_recover_waiters_pre(struct dlm_ls *ls)
3548{
3549	struct dlm_lkb *lkb, *safe;
3550
3551	mutex_lock(&ls->ls_waiters_mutex);
3552
3553	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3554		log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3555			  lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3556
3557		/* all outstanding lookups, regardless of destination  will be
3558		   resent after recovery is done */
3559
3560		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3561			lkb->lkb_flags |= DLM_IFL_RESEND;
3562			continue;
3563		}
3564
3565		if (!waiter_needs_recovery(ls, lkb))
3566			continue;
3567
3568		switch (lkb->lkb_wait_type) {
3569
3570		case DLM_MSG_REQUEST:
3571			lkb->lkb_flags |= DLM_IFL_RESEND;
3572			break;
3573
3574		case DLM_MSG_CONVERT:
3575			recover_convert_waiter(ls, lkb);
3576			break;
3577
3578		case DLM_MSG_UNLOCK:
3579			hold_lkb(lkb);
3580			ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3581			ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3582			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3583			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
3584			dlm_put_lkb(lkb);
3585			break;
3586
3587		case DLM_MSG_CANCEL:
3588			hold_lkb(lkb);
3589			ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3590			ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3591			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3592			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
3593			dlm_put_lkb(lkb);
3594			break;
3595
3596		default:
3597			log_error(ls, "invalid lkb wait_type %d",
3598				  lkb->lkb_wait_type);
3599		}
3600		schedule();
3601	}
3602	mutex_unlock(&ls->ls_waiters_mutex);
3603}
3604
3605static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3606{
3607	struct dlm_lkb *lkb;
3608	int found = 0;
3609
3610	mutex_lock(&ls->ls_waiters_mutex);
3611	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3612		if (lkb->lkb_flags & DLM_IFL_RESEND) {
3613			hold_lkb(lkb);
3614			found = 1;
3615			break;
3616		}
3617	}
3618	mutex_unlock(&ls->ls_waiters_mutex);
3619
3620	if (!found)
3621		lkb = NULL;
3622	return lkb;
3623}
3624
3625/* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3626   master or dir-node for r.  Processing the lkb may result in it being placed
3627   back on waiters. */
3628
3629/* We do this after normal locking has been enabled and any saved messages
3630   (in requestqueue) have been processed.  We should be confident that at
3631   this point we won't get or process a reply to any of these waiting
3632   operations.  But, new ops may be coming in on the rsbs/locks here from
3633   userspace or remotely. */
3634
3635/* there may have been an overlap unlock/cancel prior to recovery or after
3636   recovery.  if before, the lkb may still have a pos wait_count; if after, the
3637   overlap flag would just have been set and nothing new sent.  we can be
3638   confident here than any replies to either the initial op or overlap ops
3639   prior to recovery have been received. */
3640
3641int dlm_recover_waiters_post(struct dlm_ls *ls)
3642{
3643	struct dlm_lkb *lkb;
3644	struct dlm_rsb *r;
3645	int error = 0, mstype, err, oc, ou;
3646
3647	while (1) {
3648		if (dlm_locking_stopped(ls)) {
3649			log_debug(ls, "recover_waiters_post aborted");
3650			error = -EINTR;
3651			break;
3652		}
3653
3654		lkb = find_resend_waiter(ls);
3655		if (!lkb)
3656			break;
3657
3658		r = lkb->lkb_resource;
3659		hold_rsb(r);
3660		lock_rsb(r);
3661
3662		mstype = lkb->lkb_wait_type;
3663		oc = is_overlap_cancel(lkb);
3664		ou = is_overlap_unlock(lkb);
3665		err = 0;
3666
3667		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3668			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3669
3670		/* At this point we assume that we won't get a reply to any
3671		   previous op or overlap op on this lock.  First, do a big
3672		   remove_from_waiters() for all previous ops. */
3673
3674		lkb->lkb_flags &= ~DLM_IFL_RESEND;
3675		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3676		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3677		lkb->lkb_wait_type = 0;
3678		lkb->lkb_wait_count = 0;
3679		mutex_lock(&ls->ls_waiters_mutex);
3680		list_del_init(&lkb->lkb_wait_reply);
3681		mutex_unlock(&ls->ls_waiters_mutex);
3682		unhold_lkb(lkb); /* for waiters list */
3683
3684		if (oc || ou) {
3685			/* do an unlock or cancel instead of resending */
3686			switch (mstype) {
3687			case DLM_MSG_LOOKUP:
3688			case DLM_MSG_REQUEST:
3689				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3690							-DLM_ECANCEL);
3691				unhold_lkb(lkb); /* undoes create_lkb() */
3692				break;
3693			case DLM_MSG_CONVERT:
3694				if (oc) {
3695					queue_cast(r, lkb, -DLM_ECANCEL);
3696				} else {
3697					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3698					_unlock_lock(r, lkb);
3699				}
3700				break;
3701			default:
3702				err = 1;
3703			}
3704		} else {
3705			switch (mstype) {
3706			case DLM_MSG_LOOKUP:
3707			case DLM_MSG_REQUEST:
3708				_request_lock(r, lkb);
3709				if (is_master(r))
3710					confirm_master(r, 0);
3711				break;
3712			case DLM_MSG_CONVERT:
3713				_convert_lock(r, lkb);
3714				break;
3715			default:
3716				err = 1;
3717			}
3718		}
3719
3720		if (err)
3721			log_error(ls, "recover_waiters_post %x %d %x %d %d",
3722			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3723		unlock_rsb(r);
3724		put_rsb(r);
3725		dlm_put_lkb(lkb);
3726	}
3727
3728	return error;
3729}
3730
3731static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3732			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3733{
3734	struct dlm_ls *ls = r->res_ls;
3735	struct dlm_lkb *lkb, *safe;
3736
3737	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3738		if (test(ls, lkb)) {
3739			rsb_set_flag(r, RSB_LOCKS_PURGED);
3740			del_lkb(r, lkb);
3741			/* this put should free the lkb */
3742			if (!dlm_put_lkb(lkb))
3743				log_error(ls, "purged lkb not released");
3744		}
3745	}
3746}
3747
3748static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3749{
3750	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3751}
3752
3753static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3754{
3755	return is_master_copy(lkb);
3756}
3757
3758static void purge_dead_locks(struct dlm_rsb *r)
3759{
3760	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3761	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3762	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3763}
3764
3765void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3766{
3767	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3768	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3769	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3770}
3771
3772/* Get rid of locks held by nodes that are gone. */
3773
3774int dlm_purge_locks(struct dlm_ls *ls)
3775{
3776	struct dlm_rsb *r;
3777
3778	log_debug(ls, "dlm_purge_locks");
3779
3780	down_write(&ls->ls_root_sem);
3781	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3782		hold_rsb(r);
3783		lock_rsb(r);
3784		if (is_master(r))
3785			purge_dead_locks(r);
3786		unlock_rsb(r);
3787		unhold_rsb(r);
3788
3789		schedule();
3790	}
3791	up_write(&ls->ls_root_sem);
3792
3793	return 0;
3794}
3795
3796static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3797{
3798	struct dlm_rsb *r, *r_ret = NULL;
3799
3800	read_lock(&ls->ls_rsbtbl[bucket].lock);
3801	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3802		if (!rsb_flag(r, RSB_LOCKS_PURGED))
3803			continue;
3804		hold_rsb(r);
3805		rsb_clear_flag(r, RSB_LOCKS_PURGED);
3806		r_ret = r;
3807		break;
3808	}
3809	read_unlock(&ls->ls_rsbtbl[bucket].lock);
3810	return r_ret;
3811}
3812
3813void dlm_grant_after_purge(struct dlm_ls *ls)
3814{
3815	struct dlm_rsb *r;
3816	int bucket = 0;
3817
3818	while (1) {
3819		r = find_purged_rsb(ls, bucket);
3820		if (!r) {
3821			if (bucket == ls->ls_rsbtbl_size - 1)
3822				break;
3823			bucket++;
3824			continue;
3825		}
3826		lock_rsb(r);
3827		if (is_master(r)) {
3828			grant_pending_locks(r);
3829			confirm_master(r, 0);
3830		}
3831		unlock_rsb(r);
3832		put_rsb(r);
3833		schedule();
3834	}
3835}
3836
3837static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3838					 uint32_t remid)
3839{
3840	struct dlm_lkb *lkb;
3841
3842	list_for_each_entry(lkb, head, lkb_statequeue) {
3843		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3844			return lkb;
3845	}
3846	return NULL;
3847}
3848
3849static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3850				    uint32_t remid)
3851{
3852	struct dlm_lkb *lkb;
3853
3854	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3855	if (lkb)
3856		return lkb;
3857	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3858	if (lkb)
3859		return lkb;
3860	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3861	if (lkb)
3862		return lkb;
3863	return NULL;
3864}
3865
3866static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3867				  struct dlm_rsb *r, struct dlm_rcom *rc)
3868{
3869	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3870	int lvblen;
3871
3872	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3873	lkb->lkb_ownpid = rl->rl_ownpid;
3874	lkb->lkb_remid = rl->rl_lkid;
3875	lkb->lkb_exflags = rl->rl_exflags;
3876	lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3877	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3878	lkb->lkb_lvbseq = rl->rl_lvbseq;
3879	lkb->lkb_rqmode = rl->rl_rqmode;
3880	lkb->lkb_grmode = rl->rl_grmode;
3881	/* don't set lkb_status because add_lkb wants to itself */
3882
3883	lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3884	lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3885
3886	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3887		lkb->lkb_lvbptr = allocate_lvb(ls);
3888		if (!lkb->lkb_lvbptr)
3889			return -ENOMEM;
3890		lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3891			 sizeof(struct rcom_lock);
3892		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3893	}
3894
3895	/* Conversions between PR and CW (middle modes) need special handling.
3896	   The real granted mode of these converting locks cannot be determined
3897	   until all locks have been rebuilt on the rsb (recover_conversion) */
3898
3899	if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3900		rl->rl_status = DLM_LKSTS_CONVERT;
3901		lkb->lkb_grmode = DLM_LOCK_IV;
3902		rsb_set_flag(r, RSB_RECOVER_CONVERT);
3903	}
3904
3905	return 0;
3906}
3907
3908/* This lkb may have been recovered in a previous aborted recovery so we need
3909   to check if the rsb already has an lkb with the given remote nodeid/lkid.
3910   If so we just send back a standard reply.  If not, we create a new lkb with
3911   the given values and send back our lkid.  We send back our lkid by sending
3912   back the rcom_lock struct we got but with the remid field filled in. */
3913
3914int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3915{
3916	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3917	struct dlm_rsb *r;
3918	struct dlm_lkb *lkb;
3919	int error;
3920
3921	if (rl->rl_parent_lkid) {
3922		error = -EOPNOTSUPP;
3923		goto out;
3924	}
3925
3926	error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3927	if (error)
3928		goto out;
3929
3930	lock_rsb(r);
3931
3932	lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3933	if (lkb) {
3934		error = -EEXIST;
3935		goto out_remid;
3936	}
3937
3938	error = create_lkb(ls, &lkb);
3939	if (error)
3940		goto out_unlock;
3941
3942	error = receive_rcom_lock_args(ls, lkb, r, rc);
3943	if (error) {
3944		__put_lkb(ls, lkb);
3945		goto out_unlock;
3946	}
3947
3948	attach_lkb(r, lkb);
3949	add_lkb(r, lkb, rl->rl_status);
3950	error = 0;
3951
3952 out_remid:
3953	/* this is the new value returned to the lock holder for
3954	   saving in its process-copy lkb */
3955	rl->rl_remid = lkb->lkb_id;
3956
3957 out_unlock:
3958	unlock_rsb(r);
3959	put_rsb(r);
3960 out:
3961	if (error)
3962		log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3963	rl->rl_result = error;
3964	return error;
3965}
3966
3967int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3968{
3969	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3970	struct dlm_rsb *r;
3971	struct dlm_lkb *lkb;
3972	int error;
3973
3974	error = find_lkb(ls, rl->rl_lkid, &lkb);
3975	if (error) {
3976		log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3977		return error;
3978	}
3979
3980	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3981
3982	error = rl->rl_result;
3983
3984	r = lkb->lkb_resource;
3985	hold_rsb(r);
3986	lock_rsb(r);
3987
3988	switch (error) {
3989	case -EBADR:
3990		/* There's a chance the new master received our lock before
3991		   dlm_recover_master_reply(), this wouldn't happen if we did
3992		   a barrier between recover_masters and recover_locks. */
3993		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
3994			  (unsigned long)r, r->res_name);
3995		dlm_send_rcom_lock(r, lkb);
3996		goto out;
3997	case -EEXIST:
3998		log_debug(ls, "master copy exists %x", lkb->lkb_id);
3999		/* fall through */
4000	case 0:
4001		lkb->lkb_remid = rl->rl_remid;
4002		break;
4003	default:
4004		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4005			  error, lkb->lkb_id);
4006	}
4007
4008	/* an ack for dlm_recover_locks() which waits for replies from
4009	   all the locks it sends to new masters */
4010	dlm_recovered_lock(r);
4011 out:
4012	unlock_rsb(r);
4013	put_rsb(r);
4014	dlm_put_lkb(lkb);
4015
4016	return 0;
4017}
4018
4019int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4020		     int mode, uint32_t flags, void *name, unsigned int namelen,
4021		     uint32_t parent_lkid)
4022{
4023	struct dlm_lkb *lkb;
4024	struct dlm_args args;
4025	int error;
4026
4027	lock_recovery(ls);
4028
4029	error = create_lkb(ls, &lkb);
4030	if (error) {
4031		kfree(ua);
4032		goto out;
4033	}
4034
4035	if (flags & DLM_LKF_VALBLK) {
4036		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4037		if (!ua->lksb.sb_lvbptr) {
4038			kfree(ua);
4039			__put_lkb(ls, lkb);
4040			error = -ENOMEM;
4041			goto out;
4042		}
4043	}
4044
4045	/* After ua is attached to lkb it will be freed by free_lkb().
4046	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
4047	   lock and that lkb_astparam is the dlm_user_args structure. */
4048
4049	error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
4050			      DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4051	lkb->lkb_flags |= DLM_IFL_USER;
4052	ua->old_mode = DLM_LOCK_IV;
4053
4054	if (error) {
4055		__put_lkb(ls, lkb);
4056		goto out;
4057	}
4058
4059	error = request_lock(ls, lkb, name, namelen, &args);
4060
4061	switch (error) {
4062	case 0:
4063		break;
4064	case -EINPROGRESS:
4065		error = 0;
4066		break;
4067	case -EAGAIN:
4068		error = 0;
4069		/* fall through */
4070	default:
4071		__put_lkb(ls, lkb);
4072		goto out;
4073	}
4074
4075	/* add this new lkb to the per-process list of locks */
4076	spin_lock(&ua->proc->locks_spin);
4077	hold_lkb(lkb);
4078	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4079	spin_unlock(&ua->proc->locks_spin);
4080 out:
4081	unlock_recovery(ls);
4082	return error;
4083}
4084
4085int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4086		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
4087{
4088	struct dlm_lkb *lkb;
4089	struct dlm_args args;
4090	struct dlm_user_args *ua;
4091	int error;
4092
4093	lock_recovery(ls);
4094
4095	error = find_lkb(ls, lkid, &lkb);
4096	if (error)
4097		goto out;
4098
4099	/* user can change the params on its lock when it converts it, or
4100	   add an lvb that didn't exist before */
4101
4102	ua = (struct dlm_user_args *)lkb->lkb_astparam;
4103
4104	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4105		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4106		if (!ua->lksb.sb_lvbptr) {
4107			error = -ENOMEM;
4108			goto out_put;
4109		}
4110	}
4111	if (lvb_in && ua->lksb.sb_lvbptr)
4112		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4113
4114	ua->castparam = ua_tmp->castparam;
4115	ua->castaddr = ua_tmp->castaddr;
4116	ua->bastparam = ua_tmp->bastparam;
4117	ua->bastaddr = ua_tmp->bastaddr;
4118	ua->user_lksb = ua_tmp->user_lksb;
4119	ua->old_mode = lkb->lkb_grmode;
4120
4121	error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
4122			      ua, DLM_FAKE_USER_AST, &args);
4123	if (error)
4124		goto out_put;
4125
4126	error = convert_lock(ls, lkb, &args);
4127
4128	if (error == -EINPROGRESS || error == -EAGAIN)
4129		error = 0;
4130 out_put:
4131	dlm_put_lkb(lkb);
4132 out:
4133	unlock_recovery(ls);
4134	kfree(ua_tmp);
4135	return error;
4136}
4137
4138int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4139		    uint32_t flags, uint32_t lkid, char *lvb_in)
4140{
4141	struct dlm_lkb *lkb;
4142	struct dlm_args args;
4143	struct dlm_user_args *ua;
4144	int error;
4145
4146	lock_recovery(ls);
4147
4148	error = find_lkb(ls, lkid, &lkb);
4149	if (error)
4150		goto out;
4151
4152	ua = (struct dlm_user_args *)lkb->lkb_astparam;
4153
4154	if (lvb_in && ua->lksb.sb_lvbptr)
4155		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4156	ua->castparam = ua_tmp->castparam;
4157	ua->user_lksb = ua_tmp->user_lksb;
4158
4159	error = set_unlock_args(flags, ua, &args);
4160	if (error)
4161		goto out_put;
4162
4163	error = unlock_lock(ls, lkb, &args);
4164
4165	if (error == -DLM_EUNLOCK)
4166		error = 0;
4167	/* from validate_unlock_args() */
4168	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4169		error = 0;
4170	if (error)
4171		goto out_put;
4172
4173	spin_lock(&ua->proc->locks_spin);
4174	/* dlm_user_add_ast() may have already taken lkb off the proc list */
4175	if (!list_empty(&lkb->lkb_ownqueue))
4176		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4177	spin_unlock(&ua->proc->locks_spin);
4178 out_put:
4179	dlm_put_lkb(lkb);
4180 out:
4181	unlock_recovery(ls);
4182	kfree(ua_tmp);
4183	return error;
4184}
4185
4186int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4187		    uint32_t flags, uint32_t lkid)
4188{
4189	struct dlm_lkb *lkb;
4190	struct dlm_args args;
4191	struct dlm_user_args *ua;
4192	int error;
4193
4194	lock_recovery(ls);
4195
4196	error = find_lkb(ls, lkid, &lkb);
4197	if (error)
4198		goto out;
4199
4200	ua = (struct dlm_user_args *)lkb->lkb_astparam;
4201	ua->castparam = ua_tmp->castparam;
4202	ua->user_lksb = ua_tmp->user_lksb;
4203
4204	error = set_unlock_args(flags, ua, &args);
4205	if (error)
4206		goto out_put;
4207
4208	error = cancel_lock(ls, lkb, &args);
4209
4210	if (error == -DLM_ECANCEL)
4211		error = 0;
4212	/* from validate_unlock_args() */
4213	if (error == -EBUSY)
4214		error = 0;
4215 out_put:
4216	dlm_put_lkb(lkb);
4217 out:
4218	unlock_recovery(ls);
4219	kfree(ua_tmp);
4220	return error;
4221}
4222
4223/* lkb's that are removed from the waiters list by revert are just left on the
4224   orphans list with the granted orphan locks, to be freed by purge */
4225
4226static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4227{
4228	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4229	struct dlm_args args;
4230	int error;
4231
4232	hold_lkb(lkb);
4233	mutex_lock(&ls->ls_orphans_mutex);
4234	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4235	mutex_unlock(&ls->ls_orphans_mutex);
4236
4237	set_unlock_args(0, ua, &args);
4238
4239	error = cancel_lock(ls, lkb, &args);
4240	if (error == -DLM_ECANCEL)
4241		error = 0;
4242	return error;
4243}
4244
4245/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4246   Regardless of what rsb queue the lock is on, it's removed and freed. */
4247
4248static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4249{
4250	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4251	struct dlm_args args;
4252	int error;
4253
4254	set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4255
4256	error = unlock_lock(ls, lkb, &args);
4257	if (error == -DLM_EUNLOCK)
4258		error = 0;
4259	return error;
4260}
4261
4262/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4263   (which does lock_rsb) due to deadlock with receiving a message that does
4264   lock_rsb followed by dlm_user_add_ast() */
4265
4266static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4267				     struct dlm_user_proc *proc)
4268{
4269	struct dlm_lkb *lkb = NULL;
4270
4271	mutex_lock(&ls->ls_clear_proc_locks);
4272	if (list_empty(&proc->locks))
4273		goto out;
4274
4275	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4276	list_del_init(&lkb->lkb_ownqueue);
4277
4278	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4279		lkb->lkb_flags |= DLM_IFL_ORPHAN;
4280	else
4281		lkb->lkb_flags |= DLM_IFL_DEAD;
4282 out:
4283	mutex_unlock(&ls->ls_clear_proc_locks);
4284	return lkb;
4285}
4286
4287/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4288   1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4289   which we clear here. */
4290
4291
4292void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4293{
4294	struct dlm_lkb *lkb, *safe;
4295
4296	lock_recovery(ls);
4297
4298	while (1) {
4299		lkb = del_proc_lock(ls, proc);
4300		if (!lkb)
4301			break;
4302		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4303			orphan_proc_lock(ls, lkb);
4304		else
4305			unlock_proc_lock(ls, lkb);
4306
4307		/* this removes the reference for the proc->locks list
4308		   added by dlm_user_request, it may result in the lkb
4309		   being freed */
4310
4311		dlm_put_lkb(lkb);
4312	}
4313
4314	mutex_lock(&ls->ls_clear_proc_locks);
4315
4316	/* in-progress unlocks */
4317	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4318		list_del_init(&lkb->lkb_ownqueue);
4319		lkb->lkb_flags |= DLM_IFL_DEAD;
4320		dlm_put_lkb(lkb);
4321	}
4322
4323	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4324		list_del(&lkb->lkb_astqueue);
4325		dlm_put_lkb(lkb);
4326	}
4327
4328	mutex_unlock(&ls->ls_clear_proc_locks);
4329	unlock_recovery(ls);
4330}
4331
4332static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4333{
4334	struct dlm_lkb *lkb, *safe;
4335
4336	while (1) {
4337		lkb = NULL;
4338		spin_lock(&proc->locks_spin);
4339		if (!list_empty(&proc->locks)) {
4340			lkb = list_entry(proc->locks.next, struct dlm_lkb,
4341					 lkb_ownqueue);
4342			list_del_init(&lkb->lkb_ownqueue);
4343		}
4344		spin_unlock(&proc->locks_spin);
4345
4346		if (!lkb)
4347			break;
4348
4349		lkb->lkb_flags |= DLM_IFL_DEAD;
4350		unlock_proc_lock(ls, lkb);
4351		dlm_put_lkb(lkb); /* ref from proc->locks list */
4352	}
4353
4354	spin_lock(&proc->locks_spin);
4355	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4356		list_del_init(&lkb->lkb_ownqueue);
4357		lkb->lkb_flags |= DLM_IFL_DEAD;
4358		dlm_put_lkb(lkb);
4359	}
4360	spin_unlock(&proc->locks_spin);
4361
4362	spin_lock(&proc->asts_spin);
4363	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4364		list_del(&lkb->lkb_astqueue);
4365		dlm_put_lkb(lkb);
4366	}
4367	spin_unlock(&proc->asts_spin);
4368}
4369
4370/* pid of 0 means purge all orphans */
4371
4372static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4373{
4374	struct dlm_lkb *lkb, *safe;
4375
4376	mutex_lock(&ls->ls_orphans_mutex);
4377	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4378		if (pid && lkb->lkb_ownpid != pid)
4379			continue;
4380		unlock_proc_lock(ls, lkb);
4381		list_del_init(&lkb->lkb_ownqueue);
4382		dlm_put_lkb(lkb);
4383	}
4384	mutex_unlock(&ls->ls_orphans_mutex);
4385}
4386
4387static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4388{
4389	struct dlm_message *ms;
4390	struct dlm_mhandle *mh;
4391	int error;
4392
4393	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4394				DLM_MSG_PURGE, &ms, &mh);
4395	if (error)
4396		return error;
4397	ms->m_nodeid = nodeid;
4398	ms->m_pid = pid;
4399
4400	return send_message(mh, ms);
4401}
4402
4403int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4404		   int nodeid, int pid)
4405{
4406	int error = 0;
4407
4408	if (nodeid != dlm_our_nodeid()) {
4409		error = send_purge(ls, nodeid, pid);
4410	} else {
4411		lock_recovery(ls);
4412		if (pid == current->pid)
4413			purge_proc_locks(ls, proc);
4414		else
4415			do_purge(ls, nodeid, pid);
4416		unlock_recovery(ls);
4417	}
4418	return error;
4419}
4420