1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmglue.c
5 *
6 * Code which implements an OCFS2 specific interface to our DLM.
7 *
8 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/types.h>
27#include <linux/slab.h>
28#include <linux/highmem.h>
29#include <linux/mm.h>
30#include <linux/crc32.h>
31#include <linux/kthread.h>
32#include <linux/pagemap.h>
33#include <linux/debugfs.h>
34#include <linux/seq_file.h>
35
36#include <cluster/heartbeat.h>
37#include <cluster/nodemanager.h>
38#include <cluster/tcp.h>
39
40#include <dlm/dlmapi.h>
41
42#define MLOG_MASK_PREFIX ML_DLM_GLUE
43#include <cluster/masklog.h>
44
45#include "ocfs2.h"
46
47#include "alloc.h"
48#include "dcache.h"
49#include "dlmglue.h"
50#include "extent_map.h"
51#include "file.h"
52#include "heartbeat.h"
53#include "inode.h"
54#include "journal.h"
55#include "slot_map.h"
56#include "super.h"
57#include "uptodate.h"
58#include "vote.h"
59
60#include "buffer_head_io.h"
61
62struct ocfs2_mask_waiter {
63	struct list_head	mw_item;
64	int			mw_status;
65	struct completion	mw_complete;
66	unsigned long		mw_mask;
67	unsigned long		mw_goal;
68};
69
70static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
71static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
72
73/*
74 * Return value from ->downconvert_worker functions.
75 *
76 * These control the precise actions of ocfs2_unblock_lock()
77 * and ocfs2_process_blocked_lock()
78 *
79 */
80enum ocfs2_unblock_action {
81	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
82	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
83				      * ->post_unlock callback */
84	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
85				      * ->post_unlock() callback. */
86};
87
88struct ocfs2_unblock_ctl {
89	int requeue;
90	enum ocfs2_unblock_action unblock_action;
91};
92
93static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
94					int new_level);
95static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
96
97static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
98				     int blocking);
99
100static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
101				       int blocking);
102
103static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
104				     struct ocfs2_lock_res *lockres);
105
106
107#define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
108
109/* This aids in debugging situations where a bad LVB might be involved. */
110static void ocfs2_dump_meta_lvb_info(u64 level,
111				     const char *function,
112				     unsigned int line,
113				     struct ocfs2_lock_res *lockres)
114{
115	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
116
117	mlog(level, "LVB information for %s (called from %s:%u):\n",
118	     lockres->l_name, function, line);
119	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
120	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
121	     be32_to_cpu(lvb->lvb_igeneration));
122	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
123	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
124	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
125	     be16_to_cpu(lvb->lvb_imode));
126	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
127	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
128	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
129	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
130	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
131	     be32_to_cpu(lvb->lvb_iattr));
132}
133
134
135/*
136 * OCFS2 Lock Resource Operations
137 *
138 * These fine tune the behavior of the generic dlmglue locking infrastructure.
139 *
140 * The most basic of lock types can point ->l_priv to their respective
141 * struct ocfs2_super and allow the default actions to manage things.
142 *
143 * Right now, each lock type also needs to implement an init function,
144 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
145 * should be called when the lock is no longer needed (i.e., object
146 * destruction time).
147 */
148struct ocfs2_lock_res_ops {
149	/*
150	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
151	 * this callback if ->l_priv is not an ocfs2_super pointer
152	 */
153	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
154
155	/*
156	 * Optionally called in the downconvert (or "vote") thread
157	 * after a successful downconvert. The lockres will not be
158	 * referenced after this callback is called, so it is safe to
159	 * free memory, etc.
160	 *
161	 * The exact semantics of when this is called are controlled
162	 * by ->downconvert_worker()
163	 */
164	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
165
166	/*
167	 * Allow a lock type to add checks to determine whether it is
168	 * safe to downconvert a lock. Return 0 to re-queue the
169	 * downconvert at a later time, nonzero to continue.
170	 *
171	 * For most locks, the default checks that there are no
172	 * incompatible holders are sufficient.
173	 *
174	 * Called with the lockres spinlock held.
175	 */
176	int (*check_downconvert)(struct ocfs2_lock_res *, int);
177
178	/*
179	 * Allows a lock type to populate the lock value block. This
180	 * is called on downconvert, and when we drop a lock.
181	 *
182	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
183	 * in the flags field.
184	 *
185	 * Called with the lockres spinlock held.
186	 */
187	void (*set_lvb)(struct ocfs2_lock_res *);
188
189	/*
190	 * Called from the downconvert thread when it is determined
191	 * that a lock will be downconverted. This is called without
192	 * any locks held so the function can do work that might
193	 * schedule (syncing out data, etc).
194	 *
195	 * This should return any one of the ocfs2_unblock_action
196	 * values, depending on what it wants the thread to do.
197	 */
198	int (*downconvert_worker)(struct ocfs2_lock_res *, int);
199
200	/*
201	 * LOCK_TYPE_* flags which describe the specific requirements
202	 * of a lock type. Descriptions of each individual flag follow.
203	 */
204	int flags;
205};
206
207/*
208 * Some locks want to "refresh" potentially stale data when a
209 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
210 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
211 * individual lockres l_flags member from the ast function. It is
212 * expected that the locking wrapper will clear the
213 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
214 */
215#define LOCK_TYPE_REQUIRES_REFRESH 0x1
216
217/*
218 * Indicate that a lock type makes use of the lock value block. The
219 * ->set_lvb lock type callback must be defined.
220 */
221#define LOCK_TYPE_USES_LVB		0x2
222
223static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
224	.get_osb	= ocfs2_get_inode_osb,
225	.flags		= 0,
226};
227
228static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
229	.get_osb	= ocfs2_get_inode_osb,
230	.check_downconvert = ocfs2_check_meta_downconvert,
231	.set_lvb	= ocfs2_set_meta_lvb,
232	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
233};
234
235static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
236	.get_osb	= ocfs2_get_inode_osb,
237	.downconvert_worker = ocfs2_data_convert_worker,
238	.flags		= 0,
239};
240
241static struct ocfs2_lock_res_ops ocfs2_super_lops = {
242	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
243};
244
245static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
246	.flags		= 0,
247};
248
249static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
250	.get_osb	= ocfs2_get_dentry_osb,
251	.post_unlock	= ocfs2_dentry_post_unlock,
252	.downconvert_worker = ocfs2_dentry_convert_worker,
253	.flags		= 0,
254};
255
256static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
257	.get_osb	= ocfs2_get_inode_osb,
258	.flags		= 0,
259};
260
261static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
262{
263	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
264		lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
265		lockres->l_type == OCFS2_LOCK_TYPE_RW ||
266		lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
267}
268
269static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
270{
271	BUG_ON(!ocfs2_is_inode_lock(lockres));
272
273	return (struct inode *) lockres->l_priv;
274}
275
276static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
277{
278	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
279
280	return (struct ocfs2_dentry_lock *)lockres->l_priv;
281}
282
283static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
284{
285	if (lockres->l_ops->get_osb)
286		return lockres->l_ops->get_osb(lockres);
287
288	return (struct ocfs2_super *)lockres->l_priv;
289}
290
291static int ocfs2_lock_create(struct ocfs2_super *osb,
292			     struct ocfs2_lock_res *lockres,
293			     int level,
294			     int dlm_flags);
295static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
296						     int wanted);
297static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
298				 struct ocfs2_lock_res *lockres,
299				 int level);
300static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
301static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
302static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
303static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
304static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
305					struct ocfs2_lock_res *lockres);
306static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
307						int convert);
308#define ocfs2_log_dlm_error(_func, _stat, _lockres) do {	\
309	mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "	\
310		"resource %s: %s\n", dlm_errname(_stat), _func,	\
311		_lockres->l_name, dlm_errmsg(_stat));		\
312} while (0)
313static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
314				 struct ocfs2_lock_res *lockres);
315static int ocfs2_meta_lock_update(struct inode *inode,
316				  struct buffer_head **bh);
317static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
318static inline int ocfs2_highest_compat_lock_level(int level);
319
320static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
321				  u64 blkno,
322				  u32 generation,
323				  char *name)
324{
325	int len;
326
327	mlog_entry_void();
328
329	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
330
331	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
332		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
333		       (long long)blkno, generation);
334
335	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
336
337	mlog(0, "built lock resource with name: %s\n", name);
338
339	mlog_exit_void();
340}
341
342static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
343
344static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
345				       struct ocfs2_dlm_debug *dlm_debug)
346{
347	mlog(0, "Add tracking for lockres %s\n", res->l_name);
348
349	spin_lock(&ocfs2_dlm_tracking_lock);
350	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
351	spin_unlock(&ocfs2_dlm_tracking_lock);
352}
353
354static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
355{
356	spin_lock(&ocfs2_dlm_tracking_lock);
357	if (!list_empty(&res->l_debug_list))
358		list_del_init(&res->l_debug_list);
359	spin_unlock(&ocfs2_dlm_tracking_lock);
360}
361
362static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
363				       struct ocfs2_lock_res *res,
364				       enum ocfs2_lock_type type,
365				       struct ocfs2_lock_res_ops *ops,
366				       void *priv)
367{
368	res->l_type          = type;
369	res->l_ops           = ops;
370	res->l_priv          = priv;
371
372	res->l_level         = LKM_IVMODE;
373	res->l_requested     = LKM_IVMODE;
374	res->l_blocking      = LKM_IVMODE;
375	res->l_action        = OCFS2_AST_INVALID;
376	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
377
378	res->l_flags         = OCFS2_LOCK_INITIALIZED;
379
380	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
381}
382
383void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
384{
385	/* This also clears out the lock status block */
386	memset(res, 0, sizeof(struct ocfs2_lock_res));
387	spin_lock_init(&res->l_lock);
388	init_waitqueue_head(&res->l_event);
389	INIT_LIST_HEAD(&res->l_blocked_list);
390	INIT_LIST_HEAD(&res->l_mask_waiters);
391}
392
393void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
394			       enum ocfs2_lock_type type,
395			       unsigned int generation,
396			       struct inode *inode)
397{
398	struct ocfs2_lock_res_ops *ops;
399
400	switch(type) {
401		case OCFS2_LOCK_TYPE_RW:
402			ops = &ocfs2_inode_rw_lops;
403			break;
404		case OCFS2_LOCK_TYPE_META:
405			ops = &ocfs2_inode_meta_lops;
406			break;
407		case OCFS2_LOCK_TYPE_DATA:
408			ops = &ocfs2_inode_data_lops;
409			break;
410		case OCFS2_LOCK_TYPE_OPEN:
411			ops = &ocfs2_inode_open_lops;
412			break;
413		default:
414			mlog_bug_on_msg(1, "type: %d\n", type);
415			ops = NULL; /* thanks, gcc */
416			break;
417	};
418
419	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
420			      generation, res->l_name);
421	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
422}
423
424static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
425{
426	struct inode *inode = ocfs2_lock_res_inode(lockres);
427
428	return OCFS2_SB(inode->i_sb);
429}
430
431static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
432{
433	__be64 inode_blkno_be;
434
435	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
436	       sizeof(__be64));
437
438	return be64_to_cpu(inode_blkno_be);
439}
440
441static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
442{
443	struct ocfs2_dentry_lock *dl = lockres->l_priv;
444
445	return OCFS2_SB(dl->dl_inode->i_sb);
446}
447
448void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
449				u64 parent, struct inode *inode)
450{
451	int len;
452	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
453	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
454	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
455
456	ocfs2_lock_res_init_once(lockres);
457
458	/*
459	 * Unfortunately, the standard lock naming scheme won't work
460	 * here because we have two 16 byte values to use. Instead,
461	 * we'll stuff the inode number as a binary value. We still
462	 * want error prints to show something without garbling the
463	 * display, so drop a null byte in there before the inode
464	 * number. A future version of OCFS2 will likely use all
465	 * binary lock names. The stringified names have been a
466	 * tremendous aid in debugging, but now that the debugfs
467	 * interface exists, we can mangle things there if need be.
468	 *
469	 * NOTE: We also drop the standard "pad" value (the total lock
470	 * name size stays the same though - the last part is all
471	 * zeros due to the memset in ocfs2_lock_res_init_once()
472	 */
473	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
474		       "%c%016llx",
475		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
476		       (long long)parent);
477
478	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
479
480	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
481	       sizeof(__be64));
482
483	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
484				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
485				   dl);
486}
487
488static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
489				      struct ocfs2_super *osb)
490{
491	/* Superblock lockres doesn't come from a slab so we call init
492	 * once on it manually.  */
493	ocfs2_lock_res_init_once(res);
494	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
495			      0, res->l_name);
496	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
497				   &ocfs2_super_lops, osb);
498}
499
500static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
501				       struct ocfs2_super *osb)
502{
503	/* Rename lockres doesn't come from a slab so we call init
504	 * once on it manually.  */
505	ocfs2_lock_res_init_once(res);
506	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
507	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
508				   &ocfs2_rename_lops, osb);
509}
510
511void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
512{
513	mlog_entry_void();
514
515	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
516		return;
517
518	ocfs2_remove_lockres_tracking(res);
519
520	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
521			"Lockres %s is on the blocked list\n",
522			res->l_name);
523	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
524			"Lockres %s has mask waiters pending\n",
525			res->l_name);
526	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
527			"Lockres %s is locked\n",
528			res->l_name);
529	mlog_bug_on_msg(res->l_ro_holders,
530			"Lockres %s has %u ro holders\n",
531			res->l_name, res->l_ro_holders);
532	mlog_bug_on_msg(res->l_ex_holders,
533			"Lockres %s has %u ex holders\n",
534			res->l_name, res->l_ex_holders);
535
536	/* Need to clear out the lock status block for the dlm */
537	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
538
539	res->l_flags = 0UL;
540	mlog_exit_void();
541}
542
543static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
544				     int level)
545{
546	mlog_entry_void();
547
548	BUG_ON(!lockres);
549
550	switch(level) {
551	case LKM_EXMODE:
552		lockres->l_ex_holders++;
553		break;
554	case LKM_PRMODE:
555		lockres->l_ro_holders++;
556		break;
557	default:
558		BUG();
559	}
560
561	mlog_exit_void();
562}
563
564static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
565				     int level)
566{
567	mlog_entry_void();
568
569	BUG_ON(!lockres);
570
571	switch(level) {
572	case LKM_EXMODE:
573		BUG_ON(!lockres->l_ex_holders);
574		lockres->l_ex_holders--;
575		break;
576	case LKM_PRMODE:
577		BUG_ON(!lockres->l_ro_holders);
578		lockres->l_ro_holders--;
579		break;
580	default:
581		BUG();
582	}
583	mlog_exit_void();
584}
585
586/* WARNING: This function lives in a world where the only three lock
587 * levels are EX, PR, and NL. It *will* have to be adjusted when more
588 * lock types are added. */
589static inline int ocfs2_highest_compat_lock_level(int level)
590{
591	int new_level = LKM_EXMODE;
592
593	if (level == LKM_EXMODE)
594		new_level = LKM_NLMODE;
595	else if (level == LKM_PRMODE)
596		new_level = LKM_PRMODE;
597	return new_level;
598}
599
600static void lockres_set_flags(struct ocfs2_lock_res *lockres,
601			      unsigned long newflags)
602{
603	struct list_head *pos, *tmp;
604	struct ocfs2_mask_waiter *mw;
605
606 	assert_spin_locked(&lockres->l_lock);
607
608	lockres->l_flags = newflags;
609
610	list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
611		mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
612		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
613			continue;
614
615		list_del_init(&mw->mw_item);
616		mw->mw_status = 0;
617		complete(&mw->mw_complete);
618	}
619}
620static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
621{
622	lockres_set_flags(lockres, lockres->l_flags | or);
623}
624static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
625				unsigned long clear)
626{
627	lockres_set_flags(lockres, lockres->l_flags & ~clear);
628}
629
630static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
631{
632	mlog_entry_void();
633
634	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
635	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
636	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
637	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
638
639	lockres->l_level = lockres->l_requested;
640	if (lockres->l_level <=
641	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
642		lockres->l_blocking = LKM_NLMODE;
643		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
644	}
645	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
646
647	mlog_exit_void();
648}
649
650static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
651{
652	mlog_entry_void();
653
654	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
655	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
656
657	/* Convert from RO to EX doesn't really need anything as our
658	 * information is already up to data. Convert from NL to
659	 * *anything* however should mark ourselves as needing an
660	 * update */
661	if (lockres->l_level == LKM_NLMODE &&
662	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
663		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
664
665	lockres->l_level = lockres->l_requested;
666	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
667
668	mlog_exit_void();
669}
670
671static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
672{
673	mlog_entry_void();
674
675	BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
676	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
677
678	if (lockres->l_requested > LKM_NLMODE &&
679	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
680	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
681		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
682
683	lockres->l_level = lockres->l_requested;
684	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
685	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
686
687	mlog_exit_void();
688}
689
690static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
691				     int level)
692{
693	int needs_downconvert = 0;
694	mlog_entry_void();
695
696	assert_spin_locked(&lockres->l_lock);
697
698	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
699
700	if (level > lockres->l_blocking) {
701		/* only schedule a downconvert if we haven't already scheduled
702		 * one that goes low enough to satisfy the level we're
703		 * blocking.  this also catches the case where we get
704		 * duplicate BASTs */
705		if (ocfs2_highest_compat_lock_level(level) <
706		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
707			needs_downconvert = 1;
708
709		lockres->l_blocking = level;
710	}
711
712	mlog_exit(needs_downconvert);
713	return needs_downconvert;
714}
715
716static void ocfs2_blocking_ast(void *opaque, int level)
717{
718	struct ocfs2_lock_res *lockres = opaque;
719	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
720	int needs_downconvert;
721	unsigned long flags;
722
723	BUG_ON(level <= LKM_NLMODE);
724
725	mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
726	     lockres->l_name, level, lockres->l_level,
727	     ocfs2_lock_type_string(lockres->l_type));
728
729	spin_lock_irqsave(&lockres->l_lock, flags);
730	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
731	if (needs_downconvert)
732		ocfs2_schedule_blocked_lock(osb, lockres);
733	spin_unlock_irqrestore(&lockres->l_lock, flags);
734
735	wake_up(&lockres->l_event);
736
737	ocfs2_kick_vote_thread(osb);
738}
739
740static void ocfs2_locking_ast(void *opaque)
741{
742	struct ocfs2_lock_res *lockres = opaque;
743	struct dlm_lockstatus *lksb = &lockres->l_lksb;
744	unsigned long flags;
745
746	spin_lock_irqsave(&lockres->l_lock, flags);
747
748	if (lksb->status != DLM_NORMAL) {
749		mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
750		     lockres->l_name, lksb->status);
751		spin_unlock_irqrestore(&lockres->l_lock, flags);
752		return;
753	}
754
755	switch(lockres->l_action) {
756	case OCFS2_AST_ATTACH:
757		ocfs2_generic_handle_attach_action(lockres);
758		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
759		break;
760	case OCFS2_AST_CONVERT:
761		ocfs2_generic_handle_convert_action(lockres);
762		break;
763	case OCFS2_AST_DOWNCONVERT:
764		ocfs2_generic_handle_downconvert_action(lockres);
765		break;
766	default:
767		mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
768		     "lockres flags = 0x%lx, unlock action: %u\n",
769		     lockres->l_name, lockres->l_action, lockres->l_flags,
770		     lockres->l_unlock_action);
771		BUG();
772	}
773
774	/* set it to something invalid so if we get called again we
775	 * can catch it. */
776	lockres->l_action = OCFS2_AST_INVALID;
777
778	wake_up(&lockres->l_event);
779	spin_unlock_irqrestore(&lockres->l_lock, flags);
780}
781
782static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
783						int convert)
784{
785	unsigned long flags;
786
787	mlog_entry_void();
788	spin_lock_irqsave(&lockres->l_lock, flags);
789	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
790	if (convert)
791		lockres->l_action = OCFS2_AST_INVALID;
792	else
793		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
794	spin_unlock_irqrestore(&lockres->l_lock, flags);
795
796	wake_up(&lockres->l_event);
797	mlog_exit_void();
798}
799
800/* Note: If we detect another process working on the lock (i.e.,
801 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
802 * to do the right thing in that case.
803 */
804static int ocfs2_lock_create(struct ocfs2_super *osb,
805			     struct ocfs2_lock_res *lockres,
806			     int level,
807			     int dlm_flags)
808{
809	int ret = 0;
810	enum dlm_status status = DLM_NORMAL;
811	unsigned long flags;
812
813	mlog_entry_void();
814
815	mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
816	     dlm_flags);
817
818	spin_lock_irqsave(&lockres->l_lock, flags);
819	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
820	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
821		spin_unlock_irqrestore(&lockres->l_lock, flags);
822		goto bail;
823	}
824
825	lockres->l_action = OCFS2_AST_ATTACH;
826	lockres->l_requested = level;
827	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
828	spin_unlock_irqrestore(&lockres->l_lock, flags);
829
830	status = dlmlock(osb->dlm,
831			 level,
832			 &lockres->l_lksb,
833			 dlm_flags,
834			 lockres->l_name,
835			 OCFS2_LOCK_ID_MAX_LEN - 1,
836			 ocfs2_locking_ast,
837			 lockres,
838			 ocfs2_blocking_ast);
839	if (status != DLM_NORMAL) {
840		ocfs2_log_dlm_error("dlmlock", status, lockres);
841		ret = -EINVAL;
842		ocfs2_recover_from_dlm_error(lockres, 1);
843	}
844
845	mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
846
847bail:
848	mlog_exit(ret);
849	return ret;
850}
851
852static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
853					int flag)
854{
855	unsigned long flags;
856	int ret;
857
858	spin_lock_irqsave(&lockres->l_lock, flags);
859	ret = lockres->l_flags & flag;
860	spin_unlock_irqrestore(&lockres->l_lock, flags);
861
862	return ret;
863}
864
865static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
866
867{
868	wait_event(lockres->l_event,
869		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
870}
871
872static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
873
874{
875	wait_event(lockres->l_event,
876		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
877}
878
879/* predict what lock level we'll be dropping down to on behalf
880 * of another node, and return true if the currently wanted
881 * level will be compatible with it. */
882static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
883						     int wanted)
884{
885	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
886
887	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
888}
889
890static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
891{
892	INIT_LIST_HEAD(&mw->mw_item);
893	init_completion(&mw->mw_complete);
894}
895
896static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
897{
898	wait_for_completion(&mw->mw_complete);
899	/* Re-arm the completion in case we want to wait on it again */
900	INIT_COMPLETION(mw->mw_complete);
901	return mw->mw_status;
902}
903
904static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
905				    struct ocfs2_mask_waiter *mw,
906				    unsigned long mask,
907				    unsigned long goal)
908{
909	BUG_ON(!list_empty(&mw->mw_item));
910
911	assert_spin_locked(&lockres->l_lock);
912
913	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
914	mw->mw_mask = mask;
915	mw->mw_goal = goal;
916}
917
918/* returns 0 if the mw that was removed was already satisfied, -EBUSY
919 * if the mask still hadn't reached its goal */
920static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
921				      struct ocfs2_mask_waiter *mw)
922{
923	unsigned long flags;
924	int ret = 0;
925
926	spin_lock_irqsave(&lockres->l_lock, flags);
927	if (!list_empty(&mw->mw_item)) {
928		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
929			ret = -EBUSY;
930
931		list_del_init(&mw->mw_item);
932		init_completion(&mw->mw_complete);
933	}
934	spin_unlock_irqrestore(&lockres->l_lock, flags);
935
936	return ret;
937
938}
939
940static int ocfs2_cluster_lock(struct ocfs2_super *osb,
941			      struct ocfs2_lock_res *lockres,
942			      int level,
943			      int lkm_flags,
944			      int arg_flags)
945{
946	struct ocfs2_mask_waiter mw;
947	enum dlm_status status;
948	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
949	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
950	unsigned long flags;
951
952	mlog_entry_void();
953
954	ocfs2_init_mask_waiter(&mw);
955
956	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
957		lkm_flags |= LKM_VALBLK;
958
959again:
960	wait = 0;
961
962	if (catch_signals && signal_pending(current)) {
963		ret = -ERESTARTSYS;
964		goto out;
965	}
966
967	spin_lock_irqsave(&lockres->l_lock, flags);
968
969	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
970			"Cluster lock called on freeing lockres %s! flags "
971			"0x%lx\n", lockres->l_name, lockres->l_flags);
972
973	/* We only compare against the currently granted level
974	 * here. If the lock is blocked waiting on a downconvert,
975	 * we'll get caught below. */
976	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
977	    level > lockres->l_level) {
978		/* is someone sitting in dlm_lock? If so, wait on
979		 * them. */
980		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
981		wait = 1;
982		goto unlock;
983	}
984
985	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
986		/* lock has not been created yet. */
987		spin_unlock_irqrestore(&lockres->l_lock, flags);
988
989		ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
990		if (ret < 0) {
991			mlog_errno(ret);
992			goto out;
993		}
994		goto again;
995	}
996
997	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
998	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
999		/* is the lock is currently blocked on behalf of
1000		 * another node */
1001		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1002		wait = 1;
1003		goto unlock;
1004	}
1005
1006	if (level > lockres->l_level) {
1007		if (lockres->l_action != OCFS2_AST_INVALID)
1008			mlog(ML_ERROR, "lockres %s has action %u pending\n",
1009			     lockres->l_name, lockres->l_action);
1010
1011		lockres->l_action = OCFS2_AST_CONVERT;
1012		lockres->l_requested = level;
1013		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1014		spin_unlock_irqrestore(&lockres->l_lock, flags);
1015
1016		BUG_ON(level == LKM_IVMODE);
1017		BUG_ON(level == LKM_NLMODE);
1018
1019		mlog(0, "lock %s, convert from %d to level = %d\n",
1020		     lockres->l_name, lockres->l_level, level);
1021
1022		/* call dlm_lock to upgrade lock now */
1023		status = dlmlock(osb->dlm,
1024				 level,
1025				 &lockres->l_lksb,
1026				 lkm_flags|LKM_CONVERT,
1027				 lockres->l_name,
1028				 OCFS2_LOCK_ID_MAX_LEN - 1,
1029				 ocfs2_locking_ast,
1030				 lockres,
1031				 ocfs2_blocking_ast);
1032		if (status != DLM_NORMAL) {
1033			if ((lkm_flags & LKM_NOQUEUE) &&
1034			    (status == DLM_NOTQUEUED))
1035				ret = -EAGAIN;
1036			else {
1037				ocfs2_log_dlm_error("dlmlock", status,
1038						    lockres);
1039				ret = -EINVAL;
1040			}
1041			ocfs2_recover_from_dlm_error(lockres, 1);
1042			goto out;
1043		}
1044
1045		mlog(0, "lock %s, successfull return from dlmlock\n",
1046		     lockres->l_name);
1047
1048		/* At this point we've gone inside the dlm and need to
1049		 * complete our work regardless. */
1050		catch_signals = 0;
1051
1052		/* wait for busy to clear and carry on */
1053		goto again;
1054	}
1055
1056	/* Ok, if we get here then we're good to go. */
1057	ocfs2_inc_holders(lockres, level);
1058
1059	ret = 0;
1060unlock:
1061	spin_unlock_irqrestore(&lockres->l_lock, flags);
1062out:
1063	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1064	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1065		wait = 0;
1066		if (lockres_remove_mask_waiter(lockres, &mw))
1067			ret = -EAGAIN;
1068		else
1069			goto again;
1070	}
1071	if (wait) {
1072		ret = ocfs2_wait_for_mask(&mw);
1073		if (ret == 0)
1074			goto again;
1075		mlog_errno(ret);
1076	}
1077
1078	mlog_exit(ret);
1079	return ret;
1080}
1081
1082static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1083				 struct ocfs2_lock_res *lockres,
1084				 int level)
1085{
1086	unsigned long flags;
1087
1088	mlog_entry_void();
1089	spin_lock_irqsave(&lockres->l_lock, flags);
1090	ocfs2_dec_holders(lockres, level);
1091	ocfs2_vote_on_unlock(osb, lockres);
1092	spin_unlock_irqrestore(&lockres->l_lock, flags);
1093	mlog_exit_void();
1094}
1095
1096static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1097				 struct ocfs2_lock_res *lockres,
1098				 int ex,
1099				 int local)
1100{
1101	int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1102	unsigned long flags;
1103	int lkm_flags = local ? LKM_LOCAL : 0;
1104
1105	spin_lock_irqsave(&lockres->l_lock, flags);
1106	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1107	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1108	spin_unlock_irqrestore(&lockres->l_lock, flags);
1109
1110	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1111}
1112
1113/* Grants us an EX lock on the data and metadata resources, skipping
1114 * the normal cluster directory lookup. Use this ONLY on newly created
1115 * inodes which other nodes can't possibly see, and which haven't been
1116 * hashed in the inode hash yet. This can give us a good performance
1117 * increase as it'll skip the network broadcast normally associated
1118 * with creating a new lock resource. */
1119int ocfs2_create_new_inode_locks(struct inode *inode)
1120{
1121	int ret;
1122	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1123
1124	BUG_ON(!inode);
1125	BUG_ON(!ocfs2_inode_is_new(inode));
1126
1127	mlog_entry_void();
1128
1129	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1130
1131	/* NOTE: That we don't increment any of the holder counts, nor
1132	 * do we add anything to a journal handle. Since this is
1133	 * supposed to be a new inode which the cluster doesn't know
1134	 * about yet, there is no need to.  As far as the LVB handling
1135	 * is concerned, this is basically like acquiring an EX lock
1136	 * on a resource which has an invalid one -- we'll set it
1137	 * valid when we release the EX. */
1138
1139	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1140	if (ret) {
1141		mlog_errno(ret);
1142		goto bail;
1143	}
1144
1145	/*
1146	 * We don't want to use LKM_LOCAL on a meta data lock as they
1147	 * don't use a generation in their lock names.
1148	 */
1149	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
1150	if (ret) {
1151		mlog_errno(ret);
1152		goto bail;
1153	}
1154
1155	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
1156	if (ret) {
1157		mlog_errno(ret);
1158		goto bail;
1159	}
1160
1161	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1162	if (ret) {
1163		mlog_errno(ret);
1164		goto bail;
1165	}
1166
1167bail:
1168	mlog_exit(ret);
1169	return ret;
1170}
1171
1172int ocfs2_rw_lock(struct inode *inode, int write)
1173{
1174	int status, level;
1175	struct ocfs2_lock_res *lockres;
1176	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1177
1178	BUG_ON(!inode);
1179
1180	mlog_entry_void();
1181
1182	mlog(0, "inode %llu take %s RW lock\n",
1183	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1184	     write ? "EXMODE" : "PRMODE");
1185
1186	if (ocfs2_mount_local(osb))
1187		return 0;
1188
1189	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1190
1191	level = write ? LKM_EXMODE : LKM_PRMODE;
1192
1193	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1194				    0);
1195	if (status < 0)
1196		mlog_errno(status);
1197
1198	mlog_exit(status);
1199	return status;
1200}
1201
1202void ocfs2_rw_unlock(struct inode *inode, int write)
1203{
1204	int level = write ? LKM_EXMODE : LKM_PRMODE;
1205	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1206	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1207
1208	mlog_entry_void();
1209
1210	mlog(0, "inode %llu drop %s RW lock\n",
1211	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1212	     write ? "EXMODE" : "PRMODE");
1213
1214	if (!ocfs2_mount_local(osb))
1215		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1216
1217	mlog_exit_void();
1218}
1219
1220/*
1221 * ocfs2_open_lock always get PR mode lock.
1222 */
1223int ocfs2_open_lock(struct inode *inode)
1224{
1225	int status = 0;
1226	struct ocfs2_lock_res *lockres;
1227	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1228
1229	BUG_ON(!inode);
1230
1231	mlog_entry_void();
1232
1233	mlog(0, "inode %llu take PRMODE open lock\n",
1234	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1235
1236	if (ocfs2_mount_local(osb))
1237		goto out;
1238
1239	lockres = &OCFS2_I(inode)->ip_open_lockres;
1240
1241	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1242				    LKM_PRMODE, 0, 0);
1243	if (status < 0)
1244		mlog_errno(status);
1245
1246out:
1247	mlog_exit(status);
1248	return status;
1249}
1250
1251int ocfs2_try_open_lock(struct inode *inode, int write)
1252{
1253	int status = 0, level;
1254	struct ocfs2_lock_res *lockres;
1255	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1256
1257	BUG_ON(!inode);
1258
1259	mlog_entry_void();
1260
1261	mlog(0, "inode %llu try to take %s open lock\n",
1262	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1263	     write ? "EXMODE" : "PRMODE");
1264
1265	if (ocfs2_mount_local(osb))
1266		goto out;
1267
1268	lockres = &OCFS2_I(inode)->ip_open_lockres;
1269
1270	level = write ? LKM_EXMODE : LKM_PRMODE;
1271
1272	/*
1273	 * The file system may already holding a PRMODE/EXMODE open lock.
1274	 * Since we pass LKM_NOQUEUE, the request won't block waiting on
1275	 * other nodes and the -EAGAIN will indicate to the caller that
1276	 * this inode is still in use.
1277	 */
1278	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1279				    level, LKM_NOQUEUE, 0);
1280
1281out:
1282	mlog_exit(status);
1283	return status;
1284}
1285
1286/*
1287 * ocfs2_open_unlock unlock PR and EX mode open locks.
1288 */
1289void ocfs2_open_unlock(struct inode *inode)
1290{
1291	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1292	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1293
1294	mlog_entry_void();
1295
1296	mlog(0, "inode %llu drop open lock\n",
1297	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1298
1299	if (ocfs2_mount_local(osb))
1300		goto out;
1301
1302	if(lockres->l_ro_holders)
1303		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1304				     LKM_PRMODE);
1305	if(lockres->l_ex_holders)
1306		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1307				     LKM_EXMODE);
1308
1309out:
1310	mlog_exit_void();
1311}
1312
1313int ocfs2_data_lock_full(struct inode *inode,
1314			 int write,
1315			 int arg_flags)
1316{
1317	int status = 0, level;
1318	struct ocfs2_lock_res *lockres;
1319	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1320
1321	BUG_ON(!inode);
1322
1323	mlog_entry_void();
1324
1325	mlog(0, "inode %llu take %s DATA lock\n",
1326	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1327	     write ? "EXMODE" : "PRMODE");
1328
1329	/* We'll allow faking a readonly data lock for
1330	 * rodevices. */
1331	if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1332		if (write) {
1333			status = -EROFS;
1334			mlog_errno(status);
1335		}
1336		goto out;
1337	}
1338
1339	if (ocfs2_mount_local(osb))
1340		goto out;
1341
1342	lockres = &OCFS2_I(inode)->ip_data_lockres;
1343
1344	level = write ? LKM_EXMODE : LKM_PRMODE;
1345
1346	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1347				    0, arg_flags);
1348	if (status < 0 && status != -EAGAIN)
1349		mlog_errno(status);
1350
1351out:
1352	mlog_exit(status);
1353	return status;
1354}
1355
1356/* see ocfs2_meta_lock_with_page() */
1357int ocfs2_data_lock_with_page(struct inode *inode,
1358			      int write,
1359			      struct page *page)
1360{
1361	int ret;
1362
1363	ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1364	if (ret == -EAGAIN) {
1365		unlock_page(page);
1366		if (ocfs2_data_lock(inode, write) == 0)
1367			ocfs2_data_unlock(inode, write);
1368		ret = AOP_TRUNCATED_PAGE;
1369	}
1370
1371	return ret;
1372}
1373
1374static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1375				 struct ocfs2_lock_res *lockres)
1376{
1377	int kick = 0;
1378
1379	mlog_entry_void();
1380
1381	/* If we know that another node is waiting on our lock, kick
1382	 * the vote thread * pre-emptively when we reach a release
1383	 * condition. */
1384	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1385		switch(lockres->l_blocking) {
1386		case LKM_EXMODE:
1387			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1388				kick = 1;
1389			break;
1390		case LKM_PRMODE:
1391			if (!lockres->l_ex_holders)
1392				kick = 1;
1393			break;
1394		default:
1395			BUG();
1396		}
1397	}
1398
1399	if (kick)
1400		ocfs2_kick_vote_thread(osb);
1401
1402	mlog_exit_void();
1403}
1404
1405void ocfs2_data_unlock(struct inode *inode,
1406		       int write)
1407{
1408	int level = write ? LKM_EXMODE : LKM_PRMODE;
1409	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1410	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1411
1412	mlog_entry_void();
1413
1414	mlog(0, "inode %llu drop %s DATA lock\n",
1415	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1416	     write ? "EXMODE" : "PRMODE");
1417
1418	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
1419	    !ocfs2_mount_local(osb))
1420		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1421
1422	mlog_exit_void();
1423}
1424
1425#define OCFS2_SEC_BITS   34
1426#define OCFS2_SEC_SHIFT  (64 - 34)
1427#define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1428
1429/* LVB only has room for 64 bits of time here so we pack it for
1430 * now. */
1431static u64 ocfs2_pack_timespec(struct timespec *spec)
1432{
1433	u64 res;
1434	u64 sec = spec->tv_sec;
1435	u32 nsec = spec->tv_nsec;
1436
1437	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1438
1439	return res;
1440}
1441
1442/* Call this with the lockres locked. I am reasonably sure we don't
1443 * need ip_lock in this function as anyone who would be changing those
1444 * values is supposed to be blocked in ocfs2_meta_lock right now. */
1445static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1446{
1447	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1448	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1449	struct ocfs2_meta_lvb *lvb;
1450
1451	mlog_entry_void();
1452
1453	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1454
1455	/*
1456	 * Invalidate the LVB of a deleted inode - this way other
1457	 * nodes are forced to go to disk and discover the new inode
1458	 * status.
1459	 */
1460	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1461		lvb->lvb_version = 0;
1462		goto out;
1463	}
1464
1465	lvb->lvb_version   = OCFS2_LVB_VERSION;
1466	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
1467	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1468	lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1469	lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1470	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1471	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1472	lvb->lvb_iatime_packed  =
1473		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1474	lvb->lvb_ictime_packed =
1475		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1476	lvb->lvb_imtime_packed =
1477		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1478	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1479	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1480
1481out:
1482	mlog_meta_lvb(0, lockres);
1483
1484	mlog_exit_void();
1485}
1486
1487static void ocfs2_unpack_timespec(struct timespec *spec,
1488				  u64 packed_time)
1489{
1490	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1491	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1492}
1493
1494static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1495{
1496	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1497	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1498	struct ocfs2_meta_lvb *lvb;
1499
1500	mlog_entry_void();
1501
1502	mlog_meta_lvb(0, lockres);
1503
1504	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1505
1506	/* We're safe here without the lockres lock... */
1507	spin_lock(&oi->ip_lock);
1508	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1509	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1510
1511	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1512	ocfs2_set_inode_flags(inode);
1513
1514	/* fast-symlinks are a special case */
1515	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1516		inode->i_blocks = 0;
1517	else
1518		inode->i_blocks = ocfs2_inode_sector_count(inode);
1519
1520	inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1521	inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1522	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1523	inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1524	ocfs2_unpack_timespec(&inode->i_atime,
1525			      be64_to_cpu(lvb->lvb_iatime_packed));
1526	ocfs2_unpack_timespec(&inode->i_mtime,
1527			      be64_to_cpu(lvb->lvb_imtime_packed));
1528	ocfs2_unpack_timespec(&inode->i_ctime,
1529			      be64_to_cpu(lvb->lvb_ictime_packed));
1530	spin_unlock(&oi->ip_lock);
1531
1532	mlog_exit_void();
1533}
1534
1535static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1536					      struct ocfs2_lock_res *lockres)
1537{
1538	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1539
1540	if (lvb->lvb_version == OCFS2_LVB_VERSION
1541	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1542		return 1;
1543	return 0;
1544}
1545
1546/* Determine whether a lock resource needs to be refreshed, and
1547 * arbitrate who gets to refresh it.
1548 *
1549 *   0 means no refresh needed.
1550 *
1551 *   > 0 means you need to refresh this and you MUST call
1552 *   ocfs2_complete_lock_res_refresh afterwards. */
1553static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1554{
1555	unsigned long flags;
1556	int status = 0;
1557
1558	mlog_entry_void();
1559
1560refresh_check:
1561	spin_lock_irqsave(&lockres->l_lock, flags);
1562	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1563		spin_unlock_irqrestore(&lockres->l_lock, flags);
1564		goto bail;
1565	}
1566
1567	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1568		spin_unlock_irqrestore(&lockres->l_lock, flags);
1569
1570		ocfs2_wait_on_refreshing_lock(lockres);
1571		goto refresh_check;
1572	}
1573
1574	/* Ok, I'll be the one to refresh this lock. */
1575	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1576	spin_unlock_irqrestore(&lockres->l_lock, flags);
1577
1578	status = 1;
1579bail:
1580	mlog_exit(status);
1581	return status;
1582}
1583
1584/* If status is non zero, I'll mark it as not being in refresh
1585 * anymroe, but i won't clear the needs refresh flag. */
1586static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1587						   int status)
1588{
1589	unsigned long flags;
1590	mlog_entry_void();
1591
1592	spin_lock_irqsave(&lockres->l_lock, flags);
1593	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1594	if (!status)
1595		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1596	spin_unlock_irqrestore(&lockres->l_lock, flags);
1597
1598	wake_up(&lockres->l_event);
1599
1600	mlog_exit_void();
1601}
1602
1603/* may or may not return a bh if it went to disk. */
1604static int ocfs2_meta_lock_update(struct inode *inode,
1605				  struct buffer_head **bh)
1606{
1607	int status = 0;
1608	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1609	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1610	struct ocfs2_dinode *fe;
1611	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1612
1613	mlog_entry_void();
1614
1615	if (ocfs2_mount_local(osb))
1616		goto bail;
1617
1618	spin_lock(&oi->ip_lock);
1619	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1620		mlog(0, "Orphaned inode %llu was deleted while we "
1621		     "were waiting on a lock. ip_flags = 0x%x\n",
1622		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
1623		spin_unlock(&oi->ip_lock);
1624		status = -ENOENT;
1625		goto bail;
1626	}
1627	spin_unlock(&oi->ip_lock);
1628
1629	if (!ocfs2_should_refresh_lock_res(lockres))
1630		goto bail;
1631
1632	/* This will discard any caching information we might have had
1633	 * for the inode metadata. */
1634	ocfs2_metadata_cache_purge(inode);
1635
1636	ocfs2_extent_map_trunc(inode, 0);
1637
1638	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1639		mlog(0, "Trusting LVB on inode %llu\n",
1640		     (unsigned long long)oi->ip_blkno);
1641		ocfs2_refresh_inode_from_lvb(inode);
1642	} else {
1643		/* Boo, we have to go to disk. */
1644		/* read bh, cast, ocfs2_refresh_inode */
1645		status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1646					  bh, OCFS2_BH_CACHED, inode);
1647		if (status < 0) {
1648			mlog_errno(status);
1649			goto bail_refresh;
1650		}
1651		fe = (struct ocfs2_dinode *) (*bh)->b_data;
1652
1653		/* This is a good chance to make sure we're not
1654		 * locking an invalid object.
1655		 *
1656		 * We bug on a stale inode here because we checked
1657		 * above whether it was wiped from disk. The wiping
1658		 * node provides a guarantee that we receive that
1659		 * message and can mark the inode before dropping any
1660		 * locks associated with it. */
1661		if (!OCFS2_IS_VALID_DINODE(fe)) {
1662			OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1663			status = -EIO;
1664			goto bail_refresh;
1665		}
1666		mlog_bug_on_msg(inode->i_generation !=
1667				le32_to_cpu(fe->i_generation),
1668				"Invalid dinode %llu disk generation: %u "
1669				"inode->i_generation: %u\n",
1670				(unsigned long long)oi->ip_blkno,
1671				le32_to_cpu(fe->i_generation),
1672				inode->i_generation);
1673		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1674				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1675				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
1676				(unsigned long long)oi->ip_blkno,
1677				(unsigned long long)le64_to_cpu(fe->i_dtime),
1678				le32_to_cpu(fe->i_flags));
1679
1680		ocfs2_refresh_inode(inode, fe);
1681	}
1682
1683	status = 0;
1684bail_refresh:
1685	ocfs2_complete_lock_res_refresh(lockres, status);
1686bail:
1687	mlog_exit(status);
1688	return status;
1689}
1690
1691static int ocfs2_assign_bh(struct inode *inode,
1692			   struct buffer_head **ret_bh,
1693			   struct buffer_head *passed_bh)
1694{
1695	int status;
1696
1697	if (passed_bh) {
1698		/* Ok, the update went to disk for us, use the
1699		 * returned bh. */
1700		*ret_bh = passed_bh;
1701		get_bh(*ret_bh);
1702
1703		return 0;
1704	}
1705
1706	status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1707				  OCFS2_I(inode)->ip_blkno,
1708				  ret_bh,
1709				  OCFS2_BH_CACHED,
1710				  inode);
1711	if (status < 0)
1712		mlog_errno(status);
1713
1714	return status;
1715}
1716
1717/*
1718 * returns < 0 error if the callback will never be called, otherwise
1719 * the result of the lock will be communicated via the callback.
1720 */
1721int ocfs2_meta_lock_full(struct inode *inode,
1722			 struct buffer_head **ret_bh,
1723			 int ex,
1724			 int arg_flags)
1725{
1726	int status, level, dlm_flags, acquired;
1727	struct ocfs2_lock_res *lockres = NULL;
1728	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1729	struct buffer_head *local_bh = NULL;
1730
1731	BUG_ON(!inode);
1732
1733	mlog_entry_void();
1734
1735	mlog(0, "inode %llu, take %s META lock\n",
1736	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1737	     ex ? "EXMODE" : "PRMODE");
1738
1739	status = 0;
1740	acquired = 0;
1741	/* We'll allow faking a readonly metadata lock for
1742	 * rodevices. */
1743	if (ocfs2_is_hard_readonly(osb)) {
1744		if (ex)
1745			status = -EROFS;
1746		goto bail;
1747	}
1748
1749	if (ocfs2_mount_local(osb))
1750		goto local;
1751
1752	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1753		wait_event(osb->recovery_event,
1754			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1755
1756	lockres = &OCFS2_I(inode)->ip_meta_lockres;
1757	level = ex ? LKM_EXMODE : LKM_PRMODE;
1758	dlm_flags = 0;
1759	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1760		dlm_flags |= LKM_NOQUEUE;
1761
1762	status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1763	if (status < 0) {
1764		if (status != -EAGAIN && status != -EIOCBRETRY)
1765			mlog_errno(status);
1766		goto bail;
1767	}
1768
1769	/* Notify the error cleanup path to drop the cluster lock. */
1770	acquired = 1;
1771
1772	/* We wait twice because a node may have died while we were in
1773	 * the lower dlm layers. The second time though, we've
1774	 * committed to owning this lock so we don't allow signals to
1775	 * abort the operation. */
1776	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1777		wait_event(osb->recovery_event,
1778			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1779
1780local:
1781	/*
1782	 * We only see this flag if we're being called from
1783	 * ocfs2_read_locked_inode(). It means we're locking an inode
1784	 * which hasn't been populated yet, so clear the refresh flag
1785	 * and let the caller handle it.
1786	 */
1787	if (inode->i_state & I_NEW) {
1788		status = 0;
1789		if (lockres)
1790			ocfs2_complete_lock_res_refresh(lockres, 0);
1791		goto bail;
1792	}
1793
1794	/* This is fun. The caller may want a bh back, or it may
1795	 * not. ocfs2_meta_lock_update definitely wants one in, but
1796	 * may or may not read one, depending on what's in the
1797	 * LVB. The result of all of this is that we've *only* gone to
1798	 * disk if we have to, so the complexity is worthwhile. */
1799	status = ocfs2_meta_lock_update(inode, &local_bh);
1800	if (status < 0) {
1801		if (status != -ENOENT)
1802			mlog_errno(status);
1803		goto bail;
1804	}
1805
1806	if (ret_bh) {
1807		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1808		if (status < 0) {
1809			mlog_errno(status);
1810			goto bail;
1811		}
1812	}
1813
1814bail:
1815	if (status < 0) {
1816		if (ret_bh && (*ret_bh)) {
1817			brelse(*ret_bh);
1818			*ret_bh = NULL;
1819		}
1820		if (acquired)
1821			ocfs2_meta_unlock(inode, ex);
1822	}
1823
1824	if (local_bh)
1825		brelse(local_bh);
1826
1827	mlog_exit(status);
1828	return status;
1829}
1830
1831/*
1832 * This is working around a lock inversion between tasks acquiring DLM locks
1833 * while holding a page lock and the vote thread which blocks dlm lock acquiry
1834 * while acquiring page locks.
1835 *
1836 * ** These _with_page variantes are only intended to be called from aop
1837 * methods that hold page locks and return a very specific *positive* error
1838 * code that aop methods pass up to the VFS -- test for errors with != 0. **
1839 *
1840 * The DLM is called such that it returns -EAGAIN if it would have blocked
1841 * waiting for the vote thread.  In that case we unlock our page so the vote
1842 * thread can make progress.  Once we've done this we have to return
1843 * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1844 * into the VFS who will then immediately retry the aop call.
1845 *
1846 * We do a blocking lock and immediate unlock before returning, though, so that
1847 * the lock has a great chance of being cached on this node by the time the VFS
1848 * calls back to retry the aop.    This has a potential to livelock as nodes
1849 * ping locks back and forth, but that's a risk we're willing to take to avoid
1850 * the lock inversion simply.
1851 */
1852int ocfs2_meta_lock_with_page(struct inode *inode,
1853			      struct buffer_head **ret_bh,
1854			      int ex,
1855			      struct page *page)
1856{
1857	int ret;
1858
1859	ret = ocfs2_meta_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
1860	if (ret == -EAGAIN) {
1861		unlock_page(page);
1862		if (ocfs2_meta_lock(inode, ret_bh, ex) == 0)
1863			ocfs2_meta_unlock(inode, ex);
1864		ret = AOP_TRUNCATED_PAGE;
1865	}
1866
1867	return ret;
1868}
1869
1870int ocfs2_meta_lock_atime(struct inode *inode,
1871			  struct vfsmount *vfsmnt,
1872			  int *level)
1873{
1874	int ret;
1875
1876	mlog_entry_void();
1877	ret = ocfs2_meta_lock(inode, NULL, 0);
1878	if (ret < 0) {
1879		mlog_errno(ret);
1880		return ret;
1881	}
1882
1883	/*
1884	 * If we should update atime, we will get EX lock,
1885	 * otherwise we just get PR lock.
1886	 */
1887	if (ocfs2_should_update_atime(inode, vfsmnt)) {
1888		struct buffer_head *bh = NULL;
1889
1890		ocfs2_meta_unlock(inode, 0);
1891		ret = ocfs2_meta_lock(inode, &bh, 1);
1892		if (ret < 0) {
1893			mlog_errno(ret);
1894			return ret;
1895		}
1896		*level = 1;
1897		if (ocfs2_should_update_atime(inode, vfsmnt))
1898			ocfs2_update_inode_atime(inode, bh);
1899		if (bh)
1900			brelse(bh);
1901	} else
1902		*level = 0;
1903
1904	mlog_exit(ret);
1905	return ret;
1906}
1907
1908void ocfs2_meta_unlock(struct inode *inode,
1909		       int ex)
1910{
1911	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1912	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1913	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1914
1915	mlog_entry_void();
1916
1917	mlog(0, "inode %llu drop %s META lock\n",
1918	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1919	     ex ? "EXMODE" : "PRMODE");
1920
1921	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
1922	    !ocfs2_mount_local(osb))
1923		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1924
1925	mlog_exit_void();
1926}
1927
1928int ocfs2_super_lock(struct ocfs2_super *osb,
1929		     int ex)
1930{
1931	int status = 0;
1932	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1933	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1934	struct buffer_head *bh;
1935	struct ocfs2_slot_info *si = osb->slot_info;
1936
1937	mlog_entry_void();
1938
1939	if (ocfs2_is_hard_readonly(osb))
1940		return -EROFS;
1941
1942	if (ocfs2_mount_local(osb))
1943		goto bail;
1944
1945	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1946	if (status < 0) {
1947		mlog_errno(status);
1948		goto bail;
1949	}
1950
1951	/* The super block lock path is really in the best position to
1952	 * know when resources covered by the lock need to be
1953	 * refreshed, so we do it here. Of course, making sense of
1954	 * everything is up to the caller :) */
1955	status = ocfs2_should_refresh_lock_res(lockres);
1956	if (status < 0) {
1957		mlog_errno(status);
1958		goto bail;
1959	}
1960	if (status) {
1961		bh = si->si_bh;
1962		status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1963					  si->si_inode);
1964		if (status == 0)
1965			ocfs2_update_slot_info(si);
1966
1967		ocfs2_complete_lock_res_refresh(lockres, status);
1968
1969		if (status < 0)
1970			mlog_errno(status);
1971	}
1972bail:
1973	mlog_exit(status);
1974	return status;
1975}
1976
1977void ocfs2_super_unlock(struct ocfs2_super *osb,
1978			int ex)
1979{
1980	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1981	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1982
1983	if (!ocfs2_mount_local(osb))
1984		ocfs2_cluster_unlock(osb, lockres, level);
1985}
1986
1987int ocfs2_rename_lock(struct ocfs2_super *osb)
1988{
1989	int status;
1990	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1991
1992	if (ocfs2_is_hard_readonly(osb))
1993		return -EROFS;
1994
1995	if (ocfs2_mount_local(osb))
1996		return 0;
1997
1998	status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
1999	if (status < 0)
2000		mlog_errno(status);
2001
2002	return status;
2003}
2004
2005void ocfs2_rename_unlock(struct ocfs2_super *osb)
2006{
2007	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2008
2009	if (!ocfs2_mount_local(osb))
2010		ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
2011}
2012
2013int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2014{
2015	int ret;
2016	int level = ex ? LKM_EXMODE : LKM_PRMODE;
2017	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2018	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2019
2020	BUG_ON(!dl);
2021
2022	if (ocfs2_is_hard_readonly(osb))
2023		return -EROFS;
2024
2025	if (ocfs2_mount_local(osb))
2026		return 0;
2027
2028	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2029	if (ret < 0)
2030		mlog_errno(ret);
2031
2032	return ret;
2033}
2034
2035void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2036{
2037	int level = ex ? LKM_EXMODE : LKM_PRMODE;
2038	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2039	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2040
2041	if (!ocfs2_mount_local(osb))
2042		ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2043}
2044
2045/* Reference counting of the dlm debug structure. We want this because
2046 * open references on the debug inodes can live on after a mount, so
2047 * we can't rely on the ocfs2_super to always exist. */
2048static void ocfs2_dlm_debug_free(struct kref *kref)
2049{
2050	struct ocfs2_dlm_debug *dlm_debug;
2051
2052	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2053
2054	kfree(dlm_debug);
2055}
2056
2057void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2058{
2059	if (dlm_debug)
2060		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2061}
2062
2063static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2064{
2065	kref_get(&debug->d_refcnt);
2066}
2067
2068struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2069{
2070	struct ocfs2_dlm_debug *dlm_debug;
2071
2072	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2073	if (!dlm_debug) {
2074		mlog_errno(-ENOMEM);
2075		goto out;
2076	}
2077
2078	kref_init(&dlm_debug->d_refcnt);
2079	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2080	dlm_debug->d_locking_state = NULL;
2081out:
2082	return dlm_debug;
2083}
2084
2085/* Access to this is arbitrated for us via seq_file->sem. */
2086struct ocfs2_dlm_seq_priv {
2087	struct ocfs2_dlm_debug *p_dlm_debug;
2088	struct ocfs2_lock_res p_iter_res;
2089	struct ocfs2_lock_res p_tmp_res;
2090};
2091
2092static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2093						 struct ocfs2_dlm_seq_priv *priv)
2094{
2095	struct ocfs2_lock_res *iter, *ret = NULL;
2096	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2097
2098	assert_spin_locked(&ocfs2_dlm_tracking_lock);
2099
2100	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2101		/* discover the head of the list */
2102		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2103			mlog(0, "End of list found, %p\n", ret);
2104			break;
2105		}
2106
2107		/* We track our "dummy" iteration lockres' by a NULL
2108		 * l_ops field. */
2109		if (iter->l_ops != NULL) {
2110			ret = iter;
2111			break;
2112		}
2113	}
2114
2115	return ret;
2116}
2117
2118static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2119{
2120	struct ocfs2_dlm_seq_priv *priv = m->private;
2121	struct ocfs2_lock_res *iter;
2122
2123	spin_lock(&ocfs2_dlm_tracking_lock);
2124	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2125	if (iter) {
2126		/* Since lockres' have the lifetime of their container
2127		 * (which can be inodes, ocfs2_supers, etc) we want to
2128		 * copy this out to a temporary lockres while still
2129		 * under the spinlock. Obviously after this we can't
2130		 * trust any pointers on the copy returned, but that's
2131		 * ok as the information we want isn't typically held
2132		 * in them. */
2133		priv->p_tmp_res = *iter;
2134		iter = &priv->p_tmp_res;
2135	}
2136	spin_unlock(&ocfs2_dlm_tracking_lock);
2137
2138	return iter;
2139}
2140
2141static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2142{
2143}
2144
2145static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2146{
2147	struct ocfs2_dlm_seq_priv *priv = m->private;
2148	struct ocfs2_lock_res *iter = v;
2149	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2150
2151	spin_lock(&ocfs2_dlm_tracking_lock);
2152	iter = ocfs2_dlm_next_res(iter, priv);
2153	list_del_init(&dummy->l_debug_list);
2154	if (iter) {
2155		list_add(&dummy->l_debug_list, &iter->l_debug_list);
2156		priv->p_tmp_res = *iter;
2157		iter = &priv->p_tmp_res;
2158	}
2159	spin_unlock(&ocfs2_dlm_tracking_lock);
2160
2161	return iter;
2162}
2163
2164/* So that debugfs.ocfs2 can determine which format is being used */
2165#define OCFS2_DLM_DEBUG_STR_VERSION 1
2166static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2167{
2168	int i;
2169	char *lvb;
2170	struct ocfs2_lock_res *lockres = v;
2171
2172	if (!lockres)
2173		return -EINVAL;
2174
2175	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2176
2177	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2178		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2179			   lockres->l_name,
2180			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2181	else
2182		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2183
2184	seq_printf(m, "%d\t"
2185		   "0x%lx\t"
2186		   "0x%x\t"
2187		   "0x%x\t"
2188		   "%u\t"
2189		   "%u\t"
2190		   "%d\t"
2191		   "%d\t",
2192		   lockres->l_level,
2193		   lockres->l_flags,
2194		   lockres->l_action,
2195		   lockres->l_unlock_action,
2196		   lockres->l_ro_holders,
2197		   lockres->l_ex_holders,
2198		   lockres->l_requested,
2199		   lockres->l_blocking);
2200
2201	/* Dump the raw LVB */
2202	lvb = lockres->l_lksb.lvb;
2203	for(i = 0; i < DLM_LVB_LEN; i++)
2204		seq_printf(m, "0x%x\t", lvb[i]);
2205
2206	/* End the line */
2207	seq_printf(m, "\n");
2208	return 0;
2209}
2210
2211static struct seq_operations ocfs2_dlm_seq_ops = {
2212	.start =	ocfs2_dlm_seq_start,
2213	.stop =		ocfs2_dlm_seq_stop,
2214	.next =		ocfs2_dlm_seq_next,
2215	.show =		ocfs2_dlm_seq_show,
2216};
2217
2218static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2219{
2220	struct seq_file *seq = (struct seq_file *) file->private_data;
2221	struct ocfs2_dlm_seq_priv *priv = seq->private;
2222	struct ocfs2_lock_res *res = &priv->p_iter_res;
2223
2224	ocfs2_remove_lockres_tracking(res);
2225	ocfs2_put_dlm_debug(priv->p_dlm_debug);
2226	return seq_release_private(inode, file);
2227}
2228
2229static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2230{
2231	int ret;
2232	struct ocfs2_dlm_seq_priv *priv;
2233	struct seq_file *seq;
2234	struct ocfs2_super *osb;
2235
2236	priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2237	if (!priv) {
2238		ret = -ENOMEM;
2239		mlog_errno(ret);
2240		goto out;
2241	}
2242	osb = inode->i_private;
2243	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2244	priv->p_dlm_debug = osb->osb_dlm_debug;
2245	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2246
2247	ret = seq_open(file, &ocfs2_dlm_seq_ops);
2248	if (ret) {
2249		kfree(priv);
2250		mlog_errno(ret);
2251		goto out;
2252	}
2253
2254	seq = (struct seq_file *) file->private_data;
2255	seq->private = priv;
2256
2257	ocfs2_add_lockres_tracking(&priv->p_iter_res,
2258				   priv->p_dlm_debug);
2259
2260out:
2261	return ret;
2262}
2263
2264static const struct file_operations ocfs2_dlm_debug_fops = {
2265	.open =		ocfs2_dlm_debug_open,
2266	.release =	ocfs2_dlm_debug_release,
2267	.read =		seq_read,
2268	.llseek =	seq_lseek,
2269};
2270
2271static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2272{
2273	int ret = 0;
2274	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2275
2276	dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2277							 S_IFREG|S_IRUSR,
2278							 osb->osb_debug_root,
2279							 osb,
2280							 &ocfs2_dlm_debug_fops);
2281	if (!dlm_debug->d_locking_state) {
2282		ret = -EINVAL;
2283		mlog(ML_ERROR,
2284		     "Unable to create locking state debugfs file.\n");
2285		goto out;
2286	}
2287
2288	ocfs2_get_dlm_debug(dlm_debug);
2289out:
2290	return ret;
2291}
2292
2293static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2294{
2295	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2296
2297	if (dlm_debug) {
2298		debugfs_remove(dlm_debug->d_locking_state);
2299		ocfs2_put_dlm_debug(dlm_debug);
2300	}
2301}
2302
2303int ocfs2_dlm_init(struct ocfs2_super *osb)
2304{
2305	int status = 0;
2306	u32 dlm_key;
2307	struct dlm_ctxt *dlm = NULL;
2308
2309	mlog_entry_void();
2310
2311	if (ocfs2_mount_local(osb))
2312		goto local;
2313
2314	status = ocfs2_dlm_init_debug(osb);
2315	if (status < 0) {
2316		mlog_errno(status);
2317		goto bail;
2318	}
2319
2320	/* launch vote thread */
2321	osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
2322	if (IS_ERR(osb->vote_task)) {
2323		status = PTR_ERR(osb->vote_task);
2324		osb->vote_task = NULL;
2325		mlog_errno(status);
2326		goto bail;
2327	}
2328
2329	/* used by the dlm code to make message headers unique, each
2330	 * node in this domain must agree on this. */
2331	dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2332
2333	/* for now, uuid == domain */
2334	dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2335	if (IS_ERR(dlm)) {
2336		status = PTR_ERR(dlm);
2337		mlog_errno(status);
2338		goto bail;
2339	}
2340
2341	dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2342
2343local:
2344	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2345	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2346
2347	osb->dlm = dlm;
2348
2349	status = 0;
2350bail:
2351	if (status < 0) {
2352		ocfs2_dlm_shutdown_debug(osb);
2353		if (osb->vote_task)
2354			kthread_stop(osb->vote_task);
2355	}
2356
2357	mlog_exit(status);
2358	return status;
2359}
2360
2361void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2362{
2363	mlog_entry_void();
2364
2365	dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2366
2367	ocfs2_drop_osb_locks(osb);
2368
2369	if (osb->vote_task) {
2370		kthread_stop(osb->vote_task);
2371		osb->vote_task = NULL;
2372	}
2373
2374	ocfs2_lock_res_free(&osb->osb_super_lockres);
2375	ocfs2_lock_res_free(&osb->osb_rename_lockres);
2376
2377	dlm_unregister_domain(osb->dlm);
2378	osb->dlm = NULL;
2379
2380	ocfs2_dlm_shutdown_debug(osb);
2381
2382	mlog_exit_void();
2383}
2384
2385static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2386{
2387	struct ocfs2_lock_res *lockres = opaque;
2388	unsigned long flags;
2389
2390	mlog_entry_void();
2391
2392	mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2393	     lockres->l_unlock_action);
2394
2395	spin_lock_irqsave(&lockres->l_lock, flags);
2396	/* We tried to cancel a convert request, but it was already
2397	 * granted. All we want to do here is clear our unlock
2398	 * state. The wake_up call done at the bottom is redundant
2399	 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2400	 * hurt anything anyway */
2401	if (status == DLM_CANCELGRANT &&
2402	    lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2403		mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2404
2405		/* We don't clear the busy flag in this case as it
2406		 * should have been cleared by the ast which the dlm
2407		 * has called. */
2408		goto complete_unlock;
2409	}
2410
2411	if (status != DLM_NORMAL) {
2412		mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2413		     "unlock_action %d\n", status, lockres->l_name,
2414		     lockres->l_unlock_action);
2415		spin_unlock_irqrestore(&lockres->l_lock, flags);
2416		return;
2417	}
2418
2419	switch(lockres->l_unlock_action) {
2420	case OCFS2_UNLOCK_CANCEL_CONVERT:
2421		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2422		lockres->l_action = OCFS2_AST_INVALID;
2423		break;
2424	case OCFS2_UNLOCK_DROP_LOCK:
2425		lockres->l_level = LKM_IVMODE;
2426		break;
2427	default:
2428		BUG();
2429	}
2430
2431	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2432complete_unlock:
2433	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2434	spin_unlock_irqrestore(&lockres->l_lock, flags);
2435
2436	wake_up(&lockres->l_event);
2437
2438	mlog_exit_void();
2439}
2440
2441static int ocfs2_drop_lock(struct ocfs2_super *osb,
2442			   struct ocfs2_lock_res *lockres)
2443{
2444	enum dlm_status status;
2445	unsigned long flags;
2446	int lkm_flags = 0;
2447
2448	/* We didn't get anywhere near actually using this lockres. */
2449	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2450		goto out;
2451
2452	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2453		lkm_flags |= LKM_VALBLK;
2454
2455	spin_lock_irqsave(&lockres->l_lock, flags);
2456
2457	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2458			"lockres %s, flags 0x%lx\n",
2459			lockres->l_name, lockres->l_flags);
2460
2461	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2462		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2463		     "%u, unlock_action = %u\n",
2464		     lockres->l_name, lockres->l_flags, lockres->l_action,
2465		     lockres->l_unlock_action);
2466
2467		spin_unlock_irqrestore(&lockres->l_lock, flags);
2468
2469		ocfs2_wait_on_busy_lock(lockres);
2470
2471		spin_lock_irqsave(&lockres->l_lock, flags);
2472	}
2473
2474	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2475		if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2476		    lockres->l_level == LKM_EXMODE &&
2477		    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2478			lockres->l_ops->set_lvb(lockres);
2479	}
2480
2481	if (lockres->l_flags & OCFS2_LOCK_BUSY)
2482		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2483		     lockres->l_name);
2484	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2485		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2486
2487	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2488		spin_unlock_irqrestore(&lockres->l_lock, flags);
2489		goto out;
2490	}
2491
2492	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2493
2494	/* make sure we never get here while waiting for an ast to
2495	 * fire. */
2496	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2497
2498	/* is this necessary? */
2499	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2500	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2501	spin_unlock_irqrestore(&lockres->l_lock, flags);
2502
2503	mlog(0, "lock %s\n", lockres->l_name);
2504
2505	status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2506			   ocfs2_unlock_ast, lockres);
2507	if (status != DLM_NORMAL) {
2508		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2509		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2510		dlm_print_one_lock(lockres->l_lksb.lockid);
2511		BUG();
2512	}
2513	mlog(0, "lock %s, successfull return from dlmunlock\n",
2514	     lockres->l_name);
2515
2516	ocfs2_wait_on_busy_lock(lockres);
2517out:
2518	mlog_exit(0);
2519	return 0;
2520}
2521
2522/* Mark the lockres as being dropped. It will no longer be
2523 * queued if blocking, but we still may have to wait on it
2524 * being dequeued from the vote thread before we can consider
2525 * it safe to drop.
2526 *
2527 * You can *not* attempt to call cluster_lock on this lockres anymore. */
2528void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2529{
2530	int status;
2531	struct ocfs2_mask_waiter mw;
2532	unsigned long flags;
2533
2534	ocfs2_init_mask_waiter(&mw);
2535
2536	spin_lock_irqsave(&lockres->l_lock, flags);
2537	lockres->l_flags |= OCFS2_LOCK_FREEING;
2538	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2539		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2540		spin_unlock_irqrestore(&lockres->l_lock, flags);
2541
2542		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2543
2544		status = ocfs2_wait_for_mask(&mw);
2545		if (status)
2546			mlog_errno(status);
2547
2548		spin_lock_irqsave(&lockres->l_lock, flags);
2549	}
2550	spin_unlock_irqrestore(&lockres->l_lock, flags);
2551}
2552
2553void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2554			       struct ocfs2_lock_res *lockres)
2555{
2556	int ret;
2557
2558	ocfs2_mark_lockres_freeing(lockres);
2559	ret = ocfs2_drop_lock(osb, lockres);
2560	if (ret)
2561		mlog_errno(ret);
2562}
2563
2564static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2565{
2566	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2567	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2568}
2569
2570int ocfs2_drop_inode_locks(struct inode *inode)
2571{
2572	int status, err;
2573
2574	mlog_entry_void();
2575
2576	/* No need to call ocfs2_mark_lockres_freeing here -
2577	 * ocfs2_clear_inode has done it for us. */
2578
2579	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2580			      &OCFS2_I(inode)->ip_open_lockres);
2581	if (err < 0)
2582		mlog_errno(err);
2583
2584	status = err;
2585
2586	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2587			      &OCFS2_I(inode)->ip_data_lockres);
2588	if (err < 0)
2589		mlog_errno(err);
2590	if (err < 0 && !status)
2591		status = err;
2592
2593	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2594			      &OCFS2_I(inode)->ip_meta_lockres);
2595	if (err < 0)
2596		mlog_errno(err);
2597	if (err < 0 && !status)
2598		status = err;
2599
2600	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2601			      &OCFS2_I(inode)->ip_rw_lockres);
2602	if (err < 0)
2603		mlog_errno(err);
2604	if (err < 0 && !status)
2605		status = err;
2606
2607	mlog_exit(status);
2608	return status;
2609}
2610
2611static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2612				      int new_level)
2613{
2614	assert_spin_locked(&lockres->l_lock);
2615
2616	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2617
2618	if (lockres->l_level <= new_level) {
2619		mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2620		     lockres->l_level, new_level);
2621		BUG();
2622	}
2623
2624	mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2625	     lockres->l_name, new_level, lockres->l_blocking);
2626
2627	lockres->l_action = OCFS2_AST_DOWNCONVERT;
2628	lockres->l_requested = new_level;
2629	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2630}
2631
2632static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2633				  struct ocfs2_lock_res *lockres,
2634				  int new_level,
2635				  int lvb)
2636{
2637	int ret, dlm_flags = LKM_CONVERT;
2638	enum dlm_status status;
2639
2640	mlog_entry_void();
2641
2642	if (lvb)
2643		dlm_flags |= LKM_VALBLK;
2644
2645	status = dlmlock(osb->dlm,
2646			 new_level,
2647			 &lockres->l_lksb,
2648			 dlm_flags,
2649			 lockres->l_name,
2650			 OCFS2_LOCK_ID_MAX_LEN - 1,
2651			 ocfs2_locking_ast,
2652			 lockres,
2653			 ocfs2_blocking_ast);
2654	if (status != DLM_NORMAL) {
2655		ocfs2_log_dlm_error("dlmlock", status, lockres);
2656		ret = -EINVAL;
2657		ocfs2_recover_from_dlm_error(lockres, 1);
2658		goto bail;
2659	}
2660
2661	ret = 0;
2662bail:
2663	mlog_exit(ret);
2664	return ret;
2665}
2666
2667/* returns 1 when the caller should unlock and call dlmunlock */
2668static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2669				        struct ocfs2_lock_res *lockres)
2670{
2671	assert_spin_locked(&lockres->l_lock);
2672
2673	mlog_entry_void();
2674	mlog(0, "lock %s\n", lockres->l_name);
2675
2676	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2677		/* If we're already trying to cancel a lock conversion
2678		 * then just drop the spinlock and allow the caller to
2679		 * requeue this lock. */
2680
2681		mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2682		return 0;
2683	}
2684
2685	/* were we in a convert when we got the bast fire? */
2686	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2687	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
2688	/* set things up for the unlockast to know to just
2689	 * clear out the ast_action and unset busy, etc. */
2690	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2691
2692	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2693			"lock %s, invalid flags: 0x%lx\n",
2694			lockres->l_name, lockres->l_flags);
2695
2696	return 1;
2697}
2698
2699static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2700				struct ocfs2_lock_res *lockres)
2701{
2702	int ret;
2703	enum dlm_status status;
2704
2705	mlog_entry_void();
2706	mlog(0, "lock %s\n", lockres->l_name);
2707
2708	ret = 0;
2709	status = dlmunlock(osb->dlm,
2710			   &lockres->l_lksb,
2711			   LKM_CANCEL,
2712			   ocfs2_unlock_ast,
2713			   lockres);
2714	if (status != DLM_NORMAL) {
2715		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2716		ret = -EINVAL;
2717		ocfs2_recover_from_dlm_error(lockres, 0);
2718	}
2719
2720	mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2721
2722	mlog_exit(ret);
2723	return ret;
2724}
2725
2726static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2727			      struct ocfs2_lock_res *lockres,
2728			      struct ocfs2_unblock_ctl *ctl)
2729{
2730	unsigned long flags;
2731	int blocking;
2732	int new_level;
2733	int ret = 0;
2734	int set_lvb = 0;
2735
2736	mlog_entry_void();
2737
2738	spin_lock_irqsave(&lockres->l_lock, flags);
2739
2740	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2741
2742recheck:
2743	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2744		ctl->requeue = 1;
2745		ret = ocfs2_prepare_cancel_convert(osb, lockres);
2746		spin_unlock_irqrestore(&lockres->l_lock, flags);
2747		if (ret) {
2748			ret = ocfs2_cancel_convert(osb, lockres);
2749			if (ret < 0)
2750				mlog_errno(ret);
2751		}
2752		goto leave;
2753	}
2754
2755	/* if we're blocking an exclusive and we have *any* holders,
2756	 * then requeue. */
2757	if ((lockres->l_blocking == LKM_EXMODE)
2758	    && (lockres->l_ex_holders || lockres->l_ro_holders))
2759		goto leave_requeue;
2760
2761	/* If it's a PR we're blocking, then only
2762	 * requeue if we've got any EX holders */
2763	if (lockres->l_blocking == LKM_PRMODE &&
2764	    lockres->l_ex_holders)
2765		goto leave_requeue;
2766
2767	/*
2768	 * Can we get a lock in this state if the holder counts are
2769	 * zero? The meta data unblock code used to check this.
2770	 */
2771	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2772	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2773		goto leave_requeue;
2774
2775	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2776
2777	if (lockres->l_ops->check_downconvert
2778	    && !lockres->l_ops->check_downconvert(lockres, new_level))
2779		goto leave_requeue;
2780
2781	/* If we get here, then we know that there are no more
2782	 * incompatible holders (and anyone asking for an incompatible
2783	 * lock is blocked). We can now downconvert the lock */
2784	if (!lockres->l_ops->downconvert_worker)
2785		goto downconvert;
2786
2787	/* Some lockres types want to do a bit of work before
2788	 * downconverting a lock. Allow that here. The worker function
2789	 * may sleep, so we save off a copy of what we're blocking as
2790	 * it may change while we're not holding the spin lock. */
2791	blocking = lockres->l_blocking;
2792	spin_unlock_irqrestore(&lockres->l_lock, flags);
2793
2794	ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
2795
2796	if (ctl->unblock_action == UNBLOCK_STOP_POST)
2797		goto leave;
2798
2799	spin_lock_irqsave(&lockres->l_lock, flags);
2800	if (blocking != lockres->l_blocking) {
2801		/* If this changed underneath us, then we can't drop
2802		 * it just yet. */
2803		goto recheck;
2804	}
2805
2806downconvert:
2807	ctl->requeue = 0;
2808
2809	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2810		if (lockres->l_level == LKM_EXMODE)
2811			set_lvb = 1;
2812
2813		/*
2814		 * We only set the lvb if the lock has been fully
2815		 * refreshed - otherwise we risk setting stale
2816		 * data. Otherwise, there's no need to actually clear
2817		 * out the lvb here as it's value is still valid.
2818		 */
2819		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2820			lockres->l_ops->set_lvb(lockres);
2821	}
2822
2823	ocfs2_prepare_downconvert(lockres, new_level);
2824	spin_unlock_irqrestore(&lockres->l_lock, flags);
2825	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2826leave:
2827	mlog_exit(ret);
2828	return ret;
2829
2830leave_requeue:
2831	spin_unlock_irqrestore(&lockres->l_lock, flags);
2832	ctl->requeue = 1;
2833
2834	mlog_exit(0);
2835	return 0;
2836}
2837
2838static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2839				     int blocking)
2840{
2841	struct inode *inode;
2842	struct address_space *mapping;
2843
2844       	inode = ocfs2_lock_res_inode(lockres);
2845	mapping = inode->i_mapping;
2846
2847	/*
2848	 * We need this before the filemap_fdatawrite() so that it can
2849	 * transfer the dirty bit from the PTE to the
2850	 * page. Unfortunately this means that even for EX->PR
2851	 * downconverts, we'll lose our mappings and have to build
2852	 * them up again.
2853	 */
2854	unmap_mapping_range(mapping, 0, 0, 0);
2855
2856	if (filemap_fdatawrite(mapping)) {
2857		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2858		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
2859	}
2860	sync_mapping_buffers(mapping);
2861	if (blocking == LKM_EXMODE) {
2862		truncate_inode_pages(mapping, 0);
2863	} else {
2864		/* We only need to wait on the I/O if we're not also
2865		 * truncating pages because truncate_inode_pages waits
2866		 * for us above. We don't truncate pages if we're
2867		 * blocking anything < EXMODE because we want to keep
2868		 * them around in that case. */
2869		filemap_fdatawait(mapping);
2870	}
2871
2872	return UNBLOCK_CONTINUE;
2873}
2874
2875static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
2876					int new_level)
2877{
2878	struct inode *inode = ocfs2_lock_res_inode(lockres);
2879	int checkpointed = ocfs2_inode_fully_checkpointed(inode);
2880
2881	BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2882	BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
2883
2884	if (checkpointed)
2885		return 1;
2886
2887	ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
2888	return 0;
2889}
2890
2891static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
2892{
2893	struct inode *inode = ocfs2_lock_res_inode(lockres);
2894
2895	__ocfs2_stuff_meta_lvb(inode);
2896}
2897
2898/*
2899 * Does the final reference drop on our dentry lock. Right now this
2900 * happens in the vote thread, but we could choose to simplify the
2901 * dlmglue API and push these off to the ocfs2_wq in the future.
2902 */
2903static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2904				     struct ocfs2_lock_res *lockres)
2905{
2906	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2907	ocfs2_dentry_lock_put(osb, dl);
2908}
2909
2910/*
2911 * d_delete() matching dentries before the lock downconvert.
2912 *
2913 * At this point, any process waiting to destroy the
2914 * dentry_lock due to last ref count is stopped by the
2915 * OCFS2_LOCK_QUEUED flag.
2916 *
2917 * We have two potential problems
2918 *
2919 * 1) If we do the last reference drop on our dentry_lock (via dput)
2920 *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
2921 *    the downconvert to finish. Instead we take an elevated
2922 *    reference and push the drop until after we've completed our
2923 *    unblock processing.
2924 *
2925 * 2) There might be another process with a final reference,
2926 *    waiting on us to finish processing. If this is the case, we
2927 *    detect it and exit out - there's no more dentries anyway.
2928 */
2929static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2930				       int blocking)
2931{
2932	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2933	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2934	struct dentry *dentry;
2935	unsigned long flags;
2936	int extra_ref = 0;
2937
2938	/*
2939	 * This node is blocking another node from getting a read
2940	 * lock. This happens when we've renamed within a
2941	 * directory. We've forced the other nodes to d_delete(), but
2942	 * we never actually dropped our lock because it's still
2943	 * valid. The downconvert code will retain a PR for this node,
2944	 * so there's no further work to do.
2945	 */
2946	if (blocking == LKM_PRMODE)
2947		return UNBLOCK_CONTINUE;
2948
2949	/*
2950	 * Mark this inode as potentially orphaned. The code in
2951	 * ocfs2_delete_inode() will figure out whether it actually
2952	 * needs to be freed or not.
2953	 */
2954	spin_lock(&oi->ip_lock);
2955	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2956	spin_unlock(&oi->ip_lock);
2957
2958	/*
2959	 * Yuck. We need to make sure however that the check of
2960	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
2961	 * respect to a reference decrement or the setting of that
2962	 * flag.
2963	 */
2964	spin_lock_irqsave(&lockres->l_lock, flags);
2965	spin_lock(&dentry_attach_lock);
2966	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2967	    && dl->dl_count) {
2968		dl->dl_count++;
2969		extra_ref = 1;
2970	}
2971	spin_unlock(&dentry_attach_lock);
2972	spin_unlock_irqrestore(&lockres->l_lock, flags);
2973
2974	mlog(0, "extra_ref = %d\n", extra_ref);
2975
2976	/*
2977	 * We have a process waiting on us in ocfs2_dentry_iput(),
2978	 * which means we can't have any more outstanding
2979	 * aliases. There's no need to do any more work.
2980	 */
2981	if (!extra_ref)
2982		return UNBLOCK_CONTINUE;
2983
2984	spin_lock(&dentry_attach_lock);
2985	while (1) {
2986		dentry = ocfs2_find_local_alias(dl->dl_inode,
2987						dl->dl_parent_blkno, 1);
2988		if (!dentry)
2989			break;
2990		spin_unlock(&dentry_attach_lock);
2991
2992		mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
2993		     dentry->d_name.name);
2994
2995		/*
2996		 * The following dcache calls may do an
2997		 * iput(). Normally we don't want that from the
2998		 * downconverting thread, but in this case it's ok
2999		 * because the requesting node already has an
3000		 * exclusive lock on the inode, so it can't be queued
3001		 * for a downconvert.
3002		 */
3003		d_delete(dentry);
3004		dput(dentry);
3005
3006		spin_lock(&dentry_attach_lock);
3007	}
3008	spin_unlock(&dentry_attach_lock);
3009
3010	/*
3011	 * If we are the last holder of this dentry lock, there is no
3012	 * reason to downconvert so skip straight to the unlock.
3013	 */
3014	if (dl->dl_count == 1)
3015		return UNBLOCK_STOP_POST;
3016
3017	return UNBLOCK_CONTINUE_POST;
3018}
3019
3020void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3021				struct ocfs2_lock_res *lockres)
3022{
3023	int status;
3024	struct ocfs2_unblock_ctl ctl = {0, 0,};
3025	unsigned long flags;
3026
3027	/* Our reference to the lockres in this function can be
3028	 * considered valid until we remove the OCFS2_LOCK_QUEUED
3029	 * flag. */
3030
3031	mlog_entry_void();
3032
3033	BUG_ON(!lockres);
3034	BUG_ON(!lockres->l_ops);
3035
3036	mlog(0, "lockres %s blocked.\n", lockres->l_name);
3037
3038	/* Detect whether a lock has been marked as going away while
3039	 * the vote thread was processing other things. A lock can
3040	 * still be marked with OCFS2_LOCK_FREEING after this check,
3041	 * but short circuiting here will still save us some
3042	 * performance. */
3043	spin_lock_irqsave(&lockres->l_lock, flags);
3044	if (lockres->l_flags & OCFS2_LOCK_FREEING)
3045		goto unqueue;
3046	spin_unlock_irqrestore(&lockres->l_lock, flags);
3047
3048	status = ocfs2_unblock_lock(osb, lockres, &ctl);
3049	if (status < 0)
3050		mlog_errno(status);
3051
3052	spin_lock_irqsave(&lockres->l_lock, flags);
3053unqueue:
3054	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3055		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3056	} else
3057		ocfs2_schedule_blocked_lock(osb, lockres);
3058
3059	mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3060	     ctl.requeue ? "yes" : "no");
3061	spin_unlock_irqrestore(&lockres->l_lock, flags);
3062
3063	if (ctl.unblock_action != UNBLOCK_CONTINUE
3064	    && lockres->l_ops->post_unlock)
3065		lockres->l_ops->post_unlock(osb, lockres);
3066
3067	mlog_exit_void();
3068}
3069
3070static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3071					struct ocfs2_lock_res *lockres)
3072{
3073	mlog_entry_void();
3074
3075	assert_spin_locked(&lockres->l_lock);
3076
3077	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3078		/* Do not schedule a lock for downconvert when it's on
3079		 * the way to destruction - any nodes wanting access
3080		 * to the resource will get it soon. */
3081		mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3082		     lockres->l_name, lockres->l_flags);
3083		return;
3084	}
3085
3086	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3087
3088	spin_lock(&osb->vote_task_lock);
3089	if (list_empty(&lockres->l_blocked_list)) {
3090		list_add_tail(&lockres->l_blocked_list,
3091			      &osb->blocked_lock_list);
3092		osb->blocked_lock_count++;
3093	}
3094	spin_unlock(&osb->vote_task_lock);
3095
3096	mlog_exit_void();
3097}
3098