1// SPDX-License-Identifier: GPL-2.0-or-later
2/*
3 * dlmmod.c
4 *
5 * standalone DLM module
6 *
7 * Copyright (C) 2004 Oracle.  All rights reserved.
8 */
9
10
11#include <linux/module.h>
12#include <linux/fs.h>
13#include <linux/types.h>
14#include <linux/slab.h>
15#include <linux/highmem.h>
16#include <linux/init.h>
17#include <linux/sysctl.h>
18#include <linux/random.h>
19#include <linux/blkdev.h>
20#include <linux/socket.h>
21#include <linux/inet.h>
22#include <linux/spinlock.h>
23#include <linux/delay.h>
24
25
26#include "../cluster/heartbeat.h"
27#include "../cluster/nodemanager.h"
28#include "../cluster/tcp.h"
29
30#include "dlmapi.h"
31#include "dlmcommon.h"
32#include "dlmdomain.h"
33#include "dlmdebug.h"
34
35#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
36#include "../cluster/masklog.h"
37
38static void dlm_mle_node_down(struct dlm_ctxt *dlm,
39			      struct dlm_master_list_entry *mle,
40			      struct o2nm_node *node,
41			      int idx);
42static void dlm_mle_node_up(struct dlm_ctxt *dlm,
43			    struct dlm_master_list_entry *mle,
44			    struct o2nm_node *node,
45			    int idx);
46
47static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
48static int dlm_do_assert_master(struct dlm_ctxt *dlm,
49				struct dlm_lock_resource *res,
50				void *nodemap, u32 flags);
51static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
52
53static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
54				struct dlm_master_list_entry *mle,
55				const char *name,
56				unsigned int namelen)
57{
58	if (dlm != mle->dlm)
59		return 0;
60
61	if (namelen != mle->mnamelen ||
62	    memcmp(name, mle->mname, namelen) != 0)
63		return 0;
64
65	return 1;
66}
67
68static struct kmem_cache *dlm_lockres_cache;
69static struct kmem_cache *dlm_lockname_cache;
70static struct kmem_cache *dlm_mle_cache;
71
72static void dlm_mle_release(struct kref *kref);
73static void dlm_init_mle(struct dlm_master_list_entry *mle,
74			enum dlm_mle_type type,
75			struct dlm_ctxt *dlm,
76			struct dlm_lock_resource *res,
77			const char *name,
78			unsigned int namelen);
79static void dlm_put_mle(struct dlm_master_list_entry *mle);
80static void __dlm_put_mle(struct dlm_master_list_entry *mle);
81static int dlm_find_mle(struct dlm_ctxt *dlm,
82			struct dlm_master_list_entry **mle,
83			char *name, unsigned int namelen);
84
85static int dlm_do_master_request(struct dlm_lock_resource *res,
86				 struct dlm_master_list_entry *mle, int to);
87
88
89static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
90				     struct dlm_lock_resource *res,
91				     struct dlm_master_list_entry *mle,
92				     int *blocked);
93static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
94				    struct dlm_lock_resource *res,
95				    struct dlm_master_list_entry *mle,
96				    int blocked);
97static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
98				 struct dlm_lock_resource *res,
99				 struct dlm_master_list_entry *mle,
100				 struct dlm_master_list_entry **oldmle,
101				 const char *name, unsigned int namelen,
102				 u8 new_master, u8 master);
103
104static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
105				    struct dlm_lock_resource *res);
106static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
107				      struct dlm_lock_resource *res);
108static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
109				       struct dlm_lock_resource *res,
110				       u8 target);
111static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
112				       struct dlm_lock_resource *res);
113
114
115int dlm_is_host_down(int errno)
116{
117	switch (errno) {
118		case -EBADF:
119		case -ECONNREFUSED:
120		case -ENOTCONN:
121		case -ECONNRESET:
122		case -EPIPE:
123		case -EHOSTDOWN:
124		case -EHOSTUNREACH:
125		case -ETIMEDOUT:
126		case -ECONNABORTED:
127		case -ENETDOWN:
128		case -ENETUNREACH:
129		case -ENETRESET:
130		case -ESHUTDOWN:
131		case -ENOPROTOOPT:
132		case -EINVAL:   /* if returned from our tcp code,
133				   this means there is no socket */
134			return 1;
135	}
136	return 0;
137}
138
139
140/*
141 * MASTER LIST FUNCTIONS
142 */
143
144
145/*
146 * regarding master list entries and heartbeat callbacks:
147 *
148 * in order to avoid sleeping and allocation that occurs in
149 * heartbeat, master list entries are simply attached to the
150 * dlm's established heartbeat callbacks.  the mle is attached
151 * when it is created, and since the dlm->spinlock is held at
152 * that time, any heartbeat event will be properly discovered
153 * by the mle.  the mle needs to be detached from the
154 * dlm->mle_hb_events list as soon as heartbeat events are no
155 * longer useful to the mle, and before the mle is freed.
156 *
157 * as a general rule, heartbeat events are no longer needed by
158 * the mle once an "answer" regarding the lock master has been
159 * received.
160 */
161static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
162					      struct dlm_master_list_entry *mle)
163{
164	assert_spin_locked(&dlm->spinlock);
165
166	list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
167}
168
169
170static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
171					      struct dlm_master_list_entry *mle)
172{
173	if (!list_empty(&mle->hb_events))
174		list_del_init(&mle->hb_events);
175}
176
177
178static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
179					    struct dlm_master_list_entry *mle)
180{
181	spin_lock(&dlm->spinlock);
182	__dlm_mle_detach_hb_events(dlm, mle);
183	spin_unlock(&dlm->spinlock);
184}
185
186static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
187{
188	struct dlm_ctxt *dlm;
189	dlm = mle->dlm;
190
191	assert_spin_locked(&dlm->spinlock);
192	assert_spin_locked(&dlm->master_lock);
193	mle->inuse++;
194	kref_get(&mle->mle_refs);
195}
196
197static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
198{
199	struct dlm_ctxt *dlm;
200	dlm = mle->dlm;
201
202	spin_lock(&dlm->spinlock);
203	spin_lock(&dlm->master_lock);
204	mle->inuse--;
205	__dlm_put_mle(mle);
206	spin_unlock(&dlm->master_lock);
207	spin_unlock(&dlm->spinlock);
208
209}
210
211/* remove from list and free */
212static void __dlm_put_mle(struct dlm_master_list_entry *mle)
213{
214	struct dlm_ctxt *dlm;
215	dlm = mle->dlm;
216
217	assert_spin_locked(&dlm->spinlock);
218	assert_spin_locked(&dlm->master_lock);
219	if (!kref_read(&mle->mle_refs)) {
220		/* this may or may not crash, but who cares.
221		 * it's a BUG. */
222		mlog(ML_ERROR, "bad mle: %p\n", mle);
223		dlm_print_one_mle(mle);
224		BUG();
225	} else
226		kref_put(&mle->mle_refs, dlm_mle_release);
227}
228
229
230/* must not have any spinlocks coming in */
231static void dlm_put_mle(struct dlm_master_list_entry *mle)
232{
233	struct dlm_ctxt *dlm;
234	dlm = mle->dlm;
235
236	spin_lock(&dlm->spinlock);
237	spin_lock(&dlm->master_lock);
238	__dlm_put_mle(mle);
239	spin_unlock(&dlm->master_lock);
240	spin_unlock(&dlm->spinlock);
241}
242
243static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
244{
245	kref_get(&mle->mle_refs);
246}
247
248static void dlm_init_mle(struct dlm_master_list_entry *mle,
249			enum dlm_mle_type type,
250			struct dlm_ctxt *dlm,
251			struct dlm_lock_resource *res,
252			const char *name,
253			unsigned int namelen)
254{
255	assert_spin_locked(&dlm->spinlock);
256
257	mle->dlm = dlm;
258	mle->type = type;
259	INIT_HLIST_NODE(&mle->master_hash_node);
260	INIT_LIST_HEAD(&mle->hb_events);
261	bitmap_zero(mle->maybe_map, O2NM_MAX_NODES);
262	spin_lock_init(&mle->spinlock);
263	init_waitqueue_head(&mle->wq);
264	atomic_set(&mle->woken, 0);
265	kref_init(&mle->mle_refs);
266	bitmap_zero(mle->response_map, O2NM_MAX_NODES);
267	mle->master = O2NM_MAX_NODES;
268	mle->new_master = O2NM_MAX_NODES;
269	mle->inuse = 0;
270
271	BUG_ON(mle->type != DLM_MLE_BLOCK &&
272	       mle->type != DLM_MLE_MASTER &&
273	       mle->type != DLM_MLE_MIGRATION);
274
275	if (mle->type == DLM_MLE_MASTER) {
276		BUG_ON(!res);
277		mle->mleres = res;
278		memcpy(mle->mname, res->lockname.name, res->lockname.len);
279		mle->mnamelen = res->lockname.len;
280		mle->mnamehash = res->lockname.hash;
281	} else {
282		BUG_ON(!name);
283		mle->mleres = NULL;
284		memcpy(mle->mname, name, namelen);
285		mle->mnamelen = namelen;
286		mle->mnamehash = dlm_lockid_hash(name, namelen);
287	}
288
289	atomic_inc(&dlm->mle_tot_count[mle->type]);
290	atomic_inc(&dlm->mle_cur_count[mle->type]);
291
292	/* copy off the node_map and register hb callbacks on our copy */
293	bitmap_copy(mle->node_map, dlm->domain_map, O2NM_MAX_NODES);
294	bitmap_copy(mle->vote_map, dlm->domain_map, O2NM_MAX_NODES);
295	clear_bit(dlm->node_num, mle->vote_map);
296	clear_bit(dlm->node_num, mle->node_map);
297
298	/* attach the mle to the domain node up/down events */
299	__dlm_mle_attach_hb_events(dlm, mle);
300}
301
302void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
303{
304	assert_spin_locked(&dlm->spinlock);
305	assert_spin_locked(&dlm->master_lock);
306
307	if (!hlist_unhashed(&mle->master_hash_node))
308		hlist_del_init(&mle->master_hash_node);
309}
310
311void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
312{
313	struct hlist_head *bucket;
314
315	assert_spin_locked(&dlm->master_lock);
316
317	bucket = dlm_master_hash(dlm, mle->mnamehash);
318	hlist_add_head(&mle->master_hash_node, bucket);
319}
320
321/* returns 1 if found, 0 if not */
322static int dlm_find_mle(struct dlm_ctxt *dlm,
323			struct dlm_master_list_entry **mle,
324			char *name, unsigned int namelen)
325{
326	struct dlm_master_list_entry *tmpmle;
327	struct hlist_head *bucket;
328	unsigned int hash;
329
330	assert_spin_locked(&dlm->master_lock);
331
332	hash = dlm_lockid_hash(name, namelen);
333	bucket = dlm_master_hash(dlm, hash);
334	hlist_for_each_entry(tmpmle, bucket, master_hash_node) {
335		if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
336			continue;
337		dlm_get_mle(tmpmle);
338		*mle = tmpmle;
339		return 1;
340	}
341	return 0;
342}
343
344void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
345{
346	struct dlm_master_list_entry *mle;
347
348	assert_spin_locked(&dlm->spinlock);
349
350	list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
351		if (node_up)
352			dlm_mle_node_up(dlm, mle, NULL, idx);
353		else
354			dlm_mle_node_down(dlm, mle, NULL, idx);
355	}
356}
357
358static void dlm_mle_node_down(struct dlm_ctxt *dlm,
359			      struct dlm_master_list_entry *mle,
360			      struct o2nm_node *node, int idx)
361{
362	spin_lock(&mle->spinlock);
363
364	if (!test_bit(idx, mle->node_map))
365		mlog(0, "node %u already removed from nodemap!\n", idx);
366	else
367		clear_bit(idx, mle->node_map);
368
369	spin_unlock(&mle->spinlock);
370}
371
372static void dlm_mle_node_up(struct dlm_ctxt *dlm,
373			    struct dlm_master_list_entry *mle,
374			    struct o2nm_node *node, int idx)
375{
376	spin_lock(&mle->spinlock);
377
378	if (test_bit(idx, mle->node_map))
379		mlog(0, "node %u already in node map!\n", idx);
380	else
381		set_bit(idx, mle->node_map);
382
383	spin_unlock(&mle->spinlock);
384}
385
386
387int dlm_init_mle_cache(void)
388{
389	dlm_mle_cache = kmem_cache_create("o2dlm_mle",
390					  sizeof(struct dlm_master_list_entry),
391					  0, SLAB_HWCACHE_ALIGN,
392					  NULL);
393	if (dlm_mle_cache == NULL)
394		return -ENOMEM;
395	return 0;
396}
397
398void dlm_destroy_mle_cache(void)
399{
400	kmem_cache_destroy(dlm_mle_cache);
401}
402
403static void dlm_mle_release(struct kref *kref)
404{
405	struct dlm_master_list_entry *mle;
406	struct dlm_ctxt *dlm;
407
408	mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
409	dlm = mle->dlm;
410
411	assert_spin_locked(&dlm->spinlock);
412	assert_spin_locked(&dlm->master_lock);
413
414	mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
415	     mle->type);
416
417	/* remove from list if not already */
418	__dlm_unlink_mle(dlm, mle);
419
420	/* detach the mle from the domain node up/down events */
421	__dlm_mle_detach_hb_events(dlm, mle);
422
423	atomic_dec(&dlm->mle_cur_count[mle->type]);
424
425	/* NOTE: kfree under spinlock here.
426	 * if this is bad, we can move this to a freelist. */
427	kmem_cache_free(dlm_mle_cache, mle);
428}
429
430
431/*
432 * LOCK RESOURCE FUNCTIONS
433 */
434
435int dlm_init_master_caches(void)
436{
437	dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
438					      sizeof(struct dlm_lock_resource),
439					      0, SLAB_HWCACHE_ALIGN, NULL);
440	if (!dlm_lockres_cache)
441		goto bail;
442
443	dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
444					       DLM_LOCKID_NAME_MAX, 0,
445					       SLAB_HWCACHE_ALIGN, NULL);
446	if (!dlm_lockname_cache)
447		goto bail;
448
449	return 0;
450bail:
451	dlm_destroy_master_caches();
452	return -ENOMEM;
453}
454
455void dlm_destroy_master_caches(void)
456{
457	kmem_cache_destroy(dlm_lockname_cache);
458	dlm_lockname_cache = NULL;
459
460	kmem_cache_destroy(dlm_lockres_cache);
461	dlm_lockres_cache = NULL;
462}
463
464static void dlm_lockres_release(struct kref *kref)
465{
466	struct dlm_lock_resource *res;
467	struct dlm_ctxt *dlm;
468
469	res = container_of(kref, struct dlm_lock_resource, refs);
470	dlm = res->dlm;
471
472	/* This should not happen -- all lockres' have a name
473	 * associated with them at init time. */
474	BUG_ON(!res->lockname.name);
475
476	mlog(0, "destroying lockres %.*s\n", res->lockname.len,
477	     res->lockname.name);
478
479	atomic_dec(&dlm->res_cur_count);
480
481	if (!hlist_unhashed(&res->hash_node) ||
482	    !list_empty(&res->granted) ||
483	    !list_empty(&res->converting) ||
484	    !list_empty(&res->blocked) ||
485	    !list_empty(&res->dirty) ||
486	    !list_empty(&res->recovering) ||
487	    !list_empty(&res->purge)) {
488		mlog(ML_ERROR,
489		     "Going to BUG for resource %.*s."
490		     "  We're on a list! [%c%c%c%c%c%c%c]\n",
491		     res->lockname.len, res->lockname.name,
492		     !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
493		     !list_empty(&res->granted) ? 'G' : ' ',
494		     !list_empty(&res->converting) ? 'C' : ' ',
495		     !list_empty(&res->blocked) ? 'B' : ' ',
496		     !list_empty(&res->dirty) ? 'D' : ' ',
497		     !list_empty(&res->recovering) ? 'R' : ' ',
498		     !list_empty(&res->purge) ? 'P' : ' ');
499
500		dlm_print_one_lock_resource(res);
501	}
502
503	/* By the time we're ready to blow this guy away, we shouldn't
504	 * be on any lists. */
505	BUG_ON(!hlist_unhashed(&res->hash_node));
506	BUG_ON(!list_empty(&res->granted));
507	BUG_ON(!list_empty(&res->converting));
508	BUG_ON(!list_empty(&res->blocked));
509	BUG_ON(!list_empty(&res->dirty));
510	BUG_ON(!list_empty(&res->recovering));
511	BUG_ON(!list_empty(&res->purge));
512
513	kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
514
515	kmem_cache_free(dlm_lockres_cache, res);
516}
517
518void dlm_lockres_put(struct dlm_lock_resource *res)
519{
520	kref_put(&res->refs, dlm_lockres_release);
521}
522
523static void dlm_init_lockres(struct dlm_ctxt *dlm,
524			     struct dlm_lock_resource *res,
525			     const char *name, unsigned int namelen)
526{
527	char *qname;
528
529	/* If we memset here, we lose our reference to the kmalloc'd
530	 * res->lockname.name, so be sure to init every field
531	 * correctly! */
532
533	qname = (char *) res->lockname.name;
534	memcpy(qname, name, namelen);
535
536	res->lockname.len = namelen;
537	res->lockname.hash = dlm_lockid_hash(name, namelen);
538
539	init_waitqueue_head(&res->wq);
540	spin_lock_init(&res->spinlock);
541	INIT_HLIST_NODE(&res->hash_node);
542	INIT_LIST_HEAD(&res->granted);
543	INIT_LIST_HEAD(&res->converting);
544	INIT_LIST_HEAD(&res->blocked);
545	INIT_LIST_HEAD(&res->dirty);
546	INIT_LIST_HEAD(&res->recovering);
547	INIT_LIST_HEAD(&res->purge);
548	INIT_LIST_HEAD(&res->tracking);
549	atomic_set(&res->asts_reserved, 0);
550	res->migration_pending = 0;
551	res->inflight_locks = 0;
552	res->inflight_assert_workers = 0;
553
554	res->dlm = dlm;
555
556	kref_init(&res->refs);
557
558	atomic_inc(&dlm->res_tot_count);
559	atomic_inc(&dlm->res_cur_count);
560
561	/* just for consistency */
562	spin_lock(&res->spinlock);
563	dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
564	spin_unlock(&res->spinlock);
565
566	res->state = DLM_LOCK_RES_IN_PROGRESS;
567
568	res->last_used = 0;
569
570	spin_lock(&dlm->track_lock);
571	list_add_tail(&res->tracking, &dlm->tracking_list);
572	spin_unlock(&dlm->track_lock);
573
574	memset(res->lvb, 0, DLM_LVB_LEN);
575	bitmap_zero(res->refmap, O2NM_MAX_NODES);
576}
577
578struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
579				   const char *name,
580				   unsigned int namelen)
581{
582	struct dlm_lock_resource *res = NULL;
583
584	res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
585	if (!res)
586		goto error;
587
588	res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
589	if (!res->lockname.name)
590		goto error;
591
592	dlm_init_lockres(dlm, res, name, namelen);
593	return res;
594
595error:
596	if (res)
597		kmem_cache_free(dlm_lockres_cache, res);
598	return NULL;
599}
600
601void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
602				struct dlm_lock_resource *res, int bit)
603{
604	assert_spin_locked(&res->spinlock);
605
606	mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
607	     res->lockname.name, bit, __builtin_return_address(0));
608
609	set_bit(bit, res->refmap);
610}
611
612void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
613				  struct dlm_lock_resource *res, int bit)
614{
615	assert_spin_locked(&res->spinlock);
616
617	mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
618	     res->lockname.name, bit, __builtin_return_address(0));
619
620	clear_bit(bit, res->refmap);
621}
622
623static void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
624				   struct dlm_lock_resource *res)
625{
626	res->inflight_locks++;
627
628	mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
629	     res->lockname.len, res->lockname.name, res->inflight_locks,
630	     __builtin_return_address(0));
631}
632
633void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
634				   struct dlm_lock_resource *res)
635{
636	assert_spin_locked(&res->spinlock);
637	__dlm_lockres_grab_inflight_ref(dlm, res);
638}
639
640void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
641				   struct dlm_lock_resource *res)
642{
643	assert_spin_locked(&res->spinlock);
644
645	BUG_ON(res->inflight_locks == 0);
646
647	res->inflight_locks--;
648
649	mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
650	     res->lockname.len, res->lockname.name, res->inflight_locks,
651	     __builtin_return_address(0));
652
653	wake_up(&res->wq);
654}
655
656void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
657		struct dlm_lock_resource *res)
658{
659	assert_spin_locked(&res->spinlock);
660	res->inflight_assert_workers++;
661	mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
662			dlm->name, res->lockname.len, res->lockname.name,
663			res->inflight_assert_workers);
664}
665
666static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
667		struct dlm_lock_resource *res)
668{
669	assert_spin_locked(&res->spinlock);
670	BUG_ON(res->inflight_assert_workers == 0);
671	res->inflight_assert_workers--;
672	mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
673			dlm->name, res->lockname.len, res->lockname.name,
674			res->inflight_assert_workers);
675}
676
677static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
678		struct dlm_lock_resource *res)
679{
680	spin_lock(&res->spinlock);
681	__dlm_lockres_drop_inflight_worker(dlm, res);
682	spin_unlock(&res->spinlock);
683}
684
685/*
686 * lookup a lock resource by name.
687 * may already exist in the hashtable.
688 * lockid is null terminated
689 *
690 * if not, allocate enough for the lockres and for
691 * the temporary structure used in doing the mastering.
692 *
693 * also, do a lookup in the dlm->master_list to see
694 * if another node has begun mastering the same lock.
695 * if so, there should be a block entry in there
696 * for this name, and we should *not* attempt to master
697 * the lock here.   need to wait around for that node
698 * to assert_master (or die).
699 *
700 */
701struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
702					  const char *lockid,
703					  int namelen,
704					  int flags)
705{
706	struct dlm_lock_resource *tmpres=NULL, *res=NULL;
707	struct dlm_master_list_entry *mle = NULL;
708	struct dlm_master_list_entry *alloc_mle = NULL;
709	int blocked = 0;
710	int ret, nodenum;
711	struct dlm_node_iter iter;
712	unsigned int hash;
713	int tries = 0;
714	int bit, wait_on_recovery = 0;
715
716	BUG_ON(!lockid);
717
718	hash = dlm_lockid_hash(lockid, namelen);
719
720	mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
721
722lookup:
723	spin_lock(&dlm->spinlock);
724	tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
725	if (tmpres) {
726		spin_unlock(&dlm->spinlock);
727		spin_lock(&tmpres->spinlock);
728
729		/*
730		 * Right after dlm spinlock was released, dlm_thread could have
731		 * purged the lockres. Check if lockres got unhashed. If so
732		 * start over.
733		 */
734		if (hlist_unhashed(&tmpres->hash_node)) {
735			spin_unlock(&tmpres->spinlock);
736			dlm_lockres_put(tmpres);
737			tmpres = NULL;
738			goto lookup;
739		}
740
741		/* Wait on the thread that is mastering the resource */
742		if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
743			__dlm_wait_on_lockres(tmpres);
744			BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
745			spin_unlock(&tmpres->spinlock);
746			dlm_lockres_put(tmpres);
747			tmpres = NULL;
748			goto lookup;
749		}
750
751		/* Wait on the resource purge to complete before continuing */
752		if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
753			BUG_ON(tmpres->owner == dlm->node_num);
754			__dlm_wait_on_lockres_flags(tmpres,
755						    DLM_LOCK_RES_DROPPING_REF);
756			spin_unlock(&tmpres->spinlock);
757			dlm_lockres_put(tmpres);
758			tmpres = NULL;
759			goto lookup;
760		}
761
762		/* Grab inflight ref to pin the resource */
763		dlm_lockres_grab_inflight_ref(dlm, tmpres);
764
765		spin_unlock(&tmpres->spinlock);
766		if (res) {
767			spin_lock(&dlm->track_lock);
768			if (!list_empty(&res->tracking))
769				list_del_init(&res->tracking);
770			else
771				mlog(ML_ERROR, "Resource %.*s not "
772						"on the Tracking list\n",
773						res->lockname.len,
774						res->lockname.name);
775			spin_unlock(&dlm->track_lock);
776			dlm_lockres_put(res);
777		}
778		res = tmpres;
779		goto leave;
780	}
781
782	if (!res) {
783		spin_unlock(&dlm->spinlock);
784		mlog(0, "allocating a new resource\n");
785		/* nothing found and we need to allocate one. */
786		alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
787		if (!alloc_mle)
788			goto leave;
789		res = dlm_new_lockres(dlm, lockid, namelen);
790		if (!res)
791			goto leave;
792		goto lookup;
793	}
794
795	mlog(0, "no lockres found, allocated our own: %p\n", res);
796
797	if (flags & LKM_LOCAL) {
798		/* caller knows it's safe to assume it's not mastered elsewhere
799		 * DONE!  return right away */
800		spin_lock(&res->spinlock);
801		dlm_change_lockres_owner(dlm, res, dlm->node_num);
802		__dlm_insert_lockres(dlm, res);
803		dlm_lockres_grab_inflight_ref(dlm, res);
804		spin_unlock(&res->spinlock);
805		spin_unlock(&dlm->spinlock);
806		/* lockres still marked IN_PROGRESS */
807		goto wake_waiters;
808	}
809
810	/* check master list to see if another node has started mastering it */
811	spin_lock(&dlm->master_lock);
812
813	/* if we found a block, wait for lock to be mastered by another node */
814	blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
815	if (blocked) {
816		int mig;
817		if (mle->type == DLM_MLE_MASTER) {
818			mlog(ML_ERROR, "master entry for nonexistent lock!\n");
819			BUG();
820		}
821		mig = (mle->type == DLM_MLE_MIGRATION);
822		/* if there is a migration in progress, let the migration
823		 * finish before continuing.  we can wait for the absence
824		 * of the MIGRATION mle: either the migrate finished or
825		 * one of the nodes died and the mle was cleaned up.
826		 * if there is a BLOCK here, but it already has a master
827		 * set, we are too late.  the master does not have a ref
828		 * for us in the refmap.  detach the mle and drop it.
829		 * either way, go back to the top and start over. */
830		if (mig || mle->master != O2NM_MAX_NODES) {
831			BUG_ON(mig && mle->master == dlm->node_num);
832			/* we arrived too late.  the master does not
833			 * have a ref for us. retry. */
834			mlog(0, "%s:%.*s: late on %s\n",
835			     dlm->name, namelen, lockid,
836			     mig ?  "MIGRATION" : "BLOCK");
837			spin_unlock(&dlm->master_lock);
838			spin_unlock(&dlm->spinlock);
839
840			/* master is known, detach */
841			if (!mig)
842				dlm_mle_detach_hb_events(dlm, mle);
843			dlm_put_mle(mle);
844			mle = NULL;
845			/* this is lame, but we can't wait on either
846			 * the mle or lockres waitqueue here */
847			if (mig)
848				msleep(100);
849			goto lookup;
850		}
851	} else {
852		/* go ahead and try to master lock on this node */
853		mle = alloc_mle;
854		/* make sure this does not get freed below */
855		alloc_mle = NULL;
856		dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
857		set_bit(dlm->node_num, mle->maybe_map);
858		__dlm_insert_mle(dlm, mle);
859
860		/* still holding the dlm spinlock, check the recovery map
861		 * to see if there are any nodes that still need to be
862		 * considered.  these will not appear in the mle nodemap
863		 * but they might own this lockres.  wait on them. */
864		bit = find_first_bit(dlm->recovery_map, O2NM_MAX_NODES);
865		if (bit < O2NM_MAX_NODES) {
866			mlog(0, "%s: res %.*s, At least one node (%d) "
867			     "to recover before lock mastery can begin\n",
868			     dlm->name, namelen, (char *)lockid, bit);
869			wait_on_recovery = 1;
870		}
871	}
872
873	/* at this point there is either a DLM_MLE_BLOCK or a
874	 * DLM_MLE_MASTER on the master list, so it's safe to add the
875	 * lockres to the hashtable.  anyone who finds the lock will
876	 * still have to wait on the IN_PROGRESS. */
877
878	/* finally add the lockres to its hash bucket */
879	__dlm_insert_lockres(dlm, res);
880
881	/* since this lockres is new it doesn't not require the spinlock */
882	__dlm_lockres_grab_inflight_ref(dlm, res);
883
884	/* get an extra ref on the mle in case this is a BLOCK
885	 * if so, the creator of the BLOCK may try to put the last
886	 * ref at this time in the assert master handler, so we
887	 * need an extra one to keep from a bad ptr deref. */
888	dlm_get_mle_inuse(mle);
889	spin_unlock(&dlm->master_lock);
890	spin_unlock(&dlm->spinlock);
891
892redo_request:
893	while (wait_on_recovery) {
894		/* any cluster changes that occurred after dropping the
895		 * dlm spinlock would be detectable be a change on the mle,
896		 * so we only need to clear out the recovery map once. */
897		if (dlm_is_recovery_lock(lockid, namelen)) {
898			mlog(0, "%s: Recovery map is not empty, but must "
899			     "master $RECOVERY lock now\n", dlm->name);
900			if (!dlm_pre_master_reco_lockres(dlm, res))
901				wait_on_recovery = 0;
902			else {
903				mlog(0, "%s: waiting 500ms for heartbeat state "
904				    "change\n", dlm->name);
905				msleep(500);
906			}
907			continue;
908		}
909
910		dlm_kick_recovery_thread(dlm);
911		msleep(1000);
912		dlm_wait_for_recovery(dlm);
913
914		spin_lock(&dlm->spinlock);
915		bit = find_first_bit(dlm->recovery_map, O2NM_MAX_NODES);
916		if (bit < O2NM_MAX_NODES) {
917			mlog(0, "%s: res %.*s, At least one node (%d) "
918			     "to recover before lock mastery can begin\n",
919			     dlm->name, namelen, (char *)lockid, bit);
920			wait_on_recovery = 1;
921		} else
922			wait_on_recovery = 0;
923		spin_unlock(&dlm->spinlock);
924
925		if (wait_on_recovery)
926			dlm_wait_for_node_recovery(dlm, bit, 10000);
927	}
928
929	/* must wait for lock to be mastered elsewhere */
930	if (blocked)
931		goto wait;
932
933	ret = -EINVAL;
934	dlm_node_iter_init(mle->vote_map, &iter);
935	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
936		ret = dlm_do_master_request(res, mle, nodenum);
937		if (ret < 0)
938			mlog_errno(ret);
939		if (mle->master != O2NM_MAX_NODES) {
940			/* found a master ! */
941			if (mle->master <= nodenum)
942				break;
943			/* if our master request has not reached the master
944			 * yet, keep going until it does.  this is how the
945			 * master will know that asserts are needed back to
946			 * the lower nodes. */
947			mlog(0, "%s: res %.*s, Requests only up to %u but "
948			     "master is %u, keep going\n", dlm->name, namelen,
949			     lockid, nodenum, mle->master);
950		}
951	}
952
953wait:
954	/* keep going until the response map includes all nodes */
955	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
956	if (ret < 0) {
957		wait_on_recovery = 1;
958		mlog(0, "%s: res %.*s, Node map changed, redo the master "
959		     "request now, blocked=%d\n", dlm->name, res->lockname.len,
960		     res->lockname.name, blocked);
961		if (++tries > 20) {
962			mlog(ML_ERROR, "%s: res %.*s, Spinning on "
963			     "dlm_wait_for_lock_mastery, blocked = %d\n",
964			     dlm->name, res->lockname.len,
965			     res->lockname.name, blocked);
966			dlm_print_one_lock_resource(res);
967			dlm_print_one_mle(mle);
968			tries = 0;
969		}
970		goto redo_request;
971	}
972
973	mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
974	     res->lockname.name, res->owner);
975	/* make sure we never continue without this */
976	BUG_ON(res->owner == O2NM_MAX_NODES);
977
978	/* master is known, detach if not already detached */
979	dlm_mle_detach_hb_events(dlm, mle);
980	dlm_put_mle(mle);
981	/* put the extra ref */
982	dlm_put_mle_inuse(mle);
983
984wake_waiters:
985	spin_lock(&res->spinlock);
986	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
987	spin_unlock(&res->spinlock);
988	wake_up(&res->wq);
989
990leave:
991	/* need to free the unused mle */
992	if (alloc_mle)
993		kmem_cache_free(dlm_mle_cache, alloc_mle);
994
995	return res;
996}
997
998
999#define DLM_MASTERY_TIMEOUT_MS   5000
1000
1001static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
1002				     struct dlm_lock_resource *res,
1003				     struct dlm_master_list_entry *mle,
1004				     int *blocked)
1005{
1006	u8 m;
1007	int ret, bit;
1008	int map_changed, voting_done;
1009	int assert, sleep;
1010
1011recheck:
1012	ret = 0;
1013	assert = 0;
1014
1015	/* check if another node has already become the owner */
1016	spin_lock(&res->spinlock);
1017	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1018		mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
1019		     res->lockname.len, res->lockname.name, res->owner);
1020		spin_unlock(&res->spinlock);
1021		/* this will cause the master to re-assert across
1022		 * the whole cluster, freeing up mles */
1023		if (res->owner != dlm->node_num) {
1024			ret = dlm_do_master_request(res, mle, res->owner);
1025			if (ret < 0) {
1026				/* give recovery a chance to run */
1027				mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1028				msleep(500);
1029				goto recheck;
1030			}
1031		}
1032		ret = 0;
1033		goto leave;
1034	}
1035	spin_unlock(&res->spinlock);
1036
1037	spin_lock(&mle->spinlock);
1038	m = mle->master;
1039	map_changed = !bitmap_equal(mle->vote_map, mle->node_map,
1040				    O2NM_MAX_NODES);
1041	voting_done = bitmap_equal(mle->vote_map, mle->response_map,
1042				   O2NM_MAX_NODES);
1043
1044	/* restart if we hit any errors */
1045	if (map_changed) {
1046		int b;
1047		mlog(0, "%s: %.*s: node map changed, restarting\n",
1048		     dlm->name, res->lockname.len, res->lockname.name);
1049		ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1050		b = (mle->type == DLM_MLE_BLOCK);
1051		if ((*blocked && !b) || (!*blocked && b)) {
1052			mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
1053			     dlm->name, res->lockname.len, res->lockname.name,
1054			     *blocked, b);
1055			*blocked = b;
1056		}
1057		spin_unlock(&mle->spinlock);
1058		if (ret < 0) {
1059			mlog_errno(ret);
1060			goto leave;
1061		}
1062		mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1063		     "rechecking now\n", dlm->name, res->lockname.len,
1064		     res->lockname.name);
1065		goto recheck;
1066	} else {
1067		if (!voting_done) {
1068			mlog(0, "map not changed and voting not done "
1069			     "for %s:%.*s\n", dlm->name, res->lockname.len,
1070			     res->lockname.name);
1071		}
1072	}
1073
1074	if (m != O2NM_MAX_NODES) {
1075		/* another node has done an assert!
1076		 * all done! */
1077		sleep = 0;
1078	} else {
1079		sleep = 1;
1080		/* have all nodes responded? */
1081		if (voting_done && !*blocked) {
1082			bit = find_first_bit(mle->maybe_map, O2NM_MAX_NODES);
1083			if (dlm->node_num <= bit) {
1084				/* my node number is lowest.
1085			 	 * now tell other nodes that I am
1086				 * mastering this. */
1087				mle->master = dlm->node_num;
1088				/* ref was grabbed in get_lock_resource
1089				 * will be dropped in dlmlock_master */
1090				assert = 1;
1091				sleep = 0;
1092			}
1093			/* if voting is done, but we have not received
1094			 * an assert master yet, we must sleep */
1095		}
1096	}
1097
1098	spin_unlock(&mle->spinlock);
1099
1100	/* sleep if we haven't finished voting yet */
1101	if (sleep) {
1102		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1103		atomic_set(&mle->woken, 0);
1104		(void)wait_event_timeout(mle->wq,
1105					 (atomic_read(&mle->woken) == 1),
1106					 timeo);
1107		if (res->owner == O2NM_MAX_NODES) {
1108			mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1109			     res->lockname.len, res->lockname.name);
1110			goto recheck;
1111		}
1112		mlog(0, "done waiting, master is %u\n", res->owner);
1113		ret = 0;
1114		goto leave;
1115	}
1116
1117	ret = 0;   /* done */
1118	if (assert) {
1119		m = dlm->node_num;
1120		mlog(0, "about to master %.*s here, this=%u\n",
1121		     res->lockname.len, res->lockname.name, m);
1122		ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1123		if (ret) {
1124			/* This is a failure in the network path,
1125			 * not in the response to the assert_master
1126			 * (any nonzero response is a BUG on this node).
1127			 * Most likely a socket just got disconnected
1128			 * due to node death. */
1129			mlog_errno(ret);
1130		}
1131		/* no longer need to restart lock mastery.
1132		 * all living nodes have been contacted. */
1133		ret = 0;
1134	}
1135
1136	/* set the lockres owner */
1137	spin_lock(&res->spinlock);
1138	/* mastery reference obtained either during
1139	 * assert_master_handler or in get_lock_resource */
1140	dlm_change_lockres_owner(dlm, res, m);
1141	spin_unlock(&res->spinlock);
1142
1143leave:
1144	return ret;
1145}
1146
1147struct dlm_bitmap_diff_iter
1148{
1149	int curnode;
1150	unsigned long *orig_bm;
1151	unsigned long *cur_bm;
1152	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1153};
1154
1155enum dlm_node_state_change
1156{
1157	NODE_DOWN = -1,
1158	NODE_NO_CHANGE = 0,
1159	NODE_UP
1160};
1161
1162static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1163				      unsigned long *orig_bm,
1164				      unsigned long *cur_bm)
1165{
1166	unsigned long p1, p2;
1167	int i;
1168
1169	iter->curnode = -1;
1170	iter->orig_bm = orig_bm;
1171	iter->cur_bm = cur_bm;
1172
1173	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1174       		p1 = *(iter->orig_bm + i);
1175	       	p2 = *(iter->cur_bm + i);
1176		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1177	}
1178}
1179
1180static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1181				     enum dlm_node_state_change *state)
1182{
1183	int bit;
1184
1185	if (iter->curnode >= O2NM_MAX_NODES)
1186		return -ENOENT;
1187
1188	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1189			    iter->curnode+1);
1190	if (bit >= O2NM_MAX_NODES) {
1191		iter->curnode = O2NM_MAX_NODES;
1192		return -ENOENT;
1193	}
1194
1195	/* if it was there in the original then this node died */
1196	if (test_bit(bit, iter->orig_bm))
1197		*state = NODE_DOWN;
1198	else
1199		*state = NODE_UP;
1200
1201	iter->curnode = bit;
1202	return bit;
1203}
1204
1205
1206static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1207				    struct dlm_lock_resource *res,
1208				    struct dlm_master_list_entry *mle,
1209				    int blocked)
1210{
1211	struct dlm_bitmap_diff_iter bdi;
1212	enum dlm_node_state_change sc;
1213	int node;
1214	int ret = 0;
1215
1216	mlog(0, "something happened such that the "
1217	     "master process may need to be restarted!\n");
1218
1219	assert_spin_locked(&mle->spinlock);
1220
1221	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1222	node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1223	while (node >= 0) {
1224		if (sc == NODE_UP) {
1225			/* a node came up.  clear any old vote from
1226			 * the response map and set it in the vote map
1227			 * then restart the mastery. */
1228			mlog(ML_NOTICE, "node %d up while restarting\n", node);
1229
1230			/* redo the master request, but only for the new node */
1231			mlog(0, "sending request to new node\n");
1232			clear_bit(node, mle->response_map);
1233			set_bit(node, mle->vote_map);
1234		} else {
1235			mlog(ML_ERROR, "node down! %d\n", node);
1236			if (blocked) {
1237				int lowest = find_first_bit(mle->maybe_map,
1238						       O2NM_MAX_NODES);
1239
1240				/* act like it was never there */
1241				clear_bit(node, mle->maybe_map);
1242
1243			       	if (node == lowest) {
1244					mlog(0, "expected master %u died"
1245					    " while this node was blocked "
1246					    "waiting on it!\n", node);
1247					lowest = find_next_bit(mle->maybe_map,
1248						       	O2NM_MAX_NODES,
1249						       	lowest+1);
1250					if (lowest < O2NM_MAX_NODES) {
1251						mlog(0, "%s:%.*s:still "
1252						     "blocked. waiting on %u "
1253						     "now\n", dlm->name,
1254						     res->lockname.len,
1255						     res->lockname.name,
1256						     lowest);
1257					} else {
1258						/* mle is an MLE_BLOCK, but
1259						 * there is now nothing left to
1260						 * block on.  we need to return
1261						 * all the way back out and try
1262						 * again with an MLE_MASTER.
1263						 * dlm_do_local_recovery_cleanup
1264						 * has already run, so the mle
1265						 * refcount is ok */
1266						mlog(0, "%s:%.*s: no "
1267						     "longer blocking. try to "
1268						     "master this here\n",
1269						     dlm->name,
1270						     res->lockname.len,
1271						     res->lockname.name);
1272						mle->type = DLM_MLE_MASTER;
1273						mle->mleres = res;
1274					}
1275				}
1276			}
1277
1278			/* now blank out everything, as if we had never
1279			 * contacted anyone */
1280			bitmap_zero(mle->maybe_map, O2NM_MAX_NODES);
1281			bitmap_zero(mle->response_map, O2NM_MAX_NODES);
1282			/* reset the vote_map to the current node_map */
1283			bitmap_copy(mle->vote_map, mle->node_map,
1284				    O2NM_MAX_NODES);
1285			/* put myself into the maybe map */
1286			if (mle->type != DLM_MLE_BLOCK)
1287				set_bit(dlm->node_num, mle->maybe_map);
1288		}
1289		ret = -EAGAIN;
1290		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1291	}
1292	return ret;
1293}
1294
1295
1296/*
1297 * DLM_MASTER_REQUEST_MSG
1298 *
1299 * returns: 0 on success,
1300 *          -errno on a network error
1301 *
1302 * on error, the caller should assume the target node is "dead"
1303 *
1304 */
1305
1306static int dlm_do_master_request(struct dlm_lock_resource *res,
1307				 struct dlm_master_list_entry *mle, int to)
1308{
1309	struct dlm_ctxt *dlm = mle->dlm;
1310	struct dlm_master_request request;
1311	int ret, response=0, resend;
1312
1313	memset(&request, 0, sizeof(request));
1314	request.node_idx = dlm->node_num;
1315
1316	BUG_ON(mle->type == DLM_MLE_MIGRATION);
1317
1318	request.namelen = (u8)mle->mnamelen;
1319	memcpy(request.name, mle->mname, request.namelen);
1320
1321again:
1322	ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1323				 sizeof(request), to, &response);
1324	if (ret < 0)  {
1325		if (ret == -ESRCH) {
1326			/* should never happen */
1327			mlog(ML_ERROR, "TCP stack not ready!\n");
1328			BUG();
1329		} else if (ret == -EINVAL) {
1330			mlog(ML_ERROR, "bad args passed to o2net!\n");
1331			BUG();
1332		} else if (ret == -ENOMEM) {
1333			mlog(ML_ERROR, "out of memory while trying to send "
1334			     "network message!  retrying\n");
1335			/* this is totally crude */
1336			msleep(50);
1337			goto again;
1338		} else if (!dlm_is_host_down(ret)) {
1339			/* not a network error. bad. */
1340			mlog_errno(ret);
1341			mlog(ML_ERROR, "unhandled error!");
1342			BUG();
1343		}
1344		/* all other errors should be network errors,
1345		 * and likely indicate node death */
1346		mlog(ML_ERROR, "link to %d went down!\n", to);
1347		goto out;
1348	}
1349
1350	ret = 0;
1351	resend = 0;
1352	spin_lock(&mle->spinlock);
1353	switch (response) {
1354		case DLM_MASTER_RESP_YES:
1355			set_bit(to, mle->response_map);
1356			mlog(0, "node %u is the master, response=YES\n", to);
1357			mlog(0, "%s:%.*s: master node %u now knows I have a "
1358			     "reference\n", dlm->name, res->lockname.len,
1359			     res->lockname.name, to);
1360			mle->master = to;
1361			break;
1362		case DLM_MASTER_RESP_NO:
1363			mlog(0, "node %u not master, response=NO\n", to);
1364			set_bit(to, mle->response_map);
1365			break;
1366		case DLM_MASTER_RESP_MAYBE:
1367			mlog(0, "node %u not master, response=MAYBE\n", to);
1368			set_bit(to, mle->response_map);
1369			set_bit(to, mle->maybe_map);
1370			break;
1371		case DLM_MASTER_RESP_ERROR:
1372			mlog(0, "node %u hit an error, resending\n", to);
1373			resend = 1;
1374			response = 0;
1375			break;
1376		default:
1377			mlog(ML_ERROR, "bad response! %u\n", response);
1378			BUG();
1379	}
1380	spin_unlock(&mle->spinlock);
1381	if (resend) {
1382		/* this is also totally crude */
1383		msleep(50);
1384		goto again;
1385	}
1386
1387out:
1388	return ret;
1389}
1390
1391/*
1392 * locks that can be taken here:
1393 * dlm->spinlock
1394 * res->spinlock
1395 * mle->spinlock
1396 * dlm->master_list
1397 *
1398 * if possible, TRIM THIS DOWN!!!
1399 */
1400int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1401			       void **ret_data)
1402{
1403	u8 response = DLM_MASTER_RESP_MAYBE;
1404	struct dlm_ctxt *dlm = data;
1405	struct dlm_lock_resource *res = NULL;
1406	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1407	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1408	char *name;
1409	unsigned int namelen, hash;
1410	int found, ret;
1411	int set_maybe;
1412	int dispatch_assert = 0;
1413	int dispatched = 0;
1414
1415	if (!dlm_grab(dlm))
1416		return DLM_MASTER_RESP_NO;
1417
1418	if (!dlm_domain_fully_joined(dlm)) {
1419		response = DLM_MASTER_RESP_NO;
1420		goto send_response;
1421	}
1422
1423	name = request->name;
1424	namelen = request->namelen;
1425	hash = dlm_lockid_hash(name, namelen);
1426
1427	if (namelen > DLM_LOCKID_NAME_MAX) {
1428		response = DLM_IVBUFLEN;
1429		goto send_response;
1430	}
1431
1432way_up_top:
1433	spin_lock(&dlm->spinlock);
1434	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1435	if (res) {
1436		spin_unlock(&dlm->spinlock);
1437
1438		/* take care of the easy cases up front */
1439		spin_lock(&res->spinlock);
1440
1441		/*
1442		 * Right after dlm spinlock was released, dlm_thread could have
1443		 * purged the lockres. Check if lockres got unhashed. If so
1444		 * start over.
1445		 */
1446		if (hlist_unhashed(&res->hash_node)) {
1447			spin_unlock(&res->spinlock);
1448			dlm_lockres_put(res);
1449			goto way_up_top;
1450		}
1451
1452		if (res->state & (DLM_LOCK_RES_RECOVERING|
1453				  DLM_LOCK_RES_MIGRATING)) {
1454			spin_unlock(&res->spinlock);
1455			mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1456			     "being recovered/migrated\n");
1457			response = DLM_MASTER_RESP_ERROR;
1458			if (mle)
1459				kmem_cache_free(dlm_mle_cache, mle);
1460			goto send_response;
1461		}
1462
1463		if (res->owner == dlm->node_num) {
1464			dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
1465			spin_unlock(&res->spinlock);
1466			response = DLM_MASTER_RESP_YES;
1467			if (mle)
1468				kmem_cache_free(dlm_mle_cache, mle);
1469
1470			/* this node is the owner.
1471			 * there is some extra work that needs to
1472			 * happen now.  the requesting node has
1473			 * caused all nodes up to this one to
1474			 * create mles.  this node now needs to
1475			 * go back and clean those up. */
1476			dispatch_assert = 1;
1477			goto send_response;
1478		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1479			spin_unlock(&res->spinlock);
1480			// mlog(0, "node %u is the master\n", res->owner);
1481			response = DLM_MASTER_RESP_NO;
1482			if (mle)
1483				kmem_cache_free(dlm_mle_cache, mle);
1484			goto send_response;
1485		}
1486
1487		/* ok, there is no owner.  either this node is
1488		 * being blocked, or it is actively trying to
1489		 * master this lock. */
1490		if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1491			mlog(ML_ERROR, "lock with no owner should be "
1492			     "in-progress!\n");
1493			BUG();
1494		}
1495
1496		// mlog(0, "lockres is in progress...\n");
1497		spin_lock(&dlm->master_lock);
1498		found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1499		if (!found) {
1500			mlog(ML_ERROR, "no mle found for this lock!\n");
1501			BUG();
1502		}
1503		set_maybe = 1;
1504		spin_lock(&tmpmle->spinlock);
1505		if (tmpmle->type == DLM_MLE_BLOCK) {
1506			// mlog(0, "this node is waiting for "
1507			// "lockres to be mastered\n");
1508			response = DLM_MASTER_RESP_NO;
1509		} else if (tmpmle->type == DLM_MLE_MIGRATION) {
1510			mlog(0, "node %u is master, but trying to migrate to "
1511			     "node %u.\n", tmpmle->master, tmpmle->new_master);
1512			if (tmpmle->master == dlm->node_num) {
1513				mlog(ML_ERROR, "no owner on lockres, but this "
1514				     "node is trying to migrate it to %u?!\n",
1515				     tmpmle->new_master);
1516				BUG();
1517			} else {
1518				/* the real master can respond on its own */
1519				response = DLM_MASTER_RESP_NO;
1520			}
1521		} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1522			set_maybe = 0;
1523			if (tmpmle->master == dlm->node_num) {
1524				response = DLM_MASTER_RESP_YES;
1525				/* this node will be the owner.
1526				 * go back and clean the mles on any
1527				 * other nodes */
1528				dispatch_assert = 1;
1529				dlm_lockres_set_refmap_bit(dlm, res,
1530							   request->node_idx);
1531			} else
1532				response = DLM_MASTER_RESP_NO;
1533		} else {
1534			// mlog(0, "this node is attempting to "
1535			// "master lockres\n");
1536			response = DLM_MASTER_RESP_MAYBE;
1537		}
1538		if (set_maybe)
1539			set_bit(request->node_idx, tmpmle->maybe_map);
1540		spin_unlock(&tmpmle->spinlock);
1541
1542		spin_unlock(&dlm->master_lock);
1543		spin_unlock(&res->spinlock);
1544
1545		/* keep the mle attached to heartbeat events */
1546		dlm_put_mle(tmpmle);
1547		if (mle)
1548			kmem_cache_free(dlm_mle_cache, mle);
1549		goto send_response;
1550	}
1551
1552	/*
1553	 * lockres doesn't exist on this node
1554	 * if there is an MLE_BLOCK, return NO
1555	 * if there is an MLE_MASTER, return MAYBE
1556	 * otherwise, add an MLE_BLOCK, return NO
1557	 */
1558	spin_lock(&dlm->master_lock);
1559	found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1560	if (!found) {
1561		/* this lockid has never been seen on this node yet */
1562		// mlog(0, "no mle found\n");
1563		if (!mle) {
1564			spin_unlock(&dlm->master_lock);
1565			spin_unlock(&dlm->spinlock);
1566
1567			mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1568			if (!mle) {
1569				response = DLM_MASTER_RESP_ERROR;
1570				mlog_errno(-ENOMEM);
1571				goto send_response;
1572			}
1573			goto way_up_top;
1574		}
1575
1576		// mlog(0, "this is second time thru, already allocated, "
1577		// "add the block.\n");
1578		dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1579		set_bit(request->node_idx, mle->maybe_map);
1580		__dlm_insert_mle(dlm, mle);
1581		response = DLM_MASTER_RESP_NO;
1582	} else {
1583		spin_lock(&tmpmle->spinlock);
1584		if (tmpmle->master == dlm->node_num) {
1585			mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1586			BUG();
1587		}
1588		if (tmpmle->type == DLM_MLE_BLOCK)
1589			response = DLM_MASTER_RESP_NO;
1590		else if (tmpmle->type == DLM_MLE_MIGRATION) {
1591			mlog(0, "migration mle was found (%u->%u)\n",
1592			     tmpmle->master, tmpmle->new_master);
1593			/* real master can respond on its own */
1594			response = DLM_MASTER_RESP_NO;
1595		} else
1596			response = DLM_MASTER_RESP_MAYBE;
1597		set_bit(request->node_idx, tmpmle->maybe_map);
1598		spin_unlock(&tmpmle->spinlock);
1599	}
1600	spin_unlock(&dlm->master_lock);
1601	spin_unlock(&dlm->spinlock);
1602
1603	if (found) {
1604		/* keep the mle attached to heartbeat events */
1605		dlm_put_mle(tmpmle);
1606	}
1607send_response:
1608	/*
1609	 * __dlm_lookup_lockres() grabbed a reference to this lockres.
1610	 * The reference is released by dlm_assert_master_worker() under
1611	 * the call to dlm_dispatch_assert_master().  If
1612	 * dlm_assert_master_worker() isn't called, we drop it here.
1613	 */
1614	if (dispatch_assert) {
1615		mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1616			     dlm->node_num, res->lockname.len, res->lockname.name);
1617		spin_lock(&res->spinlock);
1618		ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1619						 DLM_ASSERT_MASTER_MLE_CLEANUP);
1620		if (ret < 0) {
1621			mlog(ML_ERROR, "failed to dispatch assert master work\n");
1622			response = DLM_MASTER_RESP_ERROR;
1623			spin_unlock(&res->spinlock);
1624			dlm_lockres_put(res);
1625		} else {
1626			dispatched = 1;
1627			__dlm_lockres_grab_inflight_worker(dlm, res);
1628			spin_unlock(&res->spinlock);
1629		}
1630	} else {
1631		if (res)
1632			dlm_lockres_put(res);
1633	}
1634
1635	if (!dispatched)
1636		dlm_put(dlm);
1637	return response;
1638}
1639
1640/*
1641 * DLM_ASSERT_MASTER_MSG
1642 */
1643
1644
1645/*
1646 * NOTE: this can be used for debugging
1647 * can periodically run all locks owned by this node
1648 * and re-assert across the cluster...
1649 */
1650static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1651				struct dlm_lock_resource *res,
1652				void *nodemap, u32 flags)
1653{
1654	struct dlm_assert_master assert;
1655	int to, tmpret;
1656	struct dlm_node_iter iter;
1657	int ret = 0;
1658	int reassert;
1659	const char *lockname = res->lockname.name;
1660	unsigned int namelen = res->lockname.len;
1661
1662	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1663
1664	spin_lock(&res->spinlock);
1665	res->state |= DLM_LOCK_RES_SETREF_INPROG;
1666	spin_unlock(&res->spinlock);
1667
1668again:
1669	reassert = 0;
1670
1671	/* note that if this nodemap is empty, it returns 0 */
1672	dlm_node_iter_init(nodemap, &iter);
1673	while ((to = dlm_node_iter_next(&iter)) >= 0) {
1674		int r = 0;
1675		struct dlm_master_list_entry *mle = NULL;
1676
1677		mlog(0, "sending assert master to %d (%.*s)\n", to,
1678		     namelen, lockname);
1679		memset(&assert, 0, sizeof(assert));
1680		assert.node_idx = dlm->node_num;
1681		assert.namelen = namelen;
1682		memcpy(assert.name, lockname, namelen);
1683		assert.flags = cpu_to_be32(flags);
1684
1685		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1686					    &assert, sizeof(assert), to, &r);
1687		if (tmpret < 0) {
1688			mlog(ML_ERROR, "Error %d when sending message %u (key "
1689			     "0x%x) to node %u\n", tmpret,
1690			     DLM_ASSERT_MASTER_MSG, dlm->key, to);
1691			if (!dlm_is_host_down(tmpret)) {
1692				mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1693				BUG();
1694			}
1695			/* a node died.  finish out the rest of the nodes. */
1696			mlog(0, "link to %d went down!\n", to);
1697			/* any nonzero status return will do */
1698			ret = tmpret;
1699			r = 0;
1700		} else if (r < 0) {
1701			/* ok, something horribly messed.  kill thyself. */
1702			mlog(ML_ERROR,"during assert master of %.*s to %u, "
1703			     "got %d.\n", namelen, lockname, to, r);
1704			spin_lock(&dlm->spinlock);
1705			spin_lock(&dlm->master_lock);
1706			if (dlm_find_mle(dlm, &mle, (char *)lockname,
1707					 namelen)) {
1708				dlm_print_one_mle(mle);
1709				__dlm_put_mle(mle);
1710			}
1711			spin_unlock(&dlm->master_lock);
1712			spin_unlock(&dlm->spinlock);
1713			BUG();
1714		}
1715
1716		if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1717		    !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1718				mlog(ML_ERROR, "%.*s: very strange, "
1719				     "master MLE but no lockres on %u\n",
1720				     namelen, lockname, to);
1721		}
1722
1723		if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1724			mlog(0, "%.*s: node %u create mles on other "
1725			     "nodes and requests a re-assert\n",
1726			     namelen, lockname, to);
1727			reassert = 1;
1728		}
1729		if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1730			mlog(0, "%.*s: node %u has a reference to this "
1731			     "lockres, set the bit in the refmap\n",
1732			     namelen, lockname, to);
1733			spin_lock(&res->spinlock);
1734			dlm_lockres_set_refmap_bit(dlm, res, to);
1735			spin_unlock(&res->spinlock);
1736		}
1737	}
1738
1739	if (reassert)
1740		goto again;
1741
1742	spin_lock(&res->spinlock);
1743	res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1744	spin_unlock(&res->spinlock);
1745	wake_up(&res->wq);
1746
1747	return ret;
1748}
1749
1750/*
1751 * locks that can be taken here:
1752 * dlm->spinlock
1753 * res->spinlock
1754 * mle->spinlock
1755 * dlm->master_list
1756 *
1757 * if possible, TRIM THIS DOWN!!!
1758 */
1759int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1760			      void **ret_data)
1761{
1762	struct dlm_ctxt *dlm = data;
1763	struct dlm_master_list_entry *mle = NULL;
1764	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1765	struct dlm_lock_resource *res = NULL;
1766	char *name;
1767	unsigned int namelen, hash;
1768	u32 flags;
1769	int master_request = 0, have_lockres_ref = 0;
1770	int ret = 0;
1771
1772	if (!dlm_grab(dlm))
1773		return 0;
1774
1775	name = assert->name;
1776	namelen = assert->namelen;
1777	hash = dlm_lockid_hash(name, namelen);
1778	flags = be32_to_cpu(assert->flags);
1779
1780	if (namelen > DLM_LOCKID_NAME_MAX) {
1781		mlog(ML_ERROR, "Invalid name length!");
1782		goto done;
1783	}
1784
1785	spin_lock(&dlm->spinlock);
1786
1787	if (flags)
1788		mlog(0, "assert_master with flags: %u\n", flags);
1789
1790	/* find the MLE */
1791	spin_lock(&dlm->master_lock);
1792	if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1793		/* not an error, could be master just re-asserting */
1794		mlog(0, "just got an assert_master from %u, but no "
1795		     "MLE for it! (%.*s)\n", assert->node_idx,
1796		     namelen, name);
1797	} else {
1798		int bit = find_first_bit(mle->maybe_map, O2NM_MAX_NODES);
1799		if (bit >= O2NM_MAX_NODES) {
1800			/* not necessarily an error, though less likely.
1801			 * could be master just re-asserting. */
1802			mlog(0, "no bits set in the maybe_map, but %u "
1803			     "is asserting! (%.*s)\n", assert->node_idx,
1804			     namelen, name);
1805		} else if (bit != assert->node_idx) {
1806			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1807				mlog(0, "master %u was found, %u should "
1808				     "back off\n", assert->node_idx, bit);
1809			} else {
1810				/* with the fix for bug 569, a higher node
1811				 * number winning the mastery will respond
1812				 * YES to mastery requests, but this node
1813				 * had no way of knowing.  let it pass. */
1814				mlog(0, "%u is the lowest node, "
1815				     "%u is asserting. (%.*s)  %u must "
1816				     "have begun after %u won.\n", bit,
1817				     assert->node_idx, namelen, name, bit,
1818				     assert->node_idx);
1819			}
1820		}
1821		if (mle->type == DLM_MLE_MIGRATION) {
1822			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1823				mlog(0, "%s:%.*s: got cleanup assert"
1824				     " from %u for migration\n",
1825				     dlm->name, namelen, name,
1826				     assert->node_idx);
1827			} else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1828				mlog(0, "%s:%.*s: got unrelated assert"
1829				     " from %u for migration, ignoring\n",
1830				     dlm->name, namelen, name,
1831				     assert->node_idx);
1832				__dlm_put_mle(mle);
1833				spin_unlock(&dlm->master_lock);
1834				spin_unlock(&dlm->spinlock);
1835				goto done;
1836			}
1837		}
1838	}
1839	spin_unlock(&dlm->master_lock);
1840
1841	/* ok everything checks out with the MLE
1842	 * now check to see if there is a lockres */
1843	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1844	if (res) {
1845		spin_lock(&res->spinlock);
1846		if (res->state & DLM_LOCK_RES_RECOVERING)  {
1847			mlog(ML_ERROR, "%u asserting but %.*s is "
1848			     "RECOVERING!\n", assert->node_idx, namelen, name);
1849			goto kill;
1850		}
1851		if (!mle) {
1852			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1853			    res->owner != assert->node_idx) {
1854				mlog(ML_ERROR, "DIE! Mastery assert from %u, "
1855				     "but current owner is %u! (%.*s)\n",
1856				     assert->node_idx, res->owner, namelen,
1857				     name);
1858				__dlm_print_one_lock_resource(res);
1859				BUG();
1860			}
1861		} else if (mle->type != DLM_MLE_MIGRATION) {
1862			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1863				/* owner is just re-asserting */
1864				if (res->owner == assert->node_idx) {
1865					mlog(0, "owner %u re-asserting on "
1866					     "lock %.*s\n", assert->node_idx,
1867					     namelen, name);
1868					goto ok;
1869				}
1870				mlog(ML_ERROR, "got assert_master from "
1871				     "node %u, but %u is the owner! "
1872				     "(%.*s)\n", assert->node_idx,
1873				     res->owner, namelen, name);
1874				goto kill;
1875			}
1876			if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1877				mlog(ML_ERROR, "got assert from %u, but lock "
1878				     "with no owner should be "
1879				     "in-progress! (%.*s)\n",
1880				     assert->node_idx,
1881				     namelen, name);
1882				goto kill;
1883			}
1884		} else /* mle->type == DLM_MLE_MIGRATION */ {
1885			/* should only be getting an assert from new master */
1886			if (assert->node_idx != mle->new_master) {
1887				mlog(ML_ERROR, "got assert from %u, but "
1888				     "new master is %u, and old master "
1889				     "was %u (%.*s)\n",
1890				     assert->node_idx, mle->new_master,
1891				     mle->master, namelen, name);
1892				goto kill;
1893			}
1894
1895		}
1896ok:
1897		spin_unlock(&res->spinlock);
1898	}
1899
1900	// mlog(0, "woo!  got an assert_master from node %u!\n",
1901	// 	     assert->node_idx);
1902	if (mle) {
1903		int extra_ref = 0;
1904		int nn = -1;
1905		int rr, err = 0;
1906
1907		spin_lock(&mle->spinlock);
1908		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1909			extra_ref = 1;
1910		else {
1911			/* MASTER mle: if any bits set in the response map
1912			 * then the calling node needs to re-assert to clear
1913			 * up nodes that this node contacted */
1914			while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1915						    nn+1)) < O2NM_MAX_NODES) {
1916				if (nn != dlm->node_num && nn != assert->node_idx) {
1917					master_request = 1;
1918					break;
1919				}
1920			}
1921		}
1922		mle->master = assert->node_idx;
1923		atomic_set(&mle->woken, 1);
1924		wake_up(&mle->wq);
1925		spin_unlock(&mle->spinlock);
1926
1927		if (res) {
1928			int wake = 0;
1929			spin_lock(&res->spinlock);
1930			if (mle->type == DLM_MLE_MIGRATION) {
1931				mlog(0, "finishing off migration of lockres %.*s, "
1932			     		"from %u to %u\n",
1933			       		res->lockname.len, res->lockname.name,
1934			       		dlm->node_num, mle->new_master);
1935				res->state &= ~DLM_LOCK_RES_MIGRATING;
1936				wake = 1;
1937				dlm_change_lockres_owner(dlm, res, mle->new_master);
1938				BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1939			} else {
1940				dlm_change_lockres_owner(dlm, res, mle->master);
1941			}
1942			spin_unlock(&res->spinlock);
1943			have_lockres_ref = 1;
1944			if (wake)
1945				wake_up(&res->wq);
1946		}
1947
1948		/* master is known, detach if not already detached.
1949		 * ensures that only one assert_master call will happen
1950		 * on this mle. */
1951		spin_lock(&dlm->master_lock);
1952
1953		rr = kref_read(&mle->mle_refs);
1954		if (mle->inuse > 0) {
1955			if (extra_ref && rr < 3)
1956				err = 1;
1957			else if (!extra_ref && rr < 2)
1958				err = 1;
1959		} else {
1960			if (extra_ref && rr < 2)
1961				err = 1;
1962			else if (!extra_ref && rr < 1)
1963				err = 1;
1964		}
1965		if (err) {
1966			mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1967			     "that will mess up this node, refs=%d, extra=%d, "
1968			     "inuse=%d\n", dlm->name, namelen, name,
1969			     assert->node_idx, rr, extra_ref, mle->inuse);
1970			dlm_print_one_mle(mle);
1971		}
1972		__dlm_unlink_mle(dlm, mle);
1973		__dlm_mle_detach_hb_events(dlm, mle);
1974		__dlm_put_mle(mle);
1975		if (extra_ref) {
1976			/* the assert master message now balances the extra
1977		 	 * ref given by the master / migration request message.
1978		 	 * if this is the last put, it will be removed
1979		 	 * from the list. */
1980			__dlm_put_mle(mle);
1981		}
1982		spin_unlock(&dlm->master_lock);
1983	} else if (res) {
1984		if (res->owner != assert->node_idx) {
1985			mlog(0, "assert_master from %u, but current "
1986			     "owner is %u (%.*s), no mle\n", assert->node_idx,
1987			     res->owner, namelen, name);
1988		}
1989	}
1990	spin_unlock(&dlm->spinlock);
1991
1992done:
1993	ret = 0;
1994	if (res) {
1995		spin_lock(&res->spinlock);
1996		res->state |= DLM_LOCK_RES_SETREF_INPROG;
1997		spin_unlock(&res->spinlock);
1998		*ret_data = (void *)res;
1999	}
2000	dlm_put(dlm);
2001	if (master_request) {
2002		mlog(0, "need to tell master to reassert\n");
2003		/* positive. negative would shoot down the node. */
2004		ret |= DLM_ASSERT_RESPONSE_REASSERT;
2005		if (!have_lockres_ref) {
2006			mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2007			     "mle present here for %s:%.*s, but no lockres!\n",
2008			     assert->node_idx, dlm->name, namelen, name);
2009		}
2010	}
2011	if (have_lockres_ref) {
2012		/* let the master know we have a reference to the lockres */
2013		ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2014		mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2015		     dlm->name, namelen, name, assert->node_idx);
2016	}
2017	return ret;
2018
2019kill:
2020	/* kill the caller! */
2021	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
2022	     "and killing the other node now!  This node is OK and can continue.\n");
2023	__dlm_print_one_lock_resource(res);
2024	spin_unlock(&res->spinlock);
2025	spin_lock(&dlm->master_lock);
2026	if (mle)
2027		__dlm_put_mle(mle);
2028	spin_unlock(&dlm->master_lock);
2029	spin_unlock(&dlm->spinlock);
2030	*ret_data = (void *)res;
2031	dlm_put(dlm);
2032	return -EINVAL;
2033}
2034
2035void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2036{
2037	struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2038
2039	if (ret_data) {
2040		spin_lock(&res->spinlock);
2041		res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2042		spin_unlock(&res->spinlock);
2043		wake_up(&res->wq);
2044		dlm_lockres_put(res);
2045	}
2046	return;
2047}
2048
2049int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2050			       struct dlm_lock_resource *res,
2051			       int ignore_higher, u8 request_from, u32 flags)
2052{
2053	struct dlm_work_item *item;
2054	item = kzalloc(sizeof(*item), GFP_ATOMIC);
2055	if (!item)
2056		return -ENOMEM;
2057
2058
2059	/* queue up work for dlm_assert_master_worker */
2060	dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2061	item->u.am.lockres = res; /* already have a ref */
2062	/* can optionally ignore node numbers higher than this node */
2063	item->u.am.ignore_higher = ignore_higher;
2064	item->u.am.request_from = request_from;
2065	item->u.am.flags = flags;
2066
2067	if (ignore_higher)
2068		mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
2069		     res->lockname.name);
2070
2071	spin_lock(&dlm->work_lock);
2072	list_add_tail(&item->list, &dlm->work_list);
2073	spin_unlock(&dlm->work_lock);
2074
2075	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2076	return 0;
2077}
2078
2079static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2080{
2081	struct dlm_ctxt *dlm = data;
2082	int ret = 0;
2083	struct dlm_lock_resource *res;
2084	unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
2085	int ignore_higher;
2086	int bit;
2087	u8 request_from;
2088	u32 flags;
2089
2090	dlm = item->dlm;
2091	res = item->u.am.lockres;
2092	ignore_higher = item->u.am.ignore_higher;
2093	request_from = item->u.am.request_from;
2094	flags = item->u.am.flags;
2095
2096	spin_lock(&dlm->spinlock);
2097	bitmap_copy(nodemap, dlm->domain_map, O2NM_MAX_NODES);
2098	spin_unlock(&dlm->spinlock);
2099
2100	clear_bit(dlm->node_num, nodemap);
2101	if (ignore_higher) {
2102		/* if is this just to clear up mles for nodes below
2103		 * this node, do not send the message to the original
2104		 * caller or any node number higher than this */
2105		clear_bit(request_from, nodemap);
2106		bit = dlm->node_num;
2107		while (1) {
2108			bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2109					    bit+1);
2110		       	if (bit >= O2NM_MAX_NODES)
2111				break;
2112			clear_bit(bit, nodemap);
2113		}
2114	}
2115
2116	/*
2117	 * If we're migrating this lock to someone else, we are no
2118	 * longer allowed to assert out own mastery.  OTOH, we need to
2119	 * prevent migration from starting while we're still asserting
2120	 * our dominance.  The reserved ast delays migration.
2121	 */
2122	spin_lock(&res->spinlock);
2123	if (res->state & DLM_LOCK_RES_MIGRATING) {
2124		mlog(0, "Someone asked us to assert mastery, but we're "
2125		     "in the middle of migration.  Skipping assert, "
2126		     "the new master will handle that.\n");
2127		spin_unlock(&res->spinlock);
2128		goto put;
2129	} else
2130		__dlm_lockres_reserve_ast(res);
2131	spin_unlock(&res->spinlock);
2132
2133	/* this call now finishes out the nodemap
2134	 * even if one or more nodes die */
2135	mlog(0, "worker about to master %.*s here, this=%u\n",
2136		     res->lockname.len, res->lockname.name, dlm->node_num);
2137	ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2138	if (ret < 0) {
2139		/* no need to restart, we are done */
2140		if (!dlm_is_host_down(ret))
2141			mlog_errno(ret);
2142	}
2143
2144	/* Ok, we've asserted ourselves.  Let's let migration start. */
2145	dlm_lockres_release_ast(dlm, res);
2146
2147put:
2148	dlm_lockres_drop_inflight_worker(dlm, res);
2149
2150	dlm_lockres_put(res);
2151
2152	mlog(0, "finished with dlm_assert_master_worker\n");
2153}
2154
2155/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2156 * We cannot wait for node recovery to complete to begin mastering this
2157 * lockres because this lockres is used to kick off recovery! ;-)
2158 * So, do a pre-check on all living nodes to see if any of those nodes
2159 * think that $RECOVERY is currently mastered by a dead node.  If so,
2160 * we wait a short time to allow that node to get notified by its own
2161 * heartbeat stack, then check again.  All $RECOVERY lock resources
2162 * mastered by dead nodes are purged when the heartbeat callback is
2163 * fired, so we can know for sure that it is safe to continue once
2164 * the node returns a live node or no node.  */
2165static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2166				       struct dlm_lock_resource *res)
2167{
2168	struct dlm_node_iter iter;
2169	int nodenum;
2170	int ret = 0;
2171	u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2172
2173	spin_lock(&dlm->spinlock);
2174	dlm_node_iter_init(dlm->domain_map, &iter);
2175	spin_unlock(&dlm->spinlock);
2176
2177	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2178		/* do not send to self */
2179		if (nodenum == dlm->node_num)
2180			continue;
2181		ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2182		if (ret < 0) {
2183			mlog_errno(ret);
2184			if (!dlm_is_host_down(ret))
2185				BUG();
2186			/* host is down, so answer for that node would be
2187			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2188			ret = 0;
2189		}
2190
2191		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2192			/* check to see if this master is in the recovery map */
2193			spin_lock(&dlm->spinlock);
2194			if (test_bit(master, dlm->recovery_map)) {
2195				mlog(ML_NOTICE, "%s: node %u has not seen "
2196				     "node %u go down yet, and thinks the "
2197				     "dead node is mastering the recovery "
2198				     "lock.  must wait.\n", dlm->name,
2199				     nodenum, master);
2200				ret = -EAGAIN;
2201			}
2202			spin_unlock(&dlm->spinlock);
2203			mlog(0, "%s: reco lock master is %u\n", dlm->name,
2204			     master);
2205			break;
2206		}
2207	}
2208	return ret;
2209}
2210
2211/*
2212 * DLM_DEREF_LOCKRES_MSG
2213 */
2214
2215int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2216{
2217	struct dlm_deref_lockres deref;
2218	int ret = 0, r;
2219	const char *lockname;
2220	unsigned int namelen;
2221
2222	lockname = res->lockname.name;
2223	namelen = res->lockname.len;
2224	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2225
2226	memset(&deref, 0, sizeof(deref));
2227	deref.node_idx = dlm->node_num;
2228	deref.namelen = namelen;
2229	memcpy(deref.name, lockname, namelen);
2230
2231	ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2232				 &deref, sizeof(deref), res->owner, &r);
2233	if (ret < 0)
2234		mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
2235		     dlm->name, namelen, lockname, ret, res->owner);
2236	else if (r < 0) {
2237		/* BAD.  other node says I did not have a ref. */
2238		mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
2239		     dlm->name, namelen, lockname, res->owner, r);
2240		dlm_print_one_lock_resource(res);
2241		if (r == -ENOMEM)
2242			BUG();
2243	} else
2244		ret = r;
2245
2246	return ret;
2247}
2248
2249int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2250			      void **ret_data)
2251{
2252	struct dlm_ctxt *dlm = data;
2253	struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2254	struct dlm_lock_resource *res = NULL;
2255	char *name;
2256	unsigned int namelen;
2257	int ret = -EINVAL;
2258	u8 node;
2259	unsigned int hash;
2260	struct dlm_work_item *item;
2261	int cleared = 0;
2262	int dispatch = 0;
2263
2264	if (!dlm_grab(dlm))
2265		return 0;
2266
2267	name = deref->name;
2268	namelen = deref->namelen;
2269	node = deref->node_idx;
2270
2271	if (namelen > DLM_LOCKID_NAME_MAX) {
2272		mlog(ML_ERROR, "Invalid name length!");
2273		goto done;
2274	}
2275	if (deref->node_idx >= O2NM_MAX_NODES) {
2276		mlog(ML_ERROR, "Invalid node number: %u\n", node);
2277		goto done;
2278	}
2279
2280	hash = dlm_lockid_hash(name, namelen);
2281
2282	spin_lock(&dlm->spinlock);
2283	res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2284	if (!res) {
2285		spin_unlock(&dlm->spinlock);
2286		mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2287		     dlm->name, namelen, name);
2288		goto done;
2289	}
2290	spin_unlock(&dlm->spinlock);
2291
2292	spin_lock(&res->spinlock);
2293	if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2294		dispatch = 1;
2295	else {
2296		BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2297		if (test_bit(node, res->refmap)) {
2298			dlm_lockres_clear_refmap_bit(dlm, res, node);
2299			cleared = 1;
2300		}
2301	}
2302	spin_unlock(&res->spinlock);
2303
2304	if (!dispatch) {
2305		if (cleared)
2306			dlm_lockres_calc_usage(dlm, res);
2307		else {
2308			mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2309		     	"but it is already dropped!\n", dlm->name,
2310		     	res->lockname.len, res->lockname.name, node);
2311			dlm_print_one_lock_resource(res);
2312		}
2313		ret = DLM_DEREF_RESPONSE_DONE;
2314		goto done;
2315	}
2316
2317	item = kzalloc(sizeof(*item), GFP_NOFS);
2318	if (!item) {
2319		ret = -ENOMEM;
2320		mlog_errno(ret);
2321		goto done;
2322	}
2323
2324	dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2325	item->u.dl.deref_res = res;
2326	item->u.dl.deref_node = node;
2327
2328	spin_lock(&dlm->work_lock);
2329	list_add_tail(&item->list, &dlm->work_list);
2330	spin_unlock(&dlm->work_lock);
2331
2332	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2333	return DLM_DEREF_RESPONSE_INPROG;
2334
2335done:
2336	if (res)
2337		dlm_lockres_put(res);
2338	dlm_put(dlm);
2339
2340	return ret;
2341}
2342
2343int dlm_deref_lockres_done_handler(struct o2net_msg *msg, u32 len, void *data,
2344			      void **ret_data)
2345{
2346	struct dlm_ctxt *dlm = data;
2347	struct dlm_deref_lockres_done *deref
2348			= (struct dlm_deref_lockres_done *)msg->buf;
2349	struct dlm_lock_resource *res = NULL;
2350	char *name;
2351	unsigned int namelen;
2352	int ret = -EINVAL;
2353	u8 node;
2354	unsigned int hash;
2355
2356	if (!dlm_grab(dlm))
2357		return 0;
2358
2359	name = deref->name;
2360	namelen = deref->namelen;
2361	node = deref->node_idx;
2362
2363	if (namelen > DLM_LOCKID_NAME_MAX) {
2364		mlog(ML_ERROR, "Invalid name length!");
2365		goto done;
2366	}
2367	if (deref->node_idx >= O2NM_MAX_NODES) {
2368		mlog(ML_ERROR, "Invalid node number: %u\n", node);
2369		goto done;
2370	}
2371
2372	hash = dlm_lockid_hash(name, namelen);
2373
2374	spin_lock(&dlm->spinlock);
2375	res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2376	if (!res) {
2377		spin_unlock(&dlm->spinlock);
2378		mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2379		     dlm->name, namelen, name);
2380		goto done;
2381	}
2382
2383	spin_lock(&res->spinlock);
2384	if (!(res->state & DLM_LOCK_RES_DROPPING_REF)) {
2385		spin_unlock(&res->spinlock);
2386		spin_unlock(&dlm->spinlock);
2387		mlog(ML_NOTICE, "%s:%.*s: node %u sends deref done "
2388			"but it is already derefed!\n", dlm->name,
2389			res->lockname.len, res->lockname.name, node);
2390		ret = 0;
2391		goto done;
2392	}
2393
2394	__dlm_do_purge_lockres(dlm, res);
2395	spin_unlock(&res->spinlock);
2396	wake_up(&res->wq);
2397
2398	spin_unlock(&dlm->spinlock);
2399
2400	ret = 0;
2401done:
2402	if (res)
2403		dlm_lockres_put(res);
2404	dlm_put(dlm);
2405	return ret;
2406}
2407
2408static void dlm_drop_lockres_ref_done(struct dlm_ctxt *dlm,
2409		struct dlm_lock_resource *res, u8 node)
2410{
2411	struct dlm_deref_lockres_done deref;
2412	int ret = 0, r;
2413	const char *lockname;
2414	unsigned int namelen;
2415
2416	lockname = res->lockname.name;
2417	namelen = res->lockname.len;
2418	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2419
2420	memset(&deref, 0, sizeof(deref));
2421	deref.node_idx = dlm->node_num;
2422	deref.namelen = namelen;
2423	memcpy(deref.name, lockname, namelen);
2424
2425	ret = o2net_send_message(DLM_DEREF_LOCKRES_DONE, dlm->key,
2426				 &deref, sizeof(deref), node, &r);
2427	if (ret < 0) {
2428		mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF DONE "
2429				" to node %u\n", dlm->name, namelen,
2430				lockname, ret, node);
2431	} else if (r < 0) {
2432		/* ignore the error */
2433		mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
2434		     dlm->name, namelen, lockname, node, r);
2435		dlm_print_one_lock_resource(res);
2436	}
2437}
2438
2439static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2440{
2441	struct dlm_ctxt *dlm;
2442	struct dlm_lock_resource *res;
2443	u8 node;
2444	u8 cleared = 0;
2445
2446	dlm = item->dlm;
2447	res = item->u.dl.deref_res;
2448	node = item->u.dl.deref_node;
2449
2450	spin_lock(&res->spinlock);
2451	BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2452	__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2453	if (test_bit(node, res->refmap)) {
2454		dlm_lockres_clear_refmap_bit(dlm, res, node);
2455		cleared = 1;
2456	}
2457	spin_unlock(&res->spinlock);
2458
2459	dlm_drop_lockres_ref_done(dlm, res, node);
2460
2461	if (cleared) {
2462		mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2463		     dlm->name, res->lockname.len, res->lockname.name, node);
2464		dlm_lockres_calc_usage(dlm, res);
2465	} else {
2466		mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2467		     "but it is already dropped!\n", dlm->name,
2468		     res->lockname.len, res->lockname.name, node);
2469		dlm_print_one_lock_resource(res);
2470	}
2471
2472	dlm_lockres_put(res);
2473}
2474
2475/*
2476 * A migratable resource is one that is :
2477 * 1. locally mastered, and,
2478 * 2. zero local locks, and,
2479 * 3. one or more non-local locks, or, one or more references
2480 * Returns 1 if yes, 0 if not.
2481 */
2482static int dlm_is_lockres_migratable(struct dlm_ctxt *dlm,
2483				      struct dlm_lock_resource *res)
2484{
2485	enum dlm_lockres_list idx;
2486	int nonlocal = 0, node_ref;
2487	struct list_head *queue;
2488	struct dlm_lock *lock;
2489	u64 cookie;
2490
2491	assert_spin_locked(&res->spinlock);
2492
2493	/* delay migration when the lockres is in MIGRATING state */
2494	if (res->state & DLM_LOCK_RES_MIGRATING)
2495		return 0;
2496
2497	/* delay migration when the lockres is in RECOCERING state */
2498	if (res->state & (DLM_LOCK_RES_RECOVERING|
2499			DLM_LOCK_RES_RECOVERY_WAITING))
2500		return 0;
2501
2502	if (res->owner != dlm->node_num)
2503		return 0;
2504
2505        for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2506		queue = dlm_list_idx_to_ptr(res, idx);
2507		list_for_each_entry(lock, queue, list) {
2508			if (lock->ml.node != dlm->node_num) {
2509				nonlocal++;
2510				continue;
2511			}
2512			cookie = be64_to_cpu(lock->ml.cookie);
2513			mlog(0, "%s: Not migratable res %.*s, lock %u:%llu on "
2514			     "%s list\n", dlm->name, res->lockname.len,
2515			     res->lockname.name,
2516			     dlm_get_lock_cookie_node(cookie),
2517			     dlm_get_lock_cookie_seq(cookie),
2518			     dlm_list_in_text(idx));
2519			return 0;
2520		}
2521	}
2522
2523	if (!nonlocal) {
2524		node_ref = find_first_bit(res->refmap, O2NM_MAX_NODES);
2525		if (node_ref >= O2NM_MAX_NODES)
2526			return 0;
2527	}
2528
2529	mlog(0, "%s: res %.*s, Migratable\n", dlm->name, res->lockname.len,
2530	     res->lockname.name);
2531
2532	return 1;
2533}
2534
2535/*
2536 * DLM_MIGRATE_LOCKRES
2537 */
2538
2539
2540static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2541			       struct dlm_lock_resource *res, u8 target)
2542{
2543	struct dlm_master_list_entry *mle = NULL;
2544	struct dlm_master_list_entry *oldmle = NULL;
2545 	struct dlm_migratable_lockres *mres = NULL;
2546	int ret = 0;
2547	const char *name;
2548	unsigned int namelen;
2549	int mle_added = 0;
2550	int wake = 0;
2551
2552	if (!dlm_grab(dlm))
2553		return -EINVAL;
2554
2555	name = res->lockname.name;
2556	namelen = res->lockname.len;
2557
2558	mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
2559	     target);
2560
2561	/* preallocate up front. if this fails, abort */
2562	ret = -ENOMEM;
2563	mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2564	if (!mres) {
2565		mlog_errno(ret);
2566		goto leave;
2567	}
2568
2569	mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
2570	if (!mle) {
2571		mlog_errno(ret);
2572		goto leave;
2573	}
2574	ret = 0;
2575
2576	/*
2577	 * clear any existing master requests and
2578	 * add the migration mle to the list
2579	 */
2580	spin_lock(&dlm->spinlock);
2581	spin_lock(&dlm->master_lock);
2582	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2583				    namelen, target, dlm->node_num);
2584	/* get an extra reference on the mle.
2585	 * otherwise the assert_master from the new
2586	 * master will destroy this.
2587	 */
2588	if (ret != -EEXIST)
2589		dlm_get_mle_inuse(mle);
2590
2591	spin_unlock(&dlm->master_lock);
2592	spin_unlock(&dlm->spinlock);
2593
2594	if (ret == -EEXIST) {
2595		mlog(0, "another process is already migrating it\n");
2596		goto fail;
2597	}
2598	mle_added = 1;
2599
2600	/*
2601	 * set the MIGRATING flag and flush asts
2602	 * if we fail after this we need to re-dirty the lockres
2603	 */
2604	if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2605		mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2606		     "the target went down.\n", res->lockname.len,
2607		     res->lockname.name, target);
2608		spin_lock(&res->spinlock);
2609		res->state &= ~DLM_LOCK_RES_MIGRATING;
2610		wake = 1;
2611		spin_unlock(&res->spinlock);
2612		ret = -EINVAL;
2613	}
2614
2615fail:
2616	if (ret != -EEXIST && oldmle) {
2617		/* master is known, detach if not already detached */
2618		dlm_mle_detach_hb_events(dlm, oldmle);
2619		dlm_put_mle(oldmle);
2620	}
2621
2622	if (ret < 0) {
2623		if (mle_added) {
2624			dlm_mle_detach_hb_events(dlm, mle);
2625			dlm_put_mle(mle);
2626			dlm_put_mle_inuse(mle);
2627		} else if (mle) {
2628			kmem_cache_free(dlm_mle_cache, mle);
2629			mle = NULL;
2630		}
2631		goto leave;
2632	}
2633
2634	/*
2635	 * at this point, we have a migration target, an mle
2636	 * in the master list, and the MIGRATING flag set on
2637	 * the lockres
2638	 */
2639
2640	/* now that remote nodes are spinning on the MIGRATING flag,
2641	 * ensure that all assert_master work is flushed. */
2642	flush_workqueue(dlm->dlm_worker);
2643
2644	/* notify new node and send all lock state */
2645	/* call send_one_lockres with migration flag.
2646	 * this serves as notice to the target node that a
2647	 * migration is starting. */
2648	ret = dlm_send_one_lockres(dlm, res, mres, target,
2649				   DLM_MRES_MIGRATION);
2650
2651	if (ret < 0) {
2652		mlog(0, "migration to node %u failed with %d\n",
2653		     target, ret);
2654		/* migration failed, detach and clean up mle */
2655		dlm_mle_detach_hb_events(dlm, mle);
2656		dlm_put_mle(mle);
2657		dlm_put_mle_inuse(mle);
2658		spin_lock(&res->spinlock);
2659		res->state &= ~DLM_LOCK_RES_MIGRATING;
2660		wake = 1;
2661		spin_unlock(&res->spinlock);
2662		if (dlm_is_host_down(ret))
2663			dlm_wait_for_node_death(dlm, target,
2664						DLM_NODE_DEATH_WAIT_MAX);
2665		goto leave;
2666	}
2667
2668	/* at this point, the target sends a message to all nodes,
2669	 * (using dlm_do_migrate_request).  this node is skipped since
2670	 * we had to put an mle in the list to begin the process.  this
2671	 * node now waits for target to do an assert master.  this node
2672	 * will be the last one notified, ensuring that the migration
2673	 * is complete everywhere.  if the target dies while this is
2674	 * going on, some nodes could potentially see the target as the
2675	 * master, so it is important that my recovery finds the migration
2676	 * mle and sets the master to UNKNOWN. */
2677
2678
2679	/* wait for new node to assert master */
2680	while (1) {
2681		ret = wait_event_interruptible_timeout(mle->wq,
2682					(atomic_read(&mle->woken) == 1),
2683					msecs_to_jiffies(5000));
2684
2685		if (ret >= 0) {
2686		       	if (atomic_read(&mle->woken) == 1 ||
2687			    res->owner == target)
2688				break;
2689
2690			mlog(0, "%s:%.*s: timed out during migration\n",
2691			     dlm->name, res->lockname.len, res->lockname.name);
2692			/* avoid hang during shutdown when migrating lockres
2693			 * to a node which also goes down */
2694			if (dlm_is_node_dead(dlm, target)) {
2695				mlog(0, "%s:%.*s: expected migration "
2696				     "target %u is no longer up, restarting\n",
2697				     dlm->name, res->lockname.len,
2698				     res->lockname.name, target);
2699				ret = -EINVAL;
2700				/* migration failed, detach and clean up mle */
2701				dlm_mle_detach_hb_events(dlm, mle);
2702				dlm_put_mle(mle);
2703				dlm_put_mle_inuse(mle);
2704				spin_lock(&res->spinlock);
2705				res->state &= ~DLM_LOCK_RES_MIGRATING;
2706				wake = 1;
2707				spin_unlock(&res->spinlock);
2708				goto leave;
2709			}
2710		} else
2711			mlog(0, "%s:%.*s: caught signal during migration\n",
2712			     dlm->name, res->lockname.len, res->lockname.name);
2713	}
2714
2715	/* all done, set the owner, clear the flag */
2716	spin_lock(&res->spinlock);
2717	dlm_set_lockres_owner(dlm, res, target);
2718	res->state &= ~DLM_LOCK_RES_MIGRATING;
2719	dlm_remove_nonlocal_locks(dlm, res);
2720	spin_unlock(&res->spinlock);
2721	wake_up(&res->wq);
2722
2723	/* master is known, detach if not already detached */
2724	dlm_mle_detach_hb_events(dlm, mle);
2725	dlm_put_mle_inuse(mle);
2726	ret = 0;
2727
2728	dlm_lockres_calc_usage(dlm, res);
2729
2730leave:
2731	/* re-dirty the lockres if we failed */
2732	if (ret < 0)
2733		dlm_kick_thread(dlm, res);
2734
2735	/* wake up waiters if the MIGRATING flag got set
2736	 * but migration failed */
2737	if (wake)
2738		wake_up(&res->wq);
2739
2740	if (mres)
2741		free_page((unsigned long)mres);
2742
2743	dlm_put(dlm);
2744
2745	mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
2746	     name, target, ret);
2747	return ret;
2748}
2749
2750/*
2751 * Should be called only after beginning the domain leave process.
2752 * There should not be any remaining locks on nonlocal lock resources,
2753 * and there should be no local locks left on locally mastered resources.
2754 *
2755 * Called with the dlm spinlock held, may drop it to do migration, but
2756 * will re-acquire before exit.
2757 *
2758 * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
2759 */
2760int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2761	__must_hold(&dlm->spinlock)
2762{
2763	int ret;
2764	int lock_dropped = 0;
2765	u8 target = O2NM_MAX_NODES;
2766
2767	assert_spin_locked(&dlm->spinlock);
2768
2769	spin_lock(&res->spinlock);
2770	if (dlm_is_lockres_migratable(dlm, res))
2771		target = dlm_pick_migration_target(dlm, res);
2772	spin_unlock(&res->spinlock);
2773
2774	if (target == O2NM_MAX_NODES)
2775		goto leave;
2776
2777	/* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2778	spin_unlock(&dlm->spinlock);
2779	lock_dropped = 1;
2780	ret = dlm_migrate_lockres(dlm, res, target);
2781	if (ret)
2782		mlog(0, "%s: res %.*s, Migrate to node %u failed with %d\n",
2783		     dlm->name, res->lockname.len, res->lockname.name,
2784		     target, ret);
2785	spin_lock(&dlm->spinlock);
2786leave:
2787	return lock_dropped;
2788}
2789
2790int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2791{
2792	int ret;
2793	spin_lock(&dlm->ast_lock);
2794	spin_lock(&lock->spinlock);
2795	ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2796	spin_unlock(&lock->spinlock);
2797	spin_unlock(&dlm->ast_lock);
2798	return ret;
2799}
2800
2801static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2802				     struct dlm_lock_resource *res,
2803				     u8 mig_target)
2804{
2805	int can_proceed;
2806	spin_lock(&res->spinlock);
2807	can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2808	spin_unlock(&res->spinlock);
2809
2810	/* target has died, so make the caller break out of the
2811	 * wait_event, but caller must recheck the domain_map */
2812	spin_lock(&dlm->spinlock);
2813	if (!test_bit(mig_target, dlm->domain_map))
2814		can_proceed = 1;
2815	spin_unlock(&dlm->spinlock);
2816	return can_proceed;
2817}
2818
2819static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2820				struct dlm_lock_resource *res)
2821{
2822	int ret;
2823	spin_lock(&res->spinlock);
2824	ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2825	spin_unlock(&res->spinlock);
2826	return ret;
2827}
2828
2829
2830static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2831				       struct dlm_lock_resource *res,
2832				       u8 target)
2833{
2834	int ret = 0;
2835
2836	mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2837	       res->lockname.len, res->lockname.name, dlm->node_num,
2838	       target);
2839	/* need to set MIGRATING flag on lockres.  this is done by
2840	 * ensuring that all asts have been flushed for this lockres. */
2841	spin_lock(&res->spinlock);
2842	BUG_ON(res->migration_pending);
2843	res->migration_pending = 1;
2844	/* strategy is to reserve an extra ast then release
2845	 * it below, letting the release do all of the work */
2846	__dlm_lockres_reserve_ast(res);
2847	spin_unlock(&res->spinlock);
2848
2849	/* now flush all the pending asts */
2850	dlm_kick_thread(dlm, res);
2851	/* before waiting on DIRTY, block processes which may
2852	 * try to dirty the lockres before MIGRATING is set */
2853	spin_lock(&res->spinlock);
2854	BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2855	res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2856	spin_unlock(&res->spinlock);
2857	/* now wait on any pending asts and the DIRTY state */
2858	wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2859	dlm_lockres_release_ast(dlm, res);
2860
2861	mlog(0, "about to wait on migration_wq, dirty=%s\n",
2862	       res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2863	/* if the extra ref we just put was the final one, this
2864	 * will pass thru immediately.  otherwise, we need to wait
2865	 * for the last ast to finish. */
2866again:
2867	ret = wait_event_interruptible_timeout(dlm->migration_wq,
2868		   dlm_migration_can_proceed(dlm, res, target),
2869		   msecs_to_jiffies(1000));
2870	if (ret < 0) {
2871		mlog(0, "woken again: migrating? %s, dead? %s\n",
2872		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2873		       test_bit(target, dlm->domain_map) ? "no":"yes");
2874	} else {
2875		mlog(0, "all is well: migrating? %s, dead? %s\n",
2876		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2877		       test_bit(target, dlm->domain_map) ? "no":"yes");
2878	}
2879	if (!dlm_migration_can_proceed(dlm, res, target)) {
2880		mlog(0, "trying again...\n");
2881		goto again;
2882	}
2883
2884	ret = 0;
2885	/* did the target go down or die? */
2886	spin_lock(&dlm->spinlock);
2887	if (!test_bit(target, dlm->domain_map)) {
2888		mlog(ML_ERROR, "aha. migration target %u just went down\n",
2889		     target);
2890		ret = -EHOSTDOWN;
2891	}
2892	spin_unlock(&dlm->spinlock);
2893
2894	/*
2895	 * if target is down, we need to clear DLM_LOCK_RES_BLOCK_DIRTY for
2896	 * another try; otherwise, we are sure the MIGRATING state is there,
2897	 * drop the unneeded state which blocked threads trying to DIRTY
2898	 */
2899	spin_lock(&res->spinlock);
2900	BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2901	res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2902	if (!ret)
2903		BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2904	else
2905		res->migration_pending = 0;
2906	spin_unlock(&res->spinlock);
2907
2908	/*
2909	 * at this point:
2910	 *
2911	 *   o the DLM_LOCK_RES_MIGRATING flag is set if target not down
2912	 *   o there are no pending asts on this lockres
2913	 *   o all processes trying to reserve an ast on this
2914	 *     lockres must wait for the MIGRATING flag to clear
2915	 */
2916	return ret;
2917}
2918
2919/* last step in the migration process.
2920 * original master calls this to free all of the dlm_lock
2921 * structures that used to be for other nodes. */
2922static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2923				      struct dlm_lock_resource *res)
2924{
2925	struct list_head *queue = &res->granted;
2926	int i, bit;
2927	struct dlm_lock *lock, *next;
2928
2929	assert_spin_locked(&res->spinlock);
2930
2931	BUG_ON(res->owner == dlm->node_num);
2932
2933	for (i=0; i<3; i++) {
2934		list_for_each_entry_safe(lock, next, queue, list) {
2935			if (lock->ml.node != dlm->node_num) {
2936				mlog(0, "putting lock for node %u\n",
2937				     lock->ml.node);
2938				/* be extra careful */
2939				BUG_ON(!list_empty(&lock->ast_list));
2940				BUG_ON(!list_empty(&lock->bast_list));
2941				BUG_ON(lock->ast_pending);
2942				BUG_ON(lock->bast_pending);
2943				dlm_lockres_clear_refmap_bit(dlm, res,
2944							     lock->ml.node);
2945				list_del_init(&lock->list);
2946				dlm_lock_put(lock);
2947				/* In a normal unlock, we would have added a
2948				 * DLM_UNLOCK_FREE_LOCK action. Force it. */
2949				dlm_lock_put(lock);
2950			}
2951		}
2952		queue++;
2953	}
2954	bit = 0;
2955	while (1) {
2956		bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2957		if (bit >= O2NM_MAX_NODES)
2958			break;
2959		/* do not clear the local node reference, if there is a
2960		 * process holding this, let it drop the ref itself */
2961		if (bit != dlm->node_num) {
2962			mlog(0, "%s:%.*s: node %u had a ref to this "
2963			     "migrating lockres, clearing\n", dlm->name,
2964			     res->lockname.len, res->lockname.name, bit);
2965			dlm_lockres_clear_refmap_bit(dlm, res, bit);
2966		}
2967		bit++;
2968	}
2969}
2970
2971/*
2972 * Pick a node to migrate the lock resource to. This function selects a
2973 * potential target based first on the locks and then on refmap. It skips
2974 * nodes that are in the process of exiting the domain.
2975 */
2976static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2977				    struct dlm_lock_resource *res)
2978{
2979	enum dlm_lockres_list idx;
2980	struct list_head *queue;
2981	struct dlm_lock *lock;
2982	int noderef;
2983	u8 nodenum = O2NM_MAX_NODES;
2984
2985	assert_spin_locked(&dlm->spinlock);
2986	assert_spin_locked(&res->spinlock);
2987
2988	/* Go through all the locks */
2989	for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2990		queue = dlm_list_idx_to_ptr(res, idx);
2991		list_for_each_entry(lock, queue, list) {
2992			if (lock->ml.node == dlm->node_num)
2993				continue;
2994			if (test_bit(lock->ml.node, dlm->exit_domain_map))
2995				continue;
2996			nodenum = lock->ml.node;
2997			goto bail;
2998		}
2999	}
3000
3001	/* Go thru the refmap */
3002	noderef = -1;
3003	while (1) {
3004		noderef = find_next_bit(res->refmap, O2NM_MAX_NODES,
3005					noderef + 1);
3006		if (noderef >= O2NM_MAX_NODES)
3007			break;
3008		if (noderef == dlm->node_num)
3009			continue;
3010		if (test_bit(noderef, dlm->exit_domain_map))
3011			continue;
3012		nodenum = noderef;
3013		goto bail;
3014	}
3015
3016bail:
3017	return nodenum;
3018}
3019
3020/* this is called by the new master once all lockres
3021 * data has been received */
3022static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
3023				  struct dlm_lock_resource *res,
3024				  u8 master, u8 new_master,
3025				  struct dlm_node_iter *iter)
3026{
3027	struct dlm_migrate_request migrate;
3028	int ret, skip, status = 0;
3029	int nodenum;
3030
3031	memset(&migrate, 0, sizeof(migrate));
3032	migrate.namelen = res->lockname.len;
3033	memcpy(migrate.name, res->lockname.name, migrate.namelen);
3034	migrate.new_master = new_master;
3035	migrate.master = master;
3036
3037	ret = 0;
3038
3039	/* send message to all nodes, except the master and myself */
3040	while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
3041		if (nodenum == master ||
3042		    nodenum == new_master)
3043			continue;
3044
3045		/* We could race exit domain. If exited, skip. */
3046		spin_lock(&dlm->spinlock);
3047		skip = (!test_bit(nodenum, dlm->domain_map));
3048		spin_unlock(&dlm->spinlock);
3049		if (skip) {
3050			clear_bit(nodenum, iter->node_map);
3051			continue;
3052		}
3053
3054		ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
3055					 &migrate, sizeof(migrate), nodenum,
3056					 &status);
3057		if (ret < 0) {
3058			mlog(ML_ERROR, "%s: res %.*s, Error %d send "
3059			     "MIGRATE_REQUEST to node %u\n", dlm->name,
3060			     migrate.namelen, migrate.name, ret, nodenum);
3061			if (!dlm_is_host_down(ret)) {
3062				mlog(ML_ERROR, "unhandled error=%d!\n", ret);
3063				BUG();
3064			}
3065			clear_bit(nodenum, iter->node_map);
3066			ret = 0;
3067		} else if (status < 0) {
3068			mlog(0, "migrate request (node %u) returned %d!\n",
3069			     nodenum, status);
3070			ret = status;
3071		} else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
3072			/* during the migration request we short-circuited
3073			 * the mastery of the lockres.  make sure we have
3074			 * a mastery ref for nodenum */
3075			mlog(0, "%s:%.*s: need ref for node %u\n",
3076			     dlm->name, res->lockname.len, res->lockname.name,
3077			     nodenum);
3078			spin_lock(&res->spinlock);
3079			dlm_lockres_set_refmap_bit(dlm, res, nodenum);
3080			spin_unlock(&res->spinlock);
3081		}
3082	}
3083
3084	if (ret < 0)
3085		mlog_errno(ret);
3086
3087	mlog(0, "returning ret=%d\n", ret);
3088	return ret;
3089}
3090
3091
3092/* if there is an existing mle for this lockres, we now know who the master is.
3093 * (the one who sent us *this* message) we can clear it up right away.
3094 * since the process that put the mle on the list still has a reference to it,
3095 * we can unhash it now, set the master and wake the process.  as a result,
3096 * we will have no mle in the list to start with.  now we can add an mle for
3097 * the migration and this should be the only one found for those scanning the
3098 * list.  */
3099int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3100				void **ret_data)
3101{
3102	struct dlm_ctxt *dlm = data;
3103	struct dlm_lock_resource *res = NULL;
3104	struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
3105	struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
3106	const char *name;
3107	unsigned int namelen, hash;
3108	int ret = 0;
3109
3110	if (!dlm_grab(dlm))
3111		return 0;
3112
3113	name = migrate->name;
3114	namelen = migrate->namelen;
3115	hash = dlm_lockid_hash(name, namelen);
3116
3117	/* preallocate.. if this fails, abort */
3118	mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
3119
3120	if (!mle) {
3121		ret = -ENOMEM;
3122		goto leave;
3123	}
3124
3125	/* check for pre-existing lock */
3126	spin_lock(&dlm->spinlock);
3127	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3128	if (res) {
3129		spin_lock(&res->spinlock);
3130		if (res->state & DLM_LOCK_RES_RECOVERING) {
3131			/* if all is working ok, this can only mean that we got
3132		 	* a migrate request from a node that we now see as
3133		 	* dead.  what can we do here?  drop it to the floor? */
3134			spin_unlock(&res->spinlock);
3135			mlog(ML_ERROR, "Got a migrate request, but the "
3136			     "lockres is marked as recovering!");
3137			kmem_cache_free(dlm_mle_cache, mle);
3138			ret = -EINVAL; /* need a better solution */
3139			goto unlock;
3140		}
3141		res->state |= DLM_LOCK_RES_MIGRATING;
3142		spin_unlock(&res->spinlock);
3143	}
3144
3145	spin_lock(&dlm->master_lock);
3146	/* ignore status.  only nonzero status would BUG. */
3147	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3148				    name, namelen,
3149				    migrate->new_master,
3150				    migrate->master);
3151
3152	if (ret < 0)
3153		kmem_cache_free(dlm_mle_cache, mle);
3154
3155	spin_unlock(&dlm->master_lock);
3156unlock:
3157	spin_unlock(&dlm->spinlock);
3158
3159	if (oldmle) {
3160		/* master is known, detach if not already detached */
3161		dlm_mle_detach_hb_events(dlm, oldmle);
3162		dlm_put_mle(oldmle);
3163	}
3164
3165	if (res)
3166		dlm_lockres_put(res);
3167leave:
3168	dlm_put(dlm);
3169	return ret;
3170}
3171
3172/* must be holding dlm->spinlock and dlm->master_lock
3173 * when adding a migration mle, we can clear any other mles
3174 * in the master list because we know with certainty that
3175 * the master is "master".  so we remove any old mle from
3176 * the list after setting it's master field, and then add
3177 * the new migration mle.  this way we can hold with the rule
3178 * of having only one mle for a given lock name at all times. */
3179static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3180				 struct dlm_lock_resource *res,
3181				 struct dlm_master_list_entry *mle,
3182				 struct dlm_master_list_entry **oldmle,
3183				 const char *name, unsigned int namelen,
3184				 u8 new_master, u8 master)
3185{
3186	int found;
3187	int ret = 0;
3188
3189	*oldmle = NULL;
3190
3191	assert_spin_locked(&dlm->spinlock);
3192	assert_spin_locked(&dlm->master_lock);
3193
3194	/* caller is responsible for any ref taken here on oldmle */
3195	found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3196	if (found) {
3197		struct dlm_master_list_entry *tmp = *oldmle;
3198		spin_lock(&tmp->spinlock);
3199		if (tmp->type == DLM_MLE_MIGRATION) {
3200			if (master == dlm->node_num) {
3201				/* ah another process raced me to it */
3202				mlog(0, "tried to migrate %.*s, but some "
3203				     "process beat me to it\n",
3204				     namelen, name);
3205				spin_unlock(&tmp->spinlock);
3206				return -EEXIST;
3207			} else {
3208				/* bad.  2 NODES are trying to migrate! */
3209				mlog(ML_ERROR, "migration error  mle: "
3210				     "master=%u new_master=%u // request: "
3211				     "master=%u new_master=%u // "
3212				     "lockres=%.*s\n",
3213				     tmp->master, tmp->new_master,
3214				     master, new_master,
3215				     namelen, name);
3216				BUG();
3217			}
3218		} else {
3219			/* this is essentially what assert_master does */
3220			tmp->master = master;
3221			atomic_set(&tmp->woken, 1);
3222			wake_up(&tmp->wq);
3223			/* remove it so that only one mle will be found */
3224			__dlm_unlink_mle(dlm, tmp);
3225			__dlm_mle_detach_hb_events(dlm, tmp);
3226			if (tmp->type == DLM_MLE_MASTER) {
3227				ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3228				mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3229						"telling master to get ref "
3230						"for cleared out mle during "
3231						"migration\n", dlm->name,
3232						namelen, name, master,
3233						new_master);
3234			}
3235		}
3236		spin_unlock(&tmp->spinlock);
3237	}
3238
3239	/* now add a migration mle to the tail of the list */
3240	dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3241	mle->new_master = new_master;
3242	/* the new master will be sending an assert master for this.
3243	 * at that point we will get the refmap reference */
3244	mle->master = master;
3245	/* do this for consistency with other mle types */
3246	set_bit(new_master, mle->maybe_map);
3247	__dlm_insert_mle(dlm, mle);
3248
3249	return ret;
3250}
3251
3252/*
3253 * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3254 */
3255static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3256					struct dlm_master_list_entry *mle)
3257{
3258	struct dlm_lock_resource *res;
3259
3260	/* Find the lockres associated to the mle and set its owner to UNK */
3261	res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3262				   mle->mnamehash);
3263	if (res) {
3264		spin_unlock(&dlm->master_lock);
3265
3266		/* move lockres onto recovery list */
3267		spin_lock(&res->spinlock);
3268		dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3269		dlm_move_lockres_to_recovery_list(dlm, res);
3270		spin_unlock(&res->spinlock);
3271		dlm_lockres_put(res);
3272
3273		/* about to get rid of mle, detach from heartbeat */
3274		__dlm_mle_detach_hb_events(dlm, mle);
3275
3276		/* dump the mle */
3277		spin_lock(&dlm->master_lock);
3278		__dlm_put_mle(mle);
3279		spin_unlock(&dlm->master_lock);
3280	}
3281
3282	return res;
3283}
3284
3285static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3286				    struct dlm_master_list_entry *mle)
3287{
3288	__dlm_mle_detach_hb_events(dlm, mle);
3289
3290	spin_lock(&mle->spinlock);
3291	__dlm_unlink_mle(dlm, mle);
3292	atomic_set(&mle->woken, 1);
3293	spin_unlock(&mle->spinlock);
3294
3295	wake_up(&mle->wq);
3296}
3297
3298static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3299				struct dlm_master_list_entry *mle, u8 dead_node)
3300{
3301	int bit;
3302
3303	BUG_ON(mle->type != DLM_MLE_BLOCK);
3304
3305	spin_lock(&mle->spinlock);
3306	bit = find_first_bit(mle->maybe_map, O2NM_MAX_NODES);
3307	if (bit != dead_node) {
3308		mlog(0, "mle found, but dead node %u would not have been "
3309		     "master\n", dead_node);
3310		spin_unlock(&mle->spinlock);
3311	} else {
3312		/* Must drop the refcount by one since the assert_master will
3313		 * never arrive. This may result in the mle being unlinked and
3314		 * freed, but there may still be a process waiting in the
3315		 * dlmlock path which is fine. */
3316		mlog(0, "node %u was expected master\n", dead_node);
3317		atomic_set(&mle->woken, 1);
3318		spin_unlock(&mle->spinlock);
3319		wake_up(&mle->wq);
3320
3321		/* Do not need events any longer, so detach from heartbeat */
3322		__dlm_mle_detach_hb_events(dlm, mle);
3323		__dlm_put_mle(mle);
3324	}
3325}
3326
3327void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3328{
3329	struct dlm_master_list_entry *mle;
3330	struct dlm_lock_resource *res;
3331	struct hlist_head *bucket;
3332	struct hlist_node *tmp;
3333	unsigned int i;
3334
3335	mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
3336top:
3337	assert_spin_locked(&dlm->spinlock);
3338
3339	/* clean the master list */
3340	spin_lock(&dlm->master_lock);
3341	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3342		bucket = dlm_master_hash(dlm, i);
3343		hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3344			BUG_ON(mle->type != DLM_MLE_BLOCK &&
3345			       mle->type != DLM_MLE_MASTER &&
3346			       mle->type != DLM_MLE_MIGRATION);
3347
3348			/* MASTER mles are initiated locally. The waiting
3349			 * process will notice the node map change shortly.
3350			 * Let that happen as normal. */
3351			if (mle->type == DLM_MLE_MASTER)
3352				continue;
3353
3354			/* BLOCK mles are initiated by other nodes. Need to
3355			 * clean up if the dead node would have been the
3356			 * master. */
3357			if (mle->type == DLM_MLE_BLOCK) {
3358				dlm_clean_block_mle(dlm, mle, dead_node);
3359				continue;
3360			}
3361
3362			/* Everything else is a MIGRATION mle */
3363
3364			/* The rule for MIGRATION mles is that the master
3365			 * becomes UNKNOWN if *either* the original or the new
3366			 * master dies. All UNKNOWN lockres' are sent to
3367			 * whichever node becomes the recovery master. The new
3368			 * master is responsible for determining if there is
3369			 * still a master for this lockres, or if he needs to
3370			 * take over mastery. Either way, this node should
3371			 * expect another message to resolve this. */
3372
3373			if (mle->master != dead_node &&
3374			    mle->new_master != dead_node)
3375				continue;
3376
3377			if (mle->new_master == dead_node && mle->inuse) {
3378				mlog(ML_NOTICE, "%s: target %u died during "
3379						"migration from %u, the MLE is "
3380						"still keep used, ignore it!\n",
3381						dlm->name, dead_node,
3382						mle->master);
3383				continue;
3384			}
3385
3386			/* If we have reached this point, this mle needs to be
3387			 * removed from the list and freed. */
3388			dlm_clean_migration_mle(dlm, mle);
3389
3390			mlog(0, "%s: node %u died during migration from "
3391			     "%u to %u!\n", dlm->name, dead_node, mle->master,
3392			     mle->new_master);
3393
3394			/* If we find a lockres associated with the mle, we've
3395			 * hit this rare case that messes up our lock ordering.
3396			 * If so, we need to drop the master lock so that we can
3397			 * take the lockres lock, meaning that we will have to
3398			 * restart from the head of list. */
3399			res = dlm_reset_mleres_owner(dlm, mle);
3400			if (res)
3401				/* restart */
3402				goto top;
3403
3404			/* This may be the last reference */
3405			__dlm_put_mle(mle);
3406		}
3407	}
3408	spin_unlock(&dlm->master_lock);
3409}
3410
3411int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3412			 u8 old_master)
3413{
3414	struct dlm_node_iter iter;
3415	int ret = 0;
3416
3417	spin_lock(&dlm->spinlock);
3418	dlm_node_iter_init(dlm->domain_map, &iter);
3419	clear_bit(old_master, iter.node_map);
3420	clear_bit(dlm->node_num, iter.node_map);
3421	spin_unlock(&dlm->spinlock);
3422
3423	/* ownership of the lockres is changing.  account for the
3424	 * mastery reference here since old_master will briefly have
3425	 * a reference after the migration completes */
3426	spin_lock(&res->spinlock);
3427	dlm_lockres_set_refmap_bit(dlm, res, old_master);
3428	spin_unlock(&res->spinlock);
3429
3430	mlog(0, "now time to do a migrate request to other nodes\n");
3431	ret = dlm_do_migrate_request(dlm, res, old_master,
3432				     dlm->node_num, &iter);
3433	if (ret < 0) {
3434		mlog_errno(ret);
3435		goto leave;
3436	}
3437
3438	mlog(0, "doing assert master of %.*s to all except the original node\n",
3439	     res->lockname.len, res->lockname.name);
3440	/* this call now finishes out the nodemap
3441	 * even if one or more nodes die */
3442	ret = dlm_do_assert_master(dlm, res, iter.node_map,
3443				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
3444	if (ret < 0) {
3445		/* no longer need to retry.  all living nodes contacted. */
3446		mlog_errno(ret);
3447		ret = 0;
3448	}
3449
3450	bitmap_zero(iter.node_map, O2NM_MAX_NODES);
3451	set_bit(old_master, iter.node_map);
3452	mlog(0, "doing assert master of %.*s back to %u\n",
3453	     res->lockname.len, res->lockname.name, old_master);
3454	ret = dlm_do_assert_master(dlm, res, iter.node_map,
3455				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
3456	if (ret < 0) {
3457		mlog(0, "assert master to original master failed "
3458		     "with %d.\n", ret);
3459		/* the only nonzero status here would be because of
3460		 * a dead original node.  we're done. */
3461		ret = 0;
3462	}
3463
3464	/* all done, set the owner, clear the flag */
3465	spin_lock(&res->spinlock);
3466	dlm_set_lockres_owner(dlm, res, dlm->node_num);
3467	res->state &= ~DLM_LOCK_RES_MIGRATING;
3468	spin_unlock(&res->spinlock);
3469	/* re-dirty it on the new master */
3470	dlm_kick_thread(dlm, res);
3471	wake_up(&res->wq);
3472leave:
3473	return ret;
3474}
3475
3476/*
3477 * LOCKRES AST REFCOUNT
3478 * this is integral to migration
3479 */
3480
3481/* for future intent to call an ast, reserve one ahead of time.
3482 * this should be called only after waiting on the lockres
3483 * with dlm_wait_on_lockres, and while still holding the
3484 * spinlock after the call. */
3485void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
3486{
3487	assert_spin_locked(&res->spinlock);
3488	if (res->state & DLM_LOCK_RES_MIGRATING) {
3489		__dlm_print_one_lock_resource(res);
3490	}
3491	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3492
3493	atomic_inc(&res->asts_reserved);
3494}
3495
3496/*
3497 * used to drop the reserved ast, either because it went unused,
3498 * or because the ast/bast was actually called.
3499 *
3500 * also, if there is a pending migration on this lockres,
3501 * and this was the last pending ast on the lockres,
3502 * atomically set the MIGRATING flag before we drop the lock.
3503 * this is how we ensure that migration can proceed with no
3504 * asts in progress.  note that it is ok if the state of the
3505 * queues is such that a lock should be granted in the future
3506 * or that a bast should be fired, because the new master will
3507 * shuffle the lists on this lockres as soon as it is migrated.
3508 */
3509void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3510			     struct dlm_lock_resource *res)
3511{
3512	if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
3513		return;
3514
3515	if (!res->migration_pending) {
3516		spin_unlock(&res->spinlock);
3517		return;
3518	}
3519
3520	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3521	res->migration_pending = 0;
3522	res->state |= DLM_LOCK_RES_MIGRATING;
3523	spin_unlock(&res->spinlock);
3524	wake_up(&res->wq);
3525	wake_up(&dlm->migration_wq);
3526}
3527
3528void dlm_force_free_mles(struct dlm_ctxt *dlm)
3529{
3530	int i;
3531	struct hlist_head *bucket;
3532	struct dlm_master_list_entry *mle;
3533	struct hlist_node *tmp;
3534
3535	/*
3536	 * We notified all other nodes that we are exiting the domain and
3537	 * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
3538	 * around we force free them and wake any processes that are waiting
3539	 * on the mles
3540	 */
3541	spin_lock(&dlm->spinlock);
3542	spin_lock(&dlm->master_lock);
3543
3544	BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
3545	BUG_ON((find_first_bit(dlm->domain_map, O2NM_MAX_NODES) < O2NM_MAX_NODES));
3546
3547	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3548		bucket = dlm_master_hash(dlm, i);
3549		hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
3550			if (mle->type != DLM_MLE_BLOCK) {
3551				mlog(ML_ERROR, "bad mle: %p\n", mle);
3552				dlm_print_one_mle(mle);
3553			}
3554			atomic_set(&mle->woken, 1);
3555			wake_up(&mle->wq);
3556
3557			__dlm_unlink_mle(dlm, mle);
3558			__dlm_mle_detach_hb_events(dlm, mle);
3559			__dlm_put_mle(mle);
3560		}
3561	}
3562	spin_unlock(&dlm->master_lock);
3563	spin_unlock(&dlm->spinlock);
3564}
3565