1/* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26#include <linux/types.h> 27#include <linux/slab.h> 28#include <linux/highmem.h> 29#include <linux/mm.h> 30#include <linux/kthread.h> 31#include <linux/pagemap.h> 32#include <linux/debugfs.h> 33#include <linux/seq_file.h> 34#include <linux/time.h> 35#include <linux/quotaops.h> 36 37#define MLOG_MASK_PREFIX ML_DLM_GLUE 38#include <cluster/masklog.h> 39 40#include "ocfs2.h" 41#include "ocfs2_lockingver.h" 42 43#include "alloc.h" 44#include "dcache.h" 45#include "dlmglue.h" 46#include "extent_map.h" 47#include "file.h" 48#include "heartbeat.h" 49#include "inode.h" 50#include "journal.h" 51#include "stackglue.h" 52#include "slot_map.h" 53#include "super.h" 54#include "uptodate.h" 55#include "quota.h" 56#include "refcounttree.h" 57 58#include "buffer_head_io.h" 59 60struct ocfs2_mask_waiter { 61 struct list_head mw_item; 62 int mw_status; 63 struct completion mw_complete; 64 unsigned long mw_mask; 65 unsigned long mw_goal; 66#ifdef CONFIG_OCFS2_FS_STATS 67 unsigned long long mw_lock_start; 68#endif 69}; 70 71static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 72static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 73static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 74static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres); 75 76/* 77 * Return value from ->downconvert_worker functions. 78 * 79 * These control the precise actions of ocfs2_unblock_lock() 80 * and ocfs2_process_blocked_lock() 81 * 82 */ 83enum ocfs2_unblock_action { 84 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 85 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 86 * ->post_unlock callback */ 87 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 88 * ->post_unlock() callback. */ 89}; 90 91struct ocfs2_unblock_ctl { 92 int requeue; 93 enum ocfs2_unblock_action unblock_action; 94}; 95 96/* Lockdep class keys */ 97struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES]; 98 99static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 100 int new_level); 101static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 102 103static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 104 int blocking); 105 106static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 107 int blocking); 108 109static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 110 struct ocfs2_lock_res *lockres); 111 112static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres); 113 114static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 115 int new_level); 116static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 117 int blocking); 118 119#define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 120 121/* This aids in debugging situations where a bad LVB might be involved. */ 122static void ocfs2_dump_meta_lvb_info(u64 level, 123 const char *function, 124 unsigned int line, 125 struct ocfs2_lock_res *lockres) 126{ 127 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 128 129 mlog(level, "LVB information for %s (called from %s:%u):\n", 130 lockres->l_name, function, line); 131 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 132 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 133 be32_to_cpu(lvb->lvb_igeneration)); 134 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 135 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 136 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 137 be16_to_cpu(lvb->lvb_imode)); 138 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 139 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 140 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 141 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 142 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 143 be32_to_cpu(lvb->lvb_iattr)); 144} 145 146 147/* 148 * OCFS2 Lock Resource Operations 149 * 150 * These fine tune the behavior of the generic dlmglue locking infrastructure. 151 * 152 * The most basic of lock types can point ->l_priv to their respective 153 * struct ocfs2_super and allow the default actions to manage things. 154 * 155 * Right now, each lock type also needs to implement an init function, 156 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 157 * should be called when the lock is no longer needed (i.e., object 158 * destruction time). 159 */ 160struct ocfs2_lock_res_ops { 161 /* 162 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 163 * this callback if ->l_priv is not an ocfs2_super pointer 164 */ 165 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 166 167 /* 168 * Optionally called in the downconvert thread after a 169 * successful downconvert. The lockres will not be referenced 170 * after this callback is called, so it is safe to free 171 * memory, etc. 172 * 173 * The exact semantics of when this is called are controlled 174 * by ->downconvert_worker() 175 */ 176 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 177 178 /* 179 * Allow a lock type to add checks to determine whether it is 180 * safe to downconvert a lock. Return 0 to re-queue the 181 * downconvert at a later time, nonzero to continue. 182 * 183 * For most locks, the default checks that there are no 184 * incompatible holders are sufficient. 185 * 186 * Called with the lockres spinlock held. 187 */ 188 int (*check_downconvert)(struct ocfs2_lock_res *, int); 189 190 /* 191 * Allows a lock type to populate the lock value block. This 192 * is called on downconvert, and when we drop a lock. 193 * 194 * Locks that want to use this should set LOCK_TYPE_USES_LVB 195 * in the flags field. 196 * 197 * Called with the lockres spinlock held. 198 */ 199 void (*set_lvb)(struct ocfs2_lock_res *); 200 201 /* 202 * Called from the downconvert thread when it is determined 203 * that a lock will be downconverted. This is called without 204 * any locks held so the function can do work that might 205 * schedule (syncing out data, etc). 206 * 207 * This should return any one of the ocfs2_unblock_action 208 * values, depending on what it wants the thread to do. 209 */ 210 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 211 212 /* 213 * LOCK_TYPE_* flags which describe the specific requirements 214 * of a lock type. Descriptions of each individual flag follow. 215 */ 216 int flags; 217}; 218 219/* 220 * Some locks want to "refresh" potentially stale data when a 221 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 222 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 223 * individual lockres l_flags member from the ast function. It is 224 * expected that the locking wrapper will clear the 225 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 226 */ 227#define LOCK_TYPE_REQUIRES_REFRESH 0x1 228 229/* 230 * Indicate that a lock type makes use of the lock value block. The 231 * ->set_lvb lock type callback must be defined. 232 */ 233#define LOCK_TYPE_USES_LVB 0x2 234 235static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 236 .get_osb = ocfs2_get_inode_osb, 237 .flags = 0, 238}; 239 240static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 241 .get_osb = ocfs2_get_inode_osb, 242 .check_downconvert = ocfs2_check_meta_downconvert, 243 .set_lvb = ocfs2_set_meta_lvb, 244 .downconvert_worker = ocfs2_data_convert_worker, 245 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 246}; 247 248static struct ocfs2_lock_res_ops ocfs2_super_lops = { 249 .flags = LOCK_TYPE_REQUIRES_REFRESH, 250}; 251 252static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 253 .flags = 0, 254}; 255 256static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { 257 .flags = 0, 258}; 259 260static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = { 261 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 262}; 263 264static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 265 .get_osb = ocfs2_get_dentry_osb, 266 .post_unlock = ocfs2_dentry_post_unlock, 267 .downconvert_worker = ocfs2_dentry_convert_worker, 268 .flags = 0, 269}; 270 271static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 272 .get_osb = ocfs2_get_inode_osb, 273 .flags = 0, 274}; 275 276static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 277 .get_osb = ocfs2_get_file_osb, 278 .flags = 0, 279}; 280 281static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = { 282 .set_lvb = ocfs2_set_qinfo_lvb, 283 .get_osb = ocfs2_get_qinfo_osb, 284 .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB, 285}; 286 287static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = { 288 .check_downconvert = ocfs2_check_refcount_downconvert, 289 .downconvert_worker = ocfs2_refcount_convert_worker, 290 .flags = 0, 291}; 292 293static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 294{ 295 return lockres->l_type == OCFS2_LOCK_TYPE_META || 296 lockres->l_type == OCFS2_LOCK_TYPE_RW || 297 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 298} 299 300static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 301{ 302 return container_of(lksb, struct ocfs2_lock_res, l_lksb); 303} 304 305static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 306{ 307 BUG_ON(!ocfs2_is_inode_lock(lockres)); 308 309 return (struct inode *) lockres->l_priv; 310} 311 312static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 313{ 314 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 315 316 return (struct ocfs2_dentry_lock *)lockres->l_priv; 317} 318 319static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres) 320{ 321 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO); 322 323 return (struct ocfs2_mem_dqinfo *)lockres->l_priv; 324} 325 326static inline struct ocfs2_refcount_tree * 327ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res) 328{ 329 return container_of(res, struct ocfs2_refcount_tree, rf_lockres); 330} 331 332static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 333{ 334 if (lockres->l_ops->get_osb) 335 return lockres->l_ops->get_osb(lockres); 336 337 return (struct ocfs2_super *)lockres->l_priv; 338} 339 340static int ocfs2_lock_create(struct ocfs2_super *osb, 341 struct ocfs2_lock_res *lockres, 342 int level, 343 u32 dlm_flags); 344static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 345 int wanted); 346static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 347 struct ocfs2_lock_res *lockres, 348 int level, unsigned long caller_ip); 349static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb, 350 struct ocfs2_lock_res *lockres, 351 int level) 352{ 353 __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_); 354} 355 356static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 357static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 358static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 359static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 360static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 361 struct ocfs2_lock_res *lockres); 362static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 363 int convert); 364#define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 365 if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ 366 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 367 _err, _func, _lockres->l_name); \ 368 else \ 369 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \ 370 _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \ 371 (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \ 372} while (0) 373static int ocfs2_downconvert_thread(void *arg); 374static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 375 struct ocfs2_lock_res *lockres); 376static int ocfs2_inode_lock_update(struct inode *inode, 377 struct buffer_head **bh); 378static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 379static inline int ocfs2_highest_compat_lock_level(int level); 380static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 381 int new_level); 382static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 383 struct ocfs2_lock_res *lockres, 384 int new_level, 385 int lvb, 386 unsigned int generation); 387static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 388 struct ocfs2_lock_res *lockres); 389static int ocfs2_cancel_convert(struct ocfs2_super *osb, 390 struct ocfs2_lock_res *lockres); 391 392 393static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 394 u64 blkno, 395 u32 generation, 396 char *name) 397{ 398 int len; 399 400 mlog_entry_void(); 401 402 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 403 404 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 405 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 406 (long long)blkno, generation); 407 408 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 409 410 mlog(0, "built lock resource with name: %s\n", name); 411 412 mlog_exit_void(); 413} 414 415static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 416 417static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 418 struct ocfs2_dlm_debug *dlm_debug) 419{ 420 mlog(0, "Add tracking for lockres %s\n", res->l_name); 421 422 spin_lock(&ocfs2_dlm_tracking_lock); 423 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 424 spin_unlock(&ocfs2_dlm_tracking_lock); 425} 426 427static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 428{ 429 spin_lock(&ocfs2_dlm_tracking_lock); 430 if (!list_empty(&res->l_debug_list)) 431 list_del_init(&res->l_debug_list); 432 spin_unlock(&ocfs2_dlm_tracking_lock); 433} 434 435#ifdef CONFIG_OCFS2_FS_STATS 436static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 437{ 438 res->l_lock_num_prmode = 0; 439 res->l_lock_num_prmode_failed = 0; 440 res->l_lock_total_prmode = 0; 441 res->l_lock_max_prmode = 0; 442 res->l_lock_num_exmode = 0; 443 res->l_lock_num_exmode_failed = 0; 444 res->l_lock_total_exmode = 0; 445 res->l_lock_max_exmode = 0; 446 res->l_lock_refresh = 0; 447} 448 449static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 450 struct ocfs2_mask_waiter *mw, int ret) 451{ 452 unsigned long long *num, *sum; 453 unsigned int *max, *failed; 454 struct timespec ts = current_kernel_time(); 455 unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start; 456 457 if (level == LKM_PRMODE) { 458 num = &res->l_lock_num_prmode; 459 sum = &res->l_lock_total_prmode; 460 max = &res->l_lock_max_prmode; 461 failed = &res->l_lock_num_prmode_failed; 462 } else if (level == LKM_EXMODE) { 463 num = &res->l_lock_num_exmode; 464 sum = &res->l_lock_total_exmode; 465 max = &res->l_lock_max_exmode; 466 failed = &res->l_lock_num_exmode_failed; 467 } else 468 return; 469 470 (*num)++; 471 (*sum) += time; 472 if (time > *max) 473 *max = time; 474 if (ret) 475 (*failed)++; 476} 477 478static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 479{ 480 lockres->l_lock_refresh++; 481} 482 483static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 484{ 485 struct timespec ts = current_kernel_time(); 486 mw->mw_lock_start = timespec_to_ns(&ts); 487} 488#else 489static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 490{ 491} 492static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 493 int level, struct ocfs2_mask_waiter *mw, int ret) 494{ 495} 496static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 497{ 498} 499static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 500{ 501} 502#endif 503 504static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 505 struct ocfs2_lock_res *res, 506 enum ocfs2_lock_type type, 507 struct ocfs2_lock_res_ops *ops, 508 void *priv) 509{ 510 res->l_type = type; 511 res->l_ops = ops; 512 res->l_priv = priv; 513 514 res->l_level = DLM_LOCK_IV; 515 res->l_requested = DLM_LOCK_IV; 516 res->l_blocking = DLM_LOCK_IV; 517 res->l_action = OCFS2_AST_INVALID; 518 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 519 520 res->l_flags = OCFS2_LOCK_INITIALIZED; 521 522 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 523 524 ocfs2_init_lock_stats(res); 525#ifdef CONFIG_DEBUG_LOCK_ALLOC 526 if (type != OCFS2_LOCK_TYPE_OPEN) 527 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type], 528 &lockdep_keys[type], 0); 529 else 530 res->l_lockdep_map.key = NULL; 531#endif 532} 533 534void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 535{ 536 /* This also clears out the lock status block */ 537 memset(res, 0, sizeof(struct ocfs2_lock_res)); 538 spin_lock_init(&res->l_lock); 539 init_waitqueue_head(&res->l_event); 540 INIT_LIST_HEAD(&res->l_blocked_list); 541 INIT_LIST_HEAD(&res->l_mask_waiters); 542} 543 544void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 545 enum ocfs2_lock_type type, 546 unsigned int generation, 547 struct inode *inode) 548{ 549 struct ocfs2_lock_res_ops *ops; 550 551 switch(type) { 552 case OCFS2_LOCK_TYPE_RW: 553 ops = &ocfs2_inode_rw_lops; 554 break; 555 case OCFS2_LOCK_TYPE_META: 556 ops = &ocfs2_inode_inode_lops; 557 break; 558 case OCFS2_LOCK_TYPE_OPEN: 559 ops = &ocfs2_inode_open_lops; 560 break; 561 default: 562 mlog_bug_on_msg(1, "type: %d\n", type); 563 ops = NULL; /* thanks, gcc */ 564 break; 565 }; 566 567 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 568 generation, res->l_name); 569 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 570} 571 572static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 573{ 574 struct inode *inode = ocfs2_lock_res_inode(lockres); 575 576 return OCFS2_SB(inode->i_sb); 577} 578 579static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres) 580{ 581 struct ocfs2_mem_dqinfo *info = lockres->l_priv; 582 583 return OCFS2_SB(info->dqi_gi.dqi_sb); 584} 585 586static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 587{ 588 struct ocfs2_file_private *fp = lockres->l_priv; 589 590 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 591} 592 593static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 594{ 595 __be64 inode_blkno_be; 596 597 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 598 sizeof(__be64)); 599 600 return be64_to_cpu(inode_blkno_be); 601} 602 603static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 604{ 605 struct ocfs2_dentry_lock *dl = lockres->l_priv; 606 607 return OCFS2_SB(dl->dl_inode->i_sb); 608} 609 610void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 611 u64 parent, struct inode *inode) 612{ 613 int len; 614 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 615 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 616 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 617 618 ocfs2_lock_res_init_once(lockres); 619 620 /* 621 * Unfortunately, the standard lock naming scheme won't work 622 * here because we have two 16 byte values to use. Instead, 623 * we'll stuff the inode number as a binary value. We still 624 * want error prints to show something without garbling the 625 * display, so drop a null byte in there before the inode 626 * number. A future version of OCFS2 will likely use all 627 * binary lock names. The stringified names have been a 628 * tremendous aid in debugging, but now that the debugfs 629 * interface exists, we can mangle things there if need be. 630 * 631 * NOTE: We also drop the standard "pad" value (the total lock 632 * name size stays the same though - the last part is all 633 * zeros due to the memset in ocfs2_lock_res_init_once() 634 */ 635 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 636 "%c%016llx", 637 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 638 (long long)parent); 639 640 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 641 642 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 643 sizeof(__be64)); 644 645 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 646 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 647 dl); 648} 649 650static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 651 struct ocfs2_super *osb) 652{ 653 /* Superblock lockres doesn't come from a slab so we call init 654 * once on it manually. */ 655 ocfs2_lock_res_init_once(res); 656 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 657 0, res->l_name); 658 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 659 &ocfs2_super_lops, osb); 660} 661 662static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 663 struct ocfs2_super *osb) 664{ 665 /* Rename lockres doesn't come from a slab so we call init 666 * once on it manually. */ 667 ocfs2_lock_res_init_once(res); 668 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 669 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 670 &ocfs2_rename_lops, osb); 671} 672 673static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, 674 struct ocfs2_super *osb) 675{ 676 /* nfs_sync lockres doesn't come from a slab so we call init 677 * once on it manually. */ 678 ocfs2_lock_res_init_once(res); 679 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name); 680 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC, 681 &ocfs2_nfs_sync_lops, osb); 682} 683 684static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res, 685 struct ocfs2_super *osb) 686{ 687 ocfs2_lock_res_init_once(res); 688 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name); 689 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN, 690 &ocfs2_orphan_scan_lops, osb); 691} 692 693void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 694 struct ocfs2_file_private *fp) 695{ 696 struct inode *inode = fp->fp_file->f_mapping->host; 697 struct ocfs2_inode_info *oi = OCFS2_I(inode); 698 699 ocfs2_lock_res_init_once(lockres); 700 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 701 inode->i_generation, lockres->l_name); 702 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 703 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 704 fp); 705 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 706} 707 708void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres, 709 struct ocfs2_mem_dqinfo *info) 710{ 711 ocfs2_lock_res_init_once(lockres); 712 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type, 713 0, lockres->l_name); 714 ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres, 715 OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops, 716 info); 717} 718 719void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres, 720 struct ocfs2_super *osb, u64 ref_blkno, 721 unsigned int generation) 722{ 723 ocfs2_lock_res_init_once(lockres); 724 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno, 725 generation, lockres->l_name); 726 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT, 727 &ocfs2_refcount_block_lops, osb); 728} 729 730void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 731{ 732 mlog_entry_void(); 733 734 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 735 return; 736 737 ocfs2_remove_lockres_tracking(res); 738 739 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 740 "Lockres %s is on the blocked list\n", 741 res->l_name); 742 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 743 "Lockres %s has mask waiters pending\n", 744 res->l_name); 745 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 746 "Lockres %s is locked\n", 747 res->l_name); 748 mlog_bug_on_msg(res->l_ro_holders, 749 "Lockres %s has %u ro holders\n", 750 res->l_name, res->l_ro_holders); 751 mlog_bug_on_msg(res->l_ex_holders, 752 "Lockres %s has %u ex holders\n", 753 res->l_name, res->l_ex_holders); 754 755 /* Need to clear out the lock status block for the dlm */ 756 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 757 758 res->l_flags = 0UL; 759 mlog_exit_void(); 760} 761 762static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 763 int level) 764{ 765 mlog_entry_void(); 766 767 BUG_ON(!lockres); 768 769 switch(level) { 770 case DLM_LOCK_EX: 771 lockres->l_ex_holders++; 772 break; 773 case DLM_LOCK_PR: 774 lockres->l_ro_holders++; 775 break; 776 default: 777 BUG(); 778 } 779 780 mlog_exit_void(); 781} 782 783static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 784 int level) 785{ 786 mlog_entry_void(); 787 788 BUG_ON(!lockres); 789 790 switch(level) { 791 case DLM_LOCK_EX: 792 BUG_ON(!lockres->l_ex_holders); 793 lockres->l_ex_holders--; 794 break; 795 case DLM_LOCK_PR: 796 BUG_ON(!lockres->l_ro_holders); 797 lockres->l_ro_holders--; 798 break; 799 default: 800 BUG(); 801 } 802 mlog_exit_void(); 803} 804 805/* WARNING: This function lives in a world where the only three lock 806 * levels are EX, PR, and NL. It *will* have to be adjusted when more 807 * lock types are added. */ 808static inline int ocfs2_highest_compat_lock_level(int level) 809{ 810 int new_level = DLM_LOCK_EX; 811 812 if (level == DLM_LOCK_EX) 813 new_level = DLM_LOCK_NL; 814 else if (level == DLM_LOCK_PR) 815 new_level = DLM_LOCK_PR; 816 return new_level; 817} 818 819static void lockres_set_flags(struct ocfs2_lock_res *lockres, 820 unsigned long newflags) 821{ 822 struct ocfs2_mask_waiter *mw, *tmp; 823 824 assert_spin_locked(&lockres->l_lock); 825 826 lockres->l_flags = newflags; 827 828 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 829 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 830 continue; 831 832 list_del_init(&mw->mw_item); 833 mw->mw_status = 0; 834 complete(&mw->mw_complete); 835 } 836} 837static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 838{ 839 lockres_set_flags(lockres, lockres->l_flags | or); 840} 841static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 842 unsigned long clear) 843{ 844 lockres_set_flags(lockres, lockres->l_flags & ~clear); 845} 846 847static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 848{ 849 mlog_entry_void(); 850 851 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 852 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 853 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 854 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 855 856 lockres->l_level = lockres->l_requested; 857 if (lockres->l_level <= 858 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 859 lockres->l_blocking = DLM_LOCK_NL; 860 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 861 } 862 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 863 864 mlog_exit_void(); 865} 866 867static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 868{ 869 mlog_entry_void(); 870 871 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 872 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 873 874 /* Convert from RO to EX doesn't really need anything as our 875 * information is already up to data. Convert from NL to 876 * *anything* however should mark ourselves as needing an 877 * update */ 878 if (lockres->l_level == DLM_LOCK_NL && 879 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 880 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 881 882 lockres->l_level = lockres->l_requested; 883 884 /* 885 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing 886 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from 887 * downconverting the lock before the upconvert has fully completed. 888 */ 889 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 890 891 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 892 893 mlog_exit_void(); 894} 895 896static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 897{ 898 mlog_entry_void(); 899 900 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 901 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 902 903 if (lockres->l_requested > DLM_LOCK_NL && 904 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 905 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 906 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 907 908 lockres->l_level = lockres->l_requested; 909 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 910 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 911 912 mlog_exit_void(); 913} 914 915static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 916 int level) 917{ 918 int needs_downconvert = 0; 919 mlog_entry_void(); 920 921 assert_spin_locked(&lockres->l_lock); 922 923 if (level > lockres->l_blocking) { 924 /* only schedule a downconvert if we haven't already scheduled 925 * one that goes low enough to satisfy the level we're 926 * blocking. this also catches the case where we get 927 * duplicate BASTs */ 928 if (ocfs2_highest_compat_lock_level(level) < 929 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 930 needs_downconvert = 1; 931 932 lockres->l_blocking = level; 933 } 934 935 mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", 936 lockres->l_name, level, lockres->l_level, lockres->l_blocking, 937 needs_downconvert); 938 939 if (needs_downconvert) 940 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 941 942 mlog_exit(needs_downconvert); 943 return needs_downconvert; 944} 945 946/* 947 * OCFS2_LOCK_PENDING and l_pending_gen. 948 * 949 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 950 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 951 * for more details on the race. 952 * 953 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 954 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 955 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 956 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 957 * the caller is going to try to clear PENDING again. If nothing else is 958 * happening, __lockres_clear_pending() sees PENDING is unset and does 959 * nothing. 960 * 961 * But what if another path (eg downconvert thread) has just started a 962 * new locking action? The other path has re-set PENDING. Our path 963 * cannot clear PENDING, because that will re-open the original race 964 * window. 965 * 966 * [Example] 967 * 968 * ocfs2_meta_lock() 969 * ocfs2_cluster_lock() 970 * set BUSY 971 * set PENDING 972 * drop l_lock 973 * ocfs2_dlm_lock() 974 * ocfs2_locking_ast() ocfs2_downconvert_thread() 975 * clear PENDING ocfs2_unblock_lock() 976 * take_l_lock 977 * !BUSY 978 * ocfs2_prepare_downconvert() 979 * set BUSY 980 * set PENDING 981 * drop l_lock 982 * take l_lock 983 * clear PENDING 984 * drop l_lock 985 * <window> 986 * ocfs2_dlm_lock() 987 * 988 * So as you can see, we now have a window where l_lock is not held, 989 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 990 * 991 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 992 * set by ocfs2_prepare_downconvert(). That wasn't nice. 993 * 994 * To solve this we introduce l_pending_gen. A call to 995 * lockres_clear_pending() will only do so when it is passed a generation 996 * number that matches the lockres. lockres_set_pending() will return the 997 * current generation number. When ocfs2_cluster_lock() goes to clear 998 * PENDING, it passes the generation it got from set_pending(). In our 999 * example above, the generation numbers will *not* match. Thus, 1000 * ocfs2_cluster_lock() will not clear the PENDING set by 1001 * ocfs2_prepare_downconvert(). 1002 */ 1003 1004/* Unlocked version for ocfs2_locking_ast() */ 1005static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 1006 unsigned int generation, 1007 struct ocfs2_super *osb) 1008{ 1009 assert_spin_locked(&lockres->l_lock); 1010 1011 /* 1012 * The ast and locking functions can race us here. The winner 1013 * will clear pending, the loser will not. 1014 */ 1015 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 1016 (lockres->l_pending_gen != generation)) 1017 return; 1018 1019 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 1020 lockres->l_pending_gen++; 1021 1022 /* 1023 * The downconvert thread may have skipped us because we 1024 * were PENDING. Wake it up. 1025 */ 1026 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1027 ocfs2_wake_downconvert_thread(osb); 1028} 1029 1030/* Locked version for callers of ocfs2_dlm_lock() */ 1031static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 1032 unsigned int generation, 1033 struct ocfs2_super *osb) 1034{ 1035 unsigned long flags; 1036 1037 spin_lock_irqsave(&lockres->l_lock, flags); 1038 __lockres_clear_pending(lockres, generation, osb); 1039 spin_unlock_irqrestore(&lockres->l_lock, flags); 1040} 1041 1042static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 1043{ 1044 assert_spin_locked(&lockres->l_lock); 1045 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 1046 1047 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 1048 1049 return lockres->l_pending_gen; 1050} 1051 1052static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) 1053{ 1054 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1055 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1056 int needs_downconvert; 1057 unsigned long flags; 1058 1059 BUG_ON(level <= DLM_LOCK_NL); 1060 1061 mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " 1062 "type %s\n", lockres->l_name, level, lockres->l_level, 1063 ocfs2_lock_type_string(lockres->l_type)); 1064 1065 /* 1066 * We can skip the bast for locks which don't enable caching - 1067 * they'll be dropped at the earliest possible time anyway. 1068 */ 1069 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 1070 return; 1071 1072 spin_lock_irqsave(&lockres->l_lock, flags); 1073 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 1074 if (needs_downconvert) 1075 ocfs2_schedule_blocked_lock(osb, lockres); 1076 spin_unlock_irqrestore(&lockres->l_lock, flags); 1077 1078 wake_up(&lockres->l_event); 1079 1080 ocfs2_wake_downconvert_thread(osb); 1081} 1082 1083static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) 1084{ 1085 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1086 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1087 unsigned long flags; 1088 int status; 1089 1090 spin_lock_irqsave(&lockres->l_lock, flags); 1091 1092 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 1093 1094 if (status == -EAGAIN) { 1095 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1096 goto out; 1097 } 1098 1099 if (status) { 1100 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 1101 lockres->l_name, status); 1102 spin_unlock_irqrestore(&lockres->l_lock, flags); 1103 return; 1104 } 1105 1106 mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " 1107 "level %d => %d\n", lockres->l_name, lockres->l_action, 1108 lockres->l_unlock_action, lockres->l_level, lockres->l_requested); 1109 1110 switch(lockres->l_action) { 1111 case OCFS2_AST_ATTACH: 1112 ocfs2_generic_handle_attach_action(lockres); 1113 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 1114 break; 1115 case OCFS2_AST_CONVERT: 1116 ocfs2_generic_handle_convert_action(lockres); 1117 break; 1118 case OCFS2_AST_DOWNCONVERT: 1119 ocfs2_generic_handle_downconvert_action(lockres); 1120 break; 1121 default: 1122 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " 1123 "flags 0x%lx, unlock: %u\n", 1124 lockres->l_name, lockres->l_action, lockres->l_flags, 1125 lockres->l_unlock_action); 1126 BUG(); 1127 } 1128out: 1129 /* set it to something invalid so if we get called again we 1130 * can catch it. */ 1131 lockres->l_action = OCFS2_AST_INVALID; 1132 1133 /* Did we try to cancel this lock? Clear that state */ 1134 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1135 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1136 1137 /* 1138 * We may have beaten the locking functions here. We certainly 1139 * know that dlm_lock() has been called :-) 1140 * Because we can't have two lock calls in flight at once, we 1141 * can use lockres->l_pending_gen. 1142 */ 1143 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1144 1145 wake_up(&lockres->l_event); 1146 spin_unlock_irqrestore(&lockres->l_lock, flags); 1147} 1148 1149static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) 1150{ 1151 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1152 unsigned long flags; 1153 1154 mlog_entry_void(); 1155 1156 mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", 1157 lockres->l_name, lockres->l_unlock_action); 1158 1159 spin_lock_irqsave(&lockres->l_lock, flags); 1160 if (error) { 1161 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 1162 "unlock_action %d\n", error, lockres->l_name, 1163 lockres->l_unlock_action); 1164 spin_unlock_irqrestore(&lockres->l_lock, flags); 1165 mlog_exit_void(); 1166 return; 1167 } 1168 1169 switch(lockres->l_unlock_action) { 1170 case OCFS2_UNLOCK_CANCEL_CONVERT: 1171 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 1172 lockres->l_action = OCFS2_AST_INVALID; 1173 /* Downconvert thread may have requeued this lock, we 1174 * need to wake it. */ 1175 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1176 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); 1177 break; 1178 case OCFS2_UNLOCK_DROP_LOCK: 1179 lockres->l_level = DLM_LOCK_IV; 1180 break; 1181 default: 1182 BUG(); 1183 } 1184 1185 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1186 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1187 wake_up(&lockres->l_event); 1188 spin_unlock_irqrestore(&lockres->l_lock, flags); 1189 1190 mlog_exit_void(); 1191} 1192 1193/* 1194 * This is the filesystem locking protocol. It provides the lock handling 1195 * hooks for the underlying DLM. It has a maximum version number. 1196 * The version number allows interoperability with systems running at 1197 * the same major number and an equal or smaller minor number. 1198 * 1199 * Whenever the filesystem does new things with locks (adds or removes a 1200 * lock, orders them differently, does different things underneath a lock), 1201 * the version must be changed. The protocol is negotiated when joining 1202 * the dlm domain. A node may join the domain if its major version is 1203 * identical to all other nodes and its minor version is greater than 1204 * or equal to all other nodes. When its minor version is greater than 1205 * the other nodes, it will run at the minor version specified by the 1206 * other nodes. 1207 * 1208 * If a locking change is made that will not be compatible with older 1209 * versions, the major number must be increased and the minor version set 1210 * to zero. If a change merely adds a behavior that can be disabled when 1211 * speaking to older versions, the minor version must be increased. If a 1212 * change adds a fully backwards compatible change (eg, LVB changes that 1213 * are just ignored by older versions), the version does not need to be 1214 * updated. 1215 */ 1216static struct ocfs2_locking_protocol lproto = { 1217 .lp_max_version = { 1218 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 1219 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 1220 }, 1221 .lp_lock_ast = ocfs2_locking_ast, 1222 .lp_blocking_ast = ocfs2_blocking_ast, 1223 .lp_unlock_ast = ocfs2_unlock_ast, 1224}; 1225 1226void ocfs2_set_locking_protocol(void) 1227{ 1228 ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); 1229} 1230 1231static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1232 int convert) 1233{ 1234 unsigned long flags; 1235 1236 mlog_entry_void(); 1237 spin_lock_irqsave(&lockres->l_lock, flags); 1238 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1239 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1240 if (convert) 1241 lockres->l_action = OCFS2_AST_INVALID; 1242 else 1243 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1244 spin_unlock_irqrestore(&lockres->l_lock, flags); 1245 1246 wake_up(&lockres->l_event); 1247 mlog_exit_void(); 1248} 1249 1250/* Note: If we detect another process working on the lock (i.e., 1251 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1252 * to do the right thing in that case. 1253 */ 1254static int ocfs2_lock_create(struct ocfs2_super *osb, 1255 struct ocfs2_lock_res *lockres, 1256 int level, 1257 u32 dlm_flags) 1258{ 1259 int ret = 0; 1260 unsigned long flags; 1261 unsigned int gen; 1262 1263 mlog_entry_void(); 1264 1265 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1266 dlm_flags); 1267 1268 spin_lock_irqsave(&lockres->l_lock, flags); 1269 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1270 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1271 spin_unlock_irqrestore(&lockres->l_lock, flags); 1272 goto bail; 1273 } 1274 1275 lockres->l_action = OCFS2_AST_ATTACH; 1276 lockres->l_requested = level; 1277 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1278 gen = lockres_set_pending(lockres); 1279 spin_unlock_irqrestore(&lockres->l_lock, flags); 1280 1281 ret = ocfs2_dlm_lock(osb->cconn, 1282 level, 1283 &lockres->l_lksb, 1284 dlm_flags, 1285 lockres->l_name, 1286 OCFS2_LOCK_ID_MAX_LEN - 1); 1287 lockres_clear_pending(lockres, gen, osb); 1288 if (ret) { 1289 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1290 ocfs2_recover_from_dlm_error(lockres, 1); 1291 } 1292 1293 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1294 1295bail: 1296 mlog_exit(ret); 1297 return ret; 1298} 1299 1300static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1301 int flag) 1302{ 1303 unsigned long flags; 1304 int ret; 1305 1306 spin_lock_irqsave(&lockres->l_lock, flags); 1307 ret = lockres->l_flags & flag; 1308 spin_unlock_irqrestore(&lockres->l_lock, flags); 1309 1310 return ret; 1311} 1312 1313static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1314 1315{ 1316 wait_event(lockres->l_event, 1317 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1318} 1319 1320static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1321 1322{ 1323 wait_event(lockres->l_event, 1324 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1325} 1326 1327/* predict what lock level we'll be dropping down to on behalf 1328 * of another node, and return true if the currently wanted 1329 * level will be compatible with it. */ 1330static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1331 int wanted) 1332{ 1333 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1334 1335 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1336} 1337 1338static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1339{ 1340 INIT_LIST_HEAD(&mw->mw_item); 1341 init_completion(&mw->mw_complete); 1342 ocfs2_init_start_time(mw); 1343} 1344 1345static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1346{ 1347 wait_for_completion(&mw->mw_complete); 1348 /* Re-arm the completion in case we want to wait on it again */ 1349 INIT_COMPLETION(mw->mw_complete); 1350 return mw->mw_status; 1351} 1352 1353static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1354 struct ocfs2_mask_waiter *mw, 1355 unsigned long mask, 1356 unsigned long goal) 1357{ 1358 BUG_ON(!list_empty(&mw->mw_item)); 1359 1360 assert_spin_locked(&lockres->l_lock); 1361 1362 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1363 mw->mw_mask = mask; 1364 mw->mw_goal = goal; 1365} 1366 1367/* returns 0 if the mw that was removed was already satisfied, -EBUSY 1368 * if the mask still hadn't reached its goal */ 1369static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1370 struct ocfs2_mask_waiter *mw) 1371{ 1372 unsigned long flags; 1373 int ret = 0; 1374 1375 spin_lock_irqsave(&lockres->l_lock, flags); 1376 if (!list_empty(&mw->mw_item)) { 1377 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1378 ret = -EBUSY; 1379 1380 list_del_init(&mw->mw_item); 1381 init_completion(&mw->mw_complete); 1382 } 1383 spin_unlock_irqrestore(&lockres->l_lock, flags); 1384 1385 return ret; 1386 1387} 1388 1389static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1390 struct ocfs2_lock_res *lockres) 1391{ 1392 int ret; 1393 1394 ret = wait_for_completion_interruptible(&mw->mw_complete); 1395 if (ret) 1396 lockres_remove_mask_waiter(lockres, mw); 1397 else 1398 ret = mw->mw_status; 1399 /* Re-arm the completion in case we want to wait on it again */ 1400 INIT_COMPLETION(mw->mw_complete); 1401 return ret; 1402} 1403 1404static int __ocfs2_cluster_lock(struct ocfs2_super *osb, 1405 struct ocfs2_lock_res *lockres, 1406 int level, 1407 u32 lkm_flags, 1408 int arg_flags, 1409 int l_subclass, 1410 unsigned long caller_ip) 1411{ 1412 struct ocfs2_mask_waiter mw; 1413 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1414 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1415 unsigned long flags; 1416 unsigned int gen; 1417 int noqueue_attempted = 0; 1418 1419 mlog_entry_void(); 1420 1421 ocfs2_init_mask_waiter(&mw); 1422 1423 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1424 lkm_flags |= DLM_LKF_VALBLK; 1425 1426again: 1427 wait = 0; 1428 1429 spin_lock_irqsave(&lockres->l_lock, flags); 1430 1431 if (catch_signals && signal_pending(current)) { 1432 ret = -ERESTARTSYS; 1433 goto unlock; 1434 } 1435 1436 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1437 "Cluster lock called on freeing lockres %s! flags " 1438 "0x%lx\n", lockres->l_name, lockres->l_flags); 1439 1440 /* We only compare against the currently granted level 1441 * here. If the lock is blocked waiting on a downconvert, 1442 * we'll get caught below. */ 1443 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1444 level > lockres->l_level) { 1445 /* is someone sitting in dlm_lock? If so, wait on 1446 * them. */ 1447 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1448 wait = 1; 1449 goto unlock; 1450 } 1451 1452 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) { 1453 /* 1454 * We've upconverted. If the lock now has a level we can 1455 * work with, we take it. If, however, the lock is not at the 1456 * required level, we go thru the full cycle. One way this could 1457 * happen is if a process requesting an upconvert to PR is 1458 * closely followed by another requesting upconvert to an EX. 1459 * If the process requesting EX lands here, we want it to 1460 * continue attempting to upconvert and let the process 1461 * requesting PR take the lock. 1462 * If multiple processes request upconvert to PR, the first one 1463 * here will take the lock. The others will have to go thru the 1464 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending 1465 * downconvert request. 1466 */ 1467 if (level <= lockres->l_level) 1468 goto update_holders; 1469 } 1470 1471 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1472 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1473 /* is the lock is currently blocked on behalf of 1474 * another node */ 1475 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1476 wait = 1; 1477 goto unlock; 1478 } 1479 1480 if (level > lockres->l_level) { 1481 if (noqueue_attempted > 0) { 1482 ret = -EAGAIN; 1483 goto unlock; 1484 } 1485 if (lkm_flags & DLM_LKF_NOQUEUE) 1486 noqueue_attempted = 1; 1487 1488 if (lockres->l_action != OCFS2_AST_INVALID) 1489 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1490 lockres->l_name, lockres->l_action); 1491 1492 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1493 lockres->l_action = OCFS2_AST_ATTACH; 1494 lkm_flags &= ~DLM_LKF_CONVERT; 1495 } else { 1496 lockres->l_action = OCFS2_AST_CONVERT; 1497 lkm_flags |= DLM_LKF_CONVERT; 1498 } 1499 1500 lockres->l_requested = level; 1501 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1502 gen = lockres_set_pending(lockres); 1503 spin_unlock_irqrestore(&lockres->l_lock, flags); 1504 1505 BUG_ON(level == DLM_LOCK_IV); 1506 BUG_ON(level == DLM_LOCK_NL); 1507 1508 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", 1509 lockres->l_name, lockres->l_level, level); 1510 1511 /* call dlm_lock to upgrade lock now */ 1512 ret = ocfs2_dlm_lock(osb->cconn, 1513 level, 1514 &lockres->l_lksb, 1515 lkm_flags, 1516 lockres->l_name, 1517 OCFS2_LOCK_ID_MAX_LEN - 1); 1518 lockres_clear_pending(lockres, gen, osb); 1519 if (ret) { 1520 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1521 (ret != -EAGAIN)) { 1522 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1523 ret, lockres); 1524 } 1525 ocfs2_recover_from_dlm_error(lockres, 1); 1526 goto out; 1527 } 1528 1529 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n", 1530 lockres->l_name); 1531 1532 /* At this point we've gone inside the dlm and need to 1533 * complete our work regardless. */ 1534 catch_signals = 0; 1535 1536 /* wait for busy to clear and carry on */ 1537 goto again; 1538 } 1539 1540update_holders: 1541 /* Ok, if we get here then we're good to go. */ 1542 ocfs2_inc_holders(lockres, level); 1543 1544 ret = 0; 1545unlock: 1546 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1547 1548 spin_unlock_irqrestore(&lockres->l_lock, flags); 1549out: 1550 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1551 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1552 wait = 0; 1553 if (lockres_remove_mask_waiter(lockres, &mw)) 1554 ret = -EAGAIN; 1555 else 1556 goto again; 1557 } 1558 if (wait) { 1559 ret = ocfs2_wait_for_mask(&mw); 1560 if (ret == 0) 1561 goto again; 1562 mlog_errno(ret); 1563 } 1564 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1565 1566#ifdef CONFIG_DEBUG_LOCK_ALLOC 1567 if (!ret && lockres->l_lockdep_map.key != NULL) { 1568 if (level == DLM_LOCK_PR) 1569 rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass, 1570 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1571 caller_ip); 1572 else 1573 rwsem_acquire(&lockres->l_lockdep_map, l_subclass, 1574 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1575 caller_ip); 1576 } 1577#endif 1578 mlog_exit(ret); 1579 return ret; 1580} 1581 1582static inline int ocfs2_cluster_lock(struct ocfs2_super *osb, 1583 struct ocfs2_lock_res *lockres, 1584 int level, 1585 u32 lkm_flags, 1586 int arg_flags) 1587{ 1588 return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags, 1589 0, _RET_IP_); 1590} 1591 1592 1593static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 1594 struct ocfs2_lock_res *lockres, 1595 int level, 1596 unsigned long caller_ip) 1597{ 1598 unsigned long flags; 1599 1600 mlog_entry_void(); 1601 spin_lock_irqsave(&lockres->l_lock, flags); 1602 ocfs2_dec_holders(lockres, level); 1603 ocfs2_downconvert_on_unlock(osb, lockres); 1604 spin_unlock_irqrestore(&lockres->l_lock, flags); 1605#ifdef CONFIG_DEBUG_LOCK_ALLOC 1606 if (lockres->l_lockdep_map.key != NULL) 1607 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip); 1608#endif 1609 mlog_exit_void(); 1610} 1611 1612static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1613 struct ocfs2_lock_res *lockres, 1614 int ex, 1615 int local) 1616{ 1617 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1618 unsigned long flags; 1619 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1620 1621 spin_lock_irqsave(&lockres->l_lock, flags); 1622 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1623 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1624 spin_unlock_irqrestore(&lockres->l_lock, flags); 1625 1626 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1627} 1628 1629/* Grants us an EX lock on the data and metadata resources, skipping 1630 * the normal cluster directory lookup. Use this ONLY on newly created 1631 * inodes which other nodes can't possibly see, and which haven't been 1632 * hashed in the inode hash yet. This can give us a good performance 1633 * increase as it'll skip the network broadcast normally associated 1634 * with creating a new lock resource. */ 1635int ocfs2_create_new_inode_locks(struct inode *inode) 1636{ 1637 int ret; 1638 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1639 1640 BUG_ON(!inode); 1641 BUG_ON(!ocfs2_inode_is_new(inode)); 1642 1643 mlog_entry_void(); 1644 1645 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1646 1647 /* NOTE: That we don't increment any of the holder counts, nor 1648 * do we add anything to a journal handle. Since this is 1649 * supposed to be a new inode which the cluster doesn't know 1650 * about yet, there is no need to. As far as the LVB handling 1651 * is concerned, this is basically like acquiring an EX lock 1652 * on a resource which has an invalid one -- we'll set it 1653 * valid when we release the EX. */ 1654 1655 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1656 if (ret) { 1657 mlog_errno(ret); 1658 goto bail; 1659 } 1660 1661 /* 1662 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1663 * don't use a generation in their lock names. 1664 */ 1665 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1666 if (ret) { 1667 mlog_errno(ret); 1668 goto bail; 1669 } 1670 1671 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1672 if (ret) { 1673 mlog_errno(ret); 1674 goto bail; 1675 } 1676 1677bail: 1678 mlog_exit(ret); 1679 return ret; 1680} 1681 1682int ocfs2_rw_lock(struct inode *inode, int write) 1683{ 1684 int status, level; 1685 struct ocfs2_lock_res *lockres; 1686 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1687 1688 BUG_ON(!inode); 1689 1690 mlog_entry_void(); 1691 1692 mlog(0, "inode %llu take %s RW lock\n", 1693 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1694 write ? "EXMODE" : "PRMODE"); 1695 1696 if (ocfs2_mount_local(osb)) { 1697 mlog_exit(0); 1698 return 0; 1699 } 1700 1701 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1702 1703 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1704 1705 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1706 0); 1707 if (status < 0) 1708 mlog_errno(status); 1709 1710 mlog_exit(status); 1711 return status; 1712} 1713 1714void ocfs2_rw_unlock(struct inode *inode, int write) 1715{ 1716 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1717 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1718 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1719 1720 mlog_entry_void(); 1721 1722 mlog(0, "inode %llu drop %s RW lock\n", 1723 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1724 write ? "EXMODE" : "PRMODE"); 1725 1726 if (!ocfs2_mount_local(osb)) 1727 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1728 1729 mlog_exit_void(); 1730} 1731 1732/* 1733 * ocfs2_open_lock always get PR mode lock. 1734 */ 1735int ocfs2_open_lock(struct inode *inode) 1736{ 1737 int status = 0; 1738 struct ocfs2_lock_res *lockres; 1739 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1740 1741 BUG_ON(!inode); 1742 1743 mlog_entry_void(); 1744 1745 mlog(0, "inode %llu take PRMODE open lock\n", 1746 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1747 1748 if (ocfs2_mount_local(osb)) 1749 goto out; 1750 1751 lockres = &OCFS2_I(inode)->ip_open_lockres; 1752 1753 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1754 DLM_LOCK_PR, 0, 0); 1755 if (status < 0) 1756 mlog_errno(status); 1757 1758out: 1759 mlog_exit(status); 1760 return status; 1761} 1762 1763int ocfs2_try_open_lock(struct inode *inode, int write) 1764{ 1765 int status = 0, level; 1766 struct ocfs2_lock_res *lockres; 1767 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1768 1769 BUG_ON(!inode); 1770 1771 mlog_entry_void(); 1772 1773 mlog(0, "inode %llu try to take %s open lock\n", 1774 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1775 write ? "EXMODE" : "PRMODE"); 1776 1777 if (ocfs2_mount_local(osb)) 1778 goto out; 1779 1780 lockres = &OCFS2_I(inode)->ip_open_lockres; 1781 1782 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1783 1784 /* 1785 * The file system may already holding a PRMODE/EXMODE open lock. 1786 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1787 * other nodes and the -EAGAIN will indicate to the caller that 1788 * this inode is still in use. 1789 */ 1790 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1791 level, DLM_LKF_NOQUEUE, 0); 1792 1793out: 1794 mlog_exit(status); 1795 return status; 1796} 1797 1798/* 1799 * ocfs2_open_unlock unlock PR and EX mode open locks. 1800 */ 1801void ocfs2_open_unlock(struct inode *inode) 1802{ 1803 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1804 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1805 1806 mlog_entry_void(); 1807 1808 mlog(0, "inode %llu drop open lock\n", 1809 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1810 1811 if (ocfs2_mount_local(osb)) 1812 goto out; 1813 1814 if(lockres->l_ro_holders) 1815 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1816 DLM_LOCK_PR); 1817 if(lockres->l_ex_holders) 1818 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1819 DLM_LOCK_EX); 1820 1821out: 1822 mlog_exit_void(); 1823} 1824 1825static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1826 int level) 1827{ 1828 int ret; 1829 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1830 unsigned long flags; 1831 struct ocfs2_mask_waiter mw; 1832 1833 ocfs2_init_mask_waiter(&mw); 1834 1835retry_cancel: 1836 spin_lock_irqsave(&lockres->l_lock, flags); 1837 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1838 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1839 if (ret) { 1840 spin_unlock_irqrestore(&lockres->l_lock, flags); 1841 ret = ocfs2_cancel_convert(osb, lockres); 1842 if (ret < 0) { 1843 mlog_errno(ret); 1844 goto out; 1845 } 1846 goto retry_cancel; 1847 } 1848 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1849 spin_unlock_irqrestore(&lockres->l_lock, flags); 1850 1851 ocfs2_wait_for_mask(&mw); 1852 goto retry_cancel; 1853 } 1854 1855 ret = -ERESTARTSYS; 1856 /* 1857 * We may still have gotten the lock, in which case there's no 1858 * point to restarting the syscall. 1859 */ 1860 if (lockres->l_level == level) 1861 ret = 0; 1862 1863 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1864 lockres->l_flags, lockres->l_level, lockres->l_action); 1865 1866 spin_unlock_irqrestore(&lockres->l_lock, flags); 1867 1868out: 1869 return ret; 1870} 1871 1872/* 1873 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1874 * flock() calls. The locking approach this requires is sufficiently 1875 * different from all other cluster lock types that we implement a 1876 * separate path to the "low-level" dlm calls. In particular: 1877 * 1878 * - No optimization of lock levels is done - we take at exactly 1879 * what's been requested. 1880 * 1881 * - No lock caching is employed. We immediately downconvert to 1882 * no-lock at unlock time. This also means flock locks never go on 1883 * the blocking list). 1884 * 1885 * - Since userspace can trivially deadlock itself with flock, we make 1886 * sure to allow cancellation of a misbehaving applications flock() 1887 * request. 1888 * 1889 * - Access to any flock lockres doesn't require concurrency, so we 1890 * can simplify the code by requiring the caller to guarantee 1891 * serialization of dlmglue flock calls. 1892 */ 1893int ocfs2_file_lock(struct file *file, int ex, int trylock) 1894{ 1895 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1896 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1897 unsigned long flags; 1898 struct ocfs2_file_private *fp = file->private_data; 1899 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1900 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1901 struct ocfs2_mask_waiter mw; 1902 1903 ocfs2_init_mask_waiter(&mw); 1904 1905 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1906 (lockres->l_level > DLM_LOCK_NL)) { 1907 mlog(ML_ERROR, 1908 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1909 "level: %u\n", lockres->l_name, lockres->l_flags, 1910 lockres->l_level); 1911 return -EINVAL; 1912 } 1913 1914 spin_lock_irqsave(&lockres->l_lock, flags); 1915 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1916 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1917 spin_unlock_irqrestore(&lockres->l_lock, flags); 1918 1919 /* 1920 * Get the lock at NLMODE to start - that way we 1921 * can cancel the upconvert request if need be. 1922 */ 1923 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1924 if (ret < 0) { 1925 mlog_errno(ret); 1926 goto out; 1927 } 1928 1929 ret = ocfs2_wait_for_mask(&mw); 1930 if (ret) { 1931 mlog_errno(ret); 1932 goto out; 1933 } 1934 spin_lock_irqsave(&lockres->l_lock, flags); 1935 } 1936 1937 lockres->l_action = OCFS2_AST_CONVERT; 1938 lkm_flags |= DLM_LKF_CONVERT; 1939 lockres->l_requested = level; 1940 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1941 1942 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1943 spin_unlock_irqrestore(&lockres->l_lock, flags); 1944 1945 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 1946 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); 1947 if (ret) { 1948 if (!trylock || (ret != -EAGAIN)) { 1949 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1950 ret = -EINVAL; 1951 } 1952 1953 ocfs2_recover_from_dlm_error(lockres, 1); 1954 lockres_remove_mask_waiter(lockres, &mw); 1955 goto out; 1956 } 1957 1958 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 1959 if (ret == -ERESTARTSYS) { 1960 /* 1961 * Userspace can cause deadlock itself with 1962 * flock(). Current behavior locally is to allow the 1963 * deadlock, but abort the system call if a signal is 1964 * received. We follow this example, otherwise a 1965 * poorly written program could sit in kernel until 1966 * reboot. 1967 * 1968 * Handling this is a bit more complicated for Ocfs2 1969 * though. We can't exit this function with an 1970 * outstanding lock request, so a cancel convert is 1971 * required. We intentionally overwrite 'ret' - if the 1972 * cancel fails and the lock was granted, it's easier 1973 * to just bubble success back up to the user. 1974 */ 1975 ret = ocfs2_flock_handle_signal(lockres, level); 1976 } else if (!ret && (level > lockres->l_level)) { 1977 /* Trylock failed asynchronously */ 1978 BUG_ON(!trylock); 1979 ret = -EAGAIN; 1980 } 1981 1982out: 1983 1984 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 1985 lockres->l_name, ex, trylock, ret); 1986 return ret; 1987} 1988 1989void ocfs2_file_unlock(struct file *file) 1990{ 1991 int ret; 1992 unsigned int gen; 1993 unsigned long flags; 1994 struct ocfs2_file_private *fp = file->private_data; 1995 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1996 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1997 struct ocfs2_mask_waiter mw; 1998 1999 ocfs2_init_mask_waiter(&mw); 2000 2001 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 2002 return; 2003 2004 if (lockres->l_level == DLM_LOCK_NL) 2005 return; 2006 2007 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 2008 lockres->l_name, lockres->l_flags, lockres->l_level, 2009 lockres->l_action); 2010 2011 spin_lock_irqsave(&lockres->l_lock, flags); 2012 /* 2013 * Fake a blocking ast for the downconvert code. 2014 */ 2015 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 2016 lockres->l_blocking = DLM_LOCK_EX; 2017 2018 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 2019 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2020 spin_unlock_irqrestore(&lockres->l_lock, flags); 2021 2022 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 2023 if (ret) { 2024 mlog_errno(ret); 2025 return; 2026 } 2027 2028 ret = ocfs2_wait_for_mask(&mw); 2029 if (ret) 2030 mlog_errno(ret); 2031} 2032 2033static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 2034 struct ocfs2_lock_res *lockres) 2035{ 2036 int kick = 0; 2037 2038 mlog_entry_void(); 2039 2040 /* If we know that another node is waiting on our lock, kick 2041 * the downconvert thread * pre-emptively when we reach a release 2042 * condition. */ 2043 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 2044 switch(lockres->l_blocking) { 2045 case DLM_LOCK_EX: 2046 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 2047 kick = 1; 2048 break; 2049 case DLM_LOCK_PR: 2050 if (!lockres->l_ex_holders) 2051 kick = 1; 2052 break; 2053 default: 2054 BUG(); 2055 } 2056 } 2057 2058 if (kick) 2059 ocfs2_wake_downconvert_thread(osb); 2060 2061 mlog_exit_void(); 2062} 2063 2064#define OCFS2_SEC_BITS 34 2065#define OCFS2_SEC_SHIFT (64 - 34) 2066#define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 2067 2068/* LVB only has room for 64 bits of time here so we pack it for 2069 * now. */ 2070static u64 ocfs2_pack_timespec(struct timespec *spec) 2071{ 2072 u64 res; 2073 u64 sec = spec->tv_sec; 2074 u32 nsec = spec->tv_nsec; 2075 2076 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 2077 2078 return res; 2079} 2080 2081/* Call this with the lockres locked. I am reasonably sure we don't 2082 * need ip_lock in this function as anyone who would be changing those 2083 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 2084static void __ocfs2_stuff_meta_lvb(struct inode *inode) 2085{ 2086 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2087 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2088 struct ocfs2_meta_lvb *lvb; 2089 2090 mlog_entry_void(); 2091 2092 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2093 2094 /* 2095 * Invalidate the LVB of a deleted inode - this way other 2096 * nodes are forced to go to disk and discover the new inode 2097 * status. 2098 */ 2099 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2100 lvb->lvb_version = 0; 2101 goto out; 2102 } 2103 2104 lvb->lvb_version = OCFS2_LVB_VERSION; 2105 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 2106 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 2107 lvb->lvb_iuid = cpu_to_be32(inode->i_uid); 2108 lvb->lvb_igid = cpu_to_be32(inode->i_gid); 2109 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 2110 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 2111 lvb->lvb_iatime_packed = 2112 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 2113 lvb->lvb_ictime_packed = 2114 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 2115 lvb->lvb_imtime_packed = 2116 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 2117 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 2118 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 2119 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 2120 2121out: 2122 mlog_meta_lvb(0, lockres); 2123 2124 mlog_exit_void(); 2125} 2126 2127static void ocfs2_unpack_timespec(struct timespec *spec, 2128 u64 packed_time) 2129{ 2130 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 2131 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 2132} 2133 2134static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 2135{ 2136 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2137 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2138 struct ocfs2_meta_lvb *lvb; 2139 2140 mlog_entry_void(); 2141 2142 mlog_meta_lvb(0, lockres); 2143 2144 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2145 2146 /* We're safe here without the lockres lock... */ 2147 spin_lock(&oi->ip_lock); 2148 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 2149 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 2150 2151 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 2152 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 2153 ocfs2_set_inode_flags(inode); 2154 2155 /* fast-symlinks are a special case */ 2156 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 2157 inode->i_blocks = 0; 2158 else 2159 inode->i_blocks = ocfs2_inode_sector_count(inode); 2160 2161 inode->i_uid = be32_to_cpu(lvb->lvb_iuid); 2162 inode->i_gid = be32_to_cpu(lvb->lvb_igid); 2163 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 2164 inode->i_nlink = be16_to_cpu(lvb->lvb_inlink); 2165 ocfs2_unpack_timespec(&inode->i_atime, 2166 be64_to_cpu(lvb->lvb_iatime_packed)); 2167 ocfs2_unpack_timespec(&inode->i_mtime, 2168 be64_to_cpu(lvb->lvb_imtime_packed)); 2169 ocfs2_unpack_timespec(&inode->i_ctime, 2170 be64_to_cpu(lvb->lvb_ictime_packed)); 2171 spin_unlock(&oi->ip_lock); 2172 2173 mlog_exit_void(); 2174} 2175 2176static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 2177 struct ocfs2_lock_res *lockres) 2178{ 2179 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2180 2181 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) 2182 && lvb->lvb_version == OCFS2_LVB_VERSION 2183 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 2184 return 1; 2185 return 0; 2186} 2187 2188/* Determine whether a lock resource needs to be refreshed, and 2189 * arbitrate who gets to refresh it. 2190 * 2191 * 0 means no refresh needed. 2192 * 2193 * > 0 means you need to refresh this and you MUST call 2194 * ocfs2_complete_lock_res_refresh afterwards. */ 2195static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 2196{ 2197 unsigned long flags; 2198 int status = 0; 2199 2200 mlog_entry_void(); 2201 2202refresh_check: 2203 spin_lock_irqsave(&lockres->l_lock, flags); 2204 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2205 spin_unlock_irqrestore(&lockres->l_lock, flags); 2206 goto bail; 2207 } 2208 2209 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2210 spin_unlock_irqrestore(&lockres->l_lock, flags); 2211 2212 ocfs2_wait_on_refreshing_lock(lockres); 2213 goto refresh_check; 2214 } 2215 2216 /* Ok, I'll be the one to refresh this lock. */ 2217 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 2218 spin_unlock_irqrestore(&lockres->l_lock, flags); 2219 2220 status = 1; 2221bail: 2222 mlog_exit(status); 2223 return status; 2224} 2225 2226/* If status is non zero, I'll mark it as not being in refresh 2227 * anymroe, but i won't clear the needs refresh flag. */ 2228static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 2229 int status) 2230{ 2231 unsigned long flags; 2232 mlog_entry_void(); 2233 2234 spin_lock_irqsave(&lockres->l_lock, flags); 2235 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 2236 if (!status) 2237 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 2238 spin_unlock_irqrestore(&lockres->l_lock, flags); 2239 2240 wake_up(&lockres->l_event); 2241 2242 mlog_exit_void(); 2243} 2244 2245/* may or may not return a bh if it went to disk. */ 2246static int ocfs2_inode_lock_update(struct inode *inode, 2247 struct buffer_head **bh) 2248{ 2249 int status = 0; 2250 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2251 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2252 struct ocfs2_dinode *fe; 2253 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2254 2255 mlog_entry_void(); 2256 2257 if (ocfs2_mount_local(osb)) 2258 goto bail; 2259 2260 spin_lock(&oi->ip_lock); 2261 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2262 mlog(0, "Orphaned inode %llu was deleted while we " 2263 "were waiting on a lock. ip_flags = 0x%x\n", 2264 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2265 spin_unlock(&oi->ip_lock); 2266 status = -ENOENT; 2267 goto bail; 2268 } 2269 spin_unlock(&oi->ip_lock); 2270 2271 if (!ocfs2_should_refresh_lock_res(lockres)) 2272 goto bail; 2273 2274 /* This will discard any caching information we might have had 2275 * for the inode metadata. */ 2276 ocfs2_metadata_cache_purge(INODE_CACHE(inode)); 2277 2278 ocfs2_extent_map_trunc(inode, 0); 2279 2280 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2281 mlog(0, "Trusting LVB on inode %llu\n", 2282 (unsigned long long)oi->ip_blkno); 2283 ocfs2_refresh_inode_from_lvb(inode); 2284 } else { 2285 /* Boo, we have to go to disk. */ 2286 /* read bh, cast, ocfs2_refresh_inode */ 2287 status = ocfs2_read_inode_block(inode, bh); 2288 if (status < 0) { 2289 mlog_errno(status); 2290 goto bail_refresh; 2291 } 2292 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2293 2294 /* This is a good chance to make sure we're not 2295 * locking an invalid object. ocfs2_read_inode_block() 2296 * already checked that the inode block is sane. 2297 * 2298 * We bug on a stale inode here because we checked 2299 * above whether it was wiped from disk. The wiping 2300 * node provides a guarantee that we receive that 2301 * message and can mark the inode before dropping any 2302 * locks associated with it. */ 2303 mlog_bug_on_msg(inode->i_generation != 2304 le32_to_cpu(fe->i_generation), 2305 "Invalid dinode %llu disk generation: %u " 2306 "inode->i_generation: %u\n", 2307 (unsigned long long)oi->ip_blkno, 2308 le32_to_cpu(fe->i_generation), 2309 inode->i_generation); 2310 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2311 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2312 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2313 (unsigned long long)oi->ip_blkno, 2314 (unsigned long long)le64_to_cpu(fe->i_dtime), 2315 le32_to_cpu(fe->i_flags)); 2316 2317 ocfs2_refresh_inode(inode, fe); 2318 ocfs2_track_lock_refresh(lockres); 2319 } 2320 2321 status = 0; 2322bail_refresh: 2323 ocfs2_complete_lock_res_refresh(lockres, status); 2324bail: 2325 mlog_exit(status); 2326 return status; 2327} 2328 2329static int ocfs2_assign_bh(struct inode *inode, 2330 struct buffer_head **ret_bh, 2331 struct buffer_head *passed_bh) 2332{ 2333 int status; 2334 2335 if (passed_bh) { 2336 /* Ok, the update went to disk for us, use the 2337 * returned bh. */ 2338 *ret_bh = passed_bh; 2339 get_bh(*ret_bh); 2340 2341 return 0; 2342 } 2343 2344 status = ocfs2_read_inode_block(inode, ret_bh); 2345 if (status < 0) 2346 mlog_errno(status); 2347 2348 return status; 2349} 2350 2351/* 2352 * returns < 0 error if the callback will never be called, otherwise 2353 * the result of the lock will be communicated via the callback. 2354 */ 2355int ocfs2_inode_lock_full_nested(struct inode *inode, 2356 struct buffer_head **ret_bh, 2357 int ex, 2358 int arg_flags, 2359 int subclass) 2360{ 2361 int status, level, acquired; 2362 u32 dlm_flags; 2363 struct ocfs2_lock_res *lockres = NULL; 2364 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2365 struct buffer_head *local_bh = NULL; 2366 2367 BUG_ON(!inode); 2368 2369 mlog_entry_void(); 2370 2371 mlog(0, "inode %llu, take %s META lock\n", 2372 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2373 ex ? "EXMODE" : "PRMODE"); 2374 2375 status = 0; 2376 acquired = 0; 2377 /* We'll allow faking a readonly metadata lock for 2378 * rodevices. */ 2379 if (ocfs2_is_hard_readonly(osb)) { 2380 if (ex) 2381 status = -EROFS; 2382 goto bail; 2383 } 2384 2385 if (ocfs2_mount_local(osb)) 2386 goto local; 2387 2388 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2389 ocfs2_wait_for_recovery(osb); 2390 2391 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2392 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2393 dlm_flags = 0; 2394 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2395 dlm_flags |= DLM_LKF_NOQUEUE; 2396 2397 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, 2398 arg_flags, subclass, _RET_IP_); 2399 if (status < 0) { 2400 if (status != -EAGAIN && status != -EIOCBRETRY) 2401 mlog_errno(status); 2402 goto bail; 2403 } 2404 2405 /* Notify the error cleanup path to drop the cluster lock. */ 2406 acquired = 1; 2407 2408 /* We wait twice because a node may have died while we were in 2409 * the lower dlm layers. The second time though, we've 2410 * committed to owning this lock so we don't allow signals to 2411 * abort the operation. */ 2412 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2413 ocfs2_wait_for_recovery(osb); 2414 2415local: 2416 /* 2417 * We only see this flag if we're being called from 2418 * ocfs2_read_locked_inode(). It means we're locking an inode 2419 * which hasn't been populated yet, so clear the refresh flag 2420 * and let the caller handle it. 2421 */ 2422 if (inode->i_state & I_NEW) { 2423 status = 0; 2424 if (lockres) 2425 ocfs2_complete_lock_res_refresh(lockres, 0); 2426 goto bail; 2427 } 2428 2429 /* This is fun. The caller may want a bh back, or it may 2430 * not. ocfs2_inode_lock_update definitely wants one in, but 2431 * may or may not read one, depending on what's in the 2432 * LVB. The result of all of this is that we've *only* gone to 2433 * disk if we have to, so the complexity is worthwhile. */ 2434 status = ocfs2_inode_lock_update(inode, &local_bh); 2435 if (status < 0) { 2436 if (status != -ENOENT) 2437 mlog_errno(status); 2438 goto bail; 2439 } 2440 2441 if (ret_bh) { 2442 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2443 if (status < 0) { 2444 mlog_errno(status); 2445 goto bail; 2446 } 2447 } 2448 2449bail: 2450 if (status < 0) { 2451 if (ret_bh && (*ret_bh)) { 2452 brelse(*ret_bh); 2453 *ret_bh = NULL; 2454 } 2455 if (acquired) 2456 ocfs2_inode_unlock(inode, ex); 2457 } 2458 2459 if (local_bh) 2460 brelse(local_bh); 2461 2462 mlog_exit(status); 2463 return status; 2464} 2465 2466/* 2467 * This is working around a lock inversion between tasks acquiring DLM 2468 * locks while holding a page lock and the downconvert thread which 2469 * blocks dlm lock acquiry while acquiring page locks. 2470 * 2471 * ** These _with_page variantes are only intended to be called from aop 2472 * methods that hold page locks and return a very specific *positive* error 2473 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2474 * 2475 * The DLM is called such that it returns -EAGAIN if it would have 2476 * blocked waiting for the downconvert thread. In that case we unlock 2477 * our page so the downconvert thread can make progress. Once we've 2478 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2479 * that called us can bubble that back up into the VFS who will then 2480 * immediately retry the aop call. 2481 * 2482 * We do a blocking lock and immediate unlock before returning, though, so that 2483 * the lock has a great chance of being cached on this node by the time the VFS 2484 * calls back to retry the aop. This has a potential to livelock as nodes 2485 * ping locks back and forth, but that's a risk we're willing to take to avoid 2486 * the lock inversion simply. 2487 */ 2488int ocfs2_inode_lock_with_page(struct inode *inode, 2489 struct buffer_head **ret_bh, 2490 int ex, 2491 struct page *page) 2492{ 2493 int ret; 2494 2495 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2496 if (ret == -EAGAIN) { 2497 unlock_page(page); 2498 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2499 ocfs2_inode_unlock(inode, ex); 2500 ret = AOP_TRUNCATED_PAGE; 2501 } 2502 2503 return ret; 2504} 2505 2506int ocfs2_inode_lock_atime(struct inode *inode, 2507 struct vfsmount *vfsmnt, 2508 int *level) 2509{ 2510 int ret; 2511 2512 mlog_entry_void(); 2513 ret = ocfs2_inode_lock(inode, NULL, 0); 2514 if (ret < 0) { 2515 mlog_errno(ret); 2516 return ret; 2517 } 2518 2519 /* 2520 * If we should update atime, we will get EX lock, 2521 * otherwise we just get PR lock. 2522 */ 2523 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2524 struct buffer_head *bh = NULL; 2525 2526 ocfs2_inode_unlock(inode, 0); 2527 ret = ocfs2_inode_lock(inode, &bh, 1); 2528 if (ret < 0) { 2529 mlog_errno(ret); 2530 return ret; 2531 } 2532 *level = 1; 2533 if (ocfs2_should_update_atime(inode, vfsmnt)) 2534 ocfs2_update_inode_atime(inode, bh); 2535 if (bh) 2536 brelse(bh); 2537 } else 2538 *level = 0; 2539 2540 mlog_exit(ret); 2541 return ret; 2542} 2543 2544void ocfs2_inode_unlock(struct inode *inode, 2545 int ex) 2546{ 2547 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2548 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2549 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2550 2551 mlog_entry_void(); 2552 2553 mlog(0, "inode %llu drop %s META lock\n", 2554 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2555 ex ? "EXMODE" : "PRMODE"); 2556 2557 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 2558 !ocfs2_mount_local(osb)) 2559 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 2560 2561 mlog_exit_void(); 2562} 2563 2564int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno) 2565{ 2566 struct ocfs2_lock_res *lockres; 2567 struct ocfs2_orphan_scan_lvb *lvb; 2568 int status = 0; 2569 2570 if (ocfs2_is_hard_readonly(osb)) 2571 return -EROFS; 2572 2573 if (ocfs2_mount_local(osb)) 2574 return 0; 2575 2576 lockres = &osb->osb_orphan_scan.os_lockres; 2577 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2578 if (status < 0) 2579 return status; 2580 2581 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2582 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2583 lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION) 2584 *seqno = be32_to_cpu(lvb->lvb_os_seqno); 2585 else 2586 *seqno = osb->osb_orphan_scan.os_seqno + 1; 2587 2588 return status; 2589} 2590 2591void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno) 2592{ 2593 struct ocfs2_lock_res *lockres; 2594 struct ocfs2_orphan_scan_lvb *lvb; 2595 2596 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) { 2597 lockres = &osb->osb_orphan_scan.os_lockres; 2598 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2599 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; 2600 lvb->lvb_os_seqno = cpu_to_be32(seqno); 2601 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2602 } 2603} 2604 2605int ocfs2_super_lock(struct ocfs2_super *osb, 2606 int ex) 2607{ 2608 int status = 0; 2609 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2610 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2611 2612 mlog_entry_void(); 2613 2614 if (ocfs2_is_hard_readonly(osb)) 2615 return -EROFS; 2616 2617 if (ocfs2_mount_local(osb)) 2618 goto bail; 2619 2620 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2621 if (status < 0) { 2622 mlog_errno(status); 2623 goto bail; 2624 } 2625 2626 /* The super block lock path is really in the best position to 2627 * know when resources covered by the lock need to be 2628 * refreshed, so we do it here. Of course, making sense of 2629 * everything is up to the caller :) */ 2630 status = ocfs2_should_refresh_lock_res(lockres); 2631 if (status < 0) { 2632 mlog_errno(status); 2633 goto bail; 2634 } 2635 if (status) { 2636 status = ocfs2_refresh_slot_info(osb); 2637 2638 ocfs2_complete_lock_res_refresh(lockres, status); 2639 2640 if (status < 0) 2641 mlog_errno(status); 2642 ocfs2_track_lock_refresh(lockres); 2643 } 2644bail: 2645 mlog_exit(status); 2646 return status; 2647} 2648 2649void ocfs2_super_unlock(struct ocfs2_super *osb, 2650 int ex) 2651{ 2652 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2653 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2654 2655 if (!ocfs2_mount_local(osb)) 2656 ocfs2_cluster_unlock(osb, lockres, level); 2657} 2658 2659int ocfs2_rename_lock(struct ocfs2_super *osb) 2660{ 2661 int status; 2662 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2663 2664 if (ocfs2_is_hard_readonly(osb)) 2665 return -EROFS; 2666 2667 if (ocfs2_mount_local(osb)) 2668 return 0; 2669 2670 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2671 if (status < 0) 2672 mlog_errno(status); 2673 2674 return status; 2675} 2676 2677void ocfs2_rename_unlock(struct ocfs2_super *osb) 2678{ 2679 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2680 2681 if (!ocfs2_mount_local(osb)) 2682 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2683} 2684 2685int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex) 2686{ 2687 int status; 2688 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2689 2690 if (ocfs2_is_hard_readonly(osb)) 2691 return -EROFS; 2692 2693 if (ocfs2_mount_local(osb)) 2694 return 0; 2695 2696 status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE, 2697 0, 0); 2698 if (status < 0) 2699 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status); 2700 2701 return status; 2702} 2703 2704void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex) 2705{ 2706 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2707 2708 if (!ocfs2_mount_local(osb)) 2709 ocfs2_cluster_unlock(osb, lockres, 2710 ex ? LKM_EXMODE : LKM_PRMODE); 2711} 2712 2713int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2714{ 2715 int ret; 2716 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2717 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2718 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2719 2720 BUG_ON(!dl); 2721 2722 if (ocfs2_is_hard_readonly(osb)) 2723 return -EROFS; 2724 2725 if (ocfs2_mount_local(osb)) 2726 return 0; 2727 2728 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2729 if (ret < 0) 2730 mlog_errno(ret); 2731 2732 return ret; 2733} 2734 2735void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2736{ 2737 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2738 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2739 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2740 2741 if (!ocfs2_mount_local(osb)) 2742 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2743} 2744 2745/* Reference counting of the dlm debug structure. We want this because 2746 * open references on the debug inodes can live on after a mount, so 2747 * we can't rely on the ocfs2_super to always exist. */ 2748static void ocfs2_dlm_debug_free(struct kref *kref) 2749{ 2750 struct ocfs2_dlm_debug *dlm_debug; 2751 2752 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2753 2754 kfree(dlm_debug); 2755} 2756 2757void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2758{ 2759 if (dlm_debug) 2760 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2761} 2762 2763static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2764{ 2765 kref_get(&debug->d_refcnt); 2766} 2767 2768struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2769{ 2770 struct ocfs2_dlm_debug *dlm_debug; 2771 2772 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2773 if (!dlm_debug) { 2774 mlog_errno(-ENOMEM); 2775 goto out; 2776 } 2777 2778 kref_init(&dlm_debug->d_refcnt); 2779 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2780 dlm_debug->d_locking_state = NULL; 2781out: 2782 return dlm_debug; 2783} 2784 2785/* Access to this is arbitrated for us via seq_file->sem. */ 2786struct ocfs2_dlm_seq_priv { 2787 struct ocfs2_dlm_debug *p_dlm_debug; 2788 struct ocfs2_lock_res p_iter_res; 2789 struct ocfs2_lock_res p_tmp_res; 2790}; 2791 2792static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2793 struct ocfs2_dlm_seq_priv *priv) 2794{ 2795 struct ocfs2_lock_res *iter, *ret = NULL; 2796 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2797 2798 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2799 2800 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2801 /* discover the head of the list */ 2802 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2803 mlog(0, "End of list found, %p\n", ret); 2804 break; 2805 } 2806 2807 /* We track our "dummy" iteration lockres' by a NULL 2808 * l_ops field. */ 2809 if (iter->l_ops != NULL) { 2810 ret = iter; 2811 break; 2812 } 2813 } 2814 2815 return ret; 2816} 2817 2818static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2819{ 2820 struct ocfs2_dlm_seq_priv *priv = m->private; 2821 struct ocfs2_lock_res *iter; 2822 2823 spin_lock(&ocfs2_dlm_tracking_lock); 2824 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2825 if (iter) { 2826 /* Since lockres' have the lifetime of their container 2827 * (which can be inodes, ocfs2_supers, etc) we want to 2828 * copy this out to a temporary lockres while still 2829 * under the spinlock. Obviously after this we can't 2830 * trust any pointers on the copy returned, but that's 2831 * ok as the information we want isn't typically held 2832 * in them. */ 2833 priv->p_tmp_res = *iter; 2834 iter = &priv->p_tmp_res; 2835 } 2836 spin_unlock(&ocfs2_dlm_tracking_lock); 2837 2838 return iter; 2839} 2840 2841static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 2842{ 2843} 2844 2845static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 2846{ 2847 struct ocfs2_dlm_seq_priv *priv = m->private; 2848 struct ocfs2_lock_res *iter = v; 2849 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 2850 2851 spin_lock(&ocfs2_dlm_tracking_lock); 2852 iter = ocfs2_dlm_next_res(iter, priv); 2853 list_del_init(&dummy->l_debug_list); 2854 if (iter) { 2855 list_add(&dummy->l_debug_list, &iter->l_debug_list); 2856 priv->p_tmp_res = *iter; 2857 iter = &priv->p_tmp_res; 2858 } 2859 spin_unlock(&ocfs2_dlm_tracking_lock); 2860 2861 return iter; 2862} 2863 2864/* So that debugfs.ocfs2 can determine which format is being used */ 2865#define OCFS2_DLM_DEBUG_STR_VERSION 2 2866static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2867{ 2868 int i; 2869 char *lvb; 2870 struct ocfs2_lock_res *lockres = v; 2871 2872 if (!lockres) 2873 return -EINVAL; 2874 2875 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2876 2877 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2878 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2879 lockres->l_name, 2880 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2881 else 2882 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2883 2884 seq_printf(m, "%d\t" 2885 "0x%lx\t" 2886 "0x%x\t" 2887 "0x%x\t" 2888 "%u\t" 2889 "%u\t" 2890 "%d\t" 2891 "%d\t", 2892 lockres->l_level, 2893 lockres->l_flags, 2894 lockres->l_action, 2895 lockres->l_unlock_action, 2896 lockres->l_ro_holders, 2897 lockres->l_ex_holders, 2898 lockres->l_requested, 2899 lockres->l_blocking); 2900 2901 /* Dump the raw LVB */ 2902 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2903 for(i = 0; i < DLM_LVB_LEN; i++) 2904 seq_printf(m, "0x%x\t", lvb[i]); 2905 2906#ifdef CONFIG_OCFS2_FS_STATS 2907# define lock_num_prmode(_l) (_l)->l_lock_num_prmode 2908# define lock_num_exmode(_l) (_l)->l_lock_num_exmode 2909# define lock_num_prmode_failed(_l) (_l)->l_lock_num_prmode_failed 2910# define lock_num_exmode_failed(_l) (_l)->l_lock_num_exmode_failed 2911# define lock_total_prmode(_l) (_l)->l_lock_total_prmode 2912# define lock_total_exmode(_l) (_l)->l_lock_total_exmode 2913# define lock_max_prmode(_l) (_l)->l_lock_max_prmode 2914# define lock_max_exmode(_l) (_l)->l_lock_max_exmode 2915# define lock_refresh(_l) (_l)->l_lock_refresh 2916#else 2917# define lock_num_prmode(_l) (0ULL) 2918# define lock_num_exmode(_l) (0ULL) 2919# define lock_num_prmode_failed(_l) (0) 2920# define lock_num_exmode_failed(_l) (0) 2921# define lock_total_prmode(_l) (0ULL) 2922# define lock_total_exmode(_l) (0ULL) 2923# define lock_max_prmode(_l) (0) 2924# define lock_max_exmode(_l) (0) 2925# define lock_refresh(_l) (0) 2926#endif 2927 /* The following seq_print was added in version 2 of this output */ 2928 seq_printf(m, "%llu\t" 2929 "%llu\t" 2930 "%u\t" 2931 "%u\t" 2932 "%llu\t" 2933 "%llu\t" 2934 "%u\t" 2935 "%u\t" 2936 "%u\t", 2937 lock_num_prmode(lockres), 2938 lock_num_exmode(lockres), 2939 lock_num_prmode_failed(lockres), 2940 lock_num_exmode_failed(lockres), 2941 lock_total_prmode(lockres), 2942 lock_total_exmode(lockres), 2943 lock_max_prmode(lockres), 2944 lock_max_exmode(lockres), 2945 lock_refresh(lockres)); 2946 2947 /* End the line */ 2948 seq_printf(m, "\n"); 2949 return 0; 2950} 2951 2952static const struct seq_operations ocfs2_dlm_seq_ops = { 2953 .start = ocfs2_dlm_seq_start, 2954 .stop = ocfs2_dlm_seq_stop, 2955 .next = ocfs2_dlm_seq_next, 2956 .show = ocfs2_dlm_seq_show, 2957}; 2958 2959static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2960{ 2961 struct seq_file *seq = file->private_data; 2962 struct ocfs2_dlm_seq_priv *priv = seq->private; 2963 struct ocfs2_lock_res *res = &priv->p_iter_res; 2964 2965 ocfs2_remove_lockres_tracking(res); 2966 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2967 return seq_release_private(inode, file); 2968} 2969 2970static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2971{ 2972 int ret; 2973 struct ocfs2_dlm_seq_priv *priv; 2974 struct seq_file *seq; 2975 struct ocfs2_super *osb; 2976 2977 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL); 2978 if (!priv) { 2979 ret = -ENOMEM; 2980 mlog_errno(ret); 2981 goto out; 2982 } 2983 osb = inode->i_private; 2984 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2985 priv->p_dlm_debug = osb->osb_dlm_debug; 2986 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2987 2988 ret = seq_open(file, &ocfs2_dlm_seq_ops); 2989 if (ret) { 2990 kfree(priv); 2991 mlog_errno(ret); 2992 goto out; 2993 } 2994 2995 seq = file->private_data; 2996 seq->private = priv; 2997 2998 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2999 priv->p_dlm_debug); 3000 3001out: 3002 return ret; 3003} 3004 3005static const struct file_operations ocfs2_dlm_debug_fops = { 3006 .open = ocfs2_dlm_debug_open, 3007 .release = ocfs2_dlm_debug_release, 3008 .read = seq_read, 3009 .llseek = seq_lseek, 3010}; 3011 3012static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 3013{ 3014 int ret = 0; 3015 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3016 3017 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 3018 S_IFREG|S_IRUSR, 3019 osb->osb_debug_root, 3020 osb, 3021 &ocfs2_dlm_debug_fops); 3022 if (!dlm_debug->d_locking_state) { 3023 ret = -EINVAL; 3024 mlog(ML_ERROR, 3025 "Unable to create locking state debugfs file.\n"); 3026 goto out; 3027 } 3028 3029 ocfs2_get_dlm_debug(dlm_debug); 3030out: 3031 return ret; 3032} 3033 3034static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 3035{ 3036 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3037 3038 if (dlm_debug) { 3039 debugfs_remove(dlm_debug->d_locking_state); 3040 ocfs2_put_dlm_debug(dlm_debug); 3041 } 3042} 3043 3044int ocfs2_dlm_init(struct ocfs2_super *osb) 3045{ 3046 int status = 0; 3047 struct ocfs2_cluster_connection *conn = NULL; 3048 3049 mlog_entry_void(); 3050 3051 if (ocfs2_mount_local(osb)) { 3052 osb->node_num = 0; 3053 goto local; 3054 } 3055 3056 status = ocfs2_dlm_init_debug(osb); 3057 if (status < 0) { 3058 mlog_errno(status); 3059 goto bail; 3060 } 3061 3062 /* launch downconvert thread */ 3063 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc"); 3064 if (IS_ERR(osb->dc_task)) { 3065 status = PTR_ERR(osb->dc_task); 3066 osb->dc_task = NULL; 3067 mlog_errno(status); 3068 goto bail; 3069 } 3070 3071 /* for now, uuid == domain */ 3072 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 3073 osb->uuid_str, 3074 strlen(osb->uuid_str), 3075 &lproto, ocfs2_do_node_down, osb, 3076 &conn); 3077 if (status) { 3078 mlog_errno(status); 3079 goto bail; 3080 } 3081 3082 status = ocfs2_cluster_this_node(&osb->node_num); 3083 if (status < 0) { 3084 mlog_errno(status); 3085 mlog(ML_ERROR, 3086 "could not find this host's node number\n"); 3087 ocfs2_cluster_disconnect(conn, 0); 3088 goto bail; 3089 } 3090 3091local: 3092 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 3093 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 3094 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 3095 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb); 3096 3097 osb->cconn = conn; 3098 3099 status = 0; 3100bail: 3101 if (status < 0) { 3102 ocfs2_dlm_shutdown_debug(osb); 3103 if (osb->dc_task) 3104 kthread_stop(osb->dc_task); 3105 } 3106 3107 mlog_exit(status); 3108 return status; 3109} 3110 3111void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 3112 int hangup_pending) 3113{ 3114 mlog_entry_void(); 3115 3116 ocfs2_drop_osb_locks(osb); 3117 3118 /* 3119 * Now that we have dropped all locks and ocfs2_dismount_volume() 3120 * has disabled recovery, the DLM won't be talking to us. It's 3121 * safe to tear things down before disconnecting the cluster. 3122 */ 3123 3124 if (osb->dc_task) { 3125 kthread_stop(osb->dc_task); 3126 osb->dc_task = NULL; 3127 } 3128 3129 ocfs2_lock_res_free(&osb->osb_super_lockres); 3130 ocfs2_lock_res_free(&osb->osb_rename_lockres); 3131 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 3132 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres); 3133 3134 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 3135 osb->cconn = NULL; 3136 3137 ocfs2_dlm_shutdown_debug(osb); 3138 3139 mlog_exit_void(); 3140} 3141 3142static int ocfs2_drop_lock(struct ocfs2_super *osb, 3143 struct ocfs2_lock_res *lockres) 3144{ 3145 int ret; 3146 unsigned long flags; 3147 u32 lkm_flags = 0; 3148 3149 /* We didn't get anywhere near actually using this lockres. */ 3150 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 3151 goto out; 3152 3153 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3154 lkm_flags |= DLM_LKF_VALBLK; 3155 3156 spin_lock_irqsave(&lockres->l_lock, flags); 3157 3158 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 3159 "lockres %s, flags 0x%lx\n", 3160 lockres->l_name, lockres->l_flags); 3161 3162 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 3163 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 3164 "%u, unlock_action = %u\n", 3165 lockres->l_name, lockres->l_flags, lockres->l_action, 3166 lockres->l_unlock_action); 3167 3168 spin_unlock_irqrestore(&lockres->l_lock, flags); 3169 3170 ocfs2_wait_on_busy_lock(lockres); 3171 3172 spin_lock_irqsave(&lockres->l_lock, flags); 3173 } 3174 3175 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3176 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 3177 lockres->l_level == DLM_LOCK_EX && 3178 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3179 lockres->l_ops->set_lvb(lockres); 3180 } 3181 3182 if (lockres->l_flags & OCFS2_LOCK_BUSY) 3183 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 3184 lockres->l_name); 3185 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 3186 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 3187 3188 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 3189 spin_unlock_irqrestore(&lockres->l_lock, flags); 3190 goto out; 3191 } 3192 3193 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 3194 3195 /* make sure we never get here while waiting for an ast to 3196 * fire. */ 3197 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 3198 3199 /* is this necessary? */ 3200 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3201 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 3202 spin_unlock_irqrestore(&lockres->l_lock, flags); 3203 3204 mlog(0, "lock %s\n", lockres->l_name); 3205 3206 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); 3207 if (ret) { 3208 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3209 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3210 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 3211 BUG(); 3212 } 3213 mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n", 3214 lockres->l_name); 3215 3216 ocfs2_wait_on_busy_lock(lockres); 3217out: 3218 mlog_exit(0); 3219 return 0; 3220} 3221 3222/* Mark the lockres as being dropped. It will no longer be 3223 * queued if blocking, but we still may have to wait on it 3224 * being dequeued from the downconvert thread before we can consider 3225 * it safe to drop. 3226 * 3227 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3228void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) 3229{ 3230 int status; 3231 struct ocfs2_mask_waiter mw; 3232 unsigned long flags; 3233 3234 ocfs2_init_mask_waiter(&mw); 3235 3236 spin_lock_irqsave(&lockres->l_lock, flags); 3237 lockres->l_flags |= OCFS2_LOCK_FREEING; 3238 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 3239 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 3240 spin_unlock_irqrestore(&lockres->l_lock, flags); 3241 3242 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 3243 3244 status = ocfs2_wait_for_mask(&mw); 3245 if (status) 3246 mlog_errno(status); 3247 3248 spin_lock_irqsave(&lockres->l_lock, flags); 3249 } 3250 spin_unlock_irqrestore(&lockres->l_lock, flags); 3251} 3252 3253void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 3254 struct ocfs2_lock_res *lockres) 3255{ 3256 int ret; 3257 3258 ocfs2_mark_lockres_freeing(lockres); 3259 ret = ocfs2_drop_lock(osb, lockres); 3260 if (ret) 3261 mlog_errno(ret); 3262} 3263 3264static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 3265{ 3266 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3267 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3268 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3269 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres); 3270} 3271 3272int ocfs2_drop_inode_locks(struct inode *inode) 3273{ 3274 int status, err; 3275 3276 mlog_entry_void(); 3277 3278 /* No need to call ocfs2_mark_lockres_freeing here - 3279 * ocfs2_clear_inode has done it for us. */ 3280 3281 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3282 &OCFS2_I(inode)->ip_open_lockres); 3283 if (err < 0) 3284 mlog_errno(err); 3285 3286 status = err; 3287 3288 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3289 &OCFS2_I(inode)->ip_inode_lockres); 3290 if (err < 0) 3291 mlog_errno(err); 3292 if (err < 0 && !status) 3293 status = err; 3294 3295 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3296 &OCFS2_I(inode)->ip_rw_lockres); 3297 if (err < 0) 3298 mlog_errno(err); 3299 if (err < 0 && !status) 3300 status = err; 3301 3302 mlog_exit(status); 3303 return status; 3304} 3305 3306static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3307 int new_level) 3308{ 3309 assert_spin_locked(&lockres->l_lock); 3310 3311 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3312 3313 if (lockres->l_level <= new_level) { 3314 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " 3315 "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " 3316 "block %d, pgen %d\n", lockres->l_name, lockres->l_level, 3317 new_level, list_empty(&lockres->l_blocked_list), 3318 list_empty(&lockres->l_mask_waiters), lockres->l_type, 3319 lockres->l_flags, lockres->l_ro_holders, 3320 lockres->l_ex_holders, lockres->l_action, 3321 lockres->l_unlock_action, lockres->l_requested, 3322 lockres->l_blocking, lockres->l_pending_gen); 3323 BUG(); 3324 } 3325 3326 mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", 3327 lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); 3328 3329 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3330 lockres->l_requested = new_level; 3331 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3332 return lockres_set_pending(lockres); 3333} 3334 3335static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3336 struct ocfs2_lock_res *lockres, 3337 int new_level, 3338 int lvb, 3339 unsigned int generation) 3340{ 3341 int ret; 3342 u32 dlm_flags = DLM_LKF_CONVERT; 3343 3344 mlog_entry_void(); 3345 3346 mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, 3347 lockres->l_level, new_level); 3348 3349 if (lvb) 3350 dlm_flags |= DLM_LKF_VALBLK; 3351 3352 ret = ocfs2_dlm_lock(osb->cconn, 3353 new_level, 3354 &lockres->l_lksb, 3355 dlm_flags, 3356 lockres->l_name, 3357 OCFS2_LOCK_ID_MAX_LEN - 1); 3358 lockres_clear_pending(lockres, generation, osb); 3359 if (ret) { 3360 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3361 ocfs2_recover_from_dlm_error(lockres, 1); 3362 goto bail; 3363 } 3364 3365 ret = 0; 3366bail: 3367 mlog_exit(ret); 3368 return ret; 3369} 3370 3371/* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3372static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3373 struct ocfs2_lock_res *lockres) 3374{ 3375 assert_spin_locked(&lockres->l_lock); 3376 3377 mlog_entry_void(); 3378 3379 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3380 /* If we're already trying to cancel a lock conversion 3381 * then just drop the spinlock and allow the caller to 3382 * requeue this lock. */ 3383 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); 3384 return 0; 3385 } 3386 3387 /* were we in a convert when we got the bast fire? */ 3388 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3389 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3390 /* set things up for the unlockast to know to just 3391 * clear out the ast_action and unset busy, etc. */ 3392 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3393 3394 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3395 "lock %s, invalid flags: 0x%lx\n", 3396 lockres->l_name, lockres->l_flags); 3397 3398 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3399 3400 return 1; 3401} 3402 3403static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3404 struct ocfs2_lock_res *lockres) 3405{ 3406 int ret; 3407 3408 mlog_entry_void(); 3409 3410 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3411 DLM_LKF_CANCEL); 3412 if (ret) { 3413 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3414 ocfs2_recover_from_dlm_error(lockres, 0); 3415 } 3416 3417 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3418 3419 mlog_exit(ret); 3420 return ret; 3421} 3422 3423static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3424 struct ocfs2_lock_res *lockres, 3425 struct ocfs2_unblock_ctl *ctl) 3426{ 3427 unsigned long flags; 3428 int blocking; 3429 int new_level; 3430 int level; 3431 int ret = 0; 3432 int set_lvb = 0; 3433 unsigned int gen; 3434 3435 mlog_entry_void(); 3436 3437 spin_lock_irqsave(&lockres->l_lock, flags); 3438 3439recheck: 3440 /* 3441 * Is it still blocking? If not, we have no more work to do. 3442 */ 3443 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) { 3444 BUG_ON(lockres->l_blocking != DLM_LOCK_NL); 3445 spin_unlock_irqrestore(&lockres->l_lock, flags); 3446 ret = 0; 3447 goto leave; 3448 } 3449 3450 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3451 if (lockres->l_flags & OCFS2_LOCK_PENDING) { 3452 mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", 3453 lockres->l_name); 3454 goto leave_requeue; 3455 } 3456 3457 ctl->requeue = 1; 3458 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3459 spin_unlock_irqrestore(&lockres->l_lock, flags); 3460 if (ret) { 3461 ret = ocfs2_cancel_convert(osb, lockres); 3462 if (ret < 0) 3463 mlog_errno(ret); 3464 } 3465 goto leave; 3466 } 3467 3468 /* 3469 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is 3470 * set when the ast is received for an upconvert just before the 3471 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast 3472 * on the heels of the ast, we want to delay the downconvert just 3473 * enough to allow the up requestor to do its task. Because this 3474 * lock is in the blocked queue, the lock will be downconverted 3475 * as soon as the requestor is done with the lock. 3476 */ 3477 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) 3478 goto leave_requeue; 3479 3480 /* 3481 * How can we block and yet be at NL? We were trying to upconvert 3482 * from NL and got canceled. The code comes back here, and now 3483 * we notice and clear BLOCKING. 3484 */ 3485 if (lockres->l_level == DLM_LOCK_NL) { 3486 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); 3487 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); 3488 lockres->l_blocking = DLM_LOCK_NL; 3489 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 3490 spin_unlock_irqrestore(&lockres->l_lock, flags); 3491 goto leave; 3492 } 3493 3494 /* if we're blocking an exclusive and we have *any* holders, 3495 * then requeue. */ 3496 if ((lockres->l_blocking == DLM_LOCK_EX) 3497 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 3498 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", 3499 lockres->l_name, lockres->l_ex_holders, 3500 lockres->l_ro_holders); 3501 goto leave_requeue; 3502 } 3503 3504 /* If it's a PR we're blocking, then only 3505 * requeue if we've got any EX holders */ 3506 if (lockres->l_blocking == DLM_LOCK_PR && 3507 lockres->l_ex_holders) { 3508 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", 3509 lockres->l_name, lockres->l_ex_holders); 3510 goto leave_requeue; 3511 } 3512 3513 /* 3514 * Can we get a lock in this state if the holder counts are 3515 * zero? The meta data unblock code used to check this. 3516 */ 3517 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3518 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { 3519 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", 3520 lockres->l_name); 3521 goto leave_requeue; 3522 } 3523 3524 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3525 3526 if (lockres->l_ops->check_downconvert 3527 && !lockres->l_ops->check_downconvert(lockres, new_level)) { 3528 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", 3529 lockres->l_name); 3530 goto leave_requeue; 3531 } 3532 3533 /* If we get here, then we know that there are no more 3534 * incompatible holders (and anyone asking for an incompatible 3535 * lock is blocked). We can now downconvert the lock */ 3536 if (!lockres->l_ops->downconvert_worker) 3537 goto downconvert; 3538 3539 /* Some lockres types want to do a bit of work before 3540 * downconverting a lock. Allow that here. The worker function 3541 * may sleep, so we save off a copy of what we're blocking as 3542 * it may change while we're not holding the spin lock. */ 3543 blocking = lockres->l_blocking; 3544 level = lockres->l_level; 3545 spin_unlock_irqrestore(&lockres->l_lock, flags); 3546 3547 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3548 3549 if (ctl->unblock_action == UNBLOCK_STOP_POST) { 3550 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", 3551 lockres->l_name); 3552 goto leave; 3553 } 3554 3555 spin_lock_irqsave(&lockres->l_lock, flags); 3556 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { 3557 /* If this changed underneath us, then we can't drop 3558 * it just yet. */ 3559 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " 3560 "Recheck\n", lockres->l_name, blocking, 3561 lockres->l_blocking, level, lockres->l_level); 3562 goto recheck; 3563 } 3564 3565downconvert: 3566 ctl->requeue = 0; 3567 3568 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3569 if (lockres->l_level == DLM_LOCK_EX) 3570 set_lvb = 1; 3571 3572 /* 3573 * We only set the lvb if the lock has been fully 3574 * refreshed - otherwise we risk setting stale 3575 * data. Otherwise, there's no need to actually clear 3576 * out the lvb here as it's value is still valid. 3577 */ 3578 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3579 lockres->l_ops->set_lvb(lockres); 3580 } 3581 3582 gen = ocfs2_prepare_downconvert(lockres, new_level); 3583 spin_unlock_irqrestore(&lockres->l_lock, flags); 3584 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3585 gen); 3586 3587leave: 3588 mlog_exit(ret); 3589 return ret; 3590 3591leave_requeue: 3592 spin_unlock_irqrestore(&lockres->l_lock, flags); 3593 ctl->requeue = 1; 3594 3595 mlog_exit(0); 3596 return 0; 3597} 3598 3599static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3600 int blocking) 3601{ 3602 struct inode *inode; 3603 struct address_space *mapping; 3604 3605 inode = ocfs2_lock_res_inode(lockres); 3606 mapping = inode->i_mapping; 3607 3608 if (!S_ISREG(inode->i_mode)) 3609 goto out; 3610 3611 /* 3612 * We need this before the filemap_fdatawrite() so that it can 3613 * transfer the dirty bit from the PTE to the 3614 * page. Unfortunately this means that even for EX->PR 3615 * downconverts, we'll lose our mappings and have to build 3616 * them up again. 3617 */ 3618 unmap_mapping_range(mapping, 0, 0, 0); 3619 3620 if (filemap_fdatawrite(mapping)) { 3621 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3622 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3623 } 3624 sync_mapping_buffers(mapping); 3625 if (blocking == DLM_LOCK_EX) { 3626 truncate_inode_pages(mapping, 0); 3627 } else { 3628 /* We only need to wait on the I/O if we're not also 3629 * truncating pages because truncate_inode_pages waits 3630 * for us above. We don't truncate pages if we're 3631 * blocking anything < EXMODE because we want to keep 3632 * them around in that case. */ 3633 filemap_fdatawait(mapping); 3634 } 3635 3636out: 3637 return UNBLOCK_CONTINUE; 3638} 3639 3640static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci, 3641 struct ocfs2_lock_res *lockres, 3642 int new_level) 3643{ 3644 int checkpointed = ocfs2_ci_fully_checkpointed(ci); 3645 3646 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3647 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3648 3649 if (checkpointed) 3650 return 1; 3651 3652 ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci))); 3653 return 0; 3654} 3655 3656static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3657 int new_level) 3658{ 3659 struct inode *inode = ocfs2_lock_res_inode(lockres); 3660 3661 return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level); 3662} 3663 3664static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3665{ 3666 struct inode *inode = ocfs2_lock_res_inode(lockres); 3667 3668 __ocfs2_stuff_meta_lvb(inode); 3669} 3670 3671/* 3672 * Does the final reference drop on our dentry lock. Right now this 3673 * happens in the downconvert thread, but we could choose to simplify the 3674 * dlmglue API and push these off to the ocfs2_wq in the future. 3675 */ 3676static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3677 struct ocfs2_lock_res *lockres) 3678{ 3679 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3680 ocfs2_dentry_lock_put(osb, dl); 3681} 3682 3683/* 3684 * d_delete() matching dentries before the lock downconvert. 3685 * 3686 * At this point, any process waiting to destroy the 3687 * dentry_lock due to last ref count is stopped by the 3688 * OCFS2_LOCK_QUEUED flag. 3689 * 3690 * We have two potential problems 3691 * 3692 * 1) If we do the last reference drop on our dentry_lock (via dput) 3693 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3694 * the downconvert to finish. Instead we take an elevated 3695 * reference and push the drop until after we've completed our 3696 * unblock processing. 3697 * 3698 * 2) There might be another process with a final reference, 3699 * waiting on us to finish processing. If this is the case, we 3700 * detect it and exit out - there's no more dentries anyway. 3701 */ 3702static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3703 int blocking) 3704{ 3705 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3706 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3707 struct dentry *dentry; 3708 unsigned long flags; 3709 int extra_ref = 0; 3710 3711 /* 3712 * This node is blocking another node from getting a read 3713 * lock. This happens when we've renamed within a 3714 * directory. We've forced the other nodes to d_delete(), but 3715 * we never actually dropped our lock because it's still 3716 * valid. The downconvert code will retain a PR for this node, 3717 * so there's no further work to do. 3718 */ 3719 if (blocking == DLM_LOCK_PR) 3720 return UNBLOCK_CONTINUE; 3721 3722 /* 3723 * Mark this inode as potentially orphaned. The code in 3724 * ocfs2_delete_inode() will figure out whether it actually 3725 * needs to be freed or not. 3726 */ 3727 spin_lock(&oi->ip_lock); 3728 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3729 spin_unlock(&oi->ip_lock); 3730 3731 /* 3732 * Yuck. We need to make sure however that the check of 3733 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3734 * respect to a reference decrement or the setting of that 3735 * flag. 3736 */ 3737 spin_lock_irqsave(&lockres->l_lock, flags); 3738 spin_lock(&dentry_attach_lock); 3739 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3740 && dl->dl_count) { 3741 dl->dl_count++; 3742 extra_ref = 1; 3743 } 3744 spin_unlock(&dentry_attach_lock); 3745 spin_unlock_irqrestore(&lockres->l_lock, flags); 3746 3747 mlog(0, "extra_ref = %d\n", extra_ref); 3748 3749 /* 3750 * We have a process waiting on us in ocfs2_dentry_iput(), 3751 * which means we can't have any more outstanding 3752 * aliases. There's no need to do any more work. 3753 */ 3754 if (!extra_ref) 3755 return UNBLOCK_CONTINUE; 3756 3757 spin_lock(&dentry_attach_lock); 3758 while (1) { 3759 dentry = ocfs2_find_local_alias(dl->dl_inode, 3760 dl->dl_parent_blkno, 1); 3761 if (!dentry) 3762 break; 3763 spin_unlock(&dentry_attach_lock); 3764 3765 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len, 3766 dentry->d_name.name); 3767 3768 /* 3769 * The following dcache calls may do an 3770 * iput(). Normally we don't want that from the 3771 * downconverting thread, but in this case it's ok 3772 * because the requesting node already has an 3773 * exclusive lock on the inode, so it can't be queued 3774 * for a downconvert. 3775 */ 3776 d_delete(dentry); 3777 dput(dentry); 3778 3779 spin_lock(&dentry_attach_lock); 3780 } 3781 spin_unlock(&dentry_attach_lock); 3782 3783 /* 3784 * If we are the last holder of this dentry lock, there is no 3785 * reason to downconvert so skip straight to the unlock. 3786 */ 3787 if (dl->dl_count == 1) 3788 return UNBLOCK_STOP_POST; 3789 3790 return UNBLOCK_CONTINUE_POST; 3791} 3792 3793static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 3794 int new_level) 3795{ 3796 struct ocfs2_refcount_tree *tree = 3797 ocfs2_lock_res_refcount_tree(lockres); 3798 3799 return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level); 3800} 3801 3802static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 3803 int blocking) 3804{ 3805 struct ocfs2_refcount_tree *tree = 3806 ocfs2_lock_res_refcount_tree(lockres); 3807 3808 ocfs2_metadata_cache_purge(&tree->rf_ci); 3809 3810 return UNBLOCK_CONTINUE; 3811} 3812 3813static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) 3814{ 3815 struct ocfs2_qinfo_lvb *lvb; 3816 struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres); 3817 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 3818 oinfo->dqi_gi.dqi_type); 3819 3820 mlog_entry_void(); 3821 3822 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3823 lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; 3824 lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); 3825 lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace); 3826 lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms); 3827 lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); 3828 lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); 3829 lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); 3830 3831 mlog_exit_void(); 3832} 3833 3834void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) 3835{ 3836 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3837 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 3838 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3839 3840 mlog_entry_void(); 3841 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 3842 ocfs2_cluster_unlock(osb, lockres, level); 3843 mlog_exit_void(); 3844} 3845 3846static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) 3847{ 3848 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 3849 oinfo->dqi_gi.dqi_type); 3850 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3851 struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3852 struct buffer_head *bh = NULL; 3853 struct ocfs2_global_disk_dqinfo *gdinfo; 3854 int status = 0; 3855 3856 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 3857 lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) { 3858 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace); 3859 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace); 3860 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms); 3861 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks); 3862 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk); 3863 oinfo->dqi_gi.dqi_free_entry = 3864 be32_to_cpu(lvb->lvb_free_entry); 3865 } else { 3866 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode, 3867 oinfo->dqi_giblk, &bh); 3868 if (status) { 3869 mlog_errno(status); 3870 goto bail; 3871 } 3872 gdinfo = (struct ocfs2_global_disk_dqinfo *) 3873 (bh->b_data + OCFS2_GLOBAL_INFO_OFF); 3874 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace); 3875 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace); 3876 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms); 3877 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks); 3878 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk); 3879 oinfo->dqi_gi.dqi_free_entry = 3880 le32_to_cpu(gdinfo->dqi_free_entry); 3881 brelse(bh); 3882 ocfs2_track_lock_refresh(lockres); 3883 } 3884 3885bail: 3886 return status; 3887} 3888 3889/* Lock quota info, this function expects at least shared lock on the quota file 3890 * so that we can safely refresh quota info from disk. */ 3891int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) 3892{ 3893 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3894 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 3895 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3896 int status = 0; 3897 3898 mlog_entry_void(); 3899 3900 /* On RO devices, locking really isn't needed... */ 3901 if (ocfs2_is_hard_readonly(osb)) { 3902 if (ex) 3903 status = -EROFS; 3904 goto bail; 3905 } 3906 if (ocfs2_mount_local(osb)) 3907 goto bail; 3908 3909 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 3910 if (status < 0) { 3911 mlog_errno(status); 3912 goto bail; 3913 } 3914 if (!ocfs2_should_refresh_lock_res(lockres)) 3915 goto bail; 3916 /* OK, we have the lock but we need to refresh the quota info */ 3917 status = ocfs2_refresh_qinfo(oinfo); 3918 if (status) 3919 ocfs2_qinfo_unlock(oinfo, ex); 3920 ocfs2_complete_lock_res_refresh(lockres, status); 3921bail: 3922 mlog_exit(status); 3923 return status; 3924} 3925 3926int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex) 3927{ 3928 int status; 3929 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3930 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 3931 struct ocfs2_super *osb = lockres->l_priv; 3932 3933 3934 if (ocfs2_is_hard_readonly(osb)) 3935 return -EROFS; 3936 3937 if (ocfs2_mount_local(osb)) 3938 return 0; 3939 3940 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 3941 if (status < 0) 3942 mlog_errno(status); 3943 3944 return status; 3945} 3946 3947void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) 3948{ 3949 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3950 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 3951 struct ocfs2_super *osb = lockres->l_priv; 3952 3953 if (!ocfs2_mount_local(osb)) 3954 ocfs2_cluster_unlock(osb, lockres, level); 3955} 3956 3957static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3958 struct ocfs2_lock_res *lockres) 3959{ 3960 int status; 3961 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3962 unsigned long flags; 3963 3964 /* Our reference to the lockres in this function can be 3965 * considered valid until we remove the OCFS2_LOCK_QUEUED 3966 * flag. */ 3967 3968 mlog_entry_void(); 3969 3970 BUG_ON(!lockres); 3971 BUG_ON(!lockres->l_ops); 3972 3973 mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); 3974 3975 /* Detect whether a lock has been marked as going away while 3976 * the downconvert thread was processing other things. A lock can 3977 * still be marked with OCFS2_LOCK_FREEING after this check, 3978 * but short circuiting here will still save us some 3979 * performance. */ 3980 spin_lock_irqsave(&lockres->l_lock, flags); 3981 if (lockres->l_flags & OCFS2_LOCK_FREEING) 3982 goto unqueue; 3983 spin_unlock_irqrestore(&lockres->l_lock, flags); 3984 3985 status = ocfs2_unblock_lock(osb, lockres, &ctl); 3986 if (status < 0) 3987 mlog_errno(status); 3988 3989 spin_lock_irqsave(&lockres->l_lock, flags); 3990unqueue: 3991 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 3992 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3993 } else 3994 ocfs2_schedule_blocked_lock(osb, lockres); 3995 3996 mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, 3997 ctl.requeue ? "yes" : "no"); 3998 spin_unlock_irqrestore(&lockres->l_lock, flags); 3999 4000 if (ctl.unblock_action != UNBLOCK_CONTINUE 4001 && lockres->l_ops->post_unlock) 4002 lockres->l_ops->post_unlock(osb, lockres); 4003 4004 mlog_exit_void(); 4005} 4006 4007static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 4008 struct ocfs2_lock_res *lockres) 4009{ 4010 mlog_entry_void(); 4011 4012 assert_spin_locked(&lockres->l_lock); 4013 4014 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 4015 /* Do not schedule a lock for downconvert when it's on 4016 * the way to destruction - any nodes wanting access 4017 * to the resource will get it soon. */ 4018 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", 4019 lockres->l_name, lockres->l_flags); 4020 return; 4021 } 4022 4023 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 4024 4025 spin_lock(&osb->dc_task_lock); 4026 if (list_empty(&lockres->l_blocked_list)) { 4027 list_add_tail(&lockres->l_blocked_list, 4028 &osb->blocked_lock_list); 4029 osb->blocked_lock_count++; 4030 } 4031 spin_unlock(&osb->dc_task_lock); 4032 4033 mlog_exit_void(); 4034} 4035 4036static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 4037{ 4038 unsigned long processed; 4039 struct ocfs2_lock_res *lockres; 4040 4041 mlog_entry_void(); 4042 4043 spin_lock(&osb->dc_task_lock); 4044 /* grab this early so we know to try again if a state change and 4045 * wake happens part-way through our work */ 4046 osb->dc_work_sequence = osb->dc_wake_sequence; 4047 4048 processed = osb->blocked_lock_count; 4049 while (processed) { 4050 BUG_ON(list_empty(&osb->blocked_lock_list)); 4051 4052 lockres = list_entry(osb->blocked_lock_list.next, 4053 struct ocfs2_lock_res, l_blocked_list); 4054 list_del_init(&lockres->l_blocked_list); 4055 osb->blocked_lock_count--; 4056 spin_unlock(&osb->dc_task_lock); 4057 4058 BUG_ON(!processed); 4059 processed--; 4060 4061 ocfs2_process_blocked_lock(osb, lockres); 4062 4063 spin_lock(&osb->dc_task_lock); 4064 } 4065 spin_unlock(&osb->dc_task_lock); 4066 4067 mlog_exit_void(); 4068} 4069 4070static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 4071{ 4072 int empty = 0; 4073 4074 spin_lock(&osb->dc_task_lock); 4075 if (list_empty(&osb->blocked_lock_list)) 4076 empty = 1; 4077 4078 spin_unlock(&osb->dc_task_lock); 4079 return empty; 4080} 4081 4082static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 4083{ 4084 int should_wake = 0; 4085 4086 spin_lock(&osb->dc_task_lock); 4087 if (osb->dc_work_sequence != osb->dc_wake_sequence) 4088 should_wake = 1; 4089 spin_unlock(&osb->dc_task_lock); 4090 4091 return should_wake; 4092} 4093 4094static int ocfs2_downconvert_thread(void *arg) 4095{ 4096 int status = 0; 4097 struct ocfs2_super *osb = arg; 4098 4099 /* only quit once we've been asked to stop and there is no more 4100 * work available */ 4101 while (!(kthread_should_stop() && 4102 ocfs2_downconvert_thread_lists_empty(osb))) { 4103 4104 wait_event_interruptible(osb->dc_event, 4105 ocfs2_downconvert_thread_should_wake(osb) || 4106 kthread_should_stop()); 4107 4108 mlog(0, "downconvert_thread: awoken\n"); 4109 4110 ocfs2_downconvert_thread_do_work(osb); 4111 } 4112 4113 osb->dc_task = NULL; 4114 return status; 4115} 4116 4117void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 4118{ 4119 spin_lock(&osb->dc_task_lock); 4120 /* make sure the voting thread gets a swipe at whatever changes 4121 * the caller may have made to the voting state */ 4122 osb->dc_wake_sequence++; 4123 spin_unlock(&osb->dc_task_lock); 4124 wake_up(&osb->dc_event); 4125} 4126