1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmglue.c
5 *
6 * Code which implements an OCFS2 specific interface to our DLM.
7 *
8 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/types.h>
27#include <linux/slab.h>
28#include <linux/highmem.h>
29#include <linux/mm.h>
30#include <linux/kthread.h>
31#include <linux/pagemap.h>
32#include <linux/debugfs.h>
33#include <linux/seq_file.h>
34#include <linux/time.h>
35#include <linux/quotaops.h>
36
37#define MLOG_MASK_PREFIX ML_DLM_GLUE
38#include <cluster/masklog.h>
39
40#include "ocfs2.h"
41#include "ocfs2_lockingver.h"
42
43#include "alloc.h"
44#include "dcache.h"
45#include "dlmglue.h"
46#include "extent_map.h"
47#include "file.h"
48#include "heartbeat.h"
49#include "inode.h"
50#include "journal.h"
51#include "stackglue.h"
52#include "slot_map.h"
53#include "super.h"
54#include "uptodate.h"
55#include "quota.h"
56#include "refcounttree.h"
57
58#include "buffer_head_io.h"
59
60struct ocfs2_mask_waiter {
61	struct list_head	mw_item;
62	int			mw_status;
63	struct completion	mw_complete;
64	unsigned long		mw_mask;
65	unsigned long		mw_goal;
66#ifdef CONFIG_OCFS2_FS_STATS
67	unsigned long long 	mw_lock_start;
68#endif
69};
70
71static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
72static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
73static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
74static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
75
76/*
77 * Return value from ->downconvert_worker functions.
78 *
79 * These control the precise actions of ocfs2_unblock_lock()
80 * and ocfs2_process_blocked_lock()
81 *
82 */
83enum ocfs2_unblock_action {
84	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
85	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
86				      * ->post_unlock callback */
87	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
88				      * ->post_unlock() callback. */
89};
90
91struct ocfs2_unblock_ctl {
92	int requeue;
93	enum ocfs2_unblock_action unblock_action;
94};
95
96/* Lockdep class keys */
97struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
98
99static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
100					int new_level);
101static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
102
103static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
104				     int blocking);
105
106static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
107				       int blocking);
108
109static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
110				     struct ocfs2_lock_res *lockres);
111
112static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
113
114static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
115					    int new_level);
116static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
117					 int blocking);
118
119#define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
120
121/* This aids in debugging situations where a bad LVB might be involved. */
122static void ocfs2_dump_meta_lvb_info(u64 level,
123				     const char *function,
124				     unsigned int line,
125				     struct ocfs2_lock_res *lockres)
126{
127	struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
128
129	mlog(level, "LVB information for %s (called from %s:%u):\n",
130	     lockres->l_name, function, line);
131	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
132	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
133	     be32_to_cpu(lvb->lvb_igeneration));
134	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
135	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
136	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
137	     be16_to_cpu(lvb->lvb_imode));
138	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
139	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
140	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
141	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
142	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
143	     be32_to_cpu(lvb->lvb_iattr));
144}
145
146
147/*
148 * OCFS2 Lock Resource Operations
149 *
150 * These fine tune the behavior of the generic dlmglue locking infrastructure.
151 *
152 * The most basic of lock types can point ->l_priv to their respective
153 * struct ocfs2_super and allow the default actions to manage things.
154 *
155 * Right now, each lock type also needs to implement an init function,
156 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
157 * should be called when the lock is no longer needed (i.e., object
158 * destruction time).
159 */
160struct ocfs2_lock_res_ops {
161	/*
162	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
163	 * this callback if ->l_priv is not an ocfs2_super pointer
164	 */
165	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
166
167	/*
168	 * Optionally called in the downconvert thread after a
169	 * successful downconvert. The lockres will not be referenced
170	 * after this callback is called, so it is safe to free
171	 * memory, etc.
172	 *
173	 * The exact semantics of when this is called are controlled
174	 * by ->downconvert_worker()
175	 */
176	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
177
178	/*
179	 * Allow a lock type to add checks to determine whether it is
180	 * safe to downconvert a lock. Return 0 to re-queue the
181	 * downconvert at a later time, nonzero to continue.
182	 *
183	 * For most locks, the default checks that there are no
184	 * incompatible holders are sufficient.
185	 *
186	 * Called with the lockres spinlock held.
187	 */
188	int (*check_downconvert)(struct ocfs2_lock_res *, int);
189
190	/*
191	 * Allows a lock type to populate the lock value block. This
192	 * is called on downconvert, and when we drop a lock.
193	 *
194	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
195	 * in the flags field.
196	 *
197	 * Called with the lockres spinlock held.
198	 */
199	void (*set_lvb)(struct ocfs2_lock_res *);
200
201	/*
202	 * Called from the downconvert thread when it is determined
203	 * that a lock will be downconverted. This is called without
204	 * any locks held so the function can do work that might
205	 * schedule (syncing out data, etc).
206	 *
207	 * This should return any one of the ocfs2_unblock_action
208	 * values, depending on what it wants the thread to do.
209	 */
210	int (*downconvert_worker)(struct ocfs2_lock_res *, int);
211
212	/*
213	 * LOCK_TYPE_* flags which describe the specific requirements
214	 * of a lock type. Descriptions of each individual flag follow.
215	 */
216	int flags;
217};
218
219/*
220 * Some locks want to "refresh" potentially stale data when a
221 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
222 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
223 * individual lockres l_flags member from the ast function. It is
224 * expected that the locking wrapper will clear the
225 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
226 */
227#define LOCK_TYPE_REQUIRES_REFRESH 0x1
228
229/*
230 * Indicate that a lock type makes use of the lock value block. The
231 * ->set_lvb lock type callback must be defined.
232 */
233#define LOCK_TYPE_USES_LVB		0x2
234
235static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
236	.get_osb	= ocfs2_get_inode_osb,
237	.flags		= 0,
238};
239
240static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
241	.get_osb	= ocfs2_get_inode_osb,
242	.check_downconvert = ocfs2_check_meta_downconvert,
243	.set_lvb	= ocfs2_set_meta_lvb,
244	.downconvert_worker = ocfs2_data_convert_worker,
245	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
246};
247
248static struct ocfs2_lock_res_ops ocfs2_super_lops = {
249	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
250};
251
252static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
253	.flags		= 0,
254};
255
256static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
257	.flags		= 0,
258};
259
260static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
261	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
262};
263
264static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
265	.get_osb	= ocfs2_get_dentry_osb,
266	.post_unlock	= ocfs2_dentry_post_unlock,
267	.downconvert_worker = ocfs2_dentry_convert_worker,
268	.flags		= 0,
269};
270
271static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
272	.get_osb	= ocfs2_get_inode_osb,
273	.flags		= 0,
274};
275
276static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
277	.get_osb	= ocfs2_get_file_osb,
278	.flags		= 0,
279};
280
281static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
282	.set_lvb	= ocfs2_set_qinfo_lvb,
283	.get_osb	= ocfs2_get_qinfo_osb,
284	.flags		= LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
285};
286
287static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = {
288	.check_downconvert = ocfs2_check_refcount_downconvert,
289	.downconvert_worker = ocfs2_refcount_convert_worker,
290	.flags		= 0,
291};
292
293static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
294{
295	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
296		lockres->l_type == OCFS2_LOCK_TYPE_RW ||
297		lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
298}
299
300static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
301{
302	return container_of(lksb, struct ocfs2_lock_res, l_lksb);
303}
304
305static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
306{
307	BUG_ON(!ocfs2_is_inode_lock(lockres));
308
309	return (struct inode *) lockres->l_priv;
310}
311
312static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
313{
314	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
315
316	return (struct ocfs2_dentry_lock *)lockres->l_priv;
317}
318
319static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
320{
321	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
322
323	return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
324}
325
326static inline struct ocfs2_refcount_tree *
327ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res)
328{
329	return container_of(res, struct ocfs2_refcount_tree, rf_lockres);
330}
331
332static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
333{
334	if (lockres->l_ops->get_osb)
335		return lockres->l_ops->get_osb(lockres);
336
337	return (struct ocfs2_super *)lockres->l_priv;
338}
339
340static int ocfs2_lock_create(struct ocfs2_super *osb,
341			     struct ocfs2_lock_res *lockres,
342			     int level,
343			     u32 dlm_flags);
344static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
345						     int wanted);
346static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
347				   struct ocfs2_lock_res *lockres,
348				   int level, unsigned long caller_ip);
349static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
350					struct ocfs2_lock_res *lockres,
351					int level)
352{
353	__ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
354}
355
356static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
357static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
358static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
359static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
360static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
361					struct ocfs2_lock_res *lockres);
362static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
363						int convert);
364#define ocfs2_log_dlm_error(_func, _err, _lockres) do {					\
365	if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY)				\
366		mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n",	\
367		     _err, _func, _lockres->l_name);					\
368	else										\
369		mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n",	\
370		     _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name,	\
371		     (unsigned int)ocfs2_get_dentry_lock_ino(_lockres));		\
372} while (0)
373static int ocfs2_downconvert_thread(void *arg);
374static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
375					struct ocfs2_lock_res *lockres);
376static int ocfs2_inode_lock_update(struct inode *inode,
377				  struct buffer_head **bh);
378static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
379static inline int ocfs2_highest_compat_lock_level(int level);
380static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
381					      int new_level);
382static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
383				  struct ocfs2_lock_res *lockres,
384				  int new_level,
385				  int lvb,
386				  unsigned int generation);
387static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
388				        struct ocfs2_lock_res *lockres);
389static int ocfs2_cancel_convert(struct ocfs2_super *osb,
390				struct ocfs2_lock_res *lockres);
391
392
393static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
394				  u64 blkno,
395				  u32 generation,
396				  char *name)
397{
398	int len;
399
400	mlog_entry_void();
401
402	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
403
404	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
405		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
406		       (long long)blkno, generation);
407
408	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
409
410	mlog(0, "built lock resource with name: %s\n", name);
411
412	mlog_exit_void();
413}
414
415static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
416
417static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
418				       struct ocfs2_dlm_debug *dlm_debug)
419{
420	mlog(0, "Add tracking for lockres %s\n", res->l_name);
421
422	spin_lock(&ocfs2_dlm_tracking_lock);
423	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
424	spin_unlock(&ocfs2_dlm_tracking_lock);
425}
426
427static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
428{
429	spin_lock(&ocfs2_dlm_tracking_lock);
430	if (!list_empty(&res->l_debug_list))
431		list_del_init(&res->l_debug_list);
432	spin_unlock(&ocfs2_dlm_tracking_lock);
433}
434
435#ifdef CONFIG_OCFS2_FS_STATS
436static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
437{
438	res->l_lock_num_prmode = 0;
439	res->l_lock_num_prmode_failed = 0;
440	res->l_lock_total_prmode = 0;
441	res->l_lock_max_prmode = 0;
442	res->l_lock_num_exmode = 0;
443	res->l_lock_num_exmode_failed = 0;
444	res->l_lock_total_exmode = 0;
445	res->l_lock_max_exmode = 0;
446	res->l_lock_refresh = 0;
447}
448
449static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
450				    struct ocfs2_mask_waiter *mw, int ret)
451{
452	unsigned long long *num, *sum;
453	unsigned int *max, *failed;
454	struct timespec ts = current_kernel_time();
455	unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start;
456
457	if (level == LKM_PRMODE) {
458		num = &res->l_lock_num_prmode;
459		sum = &res->l_lock_total_prmode;
460		max = &res->l_lock_max_prmode;
461		failed = &res->l_lock_num_prmode_failed;
462	} else if (level == LKM_EXMODE) {
463		num = &res->l_lock_num_exmode;
464		sum = &res->l_lock_total_exmode;
465		max = &res->l_lock_max_exmode;
466		failed = &res->l_lock_num_exmode_failed;
467	} else
468		return;
469
470	(*num)++;
471	(*sum) += time;
472	if (time > *max)
473		*max = time;
474	if (ret)
475		(*failed)++;
476}
477
478static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
479{
480	lockres->l_lock_refresh++;
481}
482
483static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
484{
485	struct timespec ts = current_kernel_time();
486	mw->mw_lock_start = timespec_to_ns(&ts);
487}
488#else
489static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
490{
491}
492static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
493			   int level, struct ocfs2_mask_waiter *mw, int ret)
494{
495}
496static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
497{
498}
499static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
500{
501}
502#endif
503
504static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
505				       struct ocfs2_lock_res *res,
506				       enum ocfs2_lock_type type,
507				       struct ocfs2_lock_res_ops *ops,
508				       void *priv)
509{
510	res->l_type          = type;
511	res->l_ops           = ops;
512	res->l_priv          = priv;
513
514	res->l_level         = DLM_LOCK_IV;
515	res->l_requested     = DLM_LOCK_IV;
516	res->l_blocking      = DLM_LOCK_IV;
517	res->l_action        = OCFS2_AST_INVALID;
518	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
519
520	res->l_flags         = OCFS2_LOCK_INITIALIZED;
521
522	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
523
524	ocfs2_init_lock_stats(res);
525#ifdef CONFIG_DEBUG_LOCK_ALLOC
526	if (type != OCFS2_LOCK_TYPE_OPEN)
527		lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
528				 &lockdep_keys[type], 0);
529	else
530		res->l_lockdep_map.key = NULL;
531#endif
532}
533
534void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
535{
536	/* This also clears out the lock status block */
537	memset(res, 0, sizeof(struct ocfs2_lock_res));
538	spin_lock_init(&res->l_lock);
539	init_waitqueue_head(&res->l_event);
540	INIT_LIST_HEAD(&res->l_blocked_list);
541	INIT_LIST_HEAD(&res->l_mask_waiters);
542}
543
544void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
545			       enum ocfs2_lock_type type,
546			       unsigned int generation,
547			       struct inode *inode)
548{
549	struct ocfs2_lock_res_ops *ops;
550
551	switch(type) {
552		case OCFS2_LOCK_TYPE_RW:
553			ops = &ocfs2_inode_rw_lops;
554			break;
555		case OCFS2_LOCK_TYPE_META:
556			ops = &ocfs2_inode_inode_lops;
557			break;
558		case OCFS2_LOCK_TYPE_OPEN:
559			ops = &ocfs2_inode_open_lops;
560			break;
561		default:
562			mlog_bug_on_msg(1, "type: %d\n", type);
563			ops = NULL; /* thanks, gcc */
564			break;
565	};
566
567	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
568			      generation, res->l_name);
569	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
570}
571
572static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
573{
574	struct inode *inode = ocfs2_lock_res_inode(lockres);
575
576	return OCFS2_SB(inode->i_sb);
577}
578
579static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
580{
581	struct ocfs2_mem_dqinfo *info = lockres->l_priv;
582
583	return OCFS2_SB(info->dqi_gi.dqi_sb);
584}
585
586static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
587{
588	struct ocfs2_file_private *fp = lockres->l_priv;
589
590	return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
591}
592
593static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
594{
595	__be64 inode_blkno_be;
596
597	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
598	       sizeof(__be64));
599
600	return be64_to_cpu(inode_blkno_be);
601}
602
603static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
604{
605	struct ocfs2_dentry_lock *dl = lockres->l_priv;
606
607	return OCFS2_SB(dl->dl_inode->i_sb);
608}
609
610void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
611				u64 parent, struct inode *inode)
612{
613	int len;
614	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
615	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
616	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
617
618	ocfs2_lock_res_init_once(lockres);
619
620	/*
621	 * Unfortunately, the standard lock naming scheme won't work
622	 * here because we have two 16 byte values to use. Instead,
623	 * we'll stuff the inode number as a binary value. We still
624	 * want error prints to show something without garbling the
625	 * display, so drop a null byte in there before the inode
626	 * number. A future version of OCFS2 will likely use all
627	 * binary lock names. The stringified names have been a
628	 * tremendous aid in debugging, but now that the debugfs
629	 * interface exists, we can mangle things there if need be.
630	 *
631	 * NOTE: We also drop the standard "pad" value (the total lock
632	 * name size stays the same though - the last part is all
633	 * zeros due to the memset in ocfs2_lock_res_init_once()
634	 */
635	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
636		       "%c%016llx",
637		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
638		       (long long)parent);
639
640	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
641
642	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
643	       sizeof(__be64));
644
645	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
646				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
647				   dl);
648}
649
650static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
651				      struct ocfs2_super *osb)
652{
653	/* Superblock lockres doesn't come from a slab so we call init
654	 * once on it manually.  */
655	ocfs2_lock_res_init_once(res);
656	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
657			      0, res->l_name);
658	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
659				   &ocfs2_super_lops, osb);
660}
661
662static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
663				       struct ocfs2_super *osb)
664{
665	/* Rename lockres doesn't come from a slab so we call init
666	 * once on it manually.  */
667	ocfs2_lock_res_init_once(res);
668	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
669	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
670				   &ocfs2_rename_lops, osb);
671}
672
673static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
674					 struct ocfs2_super *osb)
675{
676	/* nfs_sync lockres doesn't come from a slab so we call init
677	 * once on it manually.  */
678	ocfs2_lock_res_init_once(res);
679	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
680	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
681				   &ocfs2_nfs_sync_lops, osb);
682}
683
684static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
685					    struct ocfs2_super *osb)
686{
687	ocfs2_lock_res_init_once(res);
688	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
689	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
690				   &ocfs2_orphan_scan_lops, osb);
691}
692
693void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
694			      struct ocfs2_file_private *fp)
695{
696	struct inode *inode = fp->fp_file->f_mapping->host;
697	struct ocfs2_inode_info *oi = OCFS2_I(inode);
698
699	ocfs2_lock_res_init_once(lockres);
700	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
701			      inode->i_generation, lockres->l_name);
702	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
703				   OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
704				   fp);
705	lockres->l_flags |= OCFS2_LOCK_NOCACHE;
706}
707
708void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
709			       struct ocfs2_mem_dqinfo *info)
710{
711	ocfs2_lock_res_init_once(lockres);
712	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
713			      0, lockres->l_name);
714	ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
715				   OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
716				   info);
717}
718
719void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres,
720				  struct ocfs2_super *osb, u64 ref_blkno,
721				  unsigned int generation)
722{
723	ocfs2_lock_res_init_once(lockres);
724	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno,
725			      generation, lockres->l_name);
726	ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT,
727				   &ocfs2_refcount_block_lops, osb);
728}
729
730void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
731{
732	mlog_entry_void();
733
734	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
735		return;
736
737	ocfs2_remove_lockres_tracking(res);
738
739	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
740			"Lockres %s is on the blocked list\n",
741			res->l_name);
742	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
743			"Lockres %s has mask waiters pending\n",
744			res->l_name);
745	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
746			"Lockres %s is locked\n",
747			res->l_name);
748	mlog_bug_on_msg(res->l_ro_holders,
749			"Lockres %s has %u ro holders\n",
750			res->l_name, res->l_ro_holders);
751	mlog_bug_on_msg(res->l_ex_holders,
752			"Lockres %s has %u ex holders\n",
753			res->l_name, res->l_ex_holders);
754
755	/* Need to clear out the lock status block for the dlm */
756	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
757
758	res->l_flags = 0UL;
759	mlog_exit_void();
760}
761
762static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
763				     int level)
764{
765	mlog_entry_void();
766
767	BUG_ON(!lockres);
768
769	switch(level) {
770	case DLM_LOCK_EX:
771		lockres->l_ex_holders++;
772		break;
773	case DLM_LOCK_PR:
774		lockres->l_ro_holders++;
775		break;
776	default:
777		BUG();
778	}
779
780	mlog_exit_void();
781}
782
783static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
784				     int level)
785{
786	mlog_entry_void();
787
788	BUG_ON(!lockres);
789
790	switch(level) {
791	case DLM_LOCK_EX:
792		BUG_ON(!lockres->l_ex_holders);
793		lockres->l_ex_holders--;
794		break;
795	case DLM_LOCK_PR:
796		BUG_ON(!lockres->l_ro_holders);
797		lockres->l_ro_holders--;
798		break;
799	default:
800		BUG();
801	}
802	mlog_exit_void();
803}
804
805/* WARNING: This function lives in a world where the only three lock
806 * levels are EX, PR, and NL. It *will* have to be adjusted when more
807 * lock types are added. */
808static inline int ocfs2_highest_compat_lock_level(int level)
809{
810	int new_level = DLM_LOCK_EX;
811
812	if (level == DLM_LOCK_EX)
813		new_level = DLM_LOCK_NL;
814	else if (level == DLM_LOCK_PR)
815		new_level = DLM_LOCK_PR;
816	return new_level;
817}
818
819static void lockres_set_flags(struct ocfs2_lock_res *lockres,
820			      unsigned long newflags)
821{
822	struct ocfs2_mask_waiter *mw, *tmp;
823
824 	assert_spin_locked(&lockres->l_lock);
825
826	lockres->l_flags = newflags;
827
828	list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
829		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
830			continue;
831
832		list_del_init(&mw->mw_item);
833		mw->mw_status = 0;
834		complete(&mw->mw_complete);
835	}
836}
837static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
838{
839	lockres_set_flags(lockres, lockres->l_flags | or);
840}
841static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
842				unsigned long clear)
843{
844	lockres_set_flags(lockres, lockres->l_flags & ~clear);
845}
846
847static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
848{
849	mlog_entry_void();
850
851	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
852	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
853	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
854	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
855
856	lockres->l_level = lockres->l_requested;
857	if (lockres->l_level <=
858	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
859		lockres->l_blocking = DLM_LOCK_NL;
860		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
861	}
862	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
863
864	mlog_exit_void();
865}
866
867static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
868{
869	mlog_entry_void();
870
871	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
872	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
873
874	/* Convert from RO to EX doesn't really need anything as our
875	 * information is already up to data. Convert from NL to
876	 * *anything* however should mark ourselves as needing an
877	 * update */
878	if (lockres->l_level == DLM_LOCK_NL &&
879	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
880		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
881
882	lockres->l_level = lockres->l_requested;
883
884	/*
885	 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
886	 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from
887	 * downconverting the lock before the upconvert has fully completed.
888	 */
889	lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
890
891	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
892
893	mlog_exit_void();
894}
895
896static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
897{
898	mlog_entry_void();
899
900	BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
901	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
902
903	if (lockres->l_requested > DLM_LOCK_NL &&
904	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
905	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
906		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
907
908	lockres->l_level = lockres->l_requested;
909	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
910	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
911
912	mlog_exit_void();
913}
914
915static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
916				     int level)
917{
918	int needs_downconvert = 0;
919	mlog_entry_void();
920
921	assert_spin_locked(&lockres->l_lock);
922
923	if (level > lockres->l_blocking) {
924		/* only schedule a downconvert if we haven't already scheduled
925		 * one that goes low enough to satisfy the level we're
926		 * blocking.  this also catches the case where we get
927		 * duplicate BASTs */
928		if (ocfs2_highest_compat_lock_level(level) <
929		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
930			needs_downconvert = 1;
931
932		lockres->l_blocking = level;
933	}
934
935	mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n",
936	     lockres->l_name, level, lockres->l_level, lockres->l_blocking,
937	     needs_downconvert);
938
939	if (needs_downconvert)
940		lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
941
942	mlog_exit(needs_downconvert);
943	return needs_downconvert;
944}
945
946/*
947 * OCFS2_LOCK_PENDING and l_pending_gen.
948 *
949 * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
950 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
951 * for more details on the race.
952 *
953 * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
954 * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
955 * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
956 * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
957 * the caller is going to try to clear PENDING again.  If nothing else is
958 * happening, __lockres_clear_pending() sees PENDING is unset and does
959 * nothing.
960 *
961 * But what if another path (eg downconvert thread) has just started a
962 * new locking action?  The other path has re-set PENDING.  Our path
963 * cannot clear PENDING, because that will re-open the original race
964 * window.
965 *
966 * [Example]
967 *
968 * ocfs2_meta_lock()
969 *  ocfs2_cluster_lock()
970 *   set BUSY
971 *   set PENDING
972 *   drop l_lock
973 *   ocfs2_dlm_lock()
974 *    ocfs2_locking_ast()		ocfs2_downconvert_thread()
975 *     clear PENDING			 ocfs2_unblock_lock()
976 *					  take_l_lock
977 *					  !BUSY
978 *					  ocfs2_prepare_downconvert()
979 *					   set BUSY
980 *					   set PENDING
981 *					  drop l_lock
982 *   take l_lock
983 *   clear PENDING
984 *   drop l_lock
985 *			<window>
986 *					  ocfs2_dlm_lock()
987 *
988 * So as you can see, we now have a window where l_lock is not held,
989 * PENDING is not set, and ocfs2_dlm_lock() has not been called.
990 *
991 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
992 * set by ocfs2_prepare_downconvert().  That wasn't nice.
993 *
994 * To solve this we introduce l_pending_gen.  A call to
995 * lockres_clear_pending() will only do so when it is passed a generation
996 * number that matches the lockres.  lockres_set_pending() will return the
997 * current generation number.  When ocfs2_cluster_lock() goes to clear
998 * PENDING, it passes the generation it got from set_pending().  In our
999 * example above, the generation numbers will *not* match.  Thus,
1000 * ocfs2_cluster_lock() will not clear the PENDING set by
1001 * ocfs2_prepare_downconvert().
1002 */
1003
1004/* Unlocked version for ocfs2_locking_ast() */
1005static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
1006				    unsigned int generation,
1007				    struct ocfs2_super *osb)
1008{
1009	assert_spin_locked(&lockres->l_lock);
1010
1011	/*
1012	 * The ast and locking functions can race us here.  The winner
1013	 * will clear pending, the loser will not.
1014	 */
1015	if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
1016	    (lockres->l_pending_gen != generation))
1017		return;
1018
1019	lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
1020	lockres->l_pending_gen++;
1021
1022	/*
1023	 * The downconvert thread may have skipped us because we
1024	 * were PENDING.  Wake it up.
1025	 */
1026	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1027		ocfs2_wake_downconvert_thread(osb);
1028}
1029
1030/* Locked version for callers of ocfs2_dlm_lock() */
1031static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
1032				  unsigned int generation,
1033				  struct ocfs2_super *osb)
1034{
1035	unsigned long flags;
1036
1037	spin_lock_irqsave(&lockres->l_lock, flags);
1038	__lockres_clear_pending(lockres, generation, osb);
1039	spin_unlock_irqrestore(&lockres->l_lock, flags);
1040}
1041
1042static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
1043{
1044	assert_spin_locked(&lockres->l_lock);
1045	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
1046
1047	lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
1048
1049	return lockres->l_pending_gen;
1050}
1051
1052static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
1053{
1054	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1055	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1056	int needs_downconvert;
1057	unsigned long flags;
1058
1059	BUG_ON(level <= DLM_LOCK_NL);
1060
1061	mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, "
1062	     "type %s\n", lockres->l_name, level, lockres->l_level,
1063	     ocfs2_lock_type_string(lockres->l_type));
1064
1065	/*
1066	 * We can skip the bast for locks which don't enable caching -
1067	 * they'll be dropped at the earliest possible time anyway.
1068	 */
1069	if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
1070		return;
1071
1072	spin_lock_irqsave(&lockres->l_lock, flags);
1073	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
1074	if (needs_downconvert)
1075		ocfs2_schedule_blocked_lock(osb, lockres);
1076	spin_unlock_irqrestore(&lockres->l_lock, flags);
1077
1078	wake_up(&lockres->l_event);
1079
1080	ocfs2_wake_downconvert_thread(osb);
1081}
1082
1083static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
1084{
1085	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1086	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1087	unsigned long flags;
1088	int status;
1089
1090	spin_lock_irqsave(&lockres->l_lock, flags);
1091
1092	status = ocfs2_dlm_lock_status(&lockres->l_lksb);
1093
1094	if (status == -EAGAIN) {
1095		lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1096		goto out;
1097	}
1098
1099	if (status) {
1100		mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
1101		     lockres->l_name, status);
1102		spin_unlock_irqrestore(&lockres->l_lock, flags);
1103		return;
1104	}
1105
1106	mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, "
1107	     "level %d => %d\n", lockres->l_name, lockres->l_action,
1108	     lockres->l_unlock_action, lockres->l_level, lockres->l_requested);
1109
1110	switch(lockres->l_action) {
1111	case OCFS2_AST_ATTACH:
1112		ocfs2_generic_handle_attach_action(lockres);
1113		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
1114		break;
1115	case OCFS2_AST_CONVERT:
1116		ocfs2_generic_handle_convert_action(lockres);
1117		break;
1118	case OCFS2_AST_DOWNCONVERT:
1119		ocfs2_generic_handle_downconvert_action(lockres);
1120		break;
1121	default:
1122		mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, "
1123		     "flags 0x%lx, unlock: %u\n",
1124		     lockres->l_name, lockres->l_action, lockres->l_flags,
1125		     lockres->l_unlock_action);
1126		BUG();
1127	}
1128out:
1129	/* set it to something invalid so if we get called again we
1130	 * can catch it. */
1131	lockres->l_action = OCFS2_AST_INVALID;
1132
1133	/* Did we try to cancel this lock?  Clear that state */
1134	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
1135		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1136
1137	/*
1138	 * We may have beaten the locking functions here.  We certainly
1139	 * know that dlm_lock() has been called :-)
1140	 * Because we can't have two lock calls in flight at once, we
1141	 * can use lockres->l_pending_gen.
1142	 */
1143	__lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
1144
1145	wake_up(&lockres->l_event);
1146	spin_unlock_irqrestore(&lockres->l_lock, flags);
1147}
1148
1149static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
1150{
1151	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1152	unsigned long flags;
1153
1154	mlog_entry_void();
1155
1156	mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n",
1157	     lockres->l_name, lockres->l_unlock_action);
1158
1159	spin_lock_irqsave(&lockres->l_lock, flags);
1160	if (error) {
1161		mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
1162		     "unlock_action %d\n", error, lockres->l_name,
1163		     lockres->l_unlock_action);
1164		spin_unlock_irqrestore(&lockres->l_lock, flags);
1165		mlog_exit_void();
1166		return;
1167	}
1168
1169	switch(lockres->l_unlock_action) {
1170	case OCFS2_UNLOCK_CANCEL_CONVERT:
1171		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
1172		lockres->l_action = OCFS2_AST_INVALID;
1173		/* Downconvert thread may have requeued this lock, we
1174		 * need to wake it. */
1175		if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1176			ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
1177		break;
1178	case OCFS2_UNLOCK_DROP_LOCK:
1179		lockres->l_level = DLM_LOCK_IV;
1180		break;
1181	default:
1182		BUG();
1183	}
1184
1185	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1186	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1187	wake_up(&lockres->l_event);
1188	spin_unlock_irqrestore(&lockres->l_lock, flags);
1189
1190	mlog_exit_void();
1191}
1192
1193/*
1194 * This is the filesystem locking protocol.  It provides the lock handling
1195 * hooks for the underlying DLM.  It has a maximum version number.
1196 * The version number allows interoperability with systems running at
1197 * the same major number and an equal or smaller minor number.
1198 *
1199 * Whenever the filesystem does new things with locks (adds or removes a
1200 * lock, orders them differently, does different things underneath a lock),
1201 * the version must be changed.  The protocol is negotiated when joining
1202 * the dlm domain.  A node may join the domain if its major version is
1203 * identical to all other nodes and its minor version is greater than
1204 * or equal to all other nodes.  When its minor version is greater than
1205 * the other nodes, it will run at the minor version specified by the
1206 * other nodes.
1207 *
1208 * If a locking change is made that will not be compatible with older
1209 * versions, the major number must be increased and the minor version set
1210 * to zero.  If a change merely adds a behavior that can be disabled when
1211 * speaking to older versions, the minor version must be increased.  If a
1212 * change adds a fully backwards compatible change (eg, LVB changes that
1213 * are just ignored by older versions), the version does not need to be
1214 * updated.
1215 */
1216static struct ocfs2_locking_protocol lproto = {
1217	.lp_max_version = {
1218		.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
1219		.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
1220	},
1221	.lp_lock_ast		= ocfs2_locking_ast,
1222	.lp_blocking_ast	= ocfs2_blocking_ast,
1223	.lp_unlock_ast		= ocfs2_unlock_ast,
1224};
1225
1226void ocfs2_set_locking_protocol(void)
1227{
1228	ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
1229}
1230
1231static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1232						int convert)
1233{
1234	unsigned long flags;
1235
1236	mlog_entry_void();
1237	spin_lock_irqsave(&lockres->l_lock, flags);
1238	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1239	lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1240	if (convert)
1241		lockres->l_action = OCFS2_AST_INVALID;
1242	else
1243		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1244	spin_unlock_irqrestore(&lockres->l_lock, flags);
1245
1246	wake_up(&lockres->l_event);
1247	mlog_exit_void();
1248}
1249
1250/* Note: If we detect another process working on the lock (i.e.,
1251 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
1252 * to do the right thing in that case.
1253 */
1254static int ocfs2_lock_create(struct ocfs2_super *osb,
1255			     struct ocfs2_lock_res *lockres,
1256			     int level,
1257			     u32 dlm_flags)
1258{
1259	int ret = 0;
1260	unsigned long flags;
1261	unsigned int gen;
1262
1263	mlog_entry_void();
1264
1265	mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
1266	     dlm_flags);
1267
1268	spin_lock_irqsave(&lockres->l_lock, flags);
1269	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
1270	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
1271		spin_unlock_irqrestore(&lockres->l_lock, flags);
1272		goto bail;
1273	}
1274
1275	lockres->l_action = OCFS2_AST_ATTACH;
1276	lockres->l_requested = level;
1277	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1278	gen = lockres_set_pending(lockres);
1279	spin_unlock_irqrestore(&lockres->l_lock, flags);
1280
1281	ret = ocfs2_dlm_lock(osb->cconn,
1282			     level,
1283			     &lockres->l_lksb,
1284			     dlm_flags,
1285			     lockres->l_name,
1286			     OCFS2_LOCK_ID_MAX_LEN - 1);
1287	lockres_clear_pending(lockres, gen, osb);
1288	if (ret) {
1289		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1290		ocfs2_recover_from_dlm_error(lockres, 1);
1291	}
1292
1293	mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
1294
1295bail:
1296	mlog_exit(ret);
1297	return ret;
1298}
1299
1300static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
1301					int flag)
1302{
1303	unsigned long flags;
1304	int ret;
1305
1306	spin_lock_irqsave(&lockres->l_lock, flags);
1307	ret = lockres->l_flags & flag;
1308	spin_unlock_irqrestore(&lockres->l_lock, flags);
1309
1310	return ret;
1311}
1312
1313static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
1314
1315{
1316	wait_event(lockres->l_event,
1317		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
1318}
1319
1320static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
1321
1322{
1323	wait_event(lockres->l_event,
1324		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
1325}
1326
1327/* predict what lock level we'll be dropping down to on behalf
1328 * of another node, and return true if the currently wanted
1329 * level will be compatible with it. */
1330static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
1331						     int wanted)
1332{
1333	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
1334
1335	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
1336}
1337
1338static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
1339{
1340	INIT_LIST_HEAD(&mw->mw_item);
1341	init_completion(&mw->mw_complete);
1342	ocfs2_init_start_time(mw);
1343}
1344
1345static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
1346{
1347	wait_for_completion(&mw->mw_complete);
1348	/* Re-arm the completion in case we want to wait on it again */
1349	INIT_COMPLETION(mw->mw_complete);
1350	return mw->mw_status;
1351}
1352
1353static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
1354				    struct ocfs2_mask_waiter *mw,
1355				    unsigned long mask,
1356				    unsigned long goal)
1357{
1358	BUG_ON(!list_empty(&mw->mw_item));
1359
1360	assert_spin_locked(&lockres->l_lock);
1361
1362	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
1363	mw->mw_mask = mask;
1364	mw->mw_goal = goal;
1365}
1366
1367/* returns 0 if the mw that was removed was already satisfied, -EBUSY
1368 * if the mask still hadn't reached its goal */
1369static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1370				      struct ocfs2_mask_waiter *mw)
1371{
1372	unsigned long flags;
1373	int ret = 0;
1374
1375	spin_lock_irqsave(&lockres->l_lock, flags);
1376	if (!list_empty(&mw->mw_item)) {
1377		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
1378			ret = -EBUSY;
1379
1380		list_del_init(&mw->mw_item);
1381		init_completion(&mw->mw_complete);
1382	}
1383	spin_unlock_irqrestore(&lockres->l_lock, flags);
1384
1385	return ret;
1386
1387}
1388
1389static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1390					     struct ocfs2_lock_res *lockres)
1391{
1392	int ret;
1393
1394	ret = wait_for_completion_interruptible(&mw->mw_complete);
1395	if (ret)
1396		lockres_remove_mask_waiter(lockres, mw);
1397	else
1398		ret = mw->mw_status;
1399	/* Re-arm the completion in case we want to wait on it again */
1400	INIT_COMPLETION(mw->mw_complete);
1401	return ret;
1402}
1403
1404static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
1405				struct ocfs2_lock_res *lockres,
1406				int level,
1407				u32 lkm_flags,
1408				int arg_flags,
1409				int l_subclass,
1410				unsigned long caller_ip)
1411{
1412	struct ocfs2_mask_waiter mw;
1413	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1414	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1415	unsigned long flags;
1416	unsigned int gen;
1417	int noqueue_attempted = 0;
1418
1419	mlog_entry_void();
1420
1421	ocfs2_init_mask_waiter(&mw);
1422
1423	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1424		lkm_flags |= DLM_LKF_VALBLK;
1425
1426again:
1427	wait = 0;
1428
1429	spin_lock_irqsave(&lockres->l_lock, flags);
1430
1431	if (catch_signals && signal_pending(current)) {
1432		ret = -ERESTARTSYS;
1433		goto unlock;
1434	}
1435
1436	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1437			"Cluster lock called on freeing lockres %s! flags "
1438			"0x%lx\n", lockres->l_name, lockres->l_flags);
1439
1440	/* We only compare against the currently granted level
1441	 * here. If the lock is blocked waiting on a downconvert,
1442	 * we'll get caught below. */
1443	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1444	    level > lockres->l_level) {
1445		/* is someone sitting in dlm_lock? If so, wait on
1446		 * them. */
1447		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1448		wait = 1;
1449		goto unlock;
1450	}
1451
1452	if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) {
1453		/*
1454		 * We've upconverted. If the lock now has a level we can
1455		 * work with, we take it. If, however, the lock is not at the
1456		 * required level, we go thru the full cycle. One way this could
1457		 * happen is if a process requesting an upconvert to PR is
1458		 * closely followed by another requesting upconvert to an EX.
1459		 * If the process requesting EX lands here, we want it to
1460		 * continue attempting to upconvert and let the process
1461		 * requesting PR take the lock.
1462		 * If multiple processes request upconvert to PR, the first one
1463		 * here will take the lock. The others will have to go thru the
1464		 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending
1465		 * downconvert request.
1466		 */
1467		if (level <= lockres->l_level)
1468			goto update_holders;
1469	}
1470
1471	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1472	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1473		/* is the lock is currently blocked on behalf of
1474		 * another node */
1475		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1476		wait = 1;
1477		goto unlock;
1478	}
1479
1480	if (level > lockres->l_level) {
1481		if (noqueue_attempted > 0) {
1482			ret = -EAGAIN;
1483			goto unlock;
1484		}
1485		if (lkm_flags & DLM_LKF_NOQUEUE)
1486			noqueue_attempted = 1;
1487
1488		if (lockres->l_action != OCFS2_AST_INVALID)
1489			mlog(ML_ERROR, "lockres %s has action %u pending\n",
1490			     lockres->l_name, lockres->l_action);
1491
1492		if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1493			lockres->l_action = OCFS2_AST_ATTACH;
1494			lkm_flags &= ~DLM_LKF_CONVERT;
1495		} else {
1496			lockres->l_action = OCFS2_AST_CONVERT;
1497			lkm_flags |= DLM_LKF_CONVERT;
1498		}
1499
1500		lockres->l_requested = level;
1501		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1502		gen = lockres_set_pending(lockres);
1503		spin_unlock_irqrestore(&lockres->l_lock, flags);
1504
1505		BUG_ON(level == DLM_LOCK_IV);
1506		BUG_ON(level == DLM_LOCK_NL);
1507
1508		mlog(ML_BASTS, "lockres %s, convert from %d to %d\n",
1509		     lockres->l_name, lockres->l_level, level);
1510
1511		/* call dlm_lock to upgrade lock now */
1512		ret = ocfs2_dlm_lock(osb->cconn,
1513				     level,
1514				     &lockres->l_lksb,
1515				     lkm_flags,
1516				     lockres->l_name,
1517				     OCFS2_LOCK_ID_MAX_LEN - 1);
1518		lockres_clear_pending(lockres, gen, osb);
1519		if (ret) {
1520			if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1521			    (ret != -EAGAIN)) {
1522				ocfs2_log_dlm_error("ocfs2_dlm_lock",
1523						    ret, lockres);
1524			}
1525			ocfs2_recover_from_dlm_error(lockres, 1);
1526			goto out;
1527		}
1528
1529		mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
1530		     lockres->l_name);
1531
1532		/* At this point we've gone inside the dlm and need to
1533		 * complete our work regardless. */
1534		catch_signals = 0;
1535
1536		/* wait for busy to clear and carry on */
1537		goto again;
1538	}
1539
1540update_holders:
1541	/* Ok, if we get here then we're good to go. */
1542	ocfs2_inc_holders(lockres, level);
1543
1544	ret = 0;
1545unlock:
1546	lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1547
1548	spin_unlock_irqrestore(&lockres->l_lock, flags);
1549out:
1550	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1551	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1552		wait = 0;
1553		if (lockres_remove_mask_waiter(lockres, &mw))
1554			ret = -EAGAIN;
1555		else
1556			goto again;
1557	}
1558	if (wait) {
1559		ret = ocfs2_wait_for_mask(&mw);
1560		if (ret == 0)
1561			goto again;
1562		mlog_errno(ret);
1563	}
1564	ocfs2_update_lock_stats(lockres, level, &mw, ret);
1565
1566#ifdef CONFIG_DEBUG_LOCK_ALLOC
1567	if (!ret && lockres->l_lockdep_map.key != NULL) {
1568		if (level == DLM_LOCK_PR)
1569			rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
1570				!!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1571				caller_ip);
1572		else
1573			rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
1574				!!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1575				caller_ip);
1576	}
1577#endif
1578	mlog_exit(ret);
1579	return ret;
1580}
1581
1582static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
1583				     struct ocfs2_lock_res *lockres,
1584				     int level,
1585				     u32 lkm_flags,
1586				     int arg_flags)
1587{
1588	return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
1589				    0, _RET_IP_);
1590}
1591
1592
1593static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
1594				   struct ocfs2_lock_res *lockres,
1595				   int level,
1596				   unsigned long caller_ip)
1597{
1598	unsigned long flags;
1599
1600	mlog_entry_void();
1601	spin_lock_irqsave(&lockres->l_lock, flags);
1602	ocfs2_dec_holders(lockres, level);
1603	ocfs2_downconvert_on_unlock(osb, lockres);
1604	spin_unlock_irqrestore(&lockres->l_lock, flags);
1605#ifdef CONFIG_DEBUG_LOCK_ALLOC
1606	if (lockres->l_lockdep_map.key != NULL)
1607		rwsem_release(&lockres->l_lockdep_map, 1, caller_ip);
1608#endif
1609	mlog_exit_void();
1610}
1611
1612static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1613				 struct ocfs2_lock_res *lockres,
1614				 int ex,
1615				 int local)
1616{
1617	int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1618	unsigned long flags;
1619	u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1620
1621	spin_lock_irqsave(&lockres->l_lock, flags);
1622	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1623	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1624	spin_unlock_irqrestore(&lockres->l_lock, flags);
1625
1626	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1627}
1628
1629/* Grants us an EX lock on the data and metadata resources, skipping
1630 * the normal cluster directory lookup. Use this ONLY on newly created
1631 * inodes which other nodes can't possibly see, and which haven't been
1632 * hashed in the inode hash yet. This can give us a good performance
1633 * increase as it'll skip the network broadcast normally associated
1634 * with creating a new lock resource. */
1635int ocfs2_create_new_inode_locks(struct inode *inode)
1636{
1637	int ret;
1638	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1639
1640	BUG_ON(!inode);
1641	BUG_ON(!ocfs2_inode_is_new(inode));
1642
1643	mlog_entry_void();
1644
1645	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1646
1647	/* NOTE: That we don't increment any of the holder counts, nor
1648	 * do we add anything to a journal handle. Since this is
1649	 * supposed to be a new inode which the cluster doesn't know
1650	 * about yet, there is no need to.  As far as the LVB handling
1651	 * is concerned, this is basically like acquiring an EX lock
1652	 * on a resource which has an invalid one -- we'll set it
1653	 * valid when we release the EX. */
1654
1655	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1656	if (ret) {
1657		mlog_errno(ret);
1658		goto bail;
1659	}
1660
1661	/*
1662	 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1663	 * don't use a generation in their lock names.
1664	 */
1665	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1666	if (ret) {
1667		mlog_errno(ret);
1668		goto bail;
1669	}
1670
1671	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1672	if (ret) {
1673		mlog_errno(ret);
1674		goto bail;
1675	}
1676
1677bail:
1678	mlog_exit(ret);
1679	return ret;
1680}
1681
1682int ocfs2_rw_lock(struct inode *inode, int write)
1683{
1684	int status, level;
1685	struct ocfs2_lock_res *lockres;
1686	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1687
1688	BUG_ON(!inode);
1689
1690	mlog_entry_void();
1691
1692	mlog(0, "inode %llu take %s RW lock\n",
1693	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1694	     write ? "EXMODE" : "PRMODE");
1695
1696	if (ocfs2_mount_local(osb)) {
1697		mlog_exit(0);
1698		return 0;
1699	}
1700
1701	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1702
1703	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1704
1705	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1706				    0);
1707	if (status < 0)
1708		mlog_errno(status);
1709
1710	mlog_exit(status);
1711	return status;
1712}
1713
1714void ocfs2_rw_unlock(struct inode *inode, int write)
1715{
1716	int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1717	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1718	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1719
1720	mlog_entry_void();
1721
1722	mlog(0, "inode %llu drop %s RW lock\n",
1723	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1724	     write ? "EXMODE" : "PRMODE");
1725
1726	if (!ocfs2_mount_local(osb))
1727		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1728
1729	mlog_exit_void();
1730}
1731
1732/*
1733 * ocfs2_open_lock always get PR mode lock.
1734 */
1735int ocfs2_open_lock(struct inode *inode)
1736{
1737	int status = 0;
1738	struct ocfs2_lock_res *lockres;
1739	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1740
1741	BUG_ON(!inode);
1742
1743	mlog_entry_void();
1744
1745	mlog(0, "inode %llu take PRMODE open lock\n",
1746	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1747
1748	if (ocfs2_mount_local(osb))
1749		goto out;
1750
1751	lockres = &OCFS2_I(inode)->ip_open_lockres;
1752
1753	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1754				    DLM_LOCK_PR, 0, 0);
1755	if (status < 0)
1756		mlog_errno(status);
1757
1758out:
1759	mlog_exit(status);
1760	return status;
1761}
1762
1763int ocfs2_try_open_lock(struct inode *inode, int write)
1764{
1765	int status = 0, level;
1766	struct ocfs2_lock_res *lockres;
1767	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1768
1769	BUG_ON(!inode);
1770
1771	mlog_entry_void();
1772
1773	mlog(0, "inode %llu try to take %s open lock\n",
1774	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1775	     write ? "EXMODE" : "PRMODE");
1776
1777	if (ocfs2_mount_local(osb))
1778		goto out;
1779
1780	lockres = &OCFS2_I(inode)->ip_open_lockres;
1781
1782	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1783
1784	/*
1785	 * The file system may already holding a PRMODE/EXMODE open lock.
1786	 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1787	 * other nodes and the -EAGAIN will indicate to the caller that
1788	 * this inode is still in use.
1789	 */
1790	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1791				    level, DLM_LKF_NOQUEUE, 0);
1792
1793out:
1794	mlog_exit(status);
1795	return status;
1796}
1797
1798/*
1799 * ocfs2_open_unlock unlock PR and EX mode open locks.
1800 */
1801void ocfs2_open_unlock(struct inode *inode)
1802{
1803	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1804	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1805
1806	mlog_entry_void();
1807
1808	mlog(0, "inode %llu drop open lock\n",
1809	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1810
1811	if (ocfs2_mount_local(osb))
1812		goto out;
1813
1814	if(lockres->l_ro_holders)
1815		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1816				     DLM_LOCK_PR);
1817	if(lockres->l_ex_holders)
1818		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1819				     DLM_LOCK_EX);
1820
1821out:
1822	mlog_exit_void();
1823}
1824
1825static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1826				     int level)
1827{
1828	int ret;
1829	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1830	unsigned long flags;
1831	struct ocfs2_mask_waiter mw;
1832
1833	ocfs2_init_mask_waiter(&mw);
1834
1835retry_cancel:
1836	spin_lock_irqsave(&lockres->l_lock, flags);
1837	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1838		ret = ocfs2_prepare_cancel_convert(osb, lockres);
1839		if (ret) {
1840			spin_unlock_irqrestore(&lockres->l_lock, flags);
1841			ret = ocfs2_cancel_convert(osb, lockres);
1842			if (ret < 0) {
1843				mlog_errno(ret);
1844				goto out;
1845			}
1846			goto retry_cancel;
1847		}
1848		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1849		spin_unlock_irqrestore(&lockres->l_lock, flags);
1850
1851		ocfs2_wait_for_mask(&mw);
1852		goto retry_cancel;
1853	}
1854
1855	ret = -ERESTARTSYS;
1856	/*
1857	 * We may still have gotten the lock, in which case there's no
1858	 * point to restarting the syscall.
1859	 */
1860	if (lockres->l_level == level)
1861		ret = 0;
1862
1863	mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1864	     lockres->l_flags, lockres->l_level, lockres->l_action);
1865
1866	spin_unlock_irqrestore(&lockres->l_lock, flags);
1867
1868out:
1869	return ret;
1870}
1871
1872/*
1873 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1874 * flock() calls. The locking approach this requires is sufficiently
1875 * different from all other cluster lock types that we implement a
1876 * separate path to the "low-level" dlm calls. In particular:
1877 *
1878 * - No optimization of lock levels is done - we take at exactly
1879 *   what's been requested.
1880 *
1881 * - No lock caching is employed. We immediately downconvert to
1882 *   no-lock at unlock time. This also means flock locks never go on
1883 *   the blocking list).
1884 *
1885 * - Since userspace can trivially deadlock itself with flock, we make
1886 *   sure to allow cancellation of a misbehaving applications flock()
1887 *   request.
1888 *
1889 * - Access to any flock lockres doesn't require concurrency, so we
1890 *   can simplify the code by requiring the caller to guarantee
1891 *   serialization of dlmglue flock calls.
1892 */
1893int ocfs2_file_lock(struct file *file, int ex, int trylock)
1894{
1895	int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1896	unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
1897	unsigned long flags;
1898	struct ocfs2_file_private *fp = file->private_data;
1899	struct ocfs2_lock_res *lockres = &fp->fp_flock;
1900	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1901	struct ocfs2_mask_waiter mw;
1902
1903	ocfs2_init_mask_waiter(&mw);
1904
1905	if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1906	    (lockres->l_level > DLM_LOCK_NL)) {
1907		mlog(ML_ERROR,
1908		     "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1909		     "level: %u\n", lockres->l_name, lockres->l_flags,
1910		     lockres->l_level);
1911		return -EINVAL;
1912	}
1913
1914	spin_lock_irqsave(&lockres->l_lock, flags);
1915	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1916		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1917		spin_unlock_irqrestore(&lockres->l_lock, flags);
1918
1919		/*
1920		 * Get the lock at NLMODE to start - that way we
1921		 * can cancel the upconvert request if need be.
1922		 */
1923		ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
1924		if (ret < 0) {
1925			mlog_errno(ret);
1926			goto out;
1927		}
1928
1929		ret = ocfs2_wait_for_mask(&mw);
1930		if (ret) {
1931			mlog_errno(ret);
1932			goto out;
1933		}
1934		spin_lock_irqsave(&lockres->l_lock, flags);
1935	}
1936
1937	lockres->l_action = OCFS2_AST_CONVERT;
1938	lkm_flags |= DLM_LKF_CONVERT;
1939	lockres->l_requested = level;
1940	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1941
1942	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1943	spin_unlock_irqrestore(&lockres->l_lock, flags);
1944
1945	ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
1946			     lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
1947	if (ret) {
1948		if (!trylock || (ret != -EAGAIN)) {
1949			ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1950			ret = -EINVAL;
1951		}
1952
1953		ocfs2_recover_from_dlm_error(lockres, 1);
1954		lockres_remove_mask_waiter(lockres, &mw);
1955		goto out;
1956	}
1957
1958	ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
1959	if (ret == -ERESTARTSYS) {
1960		/*
1961		 * Userspace can cause deadlock itself with
1962		 * flock(). Current behavior locally is to allow the
1963		 * deadlock, but abort the system call if a signal is
1964		 * received. We follow this example, otherwise a
1965		 * poorly written program could sit in kernel until
1966		 * reboot.
1967		 *
1968		 * Handling this is a bit more complicated for Ocfs2
1969		 * though. We can't exit this function with an
1970		 * outstanding lock request, so a cancel convert is
1971		 * required. We intentionally overwrite 'ret' - if the
1972		 * cancel fails and the lock was granted, it's easier
1973		 * to just bubble success back up to the user.
1974		 */
1975		ret = ocfs2_flock_handle_signal(lockres, level);
1976	} else if (!ret && (level > lockres->l_level)) {
1977		/* Trylock failed asynchronously */
1978		BUG_ON(!trylock);
1979		ret = -EAGAIN;
1980	}
1981
1982out:
1983
1984	mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
1985	     lockres->l_name, ex, trylock, ret);
1986	return ret;
1987}
1988
1989void ocfs2_file_unlock(struct file *file)
1990{
1991	int ret;
1992	unsigned int gen;
1993	unsigned long flags;
1994	struct ocfs2_file_private *fp = file->private_data;
1995	struct ocfs2_lock_res *lockres = &fp->fp_flock;
1996	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1997	struct ocfs2_mask_waiter mw;
1998
1999	ocfs2_init_mask_waiter(&mw);
2000
2001	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
2002		return;
2003
2004	if (lockres->l_level == DLM_LOCK_NL)
2005		return;
2006
2007	mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
2008	     lockres->l_name, lockres->l_flags, lockres->l_level,
2009	     lockres->l_action);
2010
2011	spin_lock_irqsave(&lockres->l_lock, flags);
2012	/*
2013	 * Fake a blocking ast for the downconvert code.
2014	 */
2015	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
2016	lockres->l_blocking = DLM_LOCK_EX;
2017
2018	gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
2019	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
2020	spin_unlock_irqrestore(&lockres->l_lock, flags);
2021
2022	ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
2023	if (ret) {
2024		mlog_errno(ret);
2025		return;
2026	}
2027
2028	ret = ocfs2_wait_for_mask(&mw);
2029	if (ret)
2030		mlog_errno(ret);
2031}
2032
2033static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
2034					struct ocfs2_lock_res *lockres)
2035{
2036	int kick = 0;
2037
2038	mlog_entry_void();
2039
2040	/* If we know that another node is waiting on our lock, kick
2041	 * the downconvert thread * pre-emptively when we reach a release
2042	 * condition. */
2043	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
2044		switch(lockres->l_blocking) {
2045		case DLM_LOCK_EX:
2046			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
2047				kick = 1;
2048			break;
2049		case DLM_LOCK_PR:
2050			if (!lockres->l_ex_holders)
2051				kick = 1;
2052			break;
2053		default:
2054			BUG();
2055		}
2056	}
2057
2058	if (kick)
2059		ocfs2_wake_downconvert_thread(osb);
2060
2061	mlog_exit_void();
2062}
2063
2064#define OCFS2_SEC_BITS   34
2065#define OCFS2_SEC_SHIFT  (64 - 34)
2066#define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
2067
2068/* LVB only has room for 64 bits of time here so we pack it for
2069 * now. */
2070static u64 ocfs2_pack_timespec(struct timespec *spec)
2071{
2072	u64 res;
2073	u64 sec = spec->tv_sec;
2074	u32 nsec = spec->tv_nsec;
2075
2076	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
2077
2078	return res;
2079}
2080
2081/* Call this with the lockres locked. I am reasonably sure we don't
2082 * need ip_lock in this function as anyone who would be changing those
2083 * values is supposed to be blocked in ocfs2_inode_lock right now. */
2084static void __ocfs2_stuff_meta_lvb(struct inode *inode)
2085{
2086	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2087	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2088	struct ocfs2_meta_lvb *lvb;
2089
2090	mlog_entry_void();
2091
2092	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2093
2094	/*
2095	 * Invalidate the LVB of a deleted inode - this way other
2096	 * nodes are forced to go to disk and discover the new inode
2097	 * status.
2098	 */
2099	if (oi->ip_flags & OCFS2_INODE_DELETED) {
2100		lvb->lvb_version = 0;
2101		goto out;
2102	}
2103
2104	lvb->lvb_version   = OCFS2_LVB_VERSION;
2105	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
2106	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
2107	lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
2108	lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
2109	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
2110	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
2111	lvb->lvb_iatime_packed  =
2112		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
2113	lvb->lvb_ictime_packed =
2114		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
2115	lvb->lvb_imtime_packed =
2116		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
2117	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
2118	lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
2119	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
2120
2121out:
2122	mlog_meta_lvb(0, lockres);
2123
2124	mlog_exit_void();
2125}
2126
2127static void ocfs2_unpack_timespec(struct timespec *spec,
2128				  u64 packed_time)
2129{
2130	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
2131	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
2132}
2133
2134static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
2135{
2136	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2137	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2138	struct ocfs2_meta_lvb *lvb;
2139
2140	mlog_entry_void();
2141
2142	mlog_meta_lvb(0, lockres);
2143
2144	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2145
2146	/* We're safe here without the lockres lock... */
2147	spin_lock(&oi->ip_lock);
2148	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
2149	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
2150
2151	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
2152	oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
2153	ocfs2_set_inode_flags(inode);
2154
2155	/* fast-symlinks are a special case */
2156	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
2157		inode->i_blocks = 0;
2158	else
2159		inode->i_blocks = ocfs2_inode_sector_count(inode);
2160
2161	inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
2162	inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
2163	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
2164	inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
2165	ocfs2_unpack_timespec(&inode->i_atime,
2166			      be64_to_cpu(lvb->lvb_iatime_packed));
2167	ocfs2_unpack_timespec(&inode->i_mtime,
2168			      be64_to_cpu(lvb->lvb_imtime_packed));
2169	ocfs2_unpack_timespec(&inode->i_ctime,
2170			      be64_to_cpu(lvb->lvb_ictime_packed));
2171	spin_unlock(&oi->ip_lock);
2172
2173	mlog_exit_void();
2174}
2175
2176static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
2177					      struct ocfs2_lock_res *lockres)
2178{
2179	struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2180
2181	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
2182	    && lvb->lvb_version == OCFS2_LVB_VERSION
2183	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
2184		return 1;
2185	return 0;
2186}
2187
2188/* Determine whether a lock resource needs to be refreshed, and
2189 * arbitrate who gets to refresh it.
2190 *
2191 *   0 means no refresh needed.
2192 *
2193 *   > 0 means you need to refresh this and you MUST call
2194 *   ocfs2_complete_lock_res_refresh afterwards. */
2195static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
2196{
2197	unsigned long flags;
2198	int status = 0;
2199
2200	mlog_entry_void();
2201
2202refresh_check:
2203	spin_lock_irqsave(&lockres->l_lock, flags);
2204	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2205		spin_unlock_irqrestore(&lockres->l_lock, flags);
2206		goto bail;
2207	}
2208
2209	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2210		spin_unlock_irqrestore(&lockres->l_lock, flags);
2211
2212		ocfs2_wait_on_refreshing_lock(lockres);
2213		goto refresh_check;
2214	}
2215
2216	/* Ok, I'll be the one to refresh this lock. */
2217	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
2218	spin_unlock_irqrestore(&lockres->l_lock, flags);
2219
2220	status = 1;
2221bail:
2222	mlog_exit(status);
2223	return status;
2224}
2225
2226/* If status is non zero, I'll mark it as not being in refresh
2227 * anymroe, but i won't clear the needs refresh flag. */
2228static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
2229						   int status)
2230{
2231	unsigned long flags;
2232	mlog_entry_void();
2233
2234	spin_lock_irqsave(&lockres->l_lock, flags);
2235	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
2236	if (!status)
2237		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
2238	spin_unlock_irqrestore(&lockres->l_lock, flags);
2239
2240	wake_up(&lockres->l_event);
2241
2242	mlog_exit_void();
2243}
2244
2245/* may or may not return a bh if it went to disk. */
2246static int ocfs2_inode_lock_update(struct inode *inode,
2247				  struct buffer_head **bh)
2248{
2249	int status = 0;
2250	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2251	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2252	struct ocfs2_dinode *fe;
2253	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2254
2255	mlog_entry_void();
2256
2257	if (ocfs2_mount_local(osb))
2258		goto bail;
2259
2260	spin_lock(&oi->ip_lock);
2261	if (oi->ip_flags & OCFS2_INODE_DELETED) {
2262		mlog(0, "Orphaned inode %llu was deleted while we "
2263		     "were waiting on a lock. ip_flags = 0x%x\n",
2264		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
2265		spin_unlock(&oi->ip_lock);
2266		status = -ENOENT;
2267		goto bail;
2268	}
2269	spin_unlock(&oi->ip_lock);
2270
2271	if (!ocfs2_should_refresh_lock_res(lockres))
2272		goto bail;
2273
2274	/* This will discard any caching information we might have had
2275	 * for the inode metadata. */
2276	ocfs2_metadata_cache_purge(INODE_CACHE(inode));
2277
2278	ocfs2_extent_map_trunc(inode, 0);
2279
2280	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
2281		mlog(0, "Trusting LVB on inode %llu\n",
2282		     (unsigned long long)oi->ip_blkno);
2283		ocfs2_refresh_inode_from_lvb(inode);
2284	} else {
2285		/* Boo, we have to go to disk. */
2286		/* read bh, cast, ocfs2_refresh_inode */
2287		status = ocfs2_read_inode_block(inode, bh);
2288		if (status < 0) {
2289			mlog_errno(status);
2290			goto bail_refresh;
2291		}
2292		fe = (struct ocfs2_dinode *) (*bh)->b_data;
2293
2294		/* This is a good chance to make sure we're not
2295		 * locking an invalid object.  ocfs2_read_inode_block()
2296		 * already checked that the inode block is sane.
2297		 *
2298		 * We bug on a stale inode here because we checked
2299		 * above whether it was wiped from disk. The wiping
2300		 * node provides a guarantee that we receive that
2301		 * message and can mark the inode before dropping any
2302		 * locks associated with it. */
2303		mlog_bug_on_msg(inode->i_generation !=
2304				le32_to_cpu(fe->i_generation),
2305				"Invalid dinode %llu disk generation: %u "
2306				"inode->i_generation: %u\n",
2307				(unsigned long long)oi->ip_blkno,
2308				le32_to_cpu(fe->i_generation),
2309				inode->i_generation);
2310		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
2311				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
2312				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
2313				(unsigned long long)oi->ip_blkno,
2314				(unsigned long long)le64_to_cpu(fe->i_dtime),
2315				le32_to_cpu(fe->i_flags));
2316
2317		ocfs2_refresh_inode(inode, fe);
2318		ocfs2_track_lock_refresh(lockres);
2319	}
2320
2321	status = 0;
2322bail_refresh:
2323	ocfs2_complete_lock_res_refresh(lockres, status);
2324bail:
2325	mlog_exit(status);
2326	return status;
2327}
2328
2329static int ocfs2_assign_bh(struct inode *inode,
2330			   struct buffer_head **ret_bh,
2331			   struct buffer_head *passed_bh)
2332{
2333	int status;
2334
2335	if (passed_bh) {
2336		/* Ok, the update went to disk for us, use the
2337		 * returned bh. */
2338		*ret_bh = passed_bh;
2339		get_bh(*ret_bh);
2340
2341		return 0;
2342	}
2343
2344	status = ocfs2_read_inode_block(inode, ret_bh);
2345	if (status < 0)
2346		mlog_errno(status);
2347
2348	return status;
2349}
2350
2351/*
2352 * returns < 0 error if the callback will never be called, otherwise
2353 * the result of the lock will be communicated via the callback.
2354 */
2355int ocfs2_inode_lock_full_nested(struct inode *inode,
2356				 struct buffer_head **ret_bh,
2357				 int ex,
2358				 int arg_flags,
2359				 int subclass)
2360{
2361	int status, level, acquired;
2362	u32 dlm_flags;
2363	struct ocfs2_lock_res *lockres = NULL;
2364	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2365	struct buffer_head *local_bh = NULL;
2366
2367	BUG_ON(!inode);
2368
2369	mlog_entry_void();
2370
2371	mlog(0, "inode %llu, take %s META lock\n",
2372	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2373	     ex ? "EXMODE" : "PRMODE");
2374
2375	status = 0;
2376	acquired = 0;
2377	/* We'll allow faking a readonly metadata lock for
2378	 * rodevices. */
2379	if (ocfs2_is_hard_readonly(osb)) {
2380		if (ex)
2381			status = -EROFS;
2382		goto bail;
2383	}
2384
2385	if (ocfs2_mount_local(osb))
2386		goto local;
2387
2388	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2389		ocfs2_wait_for_recovery(osb);
2390
2391	lockres = &OCFS2_I(inode)->ip_inode_lockres;
2392	level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2393	dlm_flags = 0;
2394	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
2395		dlm_flags |= DLM_LKF_NOQUEUE;
2396
2397	status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
2398				      arg_flags, subclass, _RET_IP_);
2399	if (status < 0) {
2400		if (status != -EAGAIN && status != -EIOCBRETRY)
2401			mlog_errno(status);
2402		goto bail;
2403	}
2404
2405	/* Notify the error cleanup path to drop the cluster lock. */
2406	acquired = 1;
2407
2408	/* We wait twice because a node may have died while we were in
2409	 * the lower dlm layers. The second time though, we've
2410	 * committed to owning this lock so we don't allow signals to
2411	 * abort the operation. */
2412	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2413		ocfs2_wait_for_recovery(osb);
2414
2415local:
2416	/*
2417	 * We only see this flag if we're being called from
2418	 * ocfs2_read_locked_inode(). It means we're locking an inode
2419	 * which hasn't been populated yet, so clear the refresh flag
2420	 * and let the caller handle it.
2421	 */
2422	if (inode->i_state & I_NEW) {
2423		status = 0;
2424		if (lockres)
2425			ocfs2_complete_lock_res_refresh(lockres, 0);
2426		goto bail;
2427	}
2428
2429	/* This is fun. The caller may want a bh back, or it may
2430	 * not. ocfs2_inode_lock_update definitely wants one in, but
2431	 * may or may not read one, depending on what's in the
2432	 * LVB. The result of all of this is that we've *only* gone to
2433	 * disk if we have to, so the complexity is worthwhile. */
2434	status = ocfs2_inode_lock_update(inode, &local_bh);
2435	if (status < 0) {
2436		if (status != -ENOENT)
2437			mlog_errno(status);
2438		goto bail;
2439	}
2440
2441	if (ret_bh) {
2442		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
2443		if (status < 0) {
2444			mlog_errno(status);
2445			goto bail;
2446		}
2447	}
2448
2449bail:
2450	if (status < 0) {
2451		if (ret_bh && (*ret_bh)) {
2452			brelse(*ret_bh);
2453			*ret_bh = NULL;
2454		}
2455		if (acquired)
2456			ocfs2_inode_unlock(inode, ex);
2457	}
2458
2459	if (local_bh)
2460		brelse(local_bh);
2461
2462	mlog_exit(status);
2463	return status;
2464}
2465
2466/*
2467 * This is working around a lock inversion between tasks acquiring DLM
2468 * locks while holding a page lock and the downconvert thread which
2469 * blocks dlm lock acquiry while acquiring page locks.
2470 *
2471 * ** These _with_page variantes are only intended to be called from aop
2472 * methods that hold page locks and return a very specific *positive* error
2473 * code that aop methods pass up to the VFS -- test for errors with != 0. **
2474 *
2475 * The DLM is called such that it returns -EAGAIN if it would have
2476 * blocked waiting for the downconvert thread.  In that case we unlock
2477 * our page so the downconvert thread can make progress.  Once we've
2478 * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2479 * that called us can bubble that back up into the VFS who will then
2480 * immediately retry the aop call.
2481 *
2482 * We do a blocking lock and immediate unlock before returning, though, so that
2483 * the lock has a great chance of being cached on this node by the time the VFS
2484 * calls back to retry the aop.    This has a potential to livelock as nodes
2485 * ping locks back and forth, but that's a risk we're willing to take to avoid
2486 * the lock inversion simply.
2487 */
2488int ocfs2_inode_lock_with_page(struct inode *inode,
2489			      struct buffer_head **ret_bh,
2490			      int ex,
2491			      struct page *page)
2492{
2493	int ret;
2494
2495	ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2496	if (ret == -EAGAIN) {
2497		unlock_page(page);
2498		if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2499			ocfs2_inode_unlock(inode, ex);
2500		ret = AOP_TRUNCATED_PAGE;
2501	}
2502
2503	return ret;
2504}
2505
2506int ocfs2_inode_lock_atime(struct inode *inode,
2507			  struct vfsmount *vfsmnt,
2508			  int *level)
2509{
2510	int ret;
2511
2512	mlog_entry_void();
2513	ret = ocfs2_inode_lock(inode, NULL, 0);
2514	if (ret < 0) {
2515		mlog_errno(ret);
2516		return ret;
2517	}
2518
2519	/*
2520	 * If we should update atime, we will get EX lock,
2521	 * otherwise we just get PR lock.
2522	 */
2523	if (ocfs2_should_update_atime(inode, vfsmnt)) {
2524		struct buffer_head *bh = NULL;
2525
2526		ocfs2_inode_unlock(inode, 0);
2527		ret = ocfs2_inode_lock(inode, &bh, 1);
2528		if (ret < 0) {
2529			mlog_errno(ret);
2530			return ret;
2531		}
2532		*level = 1;
2533		if (ocfs2_should_update_atime(inode, vfsmnt))
2534			ocfs2_update_inode_atime(inode, bh);
2535		if (bh)
2536			brelse(bh);
2537	} else
2538		*level = 0;
2539
2540	mlog_exit(ret);
2541	return ret;
2542}
2543
2544void ocfs2_inode_unlock(struct inode *inode,
2545		       int ex)
2546{
2547	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2548	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2549	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2550
2551	mlog_entry_void();
2552
2553	mlog(0, "inode %llu drop %s META lock\n",
2554	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2555	     ex ? "EXMODE" : "PRMODE");
2556
2557	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
2558	    !ocfs2_mount_local(osb))
2559		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
2560
2561	mlog_exit_void();
2562}
2563
2564int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
2565{
2566	struct ocfs2_lock_res *lockres;
2567	struct ocfs2_orphan_scan_lvb *lvb;
2568	int status = 0;
2569
2570	if (ocfs2_is_hard_readonly(osb))
2571		return -EROFS;
2572
2573	if (ocfs2_mount_local(osb))
2574		return 0;
2575
2576	lockres = &osb->osb_orphan_scan.os_lockres;
2577	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2578	if (status < 0)
2579		return status;
2580
2581	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2582	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2583	    lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
2584		*seqno = be32_to_cpu(lvb->lvb_os_seqno);
2585	else
2586		*seqno = osb->osb_orphan_scan.os_seqno + 1;
2587
2588	return status;
2589}
2590
2591void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
2592{
2593	struct ocfs2_lock_res *lockres;
2594	struct ocfs2_orphan_scan_lvb *lvb;
2595
2596	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
2597		lockres = &osb->osb_orphan_scan.os_lockres;
2598		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2599		lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
2600		lvb->lvb_os_seqno = cpu_to_be32(seqno);
2601		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2602	}
2603}
2604
2605int ocfs2_super_lock(struct ocfs2_super *osb,
2606		     int ex)
2607{
2608	int status = 0;
2609	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2610	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2611
2612	mlog_entry_void();
2613
2614	if (ocfs2_is_hard_readonly(osb))
2615		return -EROFS;
2616
2617	if (ocfs2_mount_local(osb))
2618		goto bail;
2619
2620	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2621	if (status < 0) {
2622		mlog_errno(status);
2623		goto bail;
2624	}
2625
2626	/* The super block lock path is really in the best position to
2627	 * know when resources covered by the lock need to be
2628	 * refreshed, so we do it here. Of course, making sense of
2629	 * everything is up to the caller :) */
2630	status = ocfs2_should_refresh_lock_res(lockres);
2631	if (status < 0) {
2632		mlog_errno(status);
2633		goto bail;
2634	}
2635	if (status) {
2636		status = ocfs2_refresh_slot_info(osb);
2637
2638		ocfs2_complete_lock_res_refresh(lockres, status);
2639
2640		if (status < 0)
2641			mlog_errno(status);
2642		ocfs2_track_lock_refresh(lockres);
2643	}
2644bail:
2645	mlog_exit(status);
2646	return status;
2647}
2648
2649void ocfs2_super_unlock(struct ocfs2_super *osb,
2650			int ex)
2651{
2652	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2653	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2654
2655	if (!ocfs2_mount_local(osb))
2656		ocfs2_cluster_unlock(osb, lockres, level);
2657}
2658
2659int ocfs2_rename_lock(struct ocfs2_super *osb)
2660{
2661	int status;
2662	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2663
2664	if (ocfs2_is_hard_readonly(osb))
2665		return -EROFS;
2666
2667	if (ocfs2_mount_local(osb))
2668		return 0;
2669
2670	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2671	if (status < 0)
2672		mlog_errno(status);
2673
2674	return status;
2675}
2676
2677void ocfs2_rename_unlock(struct ocfs2_super *osb)
2678{
2679	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2680
2681	if (!ocfs2_mount_local(osb))
2682		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2683}
2684
2685int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
2686{
2687	int status;
2688	struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2689
2690	if (ocfs2_is_hard_readonly(osb))
2691		return -EROFS;
2692
2693	if (ocfs2_mount_local(osb))
2694		return 0;
2695
2696	status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
2697				    0, 0);
2698	if (status < 0)
2699		mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
2700
2701	return status;
2702}
2703
2704void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
2705{
2706	struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2707
2708	if (!ocfs2_mount_local(osb))
2709		ocfs2_cluster_unlock(osb, lockres,
2710				     ex ? LKM_EXMODE : LKM_PRMODE);
2711}
2712
2713int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2714{
2715	int ret;
2716	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2717	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2718	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2719
2720	BUG_ON(!dl);
2721
2722	if (ocfs2_is_hard_readonly(osb))
2723		return -EROFS;
2724
2725	if (ocfs2_mount_local(osb))
2726		return 0;
2727
2728	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2729	if (ret < 0)
2730		mlog_errno(ret);
2731
2732	return ret;
2733}
2734
2735void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2736{
2737	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2738	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2739	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2740
2741	if (!ocfs2_mount_local(osb))
2742		ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2743}
2744
2745/* Reference counting of the dlm debug structure. We want this because
2746 * open references on the debug inodes can live on after a mount, so
2747 * we can't rely on the ocfs2_super to always exist. */
2748static void ocfs2_dlm_debug_free(struct kref *kref)
2749{
2750	struct ocfs2_dlm_debug *dlm_debug;
2751
2752	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2753
2754	kfree(dlm_debug);
2755}
2756
2757void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2758{
2759	if (dlm_debug)
2760		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2761}
2762
2763static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2764{
2765	kref_get(&debug->d_refcnt);
2766}
2767
2768struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2769{
2770	struct ocfs2_dlm_debug *dlm_debug;
2771
2772	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2773	if (!dlm_debug) {
2774		mlog_errno(-ENOMEM);
2775		goto out;
2776	}
2777
2778	kref_init(&dlm_debug->d_refcnt);
2779	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2780	dlm_debug->d_locking_state = NULL;
2781out:
2782	return dlm_debug;
2783}
2784
2785/* Access to this is arbitrated for us via seq_file->sem. */
2786struct ocfs2_dlm_seq_priv {
2787	struct ocfs2_dlm_debug *p_dlm_debug;
2788	struct ocfs2_lock_res p_iter_res;
2789	struct ocfs2_lock_res p_tmp_res;
2790};
2791
2792static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2793						 struct ocfs2_dlm_seq_priv *priv)
2794{
2795	struct ocfs2_lock_res *iter, *ret = NULL;
2796	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2797
2798	assert_spin_locked(&ocfs2_dlm_tracking_lock);
2799
2800	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2801		/* discover the head of the list */
2802		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2803			mlog(0, "End of list found, %p\n", ret);
2804			break;
2805		}
2806
2807		/* We track our "dummy" iteration lockres' by a NULL
2808		 * l_ops field. */
2809		if (iter->l_ops != NULL) {
2810			ret = iter;
2811			break;
2812		}
2813	}
2814
2815	return ret;
2816}
2817
2818static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2819{
2820	struct ocfs2_dlm_seq_priv *priv = m->private;
2821	struct ocfs2_lock_res *iter;
2822
2823	spin_lock(&ocfs2_dlm_tracking_lock);
2824	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2825	if (iter) {
2826		/* Since lockres' have the lifetime of their container
2827		 * (which can be inodes, ocfs2_supers, etc) we want to
2828		 * copy this out to a temporary lockres while still
2829		 * under the spinlock. Obviously after this we can't
2830		 * trust any pointers on the copy returned, but that's
2831		 * ok as the information we want isn't typically held
2832		 * in them. */
2833		priv->p_tmp_res = *iter;
2834		iter = &priv->p_tmp_res;
2835	}
2836	spin_unlock(&ocfs2_dlm_tracking_lock);
2837
2838	return iter;
2839}
2840
2841static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2842{
2843}
2844
2845static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2846{
2847	struct ocfs2_dlm_seq_priv *priv = m->private;
2848	struct ocfs2_lock_res *iter = v;
2849	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2850
2851	spin_lock(&ocfs2_dlm_tracking_lock);
2852	iter = ocfs2_dlm_next_res(iter, priv);
2853	list_del_init(&dummy->l_debug_list);
2854	if (iter) {
2855		list_add(&dummy->l_debug_list, &iter->l_debug_list);
2856		priv->p_tmp_res = *iter;
2857		iter = &priv->p_tmp_res;
2858	}
2859	spin_unlock(&ocfs2_dlm_tracking_lock);
2860
2861	return iter;
2862}
2863
2864/* So that debugfs.ocfs2 can determine which format is being used */
2865#define OCFS2_DLM_DEBUG_STR_VERSION 2
2866static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2867{
2868	int i;
2869	char *lvb;
2870	struct ocfs2_lock_res *lockres = v;
2871
2872	if (!lockres)
2873		return -EINVAL;
2874
2875	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2876
2877	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2878		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2879			   lockres->l_name,
2880			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2881	else
2882		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2883
2884	seq_printf(m, "%d\t"
2885		   "0x%lx\t"
2886		   "0x%x\t"
2887		   "0x%x\t"
2888		   "%u\t"
2889		   "%u\t"
2890		   "%d\t"
2891		   "%d\t",
2892		   lockres->l_level,
2893		   lockres->l_flags,
2894		   lockres->l_action,
2895		   lockres->l_unlock_action,
2896		   lockres->l_ro_holders,
2897		   lockres->l_ex_holders,
2898		   lockres->l_requested,
2899		   lockres->l_blocking);
2900
2901	/* Dump the raw LVB */
2902	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2903	for(i = 0; i < DLM_LVB_LEN; i++)
2904		seq_printf(m, "0x%x\t", lvb[i]);
2905
2906#ifdef CONFIG_OCFS2_FS_STATS
2907# define lock_num_prmode(_l)		(_l)->l_lock_num_prmode
2908# define lock_num_exmode(_l)		(_l)->l_lock_num_exmode
2909# define lock_num_prmode_failed(_l)	(_l)->l_lock_num_prmode_failed
2910# define lock_num_exmode_failed(_l)	(_l)->l_lock_num_exmode_failed
2911# define lock_total_prmode(_l)		(_l)->l_lock_total_prmode
2912# define lock_total_exmode(_l)		(_l)->l_lock_total_exmode
2913# define lock_max_prmode(_l)		(_l)->l_lock_max_prmode
2914# define lock_max_exmode(_l)		(_l)->l_lock_max_exmode
2915# define lock_refresh(_l)		(_l)->l_lock_refresh
2916#else
2917# define lock_num_prmode(_l)		(0ULL)
2918# define lock_num_exmode(_l)		(0ULL)
2919# define lock_num_prmode_failed(_l)	(0)
2920# define lock_num_exmode_failed(_l)	(0)
2921# define lock_total_prmode(_l)		(0ULL)
2922# define lock_total_exmode(_l)		(0ULL)
2923# define lock_max_prmode(_l)		(0)
2924# define lock_max_exmode(_l)		(0)
2925# define lock_refresh(_l)		(0)
2926#endif
2927	/* The following seq_print was added in version 2 of this output */
2928	seq_printf(m, "%llu\t"
2929		   "%llu\t"
2930		   "%u\t"
2931		   "%u\t"
2932		   "%llu\t"
2933		   "%llu\t"
2934		   "%u\t"
2935		   "%u\t"
2936		   "%u\t",
2937		   lock_num_prmode(lockres),
2938		   lock_num_exmode(lockres),
2939		   lock_num_prmode_failed(lockres),
2940		   lock_num_exmode_failed(lockres),
2941		   lock_total_prmode(lockres),
2942		   lock_total_exmode(lockres),
2943		   lock_max_prmode(lockres),
2944		   lock_max_exmode(lockres),
2945		   lock_refresh(lockres));
2946
2947	/* End the line */
2948	seq_printf(m, "\n");
2949	return 0;
2950}
2951
2952static const struct seq_operations ocfs2_dlm_seq_ops = {
2953	.start =	ocfs2_dlm_seq_start,
2954	.stop =		ocfs2_dlm_seq_stop,
2955	.next =		ocfs2_dlm_seq_next,
2956	.show =		ocfs2_dlm_seq_show,
2957};
2958
2959static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2960{
2961	struct seq_file *seq = file->private_data;
2962	struct ocfs2_dlm_seq_priv *priv = seq->private;
2963	struct ocfs2_lock_res *res = &priv->p_iter_res;
2964
2965	ocfs2_remove_lockres_tracking(res);
2966	ocfs2_put_dlm_debug(priv->p_dlm_debug);
2967	return seq_release_private(inode, file);
2968}
2969
2970static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2971{
2972	int ret;
2973	struct ocfs2_dlm_seq_priv *priv;
2974	struct seq_file *seq;
2975	struct ocfs2_super *osb;
2976
2977	priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2978	if (!priv) {
2979		ret = -ENOMEM;
2980		mlog_errno(ret);
2981		goto out;
2982	}
2983	osb = inode->i_private;
2984	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2985	priv->p_dlm_debug = osb->osb_dlm_debug;
2986	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2987
2988	ret = seq_open(file, &ocfs2_dlm_seq_ops);
2989	if (ret) {
2990		kfree(priv);
2991		mlog_errno(ret);
2992		goto out;
2993	}
2994
2995	seq = file->private_data;
2996	seq->private = priv;
2997
2998	ocfs2_add_lockres_tracking(&priv->p_iter_res,
2999				   priv->p_dlm_debug);
3000
3001out:
3002	return ret;
3003}
3004
3005static const struct file_operations ocfs2_dlm_debug_fops = {
3006	.open =		ocfs2_dlm_debug_open,
3007	.release =	ocfs2_dlm_debug_release,
3008	.read =		seq_read,
3009	.llseek =	seq_lseek,
3010};
3011
3012static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
3013{
3014	int ret = 0;
3015	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
3016
3017	dlm_debug->d_locking_state = debugfs_create_file("locking_state",
3018							 S_IFREG|S_IRUSR,
3019							 osb->osb_debug_root,
3020							 osb,
3021							 &ocfs2_dlm_debug_fops);
3022	if (!dlm_debug->d_locking_state) {
3023		ret = -EINVAL;
3024		mlog(ML_ERROR,
3025		     "Unable to create locking state debugfs file.\n");
3026		goto out;
3027	}
3028
3029	ocfs2_get_dlm_debug(dlm_debug);
3030out:
3031	return ret;
3032}
3033
3034static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
3035{
3036	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
3037
3038	if (dlm_debug) {
3039		debugfs_remove(dlm_debug->d_locking_state);
3040		ocfs2_put_dlm_debug(dlm_debug);
3041	}
3042}
3043
3044int ocfs2_dlm_init(struct ocfs2_super *osb)
3045{
3046	int status = 0;
3047	struct ocfs2_cluster_connection *conn = NULL;
3048
3049	mlog_entry_void();
3050
3051	if (ocfs2_mount_local(osb)) {
3052		osb->node_num = 0;
3053		goto local;
3054	}
3055
3056	status = ocfs2_dlm_init_debug(osb);
3057	if (status < 0) {
3058		mlog_errno(status);
3059		goto bail;
3060	}
3061
3062	/* launch downconvert thread */
3063	osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
3064	if (IS_ERR(osb->dc_task)) {
3065		status = PTR_ERR(osb->dc_task);
3066		osb->dc_task = NULL;
3067		mlog_errno(status);
3068		goto bail;
3069	}
3070
3071	/* for now, uuid == domain */
3072	status = ocfs2_cluster_connect(osb->osb_cluster_stack,
3073				       osb->uuid_str,
3074				       strlen(osb->uuid_str),
3075				       &lproto, ocfs2_do_node_down, osb,
3076				       &conn);
3077	if (status) {
3078		mlog_errno(status);
3079		goto bail;
3080	}
3081
3082	status = ocfs2_cluster_this_node(&osb->node_num);
3083	if (status < 0) {
3084		mlog_errno(status);
3085		mlog(ML_ERROR,
3086		     "could not find this host's node number\n");
3087		ocfs2_cluster_disconnect(conn, 0);
3088		goto bail;
3089	}
3090
3091local:
3092	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
3093	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
3094	ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
3095	ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
3096
3097	osb->cconn = conn;
3098
3099	status = 0;
3100bail:
3101	if (status < 0) {
3102		ocfs2_dlm_shutdown_debug(osb);
3103		if (osb->dc_task)
3104			kthread_stop(osb->dc_task);
3105	}
3106
3107	mlog_exit(status);
3108	return status;
3109}
3110
3111void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
3112			int hangup_pending)
3113{
3114	mlog_entry_void();
3115
3116	ocfs2_drop_osb_locks(osb);
3117
3118	/*
3119	 * Now that we have dropped all locks and ocfs2_dismount_volume()
3120	 * has disabled recovery, the DLM won't be talking to us.  It's
3121	 * safe to tear things down before disconnecting the cluster.
3122	 */
3123
3124	if (osb->dc_task) {
3125		kthread_stop(osb->dc_task);
3126		osb->dc_task = NULL;
3127	}
3128
3129	ocfs2_lock_res_free(&osb->osb_super_lockres);
3130	ocfs2_lock_res_free(&osb->osb_rename_lockres);
3131	ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
3132	ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
3133
3134	ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
3135	osb->cconn = NULL;
3136
3137	ocfs2_dlm_shutdown_debug(osb);
3138
3139	mlog_exit_void();
3140}
3141
3142static int ocfs2_drop_lock(struct ocfs2_super *osb,
3143			   struct ocfs2_lock_res *lockres)
3144{
3145	int ret;
3146	unsigned long flags;
3147	u32 lkm_flags = 0;
3148
3149	/* We didn't get anywhere near actually using this lockres. */
3150	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
3151		goto out;
3152
3153	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3154		lkm_flags |= DLM_LKF_VALBLK;
3155
3156	spin_lock_irqsave(&lockres->l_lock, flags);
3157
3158	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
3159			"lockres %s, flags 0x%lx\n",
3160			lockres->l_name, lockres->l_flags);
3161
3162	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
3163		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
3164		     "%u, unlock_action = %u\n",
3165		     lockres->l_name, lockres->l_flags, lockres->l_action,
3166		     lockres->l_unlock_action);
3167
3168		spin_unlock_irqrestore(&lockres->l_lock, flags);
3169
3170		ocfs2_wait_on_busy_lock(lockres);
3171
3172		spin_lock_irqsave(&lockres->l_lock, flags);
3173	}
3174
3175	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3176		if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
3177		    lockres->l_level == DLM_LOCK_EX &&
3178		    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3179			lockres->l_ops->set_lvb(lockres);
3180	}
3181
3182	if (lockres->l_flags & OCFS2_LOCK_BUSY)
3183		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
3184		     lockres->l_name);
3185	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3186		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
3187
3188	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
3189		spin_unlock_irqrestore(&lockres->l_lock, flags);
3190		goto out;
3191	}
3192
3193	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
3194
3195	/* make sure we never get here while waiting for an ast to
3196	 * fire. */
3197	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
3198
3199	/* is this necessary? */
3200	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3201	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
3202	spin_unlock_irqrestore(&lockres->l_lock, flags);
3203
3204	mlog(0, "lock %s\n", lockres->l_name);
3205
3206	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
3207	if (ret) {
3208		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3209		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
3210		ocfs2_dlm_dump_lksb(&lockres->l_lksb);
3211		BUG();
3212	}
3213	mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
3214	     lockres->l_name);
3215
3216	ocfs2_wait_on_busy_lock(lockres);
3217out:
3218	mlog_exit(0);
3219	return 0;
3220}
3221
3222/* Mark the lockres as being dropped. It will no longer be
3223 * queued if blocking, but we still may have to wait on it
3224 * being dequeued from the downconvert thread before we can consider
3225 * it safe to drop.
3226 *
3227 * You can *not* attempt to call cluster_lock on this lockres anymore. */
3228void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
3229{
3230	int status;
3231	struct ocfs2_mask_waiter mw;
3232	unsigned long flags;
3233
3234	ocfs2_init_mask_waiter(&mw);
3235
3236	spin_lock_irqsave(&lockres->l_lock, flags);
3237	lockres->l_flags |= OCFS2_LOCK_FREEING;
3238	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
3239		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
3240		spin_unlock_irqrestore(&lockres->l_lock, flags);
3241
3242		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
3243
3244		status = ocfs2_wait_for_mask(&mw);
3245		if (status)
3246			mlog_errno(status);
3247
3248		spin_lock_irqsave(&lockres->l_lock, flags);
3249	}
3250	spin_unlock_irqrestore(&lockres->l_lock, flags);
3251}
3252
3253void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
3254			       struct ocfs2_lock_res *lockres)
3255{
3256	int ret;
3257
3258	ocfs2_mark_lockres_freeing(lockres);
3259	ret = ocfs2_drop_lock(osb, lockres);
3260	if (ret)
3261		mlog_errno(ret);
3262}
3263
3264static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
3265{
3266	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
3267	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
3268	ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
3269	ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
3270}
3271
3272int ocfs2_drop_inode_locks(struct inode *inode)
3273{
3274	int status, err;
3275
3276	mlog_entry_void();
3277
3278	/* No need to call ocfs2_mark_lockres_freeing here -
3279	 * ocfs2_clear_inode has done it for us. */
3280
3281	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3282			      &OCFS2_I(inode)->ip_open_lockres);
3283	if (err < 0)
3284		mlog_errno(err);
3285
3286	status = err;
3287
3288	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3289			      &OCFS2_I(inode)->ip_inode_lockres);
3290	if (err < 0)
3291		mlog_errno(err);
3292	if (err < 0 && !status)
3293		status = err;
3294
3295	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3296			      &OCFS2_I(inode)->ip_rw_lockres);
3297	if (err < 0)
3298		mlog_errno(err);
3299	if (err < 0 && !status)
3300		status = err;
3301
3302	mlog_exit(status);
3303	return status;
3304}
3305
3306static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
3307					      int new_level)
3308{
3309	assert_spin_locked(&lockres->l_lock);
3310
3311	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
3312
3313	if (lockres->l_level <= new_level) {
3314		mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, "
3315		     "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, "
3316		     "block %d, pgen %d\n", lockres->l_name, lockres->l_level,
3317		     new_level, list_empty(&lockres->l_blocked_list),
3318		     list_empty(&lockres->l_mask_waiters), lockres->l_type,
3319		     lockres->l_flags, lockres->l_ro_holders,
3320		     lockres->l_ex_holders, lockres->l_action,
3321		     lockres->l_unlock_action, lockres->l_requested,
3322		     lockres->l_blocking, lockres->l_pending_gen);
3323		BUG();
3324	}
3325
3326	mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n",
3327	     lockres->l_name, lockres->l_level, new_level, lockres->l_blocking);
3328
3329	lockres->l_action = OCFS2_AST_DOWNCONVERT;
3330	lockres->l_requested = new_level;
3331	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3332	return lockres_set_pending(lockres);
3333}
3334
3335static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3336				  struct ocfs2_lock_res *lockres,
3337				  int new_level,
3338				  int lvb,
3339				  unsigned int generation)
3340{
3341	int ret;
3342	u32 dlm_flags = DLM_LKF_CONVERT;
3343
3344	mlog_entry_void();
3345
3346	mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name,
3347	     lockres->l_level, new_level);
3348
3349	if (lvb)
3350		dlm_flags |= DLM_LKF_VALBLK;
3351
3352	ret = ocfs2_dlm_lock(osb->cconn,
3353			     new_level,
3354			     &lockres->l_lksb,
3355			     dlm_flags,
3356			     lockres->l_name,
3357			     OCFS2_LOCK_ID_MAX_LEN - 1);
3358	lockres_clear_pending(lockres, generation, osb);
3359	if (ret) {
3360		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
3361		ocfs2_recover_from_dlm_error(lockres, 1);
3362		goto bail;
3363	}
3364
3365	ret = 0;
3366bail:
3367	mlog_exit(ret);
3368	return ret;
3369}
3370
3371/* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
3372static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3373				        struct ocfs2_lock_res *lockres)
3374{
3375	assert_spin_locked(&lockres->l_lock);
3376
3377	mlog_entry_void();
3378
3379	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
3380		/* If we're already trying to cancel a lock conversion
3381		 * then just drop the spinlock and allow the caller to
3382		 * requeue this lock. */
3383		mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name);
3384		return 0;
3385	}
3386
3387	/* were we in a convert when we got the bast fire? */
3388	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
3389	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
3390	/* set things up for the unlockast to know to just
3391	 * clear out the ast_action and unset busy, etc. */
3392	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
3393
3394	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
3395			"lock %s, invalid flags: 0x%lx\n",
3396			lockres->l_name, lockres->l_flags);
3397
3398	mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3399
3400	return 1;
3401}
3402
3403static int ocfs2_cancel_convert(struct ocfs2_super *osb,
3404				struct ocfs2_lock_res *lockres)
3405{
3406	int ret;
3407
3408	mlog_entry_void();
3409
3410	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
3411			       DLM_LKF_CANCEL);
3412	if (ret) {
3413		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3414		ocfs2_recover_from_dlm_error(lockres, 0);
3415	}
3416
3417	mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3418
3419	mlog_exit(ret);
3420	return ret;
3421}
3422
3423static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3424			      struct ocfs2_lock_res *lockres,
3425			      struct ocfs2_unblock_ctl *ctl)
3426{
3427	unsigned long flags;
3428	int blocking;
3429	int new_level;
3430	int level;
3431	int ret = 0;
3432	int set_lvb = 0;
3433	unsigned int gen;
3434
3435	mlog_entry_void();
3436
3437	spin_lock_irqsave(&lockres->l_lock, flags);
3438
3439recheck:
3440	/*
3441	 * Is it still blocking? If not, we have no more work to do.
3442	 */
3443	if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
3444		BUG_ON(lockres->l_blocking != DLM_LOCK_NL);
3445		spin_unlock_irqrestore(&lockres->l_lock, flags);
3446		ret = 0;
3447		goto leave;
3448	}
3449
3450	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3451		if (lockres->l_flags & OCFS2_LOCK_PENDING) {
3452			mlog(ML_BASTS, "lockres %s, ReQ: Pending\n",
3453			     lockres->l_name);
3454			goto leave_requeue;
3455		}
3456
3457		ctl->requeue = 1;
3458		ret = ocfs2_prepare_cancel_convert(osb, lockres);
3459		spin_unlock_irqrestore(&lockres->l_lock, flags);
3460		if (ret) {
3461			ret = ocfs2_cancel_convert(osb, lockres);
3462			if (ret < 0)
3463				mlog_errno(ret);
3464		}
3465		goto leave;
3466	}
3467
3468	/*
3469	 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is
3470	 * set when the ast is received for an upconvert just before the
3471	 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast
3472	 * on the heels of the ast, we want to delay the downconvert just
3473	 * enough to allow the up requestor to do its task. Because this
3474	 * lock is in the blocked queue, the lock will be downconverted
3475	 * as soon as the requestor is done with the lock.
3476	 */
3477	if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING)
3478		goto leave_requeue;
3479
3480	/*
3481	 * How can we block and yet be at NL?  We were trying to upconvert
3482	 * from NL and got canceled.  The code comes back here, and now
3483	 * we notice and clear BLOCKING.
3484	 */
3485	if (lockres->l_level == DLM_LOCK_NL) {
3486		BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
3487		mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name);
3488		lockres->l_blocking = DLM_LOCK_NL;
3489		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
3490		spin_unlock_irqrestore(&lockres->l_lock, flags);
3491		goto leave;
3492	}
3493
3494	/* if we're blocking an exclusive and we have *any* holders,
3495	 * then requeue. */
3496	if ((lockres->l_blocking == DLM_LOCK_EX)
3497	    && (lockres->l_ex_holders || lockres->l_ro_holders)) {
3498		mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
3499		     lockres->l_name, lockres->l_ex_holders,
3500		     lockres->l_ro_holders);
3501		goto leave_requeue;
3502	}
3503
3504	/* If it's a PR we're blocking, then only
3505	 * requeue if we've got any EX holders */
3506	if (lockres->l_blocking == DLM_LOCK_PR &&
3507	    lockres->l_ex_holders) {
3508		mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n",
3509		     lockres->l_name, lockres->l_ex_holders);
3510		goto leave_requeue;
3511	}
3512
3513	/*
3514	 * Can we get a lock in this state if the holder counts are
3515	 * zero? The meta data unblock code used to check this.
3516	 */
3517	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
3518	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) {
3519		mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n",
3520		     lockres->l_name);
3521		goto leave_requeue;
3522	}
3523
3524	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
3525
3526	if (lockres->l_ops->check_downconvert
3527	    && !lockres->l_ops->check_downconvert(lockres, new_level)) {
3528		mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n",
3529		     lockres->l_name);
3530		goto leave_requeue;
3531	}
3532
3533	/* If we get here, then we know that there are no more
3534	 * incompatible holders (and anyone asking for an incompatible
3535	 * lock is blocked). We can now downconvert the lock */
3536	if (!lockres->l_ops->downconvert_worker)
3537		goto downconvert;
3538
3539	/* Some lockres types want to do a bit of work before
3540	 * downconverting a lock. Allow that here. The worker function
3541	 * may sleep, so we save off a copy of what we're blocking as
3542	 * it may change while we're not holding the spin lock. */
3543	blocking = lockres->l_blocking;
3544	level = lockres->l_level;
3545	spin_unlock_irqrestore(&lockres->l_lock, flags);
3546
3547	ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
3548
3549	if (ctl->unblock_action == UNBLOCK_STOP_POST) {
3550		mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n",
3551		     lockres->l_name);
3552		goto leave;
3553	}
3554
3555	spin_lock_irqsave(&lockres->l_lock, flags);
3556	if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
3557		/* If this changed underneath us, then we can't drop
3558		 * it just yet. */
3559		mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, "
3560		     "Recheck\n", lockres->l_name, blocking,
3561		     lockres->l_blocking, level, lockres->l_level);
3562		goto recheck;
3563	}
3564
3565downconvert:
3566	ctl->requeue = 0;
3567
3568	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3569		if (lockres->l_level == DLM_LOCK_EX)
3570			set_lvb = 1;
3571
3572		/*
3573		 * We only set the lvb if the lock has been fully
3574		 * refreshed - otherwise we risk setting stale
3575		 * data. Otherwise, there's no need to actually clear
3576		 * out the lvb here as it's value is still valid.
3577		 */
3578		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3579			lockres->l_ops->set_lvb(lockres);
3580	}
3581
3582	gen = ocfs2_prepare_downconvert(lockres, new_level);
3583	spin_unlock_irqrestore(&lockres->l_lock, flags);
3584	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3585				     gen);
3586
3587leave:
3588	mlog_exit(ret);
3589	return ret;
3590
3591leave_requeue:
3592	spin_unlock_irqrestore(&lockres->l_lock, flags);
3593	ctl->requeue = 1;
3594
3595	mlog_exit(0);
3596	return 0;
3597}
3598
3599static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3600				     int blocking)
3601{
3602	struct inode *inode;
3603	struct address_space *mapping;
3604
3605       	inode = ocfs2_lock_res_inode(lockres);
3606	mapping = inode->i_mapping;
3607
3608	if (!S_ISREG(inode->i_mode))
3609		goto out;
3610
3611	/*
3612	 * We need this before the filemap_fdatawrite() so that it can
3613	 * transfer the dirty bit from the PTE to the
3614	 * page. Unfortunately this means that even for EX->PR
3615	 * downconverts, we'll lose our mappings and have to build
3616	 * them up again.
3617	 */
3618	unmap_mapping_range(mapping, 0, 0, 0);
3619
3620	if (filemap_fdatawrite(mapping)) {
3621		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3622		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
3623	}
3624	sync_mapping_buffers(mapping);
3625	if (blocking == DLM_LOCK_EX) {
3626		truncate_inode_pages(mapping, 0);
3627	} else {
3628		/* We only need to wait on the I/O if we're not also
3629		 * truncating pages because truncate_inode_pages waits
3630		 * for us above. We don't truncate pages if we're
3631		 * blocking anything < EXMODE because we want to keep
3632		 * them around in that case. */
3633		filemap_fdatawait(mapping);
3634	}
3635
3636out:
3637	return UNBLOCK_CONTINUE;
3638}
3639
3640static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci,
3641				 struct ocfs2_lock_res *lockres,
3642				 int new_level)
3643{
3644	int checkpointed = ocfs2_ci_fully_checkpointed(ci);
3645
3646	BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
3647	BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
3648
3649	if (checkpointed)
3650		return 1;
3651
3652	ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci)));
3653	return 0;
3654}
3655
3656static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3657					int new_level)
3658{
3659	struct inode *inode = ocfs2_lock_res_inode(lockres);
3660
3661	return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level);
3662}
3663
3664static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3665{
3666	struct inode *inode = ocfs2_lock_res_inode(lockres);
3667
3668	__ocfs2_stuff_meta_lvb(inode);
3669}
3670
3671/*
3672 * Does the final reference drop on our dentry lock. Right now this
3673 * happens in the downconvert thread, but we could choose to simplify the
3674 * dlmglue API and push these off to the ocfs2_wq in the future.
3675 */
3676static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3677				     struct ocfs2_lock_res *lockres)
3678{
3679	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3680	ocfs2_dentry_lock_put(osb, dl);
3681}
3682
3683/*
3684 * d_delete() matching dentries before the lock downconvert.
3685 *
3686 * At this point, any process waiting to destroy the
3687 * dentry_lock due to last ref count is stopped by the
3688 * OCFS2_LOCK_QUEUED flag.
3689 *
3690 * We have two potential problems
3691 *
3692 * 1) If we do the last reference drop on our dentry_lock (via dput)
3693 *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
3694 *    the downconvert to finish. Instead we take an elevated
3695 *    reference and push the drop until after we've completed our
3696 *    unblock processing.
3697 *
3698 * 2) There might be another process with a final reference,
3699 *    waiting on us to finish processing. If this is the case, we
3700 *    detect it and exit out - there's no more dentries anyway.
3701 */
3702static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3703				       int blocking)
3704{
3705	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3706	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
3707	struct dentry *dentry;
3708	unsigned long flags;
3709	int extra_ref = 0;
3710
3711	/*
3712	 * This node is blocking another node from getting a read
3713	 * lock. This happens when we've renamed within a
3714	 * directory. We've forced the other nodes to d_delete(), but
3715	 * we never actually dropped our lock because it's still
3716	 * valid. The downconvert code will retain a PR for this node,
3717	 * so there's no further work to do.
3718	 */
3719	if (blocking == DLM_LOCK_PR)
3720		return UNBLOCK_CONTINUE;
3721
3722	/*
3723	 * Mark this inode as potentially orphaned. The code in
3724	 * ocfs2_delete_inode() will figure out whether it actually
3725	 * needs to be freed or not.
3726	 */
3727	spin_lock(&oi->ip_lock);
3728	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
3729	spin_unlock(&oi->ip_lock);
3730
3731	/*
3732	 * Yuck. We need to make sure however that the check of
3733	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
3734	 * respect to a reference decrement or the setting of that
3735	 * flag.
3736	 */
3737	spin_lock_irqsave(&lockres->l_lock, flags);
3738	spin_lock(&dentry_attach_lock);
3739	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
3740	    && dl->dl_count) {
3741		dl->dl_count++;
3742		extra_ref = 1;
3743	}
3744	spin_unlock(&dentry_attach_lock);
3745	spin_unlock_irqrestore(&lockres->l_lock, flags);
3746
3747	mlog(0, "extra_ref = %d\n", extra_ref);
3748
3749	/*
3750	 * We have a process waiting on us in ocfs2_dentry_iput(),
3751	 * which means we can't have any more outstanding
3752	 * aliases. There's no need to do any more work.
3753	 */
3754	if (!extra_ref)
3755		return UNBLOCK_CONTINUE;
3756
3757	spin_lock(&dentry_attach_lock);
3758	while (1) {
3759		dentry = ocfs2_find_local_alias(dl->dl_inode,
3760						dl->dl_parent_blkno, 1);
3761		if (!dentry)
3762			break;
3763		spin_unlock(&dentry_attach_lock);
3764
3765		mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
3766		     dentry->d_name.name);
3767
3768		/*
3769		 * The following dcache calls may do an
3770		 * iput(). Normally we don't want that from the
3771		 * downconverting thread, but in this case it's ok
3772		 * because the requesting node already has an
3773		 * exclusive lock on the inode, so it can't be queued
3774		 * for a downconvert.
3775		 */
3776		d_delete(dentry);
3777		dput(dentry);
3778
3779		spin_lock(&dentry_attach_lock);
3780	}
3781	spin_unlock(&dentry_attach_lock);
3782
3783	/*
3784	 * If we are the last holder of this dentry lock, there is no
3785	 * reason to downconvert so skip straight to the unlock.
3786	 */
3787	if (dl->dl_count == 1)
3788		return UNBLOCK_STOP_POST;
3789
3790	return UNBLOCK_CONTINUE_POST;
3791}
3792
3793static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
3794					    int new_level)
3795{
3796	struct ocfs2_refcount_tree *tree =
3797				ocfs2_lock_res_refcount_tree(lockres);
3798
3799	return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level);
3800}
3801
3802static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
3803					 int blocking)
3804{
3805	struct ocfs2_refcount_tree *tree =
3806				ocfs2_lock_res_refcount_tree(lockres);
3807
3808	ocfs2_metadata_cache_purge(&tree->rf_ci);
3809
3810	return UNBLOCK_CONTINUE;
3811}
3812
3813static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
3814{
3815	struct ocfs2_qinfo_lvb *lvb;
3816	struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
3817	struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
3818					    oinfo->dqi_gi.dqi_type);
3819
3820	mlog_entry_void();
3821
3822	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3823	lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
3824	lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
3825	lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
3826	lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
3827	lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
3828	lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
3829	lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
3830
3831	mlog_exit_void();
3832}
3833
3834void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
3835{
3836	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3837	struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
3838	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3839
3840	mlog_entry_void();
3841	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
3842		ocfs2_cluster_unlock(osb, lockres, level);
3843	mlog_exit_void();
3844}
3845
3846static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
3847{
3848	struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
3849					    oinfo->dqi_gi.dqi_type);
3850	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3851	struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3852	struct buffer_head *bh = NULL;
3853	struct ocfs2_global_disk_dqinfo *gdinfo;
3854	int status = 0;
3855
3856	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
3857	    lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
3858		info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
3859		info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
3860		oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
3861		oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
3862		oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
3863		oinfo->dqi_gi.dqi_free_entry =
3864					be32_to_cpu(lvb->lvb_free_entry);
3865	} else {
3866		status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode,
3867						     oinfo->dqi_giblk, &bh);
3868		if (status) {
3869			mlog_errno(status);
3870			goto bail;
3871		}
3872		gdinfo = (struct ocfs2_global_disk_dqinfo *)
3873					(bh->b_data + OCFS2_GLOBAL_INFO_OFF);
3874		info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
3875		info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
3876		oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
3877		oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
3878		oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
3879		oinfo->dqi_gi.dqi_free_entry =
3880					le32_to_cpu(gdinfo->dqi_free_entry);
3881		brelse(bh);
3882		ocfs2_track_lock_refresh(lockres);
3883	}
3884
3885bail:
3886	return status;
3887}
3888
3889/* Lock quota info, this function expects at least shared lock on the quota file
3890 * so that we can safely refresh quota info from disk. */
3891int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
3892{
3893	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3894	struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
3895	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3896	int status = 0;
3897
3898	mlog_entry_void();
3899
3900	/* On RO devices, locking really isn't needed... */
3901	if (ocfs2_is_hard_readonly(osb)) {
3902		if (ex)
3903			status = -EROFS;
3904		goto bail;
3905	}
3906	if (ocfs2_mount_local(osb))
3907		goto bail;
3908
3909	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
3910	if (status < 0) {
3911		mlog_errno(status);
3912		goto bail;
3913	}
3914	if (!ocfs2_should_refresh_lock_res(lockres))
3915		goto bail;
3916	/* OK, we have the lock but we need to refresh the quota info */
3917	status = ocfs2_refresh_qinfo(oinfo);
3918	if (status)
3919		ocfs2_qinfo_unlock(oinfo, ex);
3920	ocfs2_complete_lock_res_refresh(lockres, status);
3921bail:
3922	mlog_exit(status);
3923	return status;
3924}
3925
3926int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex)
3927{
3928	int status;
3929	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3930	struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
3931	struct ocfs2_super *osb = lockres->l_priv;
3932
3933
3934	if (ocfs2_is_hard_readonly(osb))
3935		return -EROFS;
3936
3937	if (ocfs2_mount_local(osb))
3938		return 0;
3939
3940	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
3941	if (status < 0)
3942		mlog_errno(status);
3943
3944	return status;
3945}
3946
3947void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
3948{
3949	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3950	struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
3951	struct ocfs2_super *osb = lockres->l_priv;
3952
3953	if (!ocfs2_mount_local(osb))
3954		ocfs2_cluster_unlock(osb, lockres, level);
3955}
3956
3957static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3958				       struct ocfs2_lock_res *lockres)
3959{
3960	int status;
3961	struct ocfs2_unblock_ctl ctl = {0, 0,};
3962	unsigned long flags;
3963
3964	/* Our reference to the lockres in this function can be
3965	 * considered valid until we remove the OCFS2_LOCK_QUEUED
3966	 * flag. */
3967
3968	mlog_entry_void();
3969
3970	BUG_ON(!lockres);
3971	BUG_ON(!lockres->l_ops);
3972
3973	mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name);
3974
3975	/* Detect whether a lock has been marked as going away while
3976	 * the downconvert thread was processing other things. A lock can
3977	 * still be marked with OCFS2_LOCK_FREEING after this check,
3978	 * but short circuiting here will still save us some
3979	 * performance. */
3980	spin_lock_irqsave(&lockres->l_lock, flags);
3981	if (lockres->l_flags & OCFS2_LOCK_FREEING)
3982		goto unqueue;
3983	spin_unlock_irqrestore(&lockres->l_lock, flags);
3984
3985	status = ocfs2_unblock_lock(osb, lockres, &ctl);
3986	if (status < 0)
3987		mlog_errno(status);
3988
3989	spin_lock_irqsave(&lockres->l_lock, flags);
3990unqueue:
3991	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3992		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3993	} else
3994		ocfs2_schedule_blocked_lock(osb, lockres);
3995
3996	mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name,
3997	     ctl.requeue ? "yes" : "no");
3998	spin_unlock_irqrestore(&lockres->l_lock, flags);
3999
4000	if (ctl.unblock_action != UNBLOCK_CONTINUE
4001	    && lockres->l_ops->post_unlock)
4002		lockres->l_ops->post_unlock(osb, lockres);
4003
4004	mlog_exit_void();
4005}
4006
4007static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
4008					struct ocfs2_lock_res *lockres)
4009{
4010	mlog_entry_void();
4011
4012	assert_spin_locked(&lockres->l_lock);
4013
4014	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
4015		/* Do not schedule a lock for downconvert when it's on
4016		 * the way to destruction - any nodes wanting access
4017		 * to the resource will get it soon. */
4018		mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n",
4019		     lockres->l_name, lockres->l_flags);
4020		return;
4021	}
4022
4023	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
4024
4025	spin_lock(&osb->dc_task_lock);
4026	if (list_empty(&lockres->l_blocked_list)) {
4027		list_add_tail(&lockres->l_blocked_list,
4028			      &osb->blocked_lock_list);
4029		osb->blocked_lock_count++;
4030	}
4031	spin_unlock(&osb->dc_task_lock);
4032
4033	mlog_exit_void();
4034}
4035
4036static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
4037{
4038	unsigned long processed;
4039	struct ocfs2_lock_res *lockres;
4040
4041	mlog_entry_void();
4042
4043	spin_lock(&osb->dc_task_lock);
4044	/* grab this early so we know to try again if a state change and
4045	 * wake happens part-way through our work  */
4046	osb->dc_work_sequence = osb->dc_wake_sequence;
4047
4048	processed = osb->blocked_lock_count;
4049	while (processed) {
4050		BUG_ON(list_empty(&osb->blocked_lock_list));
4051
4052		lockres = list_entry(osb->blocked_lock_list.next,
4053				     struct ocfs2_lock_res, l_blocked_list);
4054		list_del_init(&lockres->l_blocked_list);
4055		osb->blocked_lock_count--;
4056		spin_unlock(&osb->dc_task_lock);
4057
4058		BUG_ON(!processed);
4059		processed--;
4060
4061		ocfs2_process_blocked_lock(osb, lockres);
4062
4063		spin_lock(&osb->dc_task_lock);
4064	}
4065	spin_unlock(&osb->dc_task_lock);
4066
4067	mlog_exit_void();
4068}
4069
4070static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
4071{
4072	int empty = 0;
4073
4074	spin_lock(&osb->dc_task_lock);
4075	if (list_empty(&osb->blocked_lock_list))
4076		empty = 1;
4077
4078	spin_unlock(&osb->dc_task_lock);
4079	return empty;
4080}
4081
4082static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
4083{
4084	int should_wake = 0;
4085
4086	spin_lock(&osb->dc_task_lock);
4087	if (osb->dc_work_sequence != osb->dc_wake_sequence)
4088		should_wake = 1;
4089	spin_unlock(&osb->dc_task_lock);
4090
4091	return should_wake;
4092}
4093
4094static int ocfs2_downconvert_thread(void *arg)
4095{
4096	int status = 0;
4097	struct ocfs2_super *osb = arg;
4098
4099	/* only quit once we've been asked to stop and there is no more
4100	 * work available */
4101	while (!(kthread_should_stop() &&
4102		ocfs2_downconvert_thread_lists_empty(osb))) {
4103
4104		wait_event_interruptible(osb->dc_event,
4105					 ocfs2_downconvert_thread_should_wake(osb) ||
4106					 kthread_should_stop());
4107
4108		mlog(0, "downconvert_thread: awoken\n");
4109
4110		ocfs2_downconvert_thread_do_work(osb);
4111	}
4112
4113	osb->dc_task = NULL;
4114	return status;
4115}
4116
4117void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
4118{
4119	spin_lock(&osb->dc_task_lock);
4120	/* make sure the voting thread gets a swipe at whatever changes
4121	 * the caller may have made to the voting state */
4122	osb->dc_wake_sequence++;
4123	spin_unlock(&osb->dc_task_lock);
4124	wake_up(&osb->dc_event);
4125}
4126