1// SPDX-License-Identifier: GPL-2.0-only
2/******************************************************************************
3*******************************************************************************
4**
5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7**
8**
9*******************************************************************************
10******************************************************************************/
11
12#include <linux/module.h>
13
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "recoverd.h"
18#include "dir.h"
19#include "midcomms.h"
20#include "config.h"
21#include "memory.h"
22#include "lock.h"
23#include "recover.h"
24#include "requestqueue.h"
25#include "user.h"
26#include "ast.h"
27
28static int			ls_count;
29static struct mutex		ls_lock;
30static struct list_head		lslist;
31static spinlock_t		lslist_lock;
32static struct task_struct *	scand_task;
33
34
35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36{
37	ssize_t ret = len;
38	int n;
39	int rc = kstrtoint(buf, 0, &n);
40
41	if (rc)
42		return rc;
43	ls = dlm_find_lockspace_local(ls->ls_local_handle);
44	if (!ls)
45		return -EINVAL;
46
47	switch (n) {
48	case 0:
49		dlm_ls_stop(ls);
50		break;
51	case 1:
52		dlm_ls_start(ls);
53		break;
54	default:
55		ret = -EINVAL;
56	}
57	dlm_put_lockspace(ls);
58	return ret;
59}
60
61static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62{
63	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64
65	if (rc)
66		return rc;
67	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68	wake_up(&ls->ls_uevent_wait);
69	return len;
70}
71
72static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73{
74	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75}
76
77static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78{
79	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80
81	if (rc)
82		return rc;
83	return len;
84}
85
86static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87{
88	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89}
90
91static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92{
93	int val;
94	int rc = kstrtoint(buf, 0, &val);
95
96	if (rc)
97		return rc;
98	if (val == 1)
99		set_bit(LSFL_NODIR, &ls->ls_flags);
100	return len;
101}
102
103static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104{
105	uint32_t status = dlm_recover_status(ls);
106	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107}
108
109static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110{
111	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112}
113
114struct dlm_attr {
115	struct attribute attr;
116	ssize_t (*show)(struct dlm_ls *, char *);
117	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118};
119
120static struct dlm_attr dlm_attr_control = {
121	.attr  = {.name = "control", .mode = S_IWUSR},
122	.store = dlm_control_store
123};
124
125static struct dlm_attr dlm_attr_event = {
126	.attr  = {.name = "event_done", .mode = S_IWUSR},
127	.store = dlm_event_store
128};
129
130static struct dlm_attr dlm_attr_id = {
131	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132	.show  = dlm_id_show,
133	.store = dlm_id_store
134};
135
136static struct dlm_attr dlm_attr_nodir = {
137	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138	.show  = dlm_nodir_show,
139	.store = dlm_nodir_store
140};
141
142static struct dlm_attr dlm_attr_recover_status = {
143	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144	.show  = dlm_recover_status_show
145};
146
147static struct dlm_attr dlm_attr_recover_nodeid = {
148	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149	.show  = dlm_recover_nodeid_show
150};
151
152static struct attribute *dlm_attrs[] = {
153	&dlm_attr_control.attr,
154	&dlm_attr_event.attr,
155	&dlm_attr_id.attr,
156	&dlm_attr_nodir.attr,
157	&dlm_attr_recover_status.attr,
158	&dlm_attr_recover_nodeid.attr,
159	NULL,
160};
161ATTRIBUTE_GROUPS(dlm);
162
163static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164			     char *buf)
165{
166	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168	return a->show ? a->show(ls, buf) : 0;
169}
170
171static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172			      const char *buf, size_t len)
173{
174	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176	return a->store ? a->store(ls, buf, len) : len;
177}
178
179static void lockspace_kobj_release(struct kobject *k)
180{
181	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182	kfree(ls);
183}
184
185static const struct sysfs_ops dlm_attr_ops = {
186	.show  = dlm_attr_show,
187	.store = dlm_attr_store,
188};
189
190static struct kobj_type dlm_ktype = {
191	.default_groups = dlm_groups,
192	.sysfs_ops     = &dlm_attr_ops,
193	.release       = lockspace_kobj_release,
194};
195
196static struct kset *dlm_kset;
197
198static int do_uevent(struct dlm_ls *ls, int in)
199{
200	if (in)
201		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202	else
203		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204
205	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206
207	/* dlm_controld will see the uevent, do the necessary group management
208	   and then write to sysfs to wake us */
209
210	wait_event(ls->ls_uevent_wait,
211		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212
213	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214
215	return ls->ls_uevent_result;
216}
217
218static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
219{
220	const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
221
222	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
223	return 0;
224}
225
226static const struct kset_uevent_ops dlm_uevent_ops = {
227	.uevent = dlm_uevent,
228};
229
230int __init dlm_lockspace_init(void)
231{
232	ls_count = 0;
233	mutex_init(&ls_lock);
234	INIT_LIST_HEAD(&lslist);
235	spin_lock_init(&lslist_lock);
236
237	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
238	if (!dlm_kset) {
239		printk(KERN_WARNING "%s: can not create kset\n", __func__);
240		return -ENOMEM;
241	}
242	return 0;
243}
244
245void dlm_lockspace_exit(void)
246{
247	kset_unregister(dlm_kset);
248}
249
250static struct dlm_ls *find_ls_to_scan(void)
251{
252	struct dlm_ls *ls;
253
254	spin_lock(&lslist_lock);
255	list_for_each_entry(ls, &lslist, ls_list) {
256		if (time_after_eq(jiffies, ls->ls_scan_time +
257					    dlm_config.ci_scan_secs * HZ)) {
258			spin_unlock(&lslist_lock);
259			return ls;
260		}
261	}
262	spin_unlock(&lslist_lock);
263	return NULL;
264}
265
266static int dlm_scand(void *data)
267{
268	struct dlm_ls *ls;
269
270	while (!kthread_should_stop()) {
271		ls = find_ls_to_scan();
272		if (ls) {
273			if (dlm_lock_recovery_try(ls)) {
274				ls->ls_scan_time = jiffies;
275				dlm_scan_rsbs(ls);
276				dlm_unlock_recovery(ls);
277			} else {
278				ls->ls_scan_time += HZ;
279			}
280			continue;
281		}
282		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
283	}
284	return 0;
285}
286
287static int dlm_scand_start(void)
288{
289	struct task_struct *p;
290	int error = 0;
291
292	p = kthread_run(dlm_scand, NULL, "dlm_scand");
293	if (IS_ERR(p))
294		error = PTR_ERR(p);
295	else
296		scand_task = p;
297	return error;
298}
299
300static void dlm_scand_stop(void)
301{
302	kthread_stop(scand_task);
303}
304
305struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
306{
307	struct dlm_ls *ls;
308
309	spin_lock(&lslist_lock);
310
311	list_for_each_entry(ls, &lslist, ls_list) {
312		if (ls->ls_global_id == id) {
313			atomic_inc(&ls->ls_count);
314			goto out;
315		}
316	}
317	ls = NULL;
318 out:
319	spin_unlock(&lslist_lock);
320	return ls;
321}
322
323struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
324{
325	struct dlm_ls *ls;
326
327	spin_lock(&lslist_lock);
328	list_for_each_entry(ls, &lslist, ls_list) {
329		if (ls->ls_local_handle == lockspace) {
330			atomic_inc(&ls->ls_count);
331			goto out;
332		}
333	}
334	ls = NULL;
335 out:
336	spin_unlock(&lslist_lock);
337	return ls;
338}
339
340struct dlm_ls *dlm_find_lockspace_device(int minor)
341{
342	struct dlm_ls *ls;
343
344	spin_lock(&lslist_lock);
345	list_for_each_entry(ls, &lslist, ls_list) {
346		if (ls->ls_device.minor == minor) {
347			atomic_inc(&ls->ls_count);
348			goto out;
349		}
350	}
351	ls = NULL;
352 out:
353	spin_unlock(&lslist_lock);
354	return ls;
355}
356
357void dlm_put_lockspace(struct dlm_ls *ls)
358{
359	if (atomic_dec_and_test(&ls->ls_count))
360		wake_up(&ls->ls_count_wait);
361}
362
363static void remove_lockspace(struct dlm_ls *ls)
364{
365retry:
366	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
367
368	spin_lock(&lslist_lock);
369	if (atomic_read(&ls->ls_count) != 0) {
370		spin_unlock(&lslist_lock);
371		goto retry;
372	}
373
374	WARN_ON(ls->ls_create_count != 0);
375	list_del(&ls->ls_list);
376	spin_unlock(&lslist_lock);
377}
378
379static int threads_start(void)
380{
381	int error;
382
383	/* Thread for sending/receiving messages for all lockspace's */
384	error = dlm_midcomms_start();
385	if (error) {
386		log_print("cannot start dlm midcomms %d", error);
387		goto fail;
388	}
389
390	error = dlm_scand_start();
391	if (error) {
392		log_print("cannot start dlm_scand thread %d", error);
393		goto midcomms_fail;
394	}
395
396	return 0;
397
398 midcomms_fail:
399	dlm_midcomms_stop();
400 fail:
401	return error;
402}
403
404static int new_lockspace(const char *name, const char *cluster,
405			 uint32_t flags, int lvblen,
406			 const struct dlm_lockspace_ops *ops, void *ops_arg,
407			 int *ops_result, dlm_lockspace_t **lockspace)
408{
409	struct dlm_ls *ls;
410	int i, size, error;
411	int do_unreg = 0;
412	int namelen = strlen(name);
413
414	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
415		return -EINVAL;
416
417	if (lvblen % 8)
418		return -EINVAL;
419
420	if (!try_module_get(THIS_MODULE))
421		return -EINVAL;
422
423	if (!dlm_user_daemon_available()) {
424		log_print("dlm user daemon not available");
425		error = -EUNATCH;
426		goto out;
427	}
428
429	if (ops && ops_result) {
430	       	if (!dlm_config.ci_recover_callbacks)
431			*ops_result = -EOPNOTSUPP;
432		else
433			*ops_result = 0;
434	}
435
436	if (!cluster)
437		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
438			  dlm_config.ci_cluster_name);
439
440	if (dlm_config.ci_recover_callbacks && cluster &&
441	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
442		log_print("dlm cluster name '%s' does not match "
443			  "the application cluster name '%s'",
444			  dlm_config.ci_cluster_name, cluster);
445		error = -EBADR;
446		goto out;
447	}
448
449	error = 0;
450
451	spin_lock(&lslist_lock);
452	list_for_each_entry(ls, &lslist, ls_list) {
453		WARN_ON(ls->ls_create_count <= 0);
454		if (ls->ls_namelen != namelen)
455			continue;
456		if (memcmp(ls->ls_name, name, namelen))
457			continue;
458		if (flags & DLM_LSFL_NEWEXCL) {
459			error = -EEXIST;
460			break;
461		}
462		ls->ls_create_count++;
463		*lockspace = ls;
464		error = 1;
465		break;
466	}
467	spin_unlock(&lslist_lock);
468
469	if (error)
470		goto out;
471
472	error = -ENOMEM;
473
474	ls = kzalloc(sizeof(*ls), GFP_NOFS);
475	if (!ls)
476		goto out;
477	memcpy(ls->ls_name, name, namelen);
478	ls->ls_namelen = namelen;
479	ls->ls_lvblen = lvblen;
480	atomic_set(&ls->ls_count, 0);
481	init_waitqueue_head(&ls->ls_count_wait);
482	ls->ls_flags = 0;
483	ls->ls_scan_time = jiffies;
484
485	if (ops && dlm_config.ci_recover_callbacks) {
486		ls->ls_ops = ops;
487		ls->ls_ops_arg = ops_arg;
488	}
489
490	/* ls_exflags are forced to match among nodes, and we don't
491	 * need to require all nodes to have some flags set
492	 */
493	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
494
495	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
496	ls->ls_rsbtbl_size = size;
497
498	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
499	if (!ls->ls_rsbtbl)
500		goto out_lsfree;
501	for (i = 0; i < size; i++) {
502		ls->ls_rsbtbl[i].keep.rb_node = NULL;
503		ls->ls_rsbtbl[i].toss.rb_node = NULL;
504		spin_lock_init(&ls->ls_rsbtbl[i].lock);
505	}
506
507	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
508		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
509						 GFP_KERNEL);
510		if (!ls->ls_remove_names[i])
511			goto out_rsbtbl;
512	}
513
514	idr_init(&ls->ls_lkbidr);
515	spin_lock_init(&ls->ls_lkbidr_spin);
516
517	INIT_LIST_HEAD(&ls->ls_waiters);
518	mutex_init(&ls->ls_waiters_mutex);
519	INIT_LIST_HEAD(&ls->ls_orphans);
520	mutex_init(&ls->ls_orphans_mutex);
521
522	INIT_LIST_HEAD(&ls->ls_new_rsb);
523	spin_lock_init(&ls->ls_new_rsb_spin);
524
525	INIT_LIST_HEAD(&ls->ls_nodes);
526	INIT_LIST_HEAD(&ls->ls_nodes_gone);
527	ls->ls_num_nodes = 0;
528	ls->ls_low_nodeid = 0;
529	ls->ls_total_weight = 0;
530	ls->ls_node_array = NULL;
531
532	memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
533	ls->ls_local_rsb.res_ls = ls;
534
535	ls->ls_debug_rsb_dentry = NULL;
536	ls->ls_debug_waiters_dentry = NULL;
537
538	init_waitqueue_head(&ls->ls_uevent_wait);
539	ls->ls_uevent_result = 0;
540	init_completion(&ls->ls_recovery_done);
541	ls->ls_recovery_result = -1;
542
543	spin_lock_init(&ls->ls_cb_lock);
544	INIT_LIST_HEAD(&ls->ls_cb_delay);
545
546	ls->ls_recoverd_task = NULL;
547	mutex_init(&ls->ls_recoverd_active);
548	spin_lock_init(&ls->ls_recover_lock);
549	spin_lock_init(&ls->ls_rcom_spin);
550	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
551	ls->ls_recover_status = 0;
552	ls->ls_recover_seq = get_random_u64();
553	ls->ls_recover_args = NULL;
554	init_rwsem(&ls->ls_in_recovery);
555	init_rwsem(&ls->ls_recv_active);
556	INIT_LIST_HEAD(&ls->ls_requestqueue);
557	atomic_set(&ls->ls_requestqueue_cnt, 0);
558	init_waitqueue_head(&ls->ls_requestqueue_wait);
559	mutex_init(&ls->ls_requestqueue_mutex);
560	spin_lock_init(&ls->ls_clear_proc_locks);
561
562	/* Due backwards compatibility with 3.1 we need to use maximum
563	 * possible dlm message size to be sure the message will fit and
564	 * not having out of bounds issues. However on sending side 3.2
565	 * might send less.
566	 */
567	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
568	if (!ls->ls_recover_buf)
569		goto out_lkbidr;
570
571	ls->ls_slot = 0;
572	ls->ls_num_slots = 0;
573	ls->ls_slots_size = 0;
574	ls->ls_slots = NULL;
575
576	INIT_LIST_HEAD(&ls->ls_recover_list);
577	spin_lock_init(&ls->ls_recover_list_lock);
578	idr_init(&ls->ls_recover_idr);
579	spin_lock_init(&ls->ls_recover_idr_lock);
580	ls->ls_recover_list_count = 0;
581	ls->ls_local_handle = ls;
582	init_waitqueue_head(&ls->ls_wait_general);
583	INIT_LIST_HEAD(&ls->ls_root_list);
584	init_rwsem(&ls->ls_root_sem);
585
586	spin_lock(&lslist_lock);
587	ls->ls_create_count = 1;
588	list_add(&ls->ls_list, &lslist);
589	spin_unlock(&lslist_lock);
590
591	if (flags & DLM_LSFL_FS) {
592		error = dlm_callback_start(ls);
593		if (error) {
594			log_error(ls, "can't start dlm_callback %d", error);
595			goto out_delist;
596		}
597	}
598
599	init_waitqueue_head(&ls->ls_recover_lock_wait);
600
601	/*
602	 * Once started, dlm_recoverd first looks for ls in lslist, then
603	 * initializes ls_in_recovery as locked in "down" mode.  We need
604	 * to wait for the wakeup from dlm_recoverd because in_recovery
605	 * has to start out in down mode.
606	 */
607
608	error = dlm_recoverd_start(ls);
609	if (error) {
610		log_error(ls, "can't start dlm_recoverd %d", error);
611		goto out_callback;
612	}
613
614	wait_event(ls->ls_recover_lock_wait,
615		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
616
617	/* let kobject handle freeing of ls if there's an error */
618	do_unreg = 1;
619
620	ls->ls_kobj.kset = dlm_kset;
621	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
622				     "%s", ls->ls_name);
623	if (error)
624		goto out_recoverd;
625	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
626
627	/* This uevent triggers dlm_controld in userspace to add us to the
628	   group of nodes that are members of this lockspace (managed by the
629	   cluster infrastructure.)  Once it's done that, it tells us who the
630	   current lockspace members are (via configfs) and then tells the
631	   lockspace to start running (via sysfs) in dlm_ls_start(). */
632
633	error = do_uevent(ls, 1);
634	if (error)
635		goto out_recoverd;
636
637	/* wait until recovery is successful or failed */
638	wait_for_completion(&ls->ls_recovery_done);
639	error = ls->ls_recovery_result;
640	if (error)
641		goto out_members;
642
643	dlm_create_debug_file(ls);
644
645	log_rinfo(ls, "join complete");
646	*lockspace = ls;
647	return 0;
648
649 out_members:
650	do_uevent(ls, 0);
651	dlm_clear_members(ls);
652	kfree(ls->ls_node_array);
653 out_recoverd:
654	dlm_recoverd_stop(ls);
655 out_callback:
656	dlm_callback_stop(ls);
657 out_delist:
658	spin_lock(&lslist_lock);
659	list_del(&ls->ls_list);
660	spin_unlock(&lslist_lock);
661	idr_destroy(&ls->ls_recover_idr);
662	kfree(ls->ls_recover_buf);
663 out_lkbidr:
664	idr_destroy(&ls->ls_lkbidr);
665 out_rsbtbl:
666	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
667		kfree(ls->ls_remove_names[i]);
668	vfree(ls->ls_rsbtbl);
669 out_lsfree:
670	if (do_unreg)
671		kobject_put(&ls->ls_kobj);
672	else
673		kfree(ls);
674 out:
675	module_put(THIS_MODULE);
676	return error;
677}
678
679static int __dlm_new_lockspace(const char *name, const char *cluster,
680			       uint32_t flags, int lvblen,
681			       const struct dlm_lockspace_ops *ops,
682			       void *ops_arg, int *ops_result,
683			       dlm_lockspace_t **lockspace)
684{
685	int error = 0;
686
687	mutex_lock(&ls_lock);
688	if (!ls_count)
689		error = threads_start();
690	if (error)
691		goto out;
692
693	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
694			      ops_result, lockspace);
695	if (!error)
696		ls_count++;
697	if (error > 0)
698		error = 0;
699	if (!ls_count) {
700		dlm_scand_stop();
701		dlm_midcomms_shutdown();
702		dlm_midcomms_stop();
703	}
704 out:
705	mutex_unlock(&ls_lock);
706	return error;
707}
708
709int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
710		      int lvblen, const struct dlm_lockspace_ops *ops,
711		      void *ops_arg, int *ops_result,
712		      dlm_lockspace_t **lockspace)
713{
714	return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
715				   ops, ops_arg, ops_result, lockspace);
716}
717
718int dlm_new_user_lockspace(const char *name, const char *cluster,
719			   uint32_t flags, int lvblen,
720			   const struct dlm_lockspace_ops *ops,
721			   void *ops_arg, int *ops_result,
722			   dlm_lockspace_t **lockspace)
723{
724	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
725				   ops_arg, ops_result, lockspace);
726}
727
728static int lkb_idr_is_local(int id, void *p, void *data)
729{
730	struct dlm_lkb *lkb = p;
731
732	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
733}
734
735static int lkb_idr_is_any(int id, void *p, void *data)
736{
737	return 1;
738}
739
740static int lkb_idr_free(int id, void *p, void *data)
741{
742	struct dlm_lkb *lkb = p;
743
744	if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
745		dlm_free_lvb(lkb->lkb_lvbptr);
746
747	dlm_free_lkb(lkb);
748	return 0;
749}
750
751/* NOTE: We check the lkbidr here rather than the resource table.
752   This is because there may be LKBs queued as ASTs that have been unlinked
753   from their RSBs and are pending deletion once the AST has been delivered */
754
755static int lockspace_busy(struct dlm_ls *ls, int force)
756{
757	int rv;
758
759	spin_lock(&ls->ls_lkbidr_spin);
760	if (force == 0) {
761		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
762	} else if (force == 1) {
763		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
764	} else {
765		rv = 0;
766	}
767	spin_unlock(&ls->ls_lkbidr_spin);
768	return rv;
769}
770
771static int release_lockspace(struct dlm_ls *ls, int force)
772{
773	struct dlm_rsb *rsb;
774	struct rb_node *n;
775	int i, busy, rv;
776
777	busy = lockspace_busy(ls, force);
778
779	spin_lock(&lslist_lock);
780	if (ls->ls_create_count == 1) {
781		if (busy) {
782			rv = -EBUSY;
783		} else {
784			/* remove_lockspace takes ls off lslist */
785			ls->ls_create_count = 0;
786			rv = 0;
787		}
788	} else if (ls->ls_create_count > 1) {
789		rv = --ls->ls_create_count;
790	} else {
791		rv = -EINVAL;
792	}
793	spin_unlock(&lslist_lock);
794
795	if (rv) {
796		log_debug(ls, "release_lockspace no remove %d", rv);
797		return rv;
798	}
799
800	if (ls_count == 1)
801		dlm_midcomms_version_wait();
802
803	dlm_device_deregister(ls);
804
805	if (force < 3 && dlm_user_daemon_available())
806		do_uevent(ls, 0);
807
808	dlm_recoverd_stop(ls);
809
810	if (ls_count == 1) {
811		dlm_scand_stop();
812		dlm_clear_members(ls);
813		dlm_midcomms_shutdown();
814	}
815
816	dlm_callback_stop(ls);
817
818	remove_lockspace(ls);
819
820	dlm_delete_debug_file(ls);
821
822	idr_destroy(&ls->ls_recover_idr);
823	kfree(ls->ls_recover_buf);
824
825	/*
826	 * Free all lkb's in idr
827	 */
828
829	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
830	idr_destroy(&ls->ls_lkbidr);
831
832	/*
833	 * Free all rsb's on rsbtbl[] lists
834	 */
835
836	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
837		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
838			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
839			rb_erase(n, &ls->ls_rsbtbl[i].keep);
840			dlm_free_rsb(rsb);
841		}
842
843		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
844			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
845			rb_erase(n, &ls->ls_rsbtbl[i].toss);
846			dlm_free_rsb(rsb);
847		}
848	}
849
850	vfree(ls->ls_rsbtbl);
851
852	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
853		kfree(ls->ls_remove_names[i]);
854
855	while (!list_empty(&ls->ls_new_rsb)) {
856		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
857				       res_hashchain);
858		list_del(&rsb->res_hashchain);
859		dlm_free_rsb(rsb);
860	}
861
862	/*
863	 * Free structures on any other lists
864	 */
865
866	dlm_purge_requestqueue(ls);
867	kfree(ls->ls_recover_args);
868	dlm_clear_members(ls);
869	dlm_clear_members_gone(ls);
870	kfree(ls->ls_node_array);
871	log_rinfo(ls, "release_lockspace final free");
872	kobject_put(&ls->ls_kobj);
873	/* The ls structure will be freed when the kobject is done with */
874
875	module_put(THIS_MODULE);
876	return 0;
877}
878
879/*
880 * Called when a system has released all its locks and is not going to use the
881 * lockspace any longer.  We free everything we're managing for this lockspace.
882 * Remaining nodes will go through the recovery process as if we'd died.  The
883 * lockspace must continue to function as usual, participating in recoveries,
884 * until this returns.
885 *
886 * Force has 4 possible values:
887 * 0 - don't destroy lockspace if it has any LKBs
888 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
889 * 2 - destroy lockspace regardless of LKBs
890 * 3 - destroy lockspace as part of a forced shutdown
891 */
892
893int dlm_release_lockspace(void *lockspace, int force)
894{
895	struct dlm_ls *ls;
896	int error;
897
898	ls = dlm_find_lockspace_local(lockspace);
899	if (!ls)
900		return -EINVAL;
901	dlm_put_lockspace(ls);
902
903	mutex_lock(&ls_lock);
904	error = release_lockspace(ls, force);
905	if (!error)
906		ls_count--;
907	if (!ls_count)
908		dlm_midcomms_stop();
909	mutex_unlock(&ls_lock);
910
911	return error;
912}
913
914void dlm_stop_lockspaces(void)
915{
916	struct dlm_ls *ls;
917	int count;
918
919 restart:
920	count = 0;
921	spin_lock(&lslist_lock);
922	list_for_each_entry(ls, &lslist, ls_list) {
923		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
924			count++;
925			continue;
926		}
927		spin_unlock(&lslist_lock);
928		log_error(ls, "no userland control daemon, stopping lockspace");
929		dlm_ls_stop(ls);
930		goto restart;
931	}
932	spin_unlock(&lslist_lock);
933
934	if (count)
935		log_print("dlm user daemon left %d lockspaces", count);
936}
937
938