1/****************************************************************************** 2******************************************************************************* 3** 4** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. 5** 6** This copyrighted material is made available to anyone wishing to use, 7** modify, copy, or redistribute it subject to the terms and conditions 8** of the GNU General Public License v.2. 9** 10******************************************************************************* 11******************************************************************************/ 12 13/* Central locking logic has four stages: 14 15 dlm_lock() 16 dlm_unlock() 17 18 request_lock(ls, lkb) 19 convert_lock(ls, lkb) 20 unlock_lock(ls, lkb) 21 cancel_lock(ls, lkb) 22 23 _request_lock(r, lkb) 24 _convert_lock(r, lkb) 25 _unlock_lock(r, lkb) 26 _cancel_lock(r, lkb) 27 28 do_request(r, lkb) 29 do_convert(r, lkb) 30 do_unlock(r, lkb) 31 do_cancel(r, lkb) 32 33 Stage 1 (lock, unlock) is mainly about checking input args and 34 splitting into one of the four main operations: 35 36 dlm_lock = request_lock 37 dlm_lock+CONVERT = convert_lock 38 dlm_unlock = unlock_lock 39 dlm_unlock+CANCEL = cancel_lock 40 41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is 42 provided to the next stage. 43 44 Stage 3, _xxxx_lock(), determines if the operation is local or remote. 45 When remote, it calls send_xxxx(), when local it calls do_xxxx(). 46 47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the 48 given rsb and lkb and queues callbacks. 49 50 For remote operations, send_xxxx() results in the corresponding do_xxxx() 51 function being executed on the remote node. The connecting send/receive 52 calls on local (L) and remote (R) nodes: 53 54 L: send_xxxx() -> R: receive_xxxx() 55 R: do_xxxx() 56 L: receive_xxxx_reply() <- R: send_xxxx_reply() 57*/ 58#include <linux/types.h> 59#include "dlm_internal.h" 60#include <linux/dlm_device.h> 61#include "memory.h" 62#include "lowcomms.h" 63#include "requestqueue.h" 64#include "util.h" 65#include "dir.h" 66#include "member.h" 67#include "lockspace.h" 68#include "ast.h" 69#include "lock.h" 70#include "rcom.h" 71#include "recover.h" 72#include "lvb_table.h" 73#include "user.h" 74#include "config.h" 75 76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb); 77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb); 78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb); 79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb); 80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb); 81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode); 82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb); 83static int send_remove(struct dlm_rsb *r); 84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 86 struct dlm_message *ms); 87static int receive_extralen(struct dlm_message *ms); 88static void do_purge(struct dlm_ls *ls, int nodeid, int pid); 89 90/* 91 * Lock compatibilty matrix - thanks Steve 92 * UN = Unlocked state. Not really a state, used as a flag 93 * PD = Padding. Used to make the matrix a nice power of two in size 94 * Other states are the same as the VMS DLM. 95 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same) 96 */ 97 98static const int __dlm_compat_matrix[8][8] = { 99 /* UN NL CR CW PR PW EX PD */ 100 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */ 101 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */ 102 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */ 103 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */ 104 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */ 105 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */ 106 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */ 107 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 108}; 109 110/* 111 * This defines the direction of transfer of LVB data. 112 * Granted mode is the row; requested mode is the column. 113 * Usage: matrix[grmode+1][rqmode+1] 114 * 1 = LVB is returned to the caller 115 * 0 = LVB is written to the resource 116 * -1 = nothing happens to the LVB 117 */ 118 119const int dlm_lvb_operations[8][8] = { 120 /* UN NL CR CW PR PW EX PD*/ 121 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */ 122 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */ 123 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */ 124 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */ 125 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */ 126 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */ 127 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */ 128 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */ 129}; 130 131#define modes_compat(gr, rq) \ 132 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1] 133 134int dlm_modes_compat(int mode1, int mode2) 135{ 136 return __dlm_compat_matrix[mode1 + 1][mode2 + 1]; 137} 138 139/* 140 * Compatibility matrix for conversions with QUECVT set. 141 * Granted mode is the row; requested mode is the column. 142 * Usage: matrix[grmode+1][rqmode+1] 143 */ 144 145static const int __quecvt_compat_matrix[8][8] = { 146 /* UN NL CR CW PR PW EX PD */ 147 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */ 148 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */ 149 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */ 150 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */ 151 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */ 152 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */ 153 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */ 154 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 155}; 156 157void dlm_print_lkb(struct dlm_lkb *lkb) 158{ 159 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n" 160 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n", 161 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 162 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, 163 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type); 164} 165 166void dlm_print_rsb(struct dlm_rsb *r) 167{ 168 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n", 169 r->res_nodeid, r->res_flags, r->res_first_lkid, 170 r->res_recover_locks_count, r->res_name); 171} 172 173void dlm_dump_rsb(struct dlm_rsb *r) 174{ 175 struct dlm_lkb *lkb; 176 177 dlm_print_rsb(r); 178 179 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n", 180 list_empty(&r->res_root_list), list_empty(&r->res_recover_list)); 181 printk(KERN_ERR "rsb lookup list\n"); 182 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup) 183 dlm_print_lkb(lkb); 184 printk(KERN_ERR "rsb grant queue:\n"); 185 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) 186 dlm_print_lkb(lkb); 187 printk(KERN_ERR "rsb convert queue:\n"); 188 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) 189 dlm_print_lkb(lkb); 190 printk(KERN_ERR "rsb wait queue:\n"); 191 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) 192 dlm_print_lkb(lkb); 193} 194 195/* Threads cannot use the lockspace while it's being recovered */ 196 197static inline void lock_recovery(struct dlm_ls *ls) 198{ 199 down_read(&ls->ls_in_recovery); 200} 201 202static inline void unlock_recovery(struct dlm_ls *ls) 203{ 204 up_read(&ls->ls_in_recovery); 205} 206 207static inline int lock_recovery_try(struct dlm_ls *ls) 208{ 209 return down_read_trylock(&ls->ls_in_recovery); 210} 211 212static inline int can_be_queued(struct dlm_lkb *lkb) 213{ 214 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE); 215} 216 217static inline int force_blocking_asts(struct dlm_lkb *lkb) 218{ 219 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST); 220} 221 222static inline int is_demoted(struct dlm_lkb *lkb) 223{ 224 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED); 225} 226 227static inline int is_altmode(struct dlm_lkb *lkb) 228{ 229 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE); 230} 231 232static inline int is_granted(struct dlm_lkb *lkb) 233{ 234 return (lkb->lkb_status == DLM_LKSTS_GRANTED); 235} 236 237static inline int is_remote(struct dlm_rsb *r) 238{ 239 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); 240 return !!r->res_nodeid; 241} 242 243static inline int is_process_copy(struct dlm_lkb *lkb) 244{ 245 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY)); 246} 247 248static inline int is_master_copy(struct dlm_lkb *lkb) 249{ 250 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 251 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb);); 252 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; 253} 254 255static inline int middle_conversion(struct dlm_lkb *lkb) 256{ 257 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) || 258 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW)) 259 return 1; 260 return 0; 261} 262 263static inline int down_conversion(struct dlm_lkb *lkb) 264{ 265 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); 266} 267 268static inline int is_overlap_unlock(struct dlm_lkb *lkb) 269{ 270 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK; 271} 272 273static inline int is_overlap_cancel(struct dlm_lkb *lkb) 274{ 275 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL; 276} 277 278static inline int is_overlap(struct dlm_lkb *lkb) 279{ 280 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK | 281 DLM_IFL_OVERLAP_CANCEL)); 282} 283 284static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 285{ 286 if (is_master_copy(lkb)) 287 return; 288 289 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); 290 291 lkb->lkb_lksb->sb_status = rv; 292 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags; 293 294 dlm_add_ast(lkb, AST_COMP); 295} 296 297static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) 298{ 299 queue_cast(r, lkb, 300 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL); 301} 302 303static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) 304{ 305 if (is_master_copy(lkb)) 306 send_bast(r, lkb, rqmode); 307 else { 308 lkb->lkb_bastmode = rqmode; 309 dlm_add_ast(lkb, AST_BAST); 310 } 311} 312 313/* 314 * Basic operations on rsb's and lkb's 315 */ 316 317static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len) 318{ 319 struct dlm_rsb *r; 320 321 r = allocate_rsb(ls, len); 322 if (!r) 323 return NULL; 324 325 r->res_ls = ls; 326 r->res_length = len; 327 memcpy(r->res_name, name, len); 328 mutex_init(&r->res_mutex); 329 330 INIT_LIST_HEAD(&r->res_lookup); 331 INIT_LIST_HEAD(&r->res_grantqueue); 332 INIT_LIST_HEAD(&r->res_convertqueue); 333 INIT_LIST_HEAD(&r->res_waitqueue); 334 INIT_LIST_HEAD(&r->res_root_list); 335 INIT_LIST_HEAD(&r->res_recover_list); 336 337 return r; 338} 339 340static int search_rsb_list(struct list_head *head, char *name, int len, 341 unsigned int flags, struct dlm_rsb **r_ret) 342{ 343 struct dlm_rsb *r; 344 int error = 0; 345 346 list_for_each_entry(r, head, res_hashchain) { 347 if (len == r->res_length && !memcmp(name, r->res_name, len)) 348 goto found; 349 } 350 return -EBADR; 351 352 found: 353 if (r->res_nodeid && (flags & R_MASTER)) 354 error = -ENOTBLK; 355 *r_ret = r; 356 return error; 357} 358 359static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b, 360 unsigned int flags, struct dlm_rsb **r_ret) 361{ 362 struct dlm_rsb *r; 363 int error; 364 365 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r); 366 if (!error) { 367 kref_get(&r->res_ref); 368 goto out; 369 } 370 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r); 371 if (error) 372 goto out; 373 374 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list); 375 376 if (dlm_no_directory(ls)) 377 goto out; 378 379 if (r->res_nodeid == -1) { 380 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 381 r->res_first_lkid = 0; 382 } else if (r->res_nodeid > 0) { 383 rsb_set_flag(r, RSB_MASTER_UNCERTAIN); 384 r->res_first_lkid = 0; 385 } else { 386 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r);); 387 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),); 388 } 389 out: 390 *r_ret = r; 391 return error; 392} 393 394static int search_rsb(struct dlm_ls *ls, char *name, int len, int b, 395 unsigned int flags, struct dlm_rsb **r_ret) 396{ 397 int error; 398 write_lock(&ls->ls_rsbtbl[b].lock); 399 error = _search_rsb(ls, name, len, b, flags, r_ret); 400 write_unlock(&ls->ls_rsbtbl[b].lock); 401 return error; 402} 403 404/* 405 * Find rsb in rsbtbl and potentially create/add one 406 * 407 * Delaying the release of rsb's has a similar benefit to applications keeping 408 * NL locks on an rsb, but without the guarantee that the cached master value 409 * will still be valid when the rsb is reused. Apps aren't always smart enough 410 * to keep NL locks on an rsb that they may lock again shortly; this can lead 411 * to excessive master lookups and removals if we don't delay the release. 412 * 413 * Searching for an rsb means looking through both the normal list and toss 414 * list. When found on the toss list the rsb is moved to the normal list with 415 * ref count of 1; when found on normal list the ref count is incremented. 416 */ 417 418static int find_rsb(struct dlm_ls *ls, char *name, int namelen, 419 unsigned int flags, struct dlm_rsb **r_ret) 420{ 421 struct dlm_rsb *r, *tmp; 422 uint32_t hash, bucket; 423 int error = 0; 424 425 if (dlm_no_directory(ls)) 426 flags |= R_CREATE; 427 428 hash = jhash(name, namelen, 0); 429 bucket = hash & (ls->ls_rsbtbl_size - 1); 430 431 error = search_rsb(ls, name, namelen, bucket, flags, &r); 432 if (!error) 433 goto out; 434 435 if (error == -EBADR && !(flags & R_CREATE)) 436 goto out; 437 438 /* the rsb was found but wasn't a master copy */ 439 if (error == -ENOTBLK) 440 goto out; 441 442 error = -ENOMEM; 443 r = create_rsb(ls, name, namelen); 444 if (!r) 445 goto out; 446 447 r->res_hash = hash; 448 r->res_bucket = bucket; 449 r->res_nodeid = -1; 450 kref_init(&r->res_ref); 451 452 /* With no directory, the master can be set immediately */ 453 if (dlm_no_directory(ls)) { 454 int nodeid = dlm_dir_nodeid(r); 455 if (nodeid == dlm_our_nodeid()) 456 nodeid = 0; 457 r->res_nodeid = nodeid; 458 } 459 460 write_lock(&ls->ls_rsbtbl[bucket].lock); 461 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp); 462 if (!error) { 463 write_unlock(&ls->ls_rsbtbl[bucket].lock); 464 free_rsb(r); 465 r = tmp; 466 goto out; 467 } 468 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list); 469 write_unlock(&ls->ls_rsbtbl[bucket].lock); 470 error = 0; 471 out: 472 *r_ret = r; 473 return error; 474} 475 476int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen, 477 unsigned int flags, struct dlm_rsb **r_ret) 478{ 479 return find_rsb(ls, name, namelen, flags, r_ret); 480} 481 482/* This is only called to add a reference when the code already holds 483 a valid reference to the rsb, so there's no need for locking. */ 484 485static inline void hold_rsb(struct dlm_rsb *r) 486{ 487 kref_get(&r->res_ref); 488} 489 490void dlm_hold_rsb(struct dlm_rsb *r) 491{ 492 hold_rsb(r); 493} 494 495static void toss_rsb(struct kref *kref) 496{ 497 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 498 struct dlm_ls *ls = r->res_ls; 499 500 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); 501 kref_init(&r->res_ref); 502 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss); 503 r->res_toss_time = jiffies; 504 if (r->res_lvbptr) { 505 free_lvb(r->res_lvbptr); 506 r->res_lvbptr = NULL; 507 } 508} 509 510/* When all references to the rsb are gone it's transfered to 511 the tossed list for later disposal. */ 512 513static void put_rsb(struct dlm_rsb *r) 514{ 515 struct dlm_ls *ls = r->res_ls; 516 uint32_t bucket = r->res_bucket; 517 518 write_lock(&ls->ls_rsbtbl[bucket].lock); 519 kref_put(&r->res_ref, toss_rsb); 520 write_unlock(&ls->ls_rsbtbl[bucket].lock); 521} 522 523void dlm_put_rsb(struct dlm_rsb *r) 524{ 525 put_rsb(r); 526} 527 528/* See comment for unhold_lkb */ 529 530static void unhold_rsb(struct dlm_rsb *r) 531{ 532 int rv; 533 rv = kref_put(&r->res_ref, toss_rsb); 534 DLM_ASSERT(!rv, dlm_dump_rsb(r);); 535} 536 537static void kill_rsb(struct kref *kref) 538{ 539 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 540 541 /* All work is done after the return from kref_put() so we 542 can release the write_lock before the remove and free. */ 543 544 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); 545 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); 546 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); 547 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); 548 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); 549 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); 550} 551 552/* Attaching/detaching lkb's from rsb's is for rsb reference counting. 553 The rsb must exist as long as any lkb's for it do. */ 554 555static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 556{ 557 hold_rsb(r); 558 lkb->lkb_resource = r; 559} 560 561static void detach_lkb(struct dlm_lkb *lkb) 562{ 563 if (lkb->lkb_resource) { 564 put_rsb(lkb->lkb_resource); 565 lkb->lkb_resource = NULL; 566 } 567} 568 569static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 570{ 571 struct dlm_lkb *lkb, *tmp; 572 uint32_t lkid = 0; 573 uint16_t bucket; 574 575 lkb = allocate_lkb(ls); 576 if (!lkb) 577 return -ENOMEM; 578 579 lkb->lkb_nodeid = -1; 580 lkb->lkb_grmode = DLM_LOCK_IV; 581 kref_init(&lkb->lkb_ref); 582 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 583 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); 584 585 get_random_bytes(&bucket, sizeof(bucket)); 586 bucket &= (ls->ls_lkbtbl_size - 1); 587 588 write_lock(&ls->ls_lkbtbl[bucket].lock); 589 590 /* counter can roll over so we must verify lkid is not in use */ 591 592 while (lkid == 0) { 593 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++; 594 595 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list, 596 lkb_idtbl_list) { 597 if (tmp->lkb_id != lkid) 598 continue; 599 lkid = 0; 600 break; 601 } 602 } 603 604 lkb->lkb_id = lkid; 605 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list); 606 write_unlock(&ls->ls_lkbtbl[bucket].lock); 607 608 *lkb_ret = lkb; 609 return 0; 610} 611 612static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid) 613{ 614 struct dlm_lkb *lkb; 615 uint16_t bucket = (lkid >> 16); 616 617 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { 618 if (lkb->lkb_id == lkid) 619 return lkb; 620 } 621 return NULL; 622} 623 624static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) 625{ 626 struct dlm_lkb *lkb; 627 uint16_t bucket = (lkid >> 16); 628 629 if (bucket >= ls->ls_lkbtbl_size) 630 return -EBADSLT; 631 632 read_lock(&ls->ls_lkbtbl[bucket].lock); 633 lkb = __find_lkb(ls, lkid); 634 if (lkb) 635 kref_get(&lkb->lkb_ref); 636 read_unlock(&ls->ls_lkbtbl[bucket].lock); 637 638 *lkb_ret = lkb; 639 return lkb ? 0 : -ENOENT; 640} 641 642static void kill_lkb(struct kref *kref) 643{ 644 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref); 645 646 /* All work is done after the return from kref_put() so we 647 can release the write_lock before the detach_lkb */ 648 649 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 650} 651 652/* __put_lkb() is used when an lkb may not have an rsb attached to 653 it so we need to provide the lockspace explicitly */ 654 655static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) 656{ 657 uint16_t bucket = (lkb->lkb_id >> 16); 658 659 write_lock(&ls->ls_lkbtbl[bucket].lock); 660 if (kref_put(&lkb->lkb_ref, kill_lkb)) { 661 list_del(&lkb->lkb_idtbl_list); 662 write_unlock(&ls->ls_lkbtbl[bucket].lock); 663 664 detach_lkb(lkb); 665 666 /* for local/process lkbs, lvbptr points to caller's lksb */ 667 if (lkb->lkb_lvbptr && is_master_copy(lkb)) 668 free_lvb(lkb->lkb_lvbptr); 669 free_lkb(lkb); 670 return 1; 671 } else { 672 write_unlock(&ls->ls_lkbtbl[bucket].lock); 673 return 0; 674 } 675} 676 677int dlm_put_lkb(struct dlm_lkb *lkb) 678{ 679 struct dlm_ls *ls; 680 681 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb);); 682 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb);); 683 684 ls = lkb->lkb_resource->res_ls; 685 return __put_lkb(ls, lkb); 686} 687 688/* This is only called to add a reference when the code already holds 689 a valid reference to the lkb, so there's no need for locking. */ 690 691static inline void hold_lkb(struct dlm_lkb *lkb) 692{ 693 kref_get(&lkb->lkb_ref); 694} 695 696/* This is called when we need to remove a reference and are certain 697 it's not the last ref. e.g. del_lkb is always called between a 698 find_lkb/put_lkb and is always the inverse of a previous add_lkb. 699 put_lkb would work fine, but would involve unnecessary locking */ 700 701static inline void unhold_lkb(struct dlm_lkb *lkb) 702{ 703 int rv; 704 rv = kref_put(&lkb->lkb_ref, kill_lkb); 705 DLM_ASSERT(!rv, dlm_print_lkb(lkb);); 706} 707 708static void lkb_add_ordered(struct list_head *new, struct list_head *head, 709 int mode) 710{ 711 struct dlm_lkb *lkb = NULL; 712 713 list_for_each_entry(lkb, head, lkb_statequeue) 714 if (lkb->lkb_rqmode < mode) 715 break; 716 717 if (!lkb) 718 list_add_tail(new, head); 719 else 720 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue); 721} 722 723/* add/remove lkb to rsb's grant/convert/wait queue */ 724 725static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status) 726{ 727 kref_get(&lkb->lkb_ref); 728 729 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 730 731 lkb->lkb_status = status; 732 733 switch (status) { 734 case DLM_LKSTS_WAITING: 735 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 736 list_add(&lkb->lkb_statequeue, &r->res_waitqueue); 737 else 738 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue); 739 break; 740 case DLM_LKSTS_GRANTED: 741 /* convention says granted locks kept in order of grmode */ 742 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue, 743 lkb->lkb_grmode); 744 break; 745 case DLM_LKSTS_CONVERT: 746 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 747 list_add(&lkb->lkb_statequeue, &r->res_convertqueue); 748 else 749 list_add_tail(&lkb->lkb_statequeue, 750 &r->res_convertqueue); 751 break; 752 default: 753 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status);); 754 } 755} 756 757static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 758{ 759 lkb->lkb_status = 0; 760 list_del(&lkb->lkb_statequeue); 761 unhold_lkb(lkb); 762} 763 764static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) 765{ 766 hold_lkb(lkb); 767 del_lkb(r, lkb); 768 add_lkb(r, lkb, sts); 769 unhold_lkb(lkb); 770} 771 772static int msg_reply_type(int mstype) 773{ 774 switch (mstype) { 775 case DLM_MSG_REQUEST: 776 return DLM_MSG_REQUEST_REPLY; 777 case DLM_MSG_CONVERT: 778 return DLM_MSG_CONVERT_REPLY; 779 case DLM_MSG_UNLOCK: 780 return DLM_MSG_UNLOCK_REPLY; 781 case DLM_MSG_CANCEL: 782 return DLM_MSG_CANCEL_REPLY; 783 case DLM_MSG_LOOKUP: 784 return DLM_MSG_LOOKUP_REPLY; 785 } 786 return -1; 787} 788 789/* add/remove lkb from global waiters list of lkb's waiting for 790 a reply from a remote node */ 791 792static int add_to_waiters(struct dlm_lkb *lkb, int mstype) 793{ 794 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 795 int error = 0; 796 797 mutex_lock(&ls->ls_waiters_mutex); 798 799 if (is_overlap_unlock(lkb) || 800 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { 801 error = -EINVAL; 802 goto out; 803 } 804 805 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { 806 switch (mstype) { 807 case DLM_MSG_UNLOCK: 808 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 809 break; 810 case DLM_MSG_CANCEL: 811 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 812 break; 813 default: 814 error = -EBUSY; 815 goto out; 816 } 817 lkb->lkb_wait_count++; 818 hold_lkb(lkb); 819 820 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x", 821 lkb->lkb_id, lkb->lkb_wait_type, mstype, 822 lkb->lkb_wait_count, lkb->lkb_flags); 823 goto out; 824 } 825 826 DLM_ASSERT(!lkb->lkb_wait_count, 827 dlm_print_lkb(lkb); 828 printk("wait_count %d\n", lkb->lkb_wait_count);); 829 830 lkb->lkb_wait_count++; 831 lkb->lkb_wait_type = mstype; 832 hold_lkb(lkb); 833 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); 834 out: 835 if (error) 836 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s", 837 lkb->lkb_id, error, lkb->lkb_flags, mstype, 838 lkb->lkb_wait_type, lkb->lkb_resource->res_name); 839 mutex_unlock(&ls->ls_waiters_mutex); 840 return error; 841} 842 843/* We clear the RESEND flag because we might be taking an lkb off the waiters 844 list as part of process_requestqueue (e.g. a lookup that has an optimized 845 request reply on the requestqueue) between dlm_recover_waiters_pre() which 846 set RESEND and dlm_recover_waiters_post() */ 847 848static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype) 849{ 850 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 851 int overlap_done = 0; 852 853 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) { 854 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 855 overlap_done = 1; 856 goto out_del; 857 } 858 859 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) { 860 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 861 overlap_done = 1; 862 goto out_del; 863 } 864 865 /* N.B. type of reply may not always correspond to type of original 866 msg due to lookup->request optimization, verify others? */ 867 868 if (lkb->lkb_wait_type) { 869 lkb->lkb_wait_type = 0; 870 goto out_del; 871 } 872 873 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d", 874 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type); 875 return -1; 876 877 out_del: 878 879 if (overlap_done && lkb->lkb_wait_type) { 880 log_error(ls, "remove_from_waiters %x reply %d give up on %d", 881 lkb->lkb_id, mstype, lkb->lkb_wait_type); 882 lkb->lkb_wait_count--; 883 lkb->lkb_wait_type = 0; 884 } 885 886 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); 887 888 lkb->lkb_flags &= ~DLM_IFL_RESEND; 889 lkb->lkb_wait_count--; 890 if (!lkb->lkb_wait_count) 891 list_del_init(&lkb->lkb_wait_reply); 892 unhold_lkb(lkb); 893 return 0; 894} 895 896static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) 897{ 898 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 899 int error; 900 901 mutex_lock(&ls->ls_waiters_mutex); 902 error = _remove_from_waiters(lkb, mstype); 903 mutex_unlock(&ls->ls_waiters_mutex); 904 return error; 905} 906 907/* Handles situations where we might be processing a "fake" or "stub" reply in 908 which we can't try to take waiters_mutex again. */ 909 910static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) 911{ 912 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 913 int error; 914 915 if (ms != &ls->ls_stub_ms) 916 mutex_lock(&ls->ls_waiters_mutex); 917 error = _remove_from_waiters(lkb, ms->m_type); 918 if (ms != &ls->ls_stub_ms) 919 mutex_unlock(&ls->ls_waiters_mutex); 920 return error; 921} 922 923static void dir_remove(struct dlm_rsb *r) 924{ 925 int to_nodeid; 926 927 if (dlm_no_directory(r->res_ls)) 928 return; 929 930 to_nodeid = dlm_dir_nodeid(r); 931 if (to_nodeid != dlm_our_nodeid()) 932 send_remove(r); 933 else 934 dlm_dir_remove_entry(r->res_ls, to_nodeid, 935 r->res_name, r->res_length); 936} 937 938 939static int shrink_bucket(struct dlm_ls *ls, int b) 940{ 941 struct dlm_rsb *r; 942 int count = 0, found; 943 944 for (;;) { 945 found = 0; 946 write_lock(&ls->ls_rsbtbl[b].lock); 947 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss, 948 res_hashchain) { 949 if (!time_after_eq(jiffies, r->res_toss_time + 950 dlm_config.ci_toss_secs * HZ)) 951 continue; 952 found = 1; 953 break; 954 } 955 956 if (!found) { 957 write_unlock(&ls->ls_rsbtbl[b].lock); 958 break; 959 } 960 961 if (kref_put(&r->res_ref, kill_rsb)) { 962 list_del(&r->res_hashchain); 963 write_unlock(&ls->ls_rsbtbl[b].lock); 964 965 if (is_master(r)) 966 dir_remove(r); 967 free_rsb(r); 968 count++; 969 } else { 970 write_unlock(&ls->ls_rsbtbl[b].lock); 971 log_error(ls, "tossed rsb in use %s", r->res_name); 972 } 973 } 974 975 return count; 976} 977 978void dlm_scan_rsbs(struct dlm_ls *ls) 979{ 980 int i; 981 982 if (dlm_locking_stopped(ls)) 983 return; 984 985 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 986 shrink_bucket(ls, i); 987 cond_resched(); 988 } 989} 990 991/* lkb is master or local copy */ 992 993static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 994{ 995 int b, len = r->res_ls->ls_lvblen; 996 997 /* b=1 lvb returned to caller 998 b=0 lvb written to rsb or invalidated 999 b=-1 do nothing */ 1000 1001 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 1002 1003 if (b == 1) { 1004 if (!lkb->lkb_lvbptr) 1005 return; 1006 1007 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1008 return; 1009 1010 if (!r->res_lvbptr) 1011 return; 1012 1013 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len); 1014 lkb->lkb_lvbseq = r->res_lvbseq; 1015 1016 } else if (b == 0) { 1017 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 1018 rsb_set_flag(r, RSB_VALNOTVALID); 1019 return; 1020 } 1021 1022 if (!lkb->lkb_lvbptr) 1023 return; 1024 1025 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1026 return; 1027 1028 if (!r->res_lvbptr) 1029 r->res_lvbptr = allocate_lvb(r->res_ls); 1030 1031 if (!r->res_lvbptr) 1032 return; 1033 1034 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len); 1035 r->res_lvbseq++; 1036 lkb->lkb_lvbseq = r->res_lvbseq; 1037 rsb_clear_flag(r, RSB_VALNOTVALID); 1038 } 1039 1040 if (rsb_flag(r, RSB_VALNOTVALID)) 1041 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID; 1042} 1043 1044static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1045{ 1046 if (lkb->lkb_grmode < DLM_LOCK_PW) 1047 return; 1048 1049 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 1050 rsb_set_flag(r, RSB_VALNOTVALID); 1051 return; 1052 } 1053 1054 if (!lkb->lkb_lvbptr) 1055 return; 1056 1057 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1058 return; 1059 1060 if (!r->res_lvbptr) 1061 r->res_lvbptr = allocate_lvb(r->res_ls); 1062 1063 if (!r->res_lvbptr) 1064 return; 1065 1066 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 1067 r->res_lvbseq++; 1068 rsb_clear_flag(r, RSB_VALNOTVALID); 1069} 1070 1071/* lkb is process copy (pc) */ 1072 1073static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 1074 struct dlm_message *ms) 1075{ 1076 int b; 1077 1078 if (!lkb->lkb_lvbptr) 1079 return; 1080 1081 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1082 return; 1083 1084 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 1085 if (b == 1) { 1086 int len = receive_extralen(ms); 1087 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 1088 lkb->lkb_lvbseq = ms->m_lvbseq; 1089 } 1090} 1091 1092/* Manipulate lkb's on rsb's convert/granted/waiting queues 1093 remove_lock -- used for unlock, removes lkb from granted 1094 revert_lock -- used for cancel, moves lkb from convert to granted 1095 grant_lock -- used for request and convert, adds lkb to granted or 1096 moves lkb from convert or waiting to granted 1097 1098 Each of these is used for master or local copy lkb's. There is 1099 also a _pc() variation used to make the corresponding change on 1100 a process copy (pc) lkb. */ 1101 1102static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1103{ 1104 del_lkb(r, lkb); 1105 lkb->lkb_grmode = DLM_LOCK_IV; 1106 /* this unhold undoes the original ref from create_lkb() 1107 so this leads to the lkb being freed */ 1108 unhold_lkb(lkb); 1109} 1110 1111static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1112{ 1113 set_lvb_unlock(r, lkb); 1114 _remove_lock(r, lkb); 1115} 1116 1117static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 1118{ 1119 _remove_lock(r, lkb); 1120} 1121 1122/* returns: 0 did nothing 1123 1 moved lock to granted 1124 -1 removed lock */ 1125 1126static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1127{ 1128 int rv = 0; 1129 1130 lkb->lkb_rqmode = DLM_LOCK_IV; 1131 1132 switch (lkb->lkb_status) { 1133 case DLM_LKSTS_GRANTED: 1134 break; 1135 case DLM_LKSTS_CONVERT: 1136 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 1137 rv = 1; 1138 break; 1139 case DLM_LKSTS_WAITING: 1140 del_lkb(r, lkb); 1141 lkb->lkb_grmode = DLM_LOCK_IV; 1142 /* this unhold undoes the original ref from create_lkb() 1143 so this leads to the lkb being freed */ 1144 unhold_lkb(lkb); 1145 rv = -1; 1146 break; 1147 default: 1148 log_print("invalid status for revert %d", lkb->lkb_status); 1149 } 1150 return rv; 1151} 1152 1153static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 1154{ 1155 return revert_lock(r, lkb); 1156} 1157 1158static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1159{ 1160 if (lkb->lkb_grmode != lkb->lkb_rqmode) { 1161 lkb->lkb_grmode = lkb->lkb_rqmode; 1162 if (lkb->lkb_status) 1163 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 1164 else 1165 add_lkb(r, lkb, DLM_LKSTS_GRANTED); 1166 } 1167 1168 lkb->lkb_rqmode = DLM_LOCK_IV; 1169} 1170 1171static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1172{ 1173 set_lvb_lock(r, lkb); 1174 _grant_lock(r, lkb); 1175 lkb->lkb_highbast = 0; 1176} 1177 1178static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 1179 struct dlm_message *ms) 1180{ 1181 set_lvb_lock_pc(r, lkb, ms); 1182 _grant_lock(r, lkb); 1183} 1184 1185/* called by grant_pending_locks() which means an async grant message must 1186 be sent to the requesting node in addition to granting the lock if the 1187 lkb belongs to a remote node. */ 1188 1189static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb) 1190{ 1191 grant_lock(r, lkb); 1192 if (is_master_copy(lkb)) 1193 send_grant(r, lkb); 1194 else 1195 queue_cast(r, lkb, 0); 1196} 1197 1198/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to 1199 change the granted/requested modes. We're munging things accordingly in 1200 the process copy. 1201 CONVDEADLK: our grmode may have been forced down to NL to resolve a 1202 conversion deadlock 1203 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become 1204 compatible with other granted locks */ 1205 1206static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms) 1207{ 1208 if (ms->m_type != DLM_MSG_CONVERT_REPLY) { 1209 log_print("munge_demoted %x invalid reply type %d", 1210 lkb->lkb_id, ms->m_type); 1211 return; 1212 } 1213 1214 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { 1215 log_print("munge_demoted %x invalid modes gr %d rq %d", 1216 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); 1217 return; 1218 } 1219 1220 lkb->lkb_grmode = DLM_LOCK_NL; 1221} 1222 1223static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) 1224{ 1225 if (ms->m_type != DLM_MSG_REQUEST_REPLY && 1226 ms->m_type != DLM_MSG_GRANT) { 1227 log_print("munge_altmode %x invalid reply type %d", 1228 lkb->lkb_id, ms->m_type); 1229 return; 1230 } 1231 1232 if (lkb->lkb_exflags & DLM_LKF_ALTPR) 1233 lkb->lkb_rqmode = DLM_LOCK_PR; 1234 else if (lkb->lkb_exflags & DLM_LKF_ALTCW) 1235 lkb->lkb_rqmode = DLM_LOCK_CW; 1236 else { 1237 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags); 1238 dlm_print_lkb(lkb); 1239 } 1240} 1241 1242static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head) 1243{ 1244 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb, 1245 lkb_statequeue); 1246 if (lkb->lkb_id == first->lkb_id) 1247 return 1; 1248 1249 return 0; 1250} 1251 1252/* Check if the given lkb conflicts with another lkb on the queue. */ 1253 1254static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb) 1255{ 1256 struct dlm_lkb *this; 1257 1258 list_for_each_entry(this, head, lkb_statequeue) { 1259 if (this == lkb) 1260 continue; 1261 if (!modes_compat(this, lkb)) 1262 return 1; 1263 } 1264 return 0; 1265} 1266 1267/* 1268 * "A conversion deadlock arises with a pair of lock requests in the converting 1269 * queue for one resource. The granted mode of each lock blocks the requested 1270 * mode of the other lock." 1271 * 1272 * Part 2: if the granted mode of lkb is preventing the first lkb in the 1273 * convert queue from being granted, then demote lkb (set grmode to NL). 1274 * This second form requires that we check for conv-deadlk even when 1275 * now == 0 in _can_be_granted(). 1276 * 1277 * Example: 1278 * Granted Queue: empty 1279 * Convert Queue: NL->EX (first lock) 1280 * PR->EX (second lock) 1281 * 1282 * The first lock can't be granted because of the granted mode of the second 1283 * lock and the second lock can't be granted because it's not first in the 1284 * list. We demote the granted mode of the second lock (the lkb passed to this 1285 * function). 1286 * 1287 * After the resolution, the "grant pending" function needs to go back and try 1288 * to grant locks on the convert queue again since the first lock can now be 1289 * granted. 1290 */ 1291 1292static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb) 1293{ 1294 struct dlm_lkb *this, *first = NULL, *self = NULL; 1295 1296 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) { 1297 if (!first) 1298 first = this; 1299 if (this == lkb) { 1300 self = lkb; 1301 continue; 1302 } 1303 1304 if (!modes_compat(this, lkb) && !modes_compat(lkb, this)) 1305 return 1; 1306 } 1307 1308 /* if lkb is on the convert queue and is preventing the first 1309 from being granted, then there's deadlock and we demote lkb. 1310 multiple converting locks may need to do this before the first 1311 converting lock can be granted. */ 1312 1313 if (self && self != first) { 1314 if (!modes_compat(lkb, first) && 1315 !queue_conflict(&rsb->res_grantqueue, first)) 1316 return 1; 1317 } 1318 1319 return 0; 1320} 1321 1322/* 1323 * Return 1 if the lock can be granted, 0 otherwise. 1324 * Also detect and resolve conversion deadlocks. 1325 * 1326 * lkb is the lock to be granted 1327 * 1328 * now is 1 if the function is being called in the context of the 1329 * immediate request, it is 0 if called later, after the lock has been 1330 * queued. 1331 * 1332 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis 1333 */ 1334 1335static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) 1336{ 1337 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV); 1338 1339 /* 1340 * 6-10: Version 5.4 introduced an option to address the phenomenon of 1341 * a new request for a NL mode lock being blocked. 1342 * 1343 * 6-11: If the optional EXPEDITE flag is used with the new NL mode 1344 * request, then it would be granted. In essence, the use of this flag 1345 * tells the Lock Manager to expedite theis request by not considering 1346 * what may be in the CONVERTING or WAITING queues... As of this 1347 * writing, the EXPEDITE flag can be used only with new requests for NL 1348 * mode locks. This flag is not valid for conversion requests. 1349 * 1350 * A shortcut. Earlier checks return an error if EXPEDITE is used in a 1351 * conversion or used with a non-NL requested mode. We also know an 1352 * EXPEDITE request is always granted immediately, so now must always 1353 * be 1. The full condition to grant an expedite request: (now && 1354 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can 1355 * therefore be shortened to just checking the flag. 1356 */ 1357 1358 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE) 1359 return 1; 1360 1361 /* 1362 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be 1363 * added to the remaining conditions. 1364 */ 1365 1366 if (queue_conflict(&r->res_grantqueue, lkb)) 1367 goto out; 1368 1369 /* 1370 * 6-3: By default, a conversion request is immediately granted if the 1371 * requested mode is compatible with the modes of all other granted 1372 * locks 1373 */ 1374 1375 if (queue_conflict(&r->res_convertqueue, lkb)) 1376 goto out; 1377 1378 /* 1379 * 6-5: But the default algorithm for deciding whether to grant or 1380 * queue conversion requests does not by itself guarantee that such 1381 * requests are serviced on a "first come first serve" basis. This, in 1382 * turn, can lead to a phenomenon known as "indefinate postponement". 1383 * 1384 * 6-7: This issue is dealt with by using the optional QUECVT flag with 1385 * the system service employed to request a lock conversion. This flag 1386 * forces certain conversion requests to be queued, even if they are 1387 * compatible with the granted modes of other locks on the same 1388 * resource. Thus, the use of this flag results in conversion requests 1389 * being ordered on a "first come first servce" basis. 1390 * 1391 * DCT: This condition is all about new conversions being able to occur 1392 * "in place" while the lock remains on the granted queue (assuming 1393 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion 1394 * doesn't _have_ to go onto the convert queue where it's processed in 1395 * order. The "now" variable is necessary to distinguish converts 1396 * being received and processed for the first time now, because once a 1397 * convert is moved to the conversion queue the condition below applies 1398 * requiring fifo granting. 1399 */ 1400 1401 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT)) 1402 return 1; 1403 1404 /* 1405 * The NOORDER flag is set to avoid the standard vms rules on grant 1406 * order. 1407 */ 1408 1409 if (lkb->lkb_exflags & DLM_LKF_NOORDER) 1410 return 1; 1411 1412 /* 1413 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be 1414 * granted until all other conversion requests ahead of it are granted 1415 * and/or canceled. 1416 */ 1417 1418 if (!now && conv && first_in_list(lkb, &r->res_convertqueue)) 1419 return 1; 1420 1421 /* 1422 * 6-4: By default, a new request is immediately granted only if all 1423 * three of the following conditions are satisfied when the request is 1424 * issued: 1425 * - The queue of ungranted conversion requests for the resource is 1426 * empty. 1427 * - The queue of ungranted new requests for the resource is empty. 1428 * - The mode of the new request is compatible with the most 1429 * restrictive mode of all granted locks on the resource. 1430 */ 1431 1432 if (now && !conv && list_empty(&r->res_convertqueue) && 1433 list_empty(&r->res_waitqueue)) 1434 return 1; 1435 1436 /* 1437 * 6-4: Once a lock request is in the queue of ungranted new requests, 1438 * it cannot be granted until the queue of ungranted conversion 1439 * requests is empty, all ungranted new requests ahead of it are 1440 * granted and/or canceled, and it is compatible with the granted mode 1441 * of the most restrictive lock granted on the resource. 1442 */ 1443 1444 if (!now && !conv && list_empty(&r->res_convertqueue) && 1445 first_in_list(lkb, &r->res_waitqueue)) 1446 return 1; 1447 1448 out: 1449 /* 1450 * The following, enabled by CONVDEADLK, departs from VMS. 1451 */ 1452 1453 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) && 1454 conversion_deadlock_detect(r, lkb)) { 1455 lkb->lkb_grmode = DLM_LOCK_NL; 1456 lkb->lkb_sbflags |= DLM_SBF_DEMOTED; 1457 } 1458 1459 return 0; 1460} 1461 1462/* 1463 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a 1464 * simple way to provide a big optimization to applications that can use them. 1465 */ 1466 1467static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) 1468{ 1469 uint32_t flags = lkb->lkb_exflags; 1470 int rv; 1471 int8_t alt = 0, rqmode = lkb->lkb_rqmode; 1472 1473 rv = _can_be_granted(r, lkb, now); 1474 if (rv) 1475 goto out; 1476 1477 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED) 1478 goto out; 1479 1480 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR) 1481 alt = DLM_LOCK_PR; 1482 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW) 1483 alt = DLM_LOCK_CW; 1484 1485 if (alt) { 1486 lkb->lkb_rqmode = alt; 1487 rv = _can_be_granted(r, lkb, now); 1488 if (rv) 1489 lkb->lkb_sbflags |= DLM_SBF_ALTMODE; 1490 else 1491 lkb->lkb_rqmode = rqmode; 1492 } 1493 out: 1494 return rv; 1495} 1496 1497static int grant_pending_convert(struct dlm_rsb *r, int high) 1498{ 1499 struct dlm_lkb *lkb, *s; 1500 int hi, demoted, quit, grant_restart, demote_restart; 1501 1502 quit = 0; 1503 restart: 1504 grant_restart = 0; 1505 demote_restart = 0; 1506 hi = DLM_LOCK_IV; 1507 1508 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) { 1509 demoted = is_demoted(lkb); 1510 if (can_be_granted(r, lkb, 0)) { 1511 grant_lock_pending(r, lkb); 1512 grant_restart = 1; 1513 } else { 1514 hi = max_t(int, lkb->lkb_rqmode, hi); 1515 if (!demoted && is_demoted(lkb)) 1516 demote_restart = 1; 1517 } 1518 } 1519 1520 if (grant_restart) 1521 goto restart; 1522 if (demote_restart && !quit) { 1523 quit = 1; 1524 goto restart; 1525 } 1526 1527 return max_t(int, high, hi); 1528} 1529 1530static int grant_pending_wait(struct dlm_rsb *r, int high) 1531{ 1532 struct dlm_lkb *lkb, *s; 1533 1534 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { 1535 if (can_be_granted(r, lkb, 0)) 1536 grant_lock_pending(r, lkb); 1537 else 1538 high = max_t(int, lkb->lkb_rqmode, high); 1539 } 1540 1541 return high; 1542} 1543 1544static void grant_pending_locks(struct dlm_rsb *r) 1545{ 1546 struct dlm_lkb *lkb, *s; 1547 int high = DLM_LOCK_IV; 1548 1549 DLM_ASSERT(is_master(r), dlm_dump_rsb(r);); 1550 1551 high = grant_pending_convert(r, high); 1552 high = grant_pending_wait(r, high); 1553 1554 if (high == DLM_LOCK_IV) 1555 return; 1556 1557 1558 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) { 1559 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) && 1560 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) { 1561 queue_bast(r, lkb, high); 1562 lkb->lkb_highbast = high; 1563 } 1564 } 1565} 1566 1567static void send_bast_queue(struct dlm_rsb *r, struct list_head *head, 1568 struct dlm_lkb *lkb) 1569{ 1570 struct dlm_lkb *gr; 1571 1572 list_for_each_entry(gr, head, lkb_statequeue) { 1573 if (gr->lkb_bastaddr && 1574 gr->lkb_highbast < lkb->lkb_rqmode && 1575 !modes_compat(gr, lkb)) { 1576 queue_bast(r, gr, lkb->lkb_rqmode); 1577 gr->lkb_highbast = lkb->lkb_rqmode; 1578 } 1579 } 1580} 1581 1582static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb) 1583{ 1584 send_bast_queue(r, &r->res_grantqueue, lkb); 1585} 1586 1587static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) 1588{ 1589 send_bast_queue(r, &r->res_grantqueue, lkb); 1590 send_bast_queue(r, &r->res_convertqueue, lkb); 1591} 1592 1593/* set_master(r, lkb) -- set the master nodeid of a resource 1594 1595 The purpose of this function is to set the nodeid field in the given 1596 lkb using the nodeid field in the given rsb. If the rsb's nodeid is 1597 known, it can just be copied to the lkb and the function will return 1598 0. If the rsb's nodeid is _not_ known, it needs to be looked up 1599 before it can be copied to the lkb. 1600 1601 When the rsb nodeid is being looked up remotely, the initial lkb 1602 causing the lookup is kept on the ls_waiters list waiting for the 1603 lookup reply. Other lkb's waiting for the same rsb lookup are kept 1604 on the rsb's res_lookup list until the master is verified. 1605 1606 Return values: 1607 0: nodeid is set in rsb/lkb and the caller should go ahead and use it 1608 1: the rsb master is not available and the lkb has been placed on 1609 a wait queue 1610*/ 1611 1612static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) 1613{ 1614 struct dlm_ls *ls = r->res_ls; 1615 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); 1616 1617 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { 1618 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 1619 r->res_first_lkid = lkb->lkb_id; 1620 lkb->lkb_nodeid = r->res_nodeid; 1621 return 0; 1622 } 1623 1624 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) { 1625 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup); 1626 return 1; 1627 } 1628 1629 if (r->res_nodeid == 0) { 1630 lkb->lkb_nodeid = 0; 1631 return 0; 1632 } 1633 1634 if (r->res_nodeid > 0) { 1635 lkb->lkb_nodeid = r->res_nodeid; 1636 return 0; 1637 } 1638 1639 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r);); 1640 1641 dir_nodeid = dlm_dir_nodeid(r); 1642 1643 if (dir_nodeid != our_nodeid) { 1644 r->res_first_lkid = lkb->lkb_id; 1645 send_lookup(r, lkb); 1646 return 1; 1647 } 1648 1649 for (;;) { 1650 /* It's possible for dlm_scand to remove an old rsb for 1651 this same resource from the toss list, us to create 1652 a new one, look up the master locally, and find it 1653 already exists just before dlm_scand does the 1654 dir_remove() on the previous rsb. */ 1655 1656 error = dlm_dir_lookup(ls, our_nodeid, r->res_name, 1657 r->res_length, &ret_nodeid); 1658 if (!error) 1659 break; 1660 log_debug(ls, "dir_lookup error %d %s", error, r->res_name); 1661 schedule(); 1662 } 1663 1664 if (ret_nodeid == our_nodeid) { 1665 r->res_first_lkid = 0; 1666 r->res_nodeid = 0; 1667 lkb->lkb_nodeid = 0; 1668 } else { 1669 r->res_first_lkid = lkb->lkb_id; 1670 r->res_nodeid = ret_nodeid; 1671 lkb->lkb_nodeid = ret_nodeid; 1672 } 1673 return 0; 1674} 1675 1676static void process_lookup_list(struct dlm_rsb *r) 1677{ 1678 struct dlm_lkb *lkb, *safe; 1679 1680 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { 1681 list_del_init(&lkb->lkb_rsb_lookup); 1682 _request_lock(r, lkb); 1683 schedule(); 1684 } 1685} 1686 1687/* confirm_master -- confirm (or deny) an rsb's master nodeid */ 1688 1689static void confirm_master(struct dlm_rsb *r, int error) 1690{ 1691 struct dlm_lkb *lkb; 1692 1693 if (!r->res_first_lkid) 1694 return; 1695 1696 switch (error) { 1697 case 0: 1698 case -EINPROGRESS: 1699 r->res_first_lkid = 0; 1700 process_lookup_list(r); 1701 break; 1702 1703 case -EAGAIN: 1704 /* the remote master didn't queue our NOQUEUE request; 1705 make a waiting lkb the first_lkid */ 1706 1707 r->res_first_lkid = 0; 1708 1709 if (!list_empty(&r->res_lookup)) { 1710 lkb = list_entry(r->res_lookup.next, struct dlm_lkb, 1711 lkb_rsb_lookup); 1712 list_del_init(&lkb->lkb_rsb_lookup); 1713 r->res_first_lkid = lkb->lkb_id; 1714 _request_lock(r, lkb); 1715 } else 1716 r->res_nodeid = -1; 1717 break; 1718 1719 default: 1720 log_error(r->res_ls, "confirm_master unknown error %d", error); 1721 } 1722} 1723 1724static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, 1725 int namelen, uint32_t parent_lkid, void *ast, 1726 void *astarg, void *bast, struct dlm_args *args) 1727{ 1728 int rv = -EINVAL; 1729 1730 /* check for invalid arg usage */ 1731 1732 if (mode < 0 || mode > DLM_LOCK_EX) 1733 goto out; 1734 1735 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN)) 1736 goto out; 1737 1738 if (flags & DLM_LKF_CANCEL) 1739 goto out; 1740 1741 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT)) 1742 goto out; 1743 1744 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT)) 1745 goto out; 1746 1747 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE) 1748 goto out; 1749 1750 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT) 1751 goto out; 1752 1753 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT) 1754 goto out; 1755 1756 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE) 1757 goto out; 1758 1759 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL) 1760 goto out; 1761 1762 if (!ast || !lksb) 1763 goto out; 1764 1765 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr) 1766 goto out; 1767 1768 /* parent/child locks not yet supported */ 1769 if (parent_lkid) 1770 goto out; 1771 1772 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid) 1773 goto out; 1774 1775 /* these args will be copied to the lkb in validate_lock_args, 1776 it cannot be done now because when converting locks, fields in 1777 an active lkb cannot be modified before locking the rsb */ 1778 1779 args->flags = flags; 1780 args->astaddr = ast; 1781 args->astparam = (long) astarg; 1782 args->bastaddr = bast; 1783 args->mode = mode; 1784 args->lksb = lksb; 1785 rv = 0; 1786 out: 1787 return rv; 1788} 1789 1790static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) 1791{ 1792 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK | 1793 DLM_LKF_FORCEUNLOCK)) 1794 return -EINVAL; 1795 1796 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK) 1797 return -EINVAL; 1798 1799 args->flags = flags; 1800 args->astparam = (long) astarg; 1801 return 0; 1802} 1803 1804static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 1805 struct dlm_args *args) 1806{ 1807 int rv = -EINVAL; 1808 1809 if (args->flags & DLM_LKF_CONVERT) { 1810 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 1811 goto out; 1812 1813 if (args->flags & DLM_LKF_QUECVT && 1814 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1]) 1815 goto out; 1816 1817 rv = -EBUSY; 1818 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 1819 goto out; 1820 1821 if (lkb->lkb_wait_type) 1822 goto out; 1823 1824 if (is_overlap(lkb)) 1825 goto out; 1826 } 1827 1828 lkb->lkb_exflags = args->flags; 1829 lkb->lkb_sbflags = 0; 1830 lkb->lkb_astaddr = args->astaddr; 1831 lkb->lkb_astparam = args->astparam; 1832 lkb->lkb_bastaddr = args->bastaddr; 1833 lkb->lkb_rqmode = args->mode; 1834 lkb->lkb_lksb = args->lksb; 1835 lkb->lkb_lvbptr = args->lksb->sb_lvbptr; 1836 lkb->lkb_ownpid = (int) current->pid; 1837 rv = 0; 1838 out: 1839 return rv; 1840} 1841 1842/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0 1843 for success */ 1844 1845/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here 1846 because there may be a lookup in progress and it's valid to do 1847 cancel/unlockf on it */ 1848 1849static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) 1850{ 1851 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1852 int rv = -EINVAL; 1853 1854 if (lkb->lkb_flags & DLM_IFL_MSTCPY) { 1855 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); 1856 dlm_print_lkb(lkb); 1857 goto out; 1858 } 1859 1860 /* an lkb may still exist even though the lock is EOL'ed due to a 1861 cancel, unlock or failed noqueue request; an app can't use these 1862 locks; return same error as if the lkid had not been found at all */ 1863 1864 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) { 1865 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); 1866 rv = -ENOENT; 1867 goto out; 1868 } 1869 1870 /* an lkb may be waiting for an rsb lookup to complete where the 1871 lookup was initiated by another lock */ 1872 1873 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { 1874 if (!list_empty(&lkb->lkb_rsb_lookup)) { 1875 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); 1876 list_del_init(&lkb->lkb_rsb_lookup); 1877 queue_cast(lkb->lkb_resource, lkb, 1878 args->flags & DLM_LKF_CANCEL ? 1879 -DLM_ECANCEL : -DLM_EUNLOCK); 1880 unhold_lkb(lkb); /* undoes create_lkb() */ 1881 rv = -EBUSY; 1882 goto out; 1883 } 1884 } 1885 1886 /* cancel not allowed with another cancel/unlock in progress */ 1887 1888 if (args->flags & DLM_LKF_CANCEL) { 1889 if (lkb->lkb_exflags & DLM_LKF_CANCEL) 1890 goto out; 1891 1892 if (is_overlap(lkb)) 1893 goto out; 1894 1895 if (lkb->lkb_flags & DLM_IFL_RESEND) { 1896 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 1897 rv = -EBUSY; 1898 goto out; 1899 } 1900 1901 switch (lkb->lkb_wait_type) { 1902 case DLM_MSG_LOOKUP: 1903 case DLM_MSG_REQUEST: 1904 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 1905 rv = -EBUSY; 1906 goto out; 1907 case DLM_MSG_UNLOCK: 1908 case DLM_MSG_CANCEL: 1909 goto out; 1910 } 1911 /* add_to_waiters() will set OVERLAP_CANCEL */ 1912 goto out_ok; 1913 } 1914 1915 /* do we need to allow a force-unlock if there's a normal unlock 1916 already in progress? in what conditions could the normal unlock 1917 fail such that we'd want to send a force-unlock to be sure? */ 1918 1919 if (args->flags & DLM_LKF_FORCEUNLOCK) { 1920 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) 1921 goto out; 1922 1923 if (is_overlap_unlock(lkb)) 1924 goto out; 1925 1926 if (lkb->lkb_flags & DLM_IFL_RESEND) { 1927 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 1928 rv = -EBUSY; 1929 goto out; 1930 } 1931 1932 switch (lkb->lkb_wait_type) { 1933 case DLM_MSG_LOOKUP: 1934 case DLM_MSG_REQUEST: 1935 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 1936 rv = -EBUSY; 1937 goto out; 1938 case DLM_MSG_UNLOCK: 1939 goto out; 1940 } 1941 /* add_to_waiters() will set OVERLAP_UNLOCK */ 1942 goto out_ok; 1943 } 1944 1945 /* normal unlock not allowed if there's any op in progress */ 1946 rv = -EBUSY; 1947 if (lkb->lkb_wait_type || lkb->lkb_wait_count) 1948 goto out; 1949 1950 out_ok: 1951 /* an overlapping op shouldn't blow away exflags from other op */ 1952 lkb->lkb_exflags |= args->flags; 1953 lkb->lkb_sbflags = 0; 1954 lkb->lkb_astparam = args->astparam; 1955 rv = 0; 1956 out: 1957 if (rv) 1958 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv, 1959 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags, 1960 args->flags, lkb->lkb_wait_type, 1961 lkb->lkb_resource->res_name); 1962 return rv; 1963} 1964 1965/* 1966 * Four stage 4 varieties: 1967 * do_request(), do_convert(), do_unlock(), do_cancel() 1968 * These are called on the master node for the given lock and 1969 * from the central locking logic. 1970 */ 1971 1972static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 1973{ 1974 int error = 0; 1975 1976 if (can_be_granted(r, lkb, 1)) { 1977 grant_lock(r, lkb); 1978 queue_cast(r, lkb, 0); 1979 goto out; 1980 } 1981 1982 if (can_be_queued(lkb)) { 1983 error = -EINPROGRESS; 1984 add_lkb(r, lkb, DLM_LKSTS_WAITING); 1985 send_blocking_asts(r, lkb); 1986 goto out; 1987 } 1988 1989 error = -EAGAIN; 1990 if (force_blocking_asts(lkb)) 1991 send_blocking_asts_all(r, lkb); 1992 queue_cast(r, lkb, -EAGAIN); 1993 1994 out: 1995 return error; 1996} 1997 1998static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 1999{ 2000 int error = 0; 2001 2002 /* changing an existing lock may allow others to be granted */ 2003 2004 if (can_be_granted(r, lkb, 1)) { 2005 grant_lock(r, lkb); 2006 queue_cast(r, lkb, 0); 2007 grant_pending_locks(r); 2008 goto out; 2009 } 2010 2011 /* is_demoted() means the can_be_granted() above set the grmode 2012 to NL, and left us on the granted queue. This auto-demotion 2013 (due to CONVDEADLK) might mean other locks, and/or this lock, are 2014 now grantable. We have to try to grant other converting locks 2015 before we try again to grant this one. */ 2016 2017 if (is_demoted(lkb)) { 2018 grant_pending_convert(r, DLM_LOCK_IV); 2019 if (_can_be_granted(r, lkb, 1)) { 2020 grant_lock(r, lkb); 2021 queue_cast(r, lkb, 0); 2022 grant_pending_locks(r); 2023 goto out; 2024 } 2025 /* else fall through and move to convert queue */ 2026 } 2027 2028 if (can_be_queued(lkb)) { 2029 error = -EINPROGRESS; 2030 del_lkb(r, lkb); 2031 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 2032 send_blocking_asts(r, lkb); 2033 goto out; 2034 } 2035 2036 error = -EAGAIN; 2037 if (force_blocking_asts(lkb)) 2038 send_blocking_asts_all(r, lkb); 2039 queue_cast(r, lkb, -EAGAIN); 2040 2041 out: 2042 return error; 2043} 2044 2045static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2046{ 2047 remove_lock(r, lkb); 2048 queue_cast(r, lkb, -DLM_EUNLOCK); 2049 grant_pending_locks(r); 2050 return -DLM_EUNLOCK; 2051} 2052 2053/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 2054 2055static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 2056{ 2057 int error; 2058 2059 error = revert_lock(r, lkb); 2060 if (error) { 2061 queue_cast(r, lkb, -DLM_ECANCEL); 2062 grant_pending_locks(r); 2063 return -DLM_ECANCEL; 2064 } 2065 return 0; 2066} 2067 2068/* 2069 * Four stage 3 varieties: 2070 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock() 2071 */ 2072 2073/* add a new lkb to a possibly new rsb, called by requesting process */ 2074 2075static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2076{ 2077 int error; 2078 2079 /* set_master: sets lkb nodeid from r */ 2080 2081 error = set_master(r, lkb); 2082 if (error < 0) 2083 goto out; 2084 if (error) { 2085 error = 0; 2086 goto out; 2087 } 2088 2089 if (is_remote(r)) 2090 /* receive_request() calls do_request() on remote node */ 2091 error = send_request(r, lkb); 2092 else 2093 error = do_request(r, lkb); 2094 out: 2095 return error; 2096} 2097 2098/* change some property of an existing lkb, e.g. mode */ 2099 2100static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2101{ 2102 int error; 2103 2104 if (is_remote(r)) 2105 /* receive_convert() calls do_convert() on remote node */ 2106 error = send_convert(r, lkb); 2107 else 2108 error = do_convert(r, lkb); 2109 2110 return error; 2111} 2112 2113/* remove an existing lkb from the granted queue */ 2114 2115static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2116{ 2117 int error; 2118 2119 if (is_remote(r)) 2120 /* receive_unlock() calls do_unlock() on remote node */ 2121 error = send_unlock(r, lkb); 2122 else 2123 error = do_unlock(r, lkb); 2124 2125 return error; 2126} 2127 2128/* remove an existing lkb from the convert or wait queue */ 2129 2130static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2131{ 2132 int error; 2133 2134 if (is_remote(r)) 2135 /* receive_cancel() calls do_cancel() on remote node */ 2136 error = send_cancel(r, lkb); 2137 else 2138 error = do_cancel(r, lkb); 2139 2140 return error; 2141} 2142 2143/* 2144 * Four stage 2 varieties: 2145 * request_lock(), convert_lock(), unlock_lock(), cancel_lock() 2146 */ 2147 2148static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name, 2149 int len, struct dlm_args *args) 2150{ 2151 struct dlm_rsb *r; 2152 int error; 2153 2154 error = validate_lock_args(ls, lkb, args); 2155 if (error) 2156 goto out; 2157 2158 error = find_rsb(ls, name, len, R_CREATE, &r); 2159 if (error) 2160 goto out; 2161 2162 lock_rsb(r); 2163 2164 attach_lkb(r, lkb); 2165 lkb->lkb_lksb->sb_lkid = lkb->lkb_id; 2166 2167 error = _request_lock(r, lkb); 2168 2169 unlock_rsb(r); 2170 put_rsb(r); 2171 2172 out: 2173 return error; 2174} 2175 2176static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 2177 struct dlm_args *args) 2178{ 2179 struct dlm_rsb *r; 2180 int error; 2181 2182 r = lkb->lkb_resource; 2183 2184 hold_rsb(r); 2185 lock_rsb(r); 2186 2187 error = validate_lock_args(ls, lkb, args); 2188 if (error) 2189 goto out; 2190 2191 error = _convert_lock(r, lkb); 2192 out: 2193 unlock_rsb(r); 2194 put_rsb(r); 2195 return error; 2196} 2197 2198static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 2199 struct dlm_args *args) 2200{ 2201 struct dlm_rsb *r; 2202 int error; 2203 2204 r = lkb->lkb_resource; 2205 2206 hold_rsb(r); 2207 lock_rsb(r); 2208 2209 error = validate_unlock_args(lkb, args); 2210 if (error) 2211 goto out; 2212 2213 error = _unlock_lock(r, lkb); 2214 out: 2215 unlock_rsb(r); 2216 put_rsb(r); 2217 return error; 2218} 2219 2220static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 2221 struct dlm_args *args) 2222{ 2223 struct dlm_rsb *r; 2224 int error; 2225 2226 r = lkb->lkb_resource; 2227 2228 hold_rsb(r); 2229 lock_rsb(r); 2230 2231 error = validate_unlock_args(lkb, args); 2232 if (error) 2233 goto out; 2234 2235 error = _cancel_lock(r, lkb); 2236 out: 2237 unlock_rsb(r); 2238 put_rsb(r); 2239 return error; 2240} 2241 2242/* 2243 * Two stage 1 varieties: dlm_lock() and dlm_unlock() 2244 */ 2245 2246int dlm_lock(dlm_lockspace_t *lockspace, 2247 int mode, 2248 struct dlm_lksb *lksb, 2249 uint32_t flags, 2250 void *name, 2251 unsigned int namelen, 2252 uint32_t parent_lkid, 2253 void (*ast) (void *astarg), 2254 void *astarg, 2255 void (*bast) (void *astarg, int mode)) 2256{ 2257 struct dlm_ls *ls; 2258 struct dlm_lkb *lkb; 2259 struct dlm_args args; 2260 int error, convert = flags & DLM_LKF_CONVERT; 2261 2262 ls = dlm_find_lockspace_local(lockspace); 2263 if (!ls) 2264 return -EINVAL; 2265 2266 lock_recovery(ls); 2267 2268 if (convert) 2269 error = find_lkb(ls, lksb->sb_lkid, &lkb); 2270 else 2271 error = create_lkb(ls, &lkb); 2272 2273 if (error) 2274 goto out; 2275 2276 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast, 2277 astarg, bast, &args); 2278 if (error) 2279 goto out_put; 2280 2281 if (convert) 2282 error = convert_lock(ls, lkb, &args); 2283 else 2284 error = request_lock(ls, lkb, name, namelen, &args); 2285 2286 if (error == -EINPROGRESS) 2287 error = 0; 2288 out_put: 2289 if (convert || error) 2290 __put_lkb(ls, lkb); 2291 if (error == -EAGAIN) 2292 error = 0; 2293 out: 2294 unlock_recovery(ls); 2295 dlm_put_lockspace(ls); 2296 return error; 2297} 2298 2299int dlm_unlock(dlm_lockspace_t *lockspace, 2300 uint32_t lkid, 2301 uint32_t flags, 2302 struct dlm_lksb *lksb, 2303 void *astarg) 2304{ 2305 struct dlm_ls *ls; 2306 struct dlm_lkb *lkb; 2307 struct dlm_args args; 2308 int error; 2309 2310 ls = dlm_find_lockspace_local(lockspace); 2311 if (!ls) 2312 return -EINVAL; 2313 2314 lock_recovery(ls); 2315 2316 error = find_lkb(ls, lkid, &lkb); 2317 if (error) 2318 goto out; 2319 2320 error = set_unlock_args(flags, astarg, &args); 2321 if (error) 2322 goto out_put; 2323 2324 if (flags & DLM_LKF_CANCEL) 2325 error = cancel_lock(ls, lkb, &args); 2326 else 2327 error = unlock_lock(ls, lkb, &args); 2328 2329 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) 2330 error = 0; 2331 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK))) 2332 error = 0; 2333 out_put: 2334 dlm_put_lkb(lkb); 2335 out: 2336 unlock_recovery(ls); 2337 dlm_put_lockspace(ls); 2338 return error; 2339} 2340 2341/* 2342 * send/receive routines for remote operations and replies 2343 * 2344 * send_args 2345 * send_common 2346 * send_request receive_request 2347 * send_convert receive_convert 2348 * send_unlock receive_unlock 2349 * send_cancel receive_cancel 2350 * send_grant receive_grant 2351 * send_bast receive_bast 2352 * send_lookup receive_lookup 2353 * send_remove receive_remove 2354 * 2355 * send_common_reply 2356 * receive_request_reply send_request_reply 2357 * receive_convert_reply send_convert_reply 2358 * receive_unlock_reply send_unlock_reply 2359 * receive_cancel_reply send_cancel_reply 2360 * receive_lookup_reply send_lookup_reply 2361 */ 2362 2363static int _create_message(struct dlm_ls *ls, int mb_len, 2364 int to_nodeid, int mstype, 2365 struct dlm_message **ms_ret, 2366 struct dlm_mhandle **mh_ret) 2367{ 2368 struct dlm_message *ms; 2369 struct dlm_mhandle *mh; 2370 char *mb; 2371 2372 /* get_buffer gives us a message handle (mh) that we need to 2373 pass into lowcomms_commit and a message buffer (mb) that we 2374 write our data into */ 2375 2376 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb); 2377 if (!mh) 2378 return -ENOBUFS; 2379 2380 memset(mb, 0, mb_len); 2381 2382 ms = (struct dlm_message *) mb; 2383 2384 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 2385 ms->m_header.h_lockspace = ls->ls_global_id; 2386 ms->m_header.h_nodeid = dlm_our_nodeid(); 2387 ms->m_header.h_length = mb_len; 2388 ms->m_header.h_cmd = DLM_MSG; 2389 2390 ms->m_type = mstype; 2391 2392 *mh_ret = mh; 2393 *ms_ret = ms; 2394 return 0; 2395} 2396 2397static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, 2398 int to_nodeid, int mstype, 2399 struct dlm_message **ms_ret, 2400 struct dlm_mhandle **mh_ret) 2401{ 2402 int mb_len = sizeof(struct dlm_message); 2403 2404 switch (mstype) { 2405 case DLM_MSG_REQUEST: 2406 case DLM_MSG_LOOKUP: 2407 case DLM_MSG_REMOVE: 2408 mb_len += r->res_length; 2409 break; 2410 case DLM_MSG_CONVERT: 2411 case DLM_MSG_UNLOCK: 2412 case DLM_MSG_REQUEST_REPLY: 2413 case DLM_MSG_CONVERT_REPLY: 2414 case DLM_MSG_GRANT: 2415 if (lkb && lkb->lkb_lvbptr) 2416 mb_len += r->res_ls->ls_lvblen; 2417 break; 2418 } 2419 2420 return _create_message(r->res_ls, mb_len, to_nodeid, mstype, 2421 ms_ret, mh_ret); 2422} 2423 2424/* further lowcomms enhancements or alternate implementations may make 2425 the return value from this function useful at some point */ 2426 2427static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms) 2428{ 2429 dlm_message_out(ms); 2430 dlm_lowcomms_commit_buffer(mh); 2431 return 0; 2432} 2433 2434static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, 2435 struct dlm_message *ms) 2436{ 2437 ms->m_nodeid = lkb->lkb_nodeid; 2438 ms->m_pid = lkb->lkb_ownpid; 2439 ms->m_lkid = lkb->lkb_id; 2440 ms->m_remid = lkb->lkb_remid; 2441 ms->m_exflags = lkb->lkb_exflags; 2442 ms->m_sbflags = lkb->lkb_sbflags; 2443 ms->m_flags = lkb->lkb_flags; 2444 ms->m_lvbseq = lkb->lkb_lvbseq; 2445 ms->m_status = lkb->lkb_status; 2446 ms->m_grmode = lkb->lkb_grmode; 2447 ms->m_rqmode = lkb->lkb_rqmode; 2448 ms->m_hash = r->res_hash; 2449 2450 /* m_result and m_bastmode are set from function args, 2451 not from lkb fields */ 2452 2453 if (lkb->lkb_bastaddr) 2454 ms->m_asts |= AST_BAST; 2455 if (lkb->lkb_astaddr) 2456 ms->m_asts |= AST_COMP; 2457 2458 /* compare with switch in create_message; send_remove() doesn't 2459 use send_args() */ 2460 2461 switch (ms->m_type) { 2462 case DLM_MSG_REQUEST: 2463 case DLM_MSG_LOOKUP: 2464 memcpy(ms->m_extra, r->res_name, r->res_length); 2465 break; 2466 case DLM_MSG_CONVERT: 2467 case DLM_MSG_UNLOCK: 2468 case DLM_MSG_REQUEST_REPLY: 2469 case DLM_MSG_CONVERT_REPLY: 2470 case DLM_MSG_GRANT: 2471 if (!lkb->lkb_lvbptr) 2472 break; 2473 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 2474 break; 2475 } 2476} 2477 2478static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) 2479{ 2480 struct dlm_message *ms; 2481 struct dlm_mhandle *mh; 2482 int to_nodeid, error; 2483 2484 error = add_to_waiters(lkb, mstype); 2485 if (error) 2486 return error; 2487 2488 to_nodeid = r->res_nodeid; 2489 2490 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 2491 if (error) 2492 goto fail; 2493 2494 send_args(r, lkb, ms); 2495 2496 error = send_message(mh, ms); 2497 if (error) 2498 goto fail; 2499 return 0; 2500 2501 fail: 2502 remove_from_waiters(lkb, msg_reply_type(mstype)); 2503 return error; 2504} 2505 2506static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 2507{ 2508 return send_common(r, lkb, DLM_MSG_REQUEST); 2509} 2510 2511static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 2512{ 2513 int error; 2514 2515 error = send_common(r, lkb, DLM_MSG_CONVERT); 2516 2517 /* down conversions go without a reply from the master */ 2518 if (!error && down_conversion(lkb)) { 2519 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); 2520 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; 2521 r->res_ls->ls_stub_ms.m_result = 0; 2522 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags; 2523 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); 2524 } 2525 2526 return error; 2527} 2528 2529 2530static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2531{ 2532 return send_common(r, lkb, DLM_MSG_UNLOCK); 2533} 2534 2535static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 2536{ 2537 return send_common(r, lkb, DLM_MSG_CANCEL); 2538} 2539 2540static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb) 2541{ 2542 struct dlm_message *ms; 2543 struct dlm_mhandle *mh; 2544 int to_nodeid, error; 2545 2546 to_nodeid = lkb->lkb_nodeid; 2547 2548 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh); 2549 if (error) 2550 goto out; 2551 2552 send_args(r, lkb, ms); 2553 2554 ms->m_result = 0; 2555 2556 error = send_message(mh, ms); 2557 out: 2558 return error; 2559} 2560 2561static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode) 2562{ 2563 struct dlm_message *ms; 2564 struct dlm_mhandle *mh; 2565 int to_nodeid, error; 2566 2567 to_nodeid = lkb->lkb_nodeid; 2568 2569 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh); 2570 if (error) 2571 goto out; 2572 2573 send_args(r, lkb, ms); 2574 2575 ms->m_bastmode = mode; 2576 2577 error = send_message(mh, ms); 2578 out: 2579 return error; 2580} 2581 2582static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) 2583{ 2584 struct dlm_message *ms; 2585 struct dlm_mhandle *mh; 2586 int to_nodeid, error; 2587 2588 error = add_to_waiters(lkb, DLM_MSG_LOOKUP); 2589 if (error) 2590 return error; 2591 2592 to_nodeid = dlm_dir_nodeid(r); 2593 2594 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); 2595 if (error) 2596 goto fail; 2597 2598 send_args(r, lkb, ms); 2599 2600 error = send_message(mh, ms); 2601 if (error) 2602 goto fail; 2603 return 0; 2604 2605 fail: 2606 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 2607 return error; 2608} 2609 2610static int send_remove(struct dlm_rsb *r) 2611{ 2612 struct dlm_message *ms; 2613 struct dlm_mhandle *mh; 2614 int to_nodeid, error; 2615 2616 to_nodeid = dlm_dir_nodeid(r); 2617 2618 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh); 2619 if (error) 2620 goto out; 2621 2622 memcpy(ms->m_extra, r->res_name, r->res_length); 2623 ms->m_hash = r->res_hash; 2624 2625 error = send_message(mh, ms); 2626 out: 2627 return error; 2628} 2629 2630static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 2631 int mstype, int rv) 2632{ 2633 struct dlm_message *ms; 2634 struct dlm_mhandle *mh; 2635 int to_nodeid, error; 2636 2637 to_nodeid = lkb->lkb_nodeid; 2638 2639 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 2640 if (error) 2641 goto out; 2642 2643 send_args(r, lkb, ms); 2644 2645 ms->m_result = rv; 2646 2647 error = send_message(mh, ms); 2648 out: 2649 return error; 2650} 2651 2652static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 2653{ 2654 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv); 2655} 2656 2657static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 2658{ 2659 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv); 2660} 2661 2662static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 2663{ 2664 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv); 2665} 2666 2667static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 2668{ 2669 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv); 2670} 2671 2672static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, 2673 int ret_nodeid, int rv) 2674{ 2675 struct dlm_rsb *r = &ls->ls_stub_rsb; 2676 struct dlm_message *ms; 2677 struct dlm_mhandle *mh; 2678 int error, nodeid = ms_in->m_header.h_nodeid; 2679 2680 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh); 2681 if (error) 2682 goto out; 2683 2684 ms->m_lkid = ms_in->m_lkid; 2685 ms->m_result = rv; 2686 ms->m_nodeid = ret_nodeid; 2687 2688 error = send_message(mh, ms); 2689 out: 2690 return error; 2691} 2692 2693/* which args we save from a received message depends heavily on the type 2694 of message, unlike the send side where we can safely send everything about 2695 the lkb for any type of message */ 2696 2697static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) 2698{ 2699 lkb->lkb_exflags = ms->m_exflags; 2700 lkb->lkb_sbflags = ms->m_sbflags; 2701 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 2702 (ms->m_flags & 0x0000FFFF); 2703} 2704 2705static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 2706{ 2707 lkb->lkb_sbflags = ms->m_sbflags; 2708 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 2709 (ms->m_flags & 0x0000FFFF); 2710} 2711 2712static int receive_extralen(struct dlm_message *ms) 2713{ 2714 return (ms->m_header.h_length - sizeof(struct dlm_message)); 2715} 2716 2717static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, 2718 struct dlm_message *ms) 2719{ 2720 int len; 2721 2722 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 2723 if (!lkb->lkb_lvbptr) 2724 lkb->lkb_lvbptr = allocate_lvb(ls); 2725 if (!lkb->lkb_lvbptr) 2726 return -ENOMEM; 2727 len = receive_extralen(ms); 2728 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 2729 } 2730 return 0; 2731} 2732 2733static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2734 struct dlm_message *ms) 2735{ 2736 lkb->lkb_nodeid = ms->m_header.h_nodeid; 2737 lkb->lkb_ownpid = ms->m_pid; 2738 lkb->lkb_remid = ms->m_lkid; 2739 lkb->lkb_grmode = DLM_LOCK_IV; 2740 lkb->lkb_rqmode = ms->m_rqmode; 2741 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST); 2742 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP); 2743 2744 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb);); 2745 2746 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 2747 /* lkb was just created so there won't be an lvb yet */ 2748 lkb->lkb_lvbptr = allocate_lvb(ls); 2749 if (!lkb->lkb_lvbptr) 2750 return -ENOMEM; 2751 } 2752 2753 return 0; 2754} 2755 2756static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2757 struct dlm_message *ms) 2758{ 2759 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) { 2760 log_error(ls, "convert_args nodeid %d %d lkid %x %x", 2761 lkb->lkb_nodeid, ms->m_header.h_nodeid, 2762 lkb->lkb_id, lkb->lkb_remid); 2763 return -EINVAL; 2764 } 2765 2766 if (!is_master_copy(lkb)) 2767 return -EINVAL; 2768 2769 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 2770 return -EBUSY; 2771 2772 if (receive_lvb(ls, lkb, ms)) 2773 return -ENOMEM; 2774 2775 lkb->lkb_rqmode = ms->m_rqmode; 2776 lkb->lkb_lvbseq = ms->m_lvbseq; 2777 2778 return 0; 2779} 2780 2781static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2782 struct dlm_message *ms) 2783{ 2784 if (!is_master_copy(lkb)) 2785 return -EINVAL; 2786 if (receive_lvb(ls, lkb, ms)) 2787 return -ENOMEM; 2788 return 0; 2789} 2790 2791/* We fill in the stub-lkb fields with the info that send_xxxx_reply() 2792 uses to send a reply and that the remote end uses to process the reply. */ 2793 2794static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) 2795{ 2796 struct dlm_lkb *lkb = &ls->ls_stub_lkb; 2797 lkb->lkb_nodeid = ms->m_header.h_nodeid; 2798 lkb->lkb_remid = ms->m_lkid; 2799} 2800 2801static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) 2802{ 2803 struct dlm_lkb *lkb; 2804 struct dlm_rsb *r; 2805 int error, namelen; 2806 2807 error = create_lkb(ls, &lkb); 2808 if (error) 2809 goto fail; 2810 2811 receive_flags(lkb, ms); 2812 lkb->lkb_flags |= DLM_IFL_MSTCPY; 2813 error = receive_request_args(ls, lkb, ms); 2814 if (error) { 2815 __put_lkb(ls, lkb); 2816 goto fail; 2817 } 2818 2819 namelen = receive_extralen(ms); 2820 2821 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r); 2822 if (error) { 2823 __put_lkb(ls, lkb); 2824 goto fail; 2825 } 2826 2827 lock_rsb(r); 2828 2829 attach_lkb(r, lkb); 2830 error = do_request(r, lkb); 2831 send_request_reply(r, lkb, error); 2832 2833 unlock_rsb(r); 2834 put_rsb(r); 2835 2836 if (error == -EINPROGRESS) 2837 error = 0; 2838 if (error) 2839 dlm_put_lkb(lkb); 2840 return; 2841 2842 fail: 2843 setup_stub_lkb(ls, ms); 2844 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 2845} 2846 2847static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms) 2848{ 2849 struct dlm_lkb *lkb; 2850 struct dlm_rsb *r; 2851 int error, reply = 1; 2852 2853 error = find_lkb(ls, ms->m_remid, &lkb); 2854 if (error) 2855 goto fail; 2856 2857 r = lkb->lkb_resource; 2858 2859 hold_rsb(r); 2860 lock_rsb(r); 2861 2862 receive_flags(lkb, ms); 2863 error = receive_convert_args(ls, lkb, ms); 2864 if (error) 2865 goto out; 2866 reply = !down_conversion(lkb); 2867 2868 error = do_convert(r, lkb); 2869 out: 2870 if (reply) 2871 send_convert_reply(r, lkb, error); 2872 2873 unlock_rsb(r); 2874 put_rsb(r); 2875 dlm_put_lkb(lkb); 2876 return; 2877 2878 fail: 2879 setup_stub_lkb(ls, ms); 2880 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 2881} 2882 2883static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) 2884{ 2885 struct dlm_lkb *lkb; 2886 struct dlm_rsb *r; 2887 int error; 2888 2889 error = find_lkb(ls, ms->m_remid, &lkb); 2890 if (error) 2891 goto fail; 2892 2893 r = lkb->lkb_resource; 2894 2895 hold_rsb(r); 2896 lock_rsb(r); 2897 2898 receive_flags(lkb, ms); 2899 error = receive_unlock_args(ls, lkb, ms); 2900 if (error) 2901 goto out; 2902 2903 error = do_unlock(r, lkb); 2904 out: 2905 send_unlock_reply(r, lkb, error); 2906 2907 unlock_rsb(r); 2908 put_rsb(r); 2909 dlm_put_lkb(lkb); 2910 return; 2911 2912 fail: 2913 setup_stub_lkb(ls, ms); 2914 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 2915} 2916 2917static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) 2918{ 2919 struct dlm_lkb *lkb; 2920 struct dlm_rsb *r; 2921 int error; 2922 2923 error = find_lkb(ls, ms->m_remid, &lkb); 2924 if (error) 2925 goto fail; 2926 2927 receive_flags(lkb, ms); 2928 2929 r = lkb->lkb_resource; 2930 2931 hold_rsb(r); 2932 lock_rsb(r); 2933 2934 error = do_cancel(r, lkb); 2935 send_cancel_reply(r, lkb, error); 2936 2937 unlock_rsb(r); 2938 put_rsb(r); 2939 dlm_put_lkb(lkb); 2940 return; 2941 2942 fail: 2943 setup_stub_lkb(ls, ms); 2944 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 2945} 2946 2947static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) 2948{ 2949 struct dlm_lkb *lkb; 2950 struct dlm_rsb *r; 2951 int error; 2952 2953 error = find_lkb(ls, ms->m_remid, &lkb); 2954 if (error) { 2955 log_error(ls, "receive_grant no lkb"); 2956 return; 2957 } 2958 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 2959 2960 r = lkb->lkb_resource; 2961 2962 hold_rsb(r); 2963 lock_rsb(r); 2964 2965 receive_flags_reply(lkb, ms); 2966 if (is_altmode(lkb)) 2967 munge_altmode(lkb, ms); 2968 grant_lock_pc(r, lkb, ms); 2969 queue_cast(r, lkb, 0); 2970 2971 unlock_rsb(r); 2972 put_rsb(r); 2973 dlm_put_lkb(lkb); 2974} 2975 2976static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms) 2977{ 2978 struct dlm_lkb *lkb; 2979 struct dlm_rsb *r; 2980 int error; 2981 2982 error = find_lkb(ls, ms->m_remid, &lkb); 2983 if (error) { 2984 log_error(ls, "receive_bast no lkb"); 2985 return; 2986 } 2987 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 2988 2989 r = lkb->lkb_resource; 2990 2991 hold_rsb(r); 2992 lock_rsb(r); 2993 2994 queue_bast(r, lkb, ms->m_bastmode); 2995 2996 unlock_rsb(r); 2997 put_rsb(r); 2998 dlm_put_lkb(lkb); 2999} 3000 3001static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) 3002{ 3003 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid; 3004 3005 from_nodeid = ms->m_header.h_nodeid; 3006 our_nodeid = dlm_our_nodeid(); 3007 3008 len = receive_extralen(ms); 3009 3010 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); 3011 if (dir_nodeid != our_nodeid) { 3012 log_error(ls, "lookup dir_nodeid %d from %d", 3013 dir_nodeid, from_nodeid); 3014 error = -EINVAL; 3015 ret_nodeid = -1; 3016 goto out; 3017 } 3018 3019 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid); 3020 3021 /* Optimization: we're master so treat lookup as a request */ 3022 if (!error && ret_nodeid == our_nodeid) { 3023 receive_request(ls, ms); 3024 return; 3025 } 3026 out: 3027 send_lookup_reply(ls, ms, ret_nodeid, error); 3028} 3029 3030static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) 3031{ 3032 int len, dir_nodeid, from_nodeid; 3033 3034 from_nodeid = ms->m_header.h_nodeid; 3035 3036 len = receive_extralen(ms); 3037 3038 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); 3039 if (dir_nodeid != dlm_our_nodeid()) { 3040 log_error(ls, "remove dir entry dir_nodeid %d from %d", 3041 dir_nodeid, from_nodeid); 3042 return; 3043 } 3044 3045 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len); 3046} 3047 3048static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) 3049{ 3050 do_purge(ls, ms->m_nodeid, ms->m_pid); 3051} 3052 3053static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) 3054{ 3055 struct dlm_lkb *lkb; 3056 struct dlm_rsb *r; 3057 int error, mstype, result; 3058 3059 error = find_lkb(ls, ms->m_remid, &lkb); 3060 if (error) { 3061 log_error(ls, "receive_request_reply no lkb"); 3062 return; 3063 } 3064 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3065 3066 r = lkb->lkb_resource; 3067 hold_rsb(r); 3068 lock_rsb(r); 3069 3070 mstype = lkb->lkb_wait_type; 3071 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 3072 if (error) 3073 goto out; 3074 3075 /* Optimization: the dir node was also the master, so it took our 3076 lookup as a request and sent request reply instead of lookup reply */ 3077 if (mstype == DLM_MSG_LOOKUP) { 3078 r->res_nodeid = ms->m_header.h_nodeid; 3079 lkb->lkb_nodeid = r->res_nodeid; 3080 } 3081 3082 /* this is the value returned from do_request() on the master */ 3083 result = ms->m_result; 3084 3085 switch (result) { 3086 case -EAGAIN: 3087 /* request would block (be queued) on remote master */ 3088 queue_cast(r, lkb, -EAGAIN); 3089 confirm_master(r, -EAGAIN); 3090 unhold_lkb(lkb); /* undoes create_lkb() */ 3091 break; 3092 3093 case -EINPROGRESS: 3094 case 0: 3095 /* request was queued or granted on remote master */ 3096 receive_flags_reply(lkb, ms); 3097 lkb->lkb_remid = ms->m_lkid; 3098 if (is_altmode(lkb)) 3099 munge_altmode(lkb, ms); 3100 if (result) 3101 add_lkb(r, lkb, DLM_LKSTS_WAITING); 3102 else { 3103 grant_lock_pc(r, lkb, ms); 3104 queue_cast(r, lkb, 0); 3105 } 3106 confirm_master(r, result); 3107 break; 3108 3109 case -EBADR: 3110 case -ENOTBLK: 3111 /* find_rsb failed to find rsb or rsb wasn't master */ 3112 log_debug(ls, "receive_request_reply %x %x master diff %d %d", 3113 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result); 3114 r->res_nodeid = -1; 3115 lkb->lkb_nodeid = -1; 3116 3117 if (is_overlap(lkb)) { 3118 /* we'll ignore error in cancel/unlock reply */ 3119 queue_cast_overlap(r, lkb); 3120 unhold_lkb(lkb); /* undoes create_lkb() */ 3121 } else 3122 _request_lock(r, lkb); 3123 break; 3124 3125 default: 3126 log_error(ls, "receive_request_reply %x error %d", 3127 lkb->lkb_id, result); 3128 } 3129 3130 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) { 3131 log_debug(ls, "receive_request_reply %x result %d unlock", 3132 lkb->lkb_id, result); 3133 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 3134 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 3135 send_unlock(r, lkb); 3136 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) { 3137 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id); 3138 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 3139 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 3140 send_cancel(r, lkb); 3141 } else { 3142 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 3143 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 3144 } 3145 out: 3146 unlock_rsb(r); 3147 put_rsb(r); 3148 dlm_put_lkb(lkb); 3149} 3150 3151static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3152 struct dlm_message *ms) 3153{ 3154 /* this is the value returned from do_convert() on the master */ 3155 switch (ms->m_result) { 3156 case -EAGAIN: 3157 /* convert would block (be queued) on remote master */ 3158 queue_cast(r, lkb, -EAGAIN); 3159 break; 3160 3161 case -EINPROGRESS: 3162 /* convert was queued on remote master */ 3163 receive_flags_reply(lkb, ms); 3164 if (is_demoted(lkb)) 3165 munge_demoted(lkb, ms); 3166 del_lkb(r, lkb); 3167 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 3168 break; 3169 3170 case 0: 3171 /* convert was granted on remote master */ 3172 receive_flags_reply(lkb, ms); 3173 if (is_demoted(lkb)) 3174 munge_demoted(lkb, ms); 3175 grant_lock_pc(r, lkb, ms); 3176 queue_cast(r, lkb, 0); 3177 break; 3178 3179 default: 3180 log_error(r->res_ls, "receive_convert_reply %x error %d", 3181 lkb->lkb_id, ms->m_result); 3182 } 3183} 3184 3185static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3186{ 3187 struct dlm_rsb *r = lkb->lkb_resource; 3188 int error; 3189 3190 hold_rsb(r); 3191 lock_rsb(r); 3192 3193 /* stub reply can happen with waiters_mutex held */ 3194 error = remove_from_waiters_ms(lkb, ms); 3195 if (error) 3196 goto out; 3197 3198 __receive_convert_reply(r, lkb, ms); 3199 out: 3200 unlock_rsb(r); 3201 put_rsb(r); 3202} 3203 3204static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) 3205{ 3206 struct dlm_lkb *lkb; 3207 int error; 3208 3209 error = find_lkb(ls, ms->m_remid, &lkb); 3210 if (error) { 3211 log_error(ls, "receive_convert_reply no lkb"); 3212 return; 3213 } 3214 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3215 3216 _receive_convert_reply(lkb, ms); 3217 dlm_put_lkb(lkb); 3218} 3219 3220static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3221{ 3222 struct dlm_rsb *r = lkb->lkb_resource; 3223 int error; 3224 3225 hold_rsb(r); 3226 lock_rsb(r); 3227 3228 /* stub reply can happen with waiters_mutex held */ 3229 error = remove_from_waiters_ms(lkb, ms); 3230 if (error) 3231 goto out; 3232 3233 /* this is the value returned from do_unlock() on the master */ 3234 3235 switch (ms->m_result) { 3236 case -DLM_EUNLOCK: 3237 receive_flags_reply(lkb, ms); 3238 remove_lock_pc(r, lkb); 3239 queue_cast(r, lkb, -DLM_EUNLOCK); 3240 break; 3241 case -ENOENT: 3242 break; 3243 default: 3244 log_error(r->res_ls, "receive_unlock_reply %x error %d", 3245 lkb->lkb_id, ms->m_result); 3246 } 3247 out: 3248 unlock_rsb(r); 3249 put_rsb(r); 3250} 3251 3252static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) 3253{ 3254 struct dlm_lkb *lkb; 3255 int error; 3256 3257 error = find_lkb(ls, ms->m_remid, &lkb); 3258 if (error) { 3259 log_error(ls, "receive_unlock_reply no lkb"); 3260 return; 3261 } 3262 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3263 3264 _receive_unlock_reply(lkb, ms); 3265 dlm_put_lkb(lkb); 3266} 3267 3268static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3269{ 3270 struct dlm_rsb *r = lkb->lkb_resource; 3271 int error; 3272 3273 hold_rsb(r); 3274 lock_rsb(r); 3275 3276 /* stub reply can happen with waiters_mutex held */ 3277 error = remove_from_waiters_ms(lkb, ms); 3278 if (error) 3279 goto out; 3280 3281 /* this is the value returned from do_cancel() on the master */ 3282 3283 switch (ms->m_result) { 3284 case -DLM_ECANCEL: 3285 receive_flags_reply(lkb, ms); 3286 revert_lock_pc(r, lkb); 3287 if (ms->m_result) 3288 queue_cast(r, lkb, -DLM_ECANCEL); 3289 break; 3290 case 0: 3291 break; 3292 default: 3293 log_error(r->res_ls, "receive_cancel_reply %x error %d", 3294 lkb->lkb_id, ms->m_result); 3295 } 3296 out: 3297 unlock_rsb(r); 3298 put_rsb(r); 3299} 3300 3301static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) 3302{ 3303 struct dlm_lkb *lkb; 3304 int error; 3305 3306 error = find_lkb(ls, ms->m_remid, &lkb); 3307 if (error) { 3308 log_error(ls, "receive_cancel_reply no lkb"); 3309 return; 3310 } 3311 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3312 3313 _receive_cancel_reply(lkb, ms); 3314 dlm_put_lkb(lkb); 3315} 3316 3317static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) 3318{ 3319 struct dlm_lkb *lkb; 3320 struct dlm_rsb *r; 3321 int error, ret_nodeid; 3322 3323 error = find_lkb(ls, ms->m_lkid, &lkb); 3324 if (error) { 3325 log_error(ls, "receive_lookup_reply no lkb"); 3326 return; 3327 } 3328 3329 3330 r = lkb->lkb_resource; 3331 hold_rsb(r); 3332 lock_rsb(r); 3333 3334 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 3335 if (error) 3336 goto out; 3337 3338 ret_nodeid = ms->m_nodeid; 3339 if (ret_nodeid == dlm_our_nodeid()) { 3340 r->res_nodeid = 0; 3341 ret_nodeid = 0; 3342 r->res_first_lkid = 0; 3343 } else { 3344 /* set_master() will copy res_nodeid to lkb_nodeid */ 3345 r->res_nodeid = ret_nodeid; 3346 } 3347 3348 if (is_overlap(lkb)) { 3349 log_debug(ls, "receive_lookup_reply %x unlock %x", 3350 lkb->lkb_id, lkb->lkb_flags); 3351 queue_cast_overlap(r, lkb); 3352 unhold_lkb(lkb); /* undoes create_lkb() */ 3353 goto out_list; 3354 } 3355 3356 _request_lock(r, lkb); 3357 3358 out_list: 3359 if (!ret_nodeid) 3360 process_lookup_list(r); 3361 out: 3362 unlock_rsb(r); 3363 put_rsb(r); 3364 dlm_put_lkb(lkb); 3365} 3366 3367int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) 3368{ 3369 struct dlm_message *ms = (struct dlm_message *) hd; 3370 struct dlm_ls *ls; 3371 int error = 0; 3372 3373 if (!recovery) 3374 dlm_message_in(ms); 3375 3376 ls = dlm_find_lockspace_global(hd->h_lockspace); 3377 if (!ls) { 3378 log_print("drop message %d from %d for unknown lockspace %d", 3379 ms->m_type, nodeid, hd->h_lockspace); 3380 return -EINVAL; 3381 } 3382 3383 /* recovery may have just ended leaving a bunch of backed-up requests 3384 in the requestqueue; wait while dlm_recoverd clears them */ 3385 3386 if (!recovery) 3387 dlm_wait_requestqueue(ls); 3388 3389 /* recovery may have just started while there were a bunch of 3390 in-flight requests -- save them in requestqueue to be processed 3391 after recovery. we can't let dlm_recvd block on the recovery 3392 lock. if dlm_recoverd is calling this function to clear the 3393 requestqueue, it needs to be interrupted (-EINTR) if another 3394 recovery operation is starting. */ 3395 3396 while (1) { 3397 if (dlm_locking_stopped(ls)) { 3398 if (recovery) { 3399 error = -EINTR; 3400 goto out; 3401 } 3402 error = dlm_add_requestqueue(ls, nodeid, hd); 3403 if (error == -EAGAIN) 3404 continue; 3405 else { 3406 error = -EINTR; 3407 goto out; 3408 } 3409 } 3410 3411 if (lock_recovery_try(ls)) 3412 break; 3413 schedule(); 3414 } 3415 3416 switch (ms->m_type) { 3417 3418 /* messages sent to a master node */ 3419 3420 case DLM_MSG_REQUEST: 3421 receive_request(ls, ms); 3422 break; 3423 3424 case DLM_MSG_CONVERT: 3425 receive_convert(ls, ms); 3426 break; 3427 3428 case DLM_MSG_UNLOCK: 3429 receive_unlock(ls, ms); 3430 break; 3431 3432 case DLM_MSG_CANCEL: 3433 receive_cancel(ls, ms); 3434 break; 3435 3436 /* messages sent from a master node (replies to above) */ 3437 3438 case DLM_MSG_REQUEST_REPLY: 3439 receive_request_reply(ls, ms); 3440 break; 3441 3442 case DLM_MSG_CONVERT_REPLY: 3443 receive_convert_reply(ls, ms); 3444 break; 3445 3446 case DLM_MSG_UNLOCK_REPLY: 3447 receive_unlock_reply(ls, ms); 3448 break; 3449 3450 case DLM_MSG_CANCEL_REPLY: 3451 receive_cancel_reply(ls, ms); 3452 break; 3453 3454 /* messages sent from a master node (only two types of async msg) */ 3455 3456 case DLM_MSG_GRANT: 3457 receive_grant(ls, ms); 3458 break; 3459 3460 case DLM_MSG_BAST: 3461 receive_bast(ls, ms); 3462 break; 3463 3464 /* messages sent to a dir node */ 3465 3466 case DLM_MSG_LOOKUP: 3467 receive_lookup(ls, ms); 3468 break; 3469 3470 case DLM_MSG_REMOVE: 3471 receive_remove(ls, ms); 3472 break; 3473 3474 /* messages sent from a dir node (remove has no reply) */ 3475 3476 case DLM_MSG_LOOKUP_REPLY: 3477 receive_lookup_reply(ls, ms); 3478 break; 3479 3480 /* other messages */ 3481 3482 case DLM_MSG_PURGE: 3483 receive_purge(ls, ms); 3484 break; 3485 3486 default: 3487 log_error(ls, "unknown message type %d", ms->m_type); 3488 } 3489 3490 unlock_recovery(ls); 3491 out: 3492 dlm_put_lockspace(ls); 3493 dlm_astd_wake(); 3494 return error; 3495} 3496 3497 3498/* 3499 * Recovery related 3500 */ 3501 3502static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) 3503{ 3504 if (middle_conversion(lkb)) { 3505 hold_lkb(lkb); 3506 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; 3507 ls->ls_stub_ms.m_result = -EINPROGRESS; 3508 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3509 _receive_convert_reply(lkb, &ls->ls_stub_ms); 3510 3511 /* Same special case as in receive_rcom_lock_args() */ 3512 lkb->lkb_grmode = DLM_LOCK_IV; 3513 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT); 3514 unhold_lkb(lkb); 3515 3516 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) { 3517 lkb->lkb_flags |= DLM_IFL_RESEND; 3518 } 3519 3520 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down 3521 conversions are async; there's no reply from the remote master */ 3522} 3523 3524/* A waiting lkb needs recovery if the master node has failed, or 3525 the master node is changing (only when no directory is used) */ 3526 3527static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb) 3528{ 3529 if (dlm_is_removed(ls, lkb->lkb_nodeid)) 3530 return 1; 3531 3532 if (!dlm_no_directory(ls)) 3533 return 0; 3534 3535 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid) 3536 return 1; 3537 3538 return 0; 3539} 3540 3541/* Recovery for locks that are waiting for replies from nodes that are now 3542 gone. We can just complete unlocks and cancels by faking a reply from the 3543 dead node. Requests and up-conversions we flag to be resent after 3544 recovery. Down-conversions can just be completed with a fake reply like 3545 unlocks. Conversions between PR and CW need special attention. */ 3546 3547void dlm_recover_waiters_pre(struct dlm_ls *ls) 3548{ 3549 struct dlm_lkb *lkb, *safe; 3550 3551 mutex_lock(&ls->ls_waiters_mutex); 3552 3553 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { 3554 log_debug(ls, "pre recover waiter lkid %x type %d flags %x", 3555 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags); 3556 3557 /* all outstanding lookups, regardless of destination will be 3558 resent after recovery is done */ 3559 3560 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) { 3561 lkb->lkb_flags |= DLM_IFL_RESEND; 3562 continue; 3563 } 3564 3565 if (!waiter_needs_recovery(ls, lkb)) 3566 continue; 3567 3568 switch (lkb->lkb_wait_type) { 3569 3570 case DLM_MSG_REQUEST: 3571 lkb->lkb_flags |= DLM_IFL_RESEND; 3572 break; 3573 3574 case DLM_MSG_CONVERT: 3575 recover_convert_waiter(ls, lkb); 3576 break; 3577 3578 case DLM_MSG_UNLOCK: 3579 hold_lkb(lkb); 3580 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; 3581 ls->ls_stub_ms.m_result = -DLM_EUNLOCK; 3582 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3583 _receive_unlock_reply(lkb, &ls->ls_stub_ms); 3584 dlm_put_lkb(lkb); 3585 break; 3586 3587 case DLM_MSG_CANCEL: 3588 hold_lkb(lkb); 3589 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; 3590 ls->ls_stub_ms.m_result = -DLM_ECANCEL; 3591 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3592 _receive_cancel_reply(lkb, &ls->ls_stub_ms); 3593 dlm_put_lkb(lkb); 3594 break; 3595 3596 default: 3597 log_error(ls, "invalid lkb wait_type %d", 3598 lkb->lkb_wait_type); 3599 } 3600 schedule(); 3601 } 3602 mutex_unlock(&ls->ls_waiters_mutex); 3603} 3604 3605static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) 3606{ 3607 struct dlm_lkb *lkb; 3608 int found = 0; 3609 3610 mutex_lock(&ls->ls_waiters_mutex); 3611 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { 3612 if (lkb->lkb_flags & DLM_IFL_RESEND) { 3613 hold_lkb(lkb); 3614 found = 1; 3615 break; 3616 } 3617 } 3618 mutex_unlock(&ls->ls_waiters_mutex); 3619 3620 if (!found) 3621 lkb = NULL; 3622 return lkb; 3623} 3624 3625/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the 3626 master or dir-node for r. Processing the lkb may result in it being placed 3627 back on waiters. */ 3628 3629/* We do this after normal locking has been enabled and any saved messages 3630 (in requestqueue) have been processed. We should be confident that at 3631 this point we won't get or process a reply to any of these waiting 3632 operations. But, new ops may be coming in on the rsbs/locks here from 3633 userspace or remotely. */ 3634 3635/* there may have been an overlap unlock/cancel prior to recovery or after 3636 recovery. if before, the lkb may still have a pos wait_count; if after, the 3637 overlap flag would just have been set and nothing new sent. we can be 3638 confident here than any replies to either the initial op or overlap ops 3639 prior to recovery have been received. */ 3640 3641int dlm_recover_waiters_post(struct dlm_ls *ls) 3642{ 3643 struct dlm_lkb *lkb; 3644 struct dlm_rsb *r; 3645 int error = 0, mstype, err, oc, ou; 3646 3647 while (1) { 3648 if (dlm_locking_stopped(ls)) { 3649 log_debug(ls, "recover_waiters_post aborted"); 3650 error = -EINTR; 3651 break; 3652 } 3653 3654 lkb = find_resend_waiter(ls); 3655 if (!lkb) 3656 break; 3657 3658 r = lkb->lkb_resource; 3659 hold_rsb(r); 3660 lock_rsb(r); 3661 3662 mstype = lkb->lkb_wait_type; 3663 oc = is_overlap_cancel(lkb); 3664 ou = is_overlap_unlock(lkb); 3665 err = 0; 3666 3667 log_debug(ls, "recover_waiters_post %x type %d flags %x %s", 3668 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name); 3669 3670 /* At this point we assume that we won't get a reply to any 3671 previous op or overlap op on this lock. First, do a big 3672 remove_from_waiters() for all previous ops. */ 3673 3674 lkb->lkb_flags &= ~DLM_IFL_RESEND; 3675 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 3676 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 3677 lkb->lkb_wait_type = 0; 3678 lkb->lkb_wait_count = 0; 3679 mutex_lock(&ls->ls_waiters_mutex); 3680 list_del_init(&lkb->lkb_wait_reply); 3681 mutex_unlock(&ls->ls_waiters_mutex); 3682 unhold_lkb(lkb); /* for waiters list */ 3683 3684 if (oc || ou) { 3685 /* do an unlock or cancel instead of resending */ 3686 switch (mstype) { 3687 case DLM_MSG_LOOKUP: 3688 case DLM_MSG_REQUEST: 3689 queue_cast(r, lkb, ou ? -DLM_EUNLOCK : 3690 -DLM_ECANCEL); 3691 unhold_lkb(lkb); /* undoes create_lkb() */ 3692 break; 3693 case DLM_MSG_CONVERT: 3694 if (oc) { 3695 queue_cast(r, lkb, -DLM_ECANCEL); 3696 } else { 3697 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK; 3698 _unlock_lock(r, lkb); 3699 } 3700 break; 3701 default: 3702 err = 1; 3703 } 3704 } else { 3705 switch (mstype) { 3706 case DLM_MSG_LOOKUP: 3707 case DLM_MSG_REQUEST: 3708 _request_lock(r, lkb); 3709 if (is_master(r)) 3710 confirm_master(r, 0); 3711 break; 3712 case DLM_MSG_CONVERT: 3713 _convert_lock(r, lkb); 3714 break; 3715 default: 3716 err = 1; 3717 } 3718 } 3719 3720 if (err) 3721 log_error(ls, "recover_waiters_post %x %d %x %d %d", 3722 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou); 3723 unlock_rsb(r); 3724 put_rsb(r); 3725 dlm_put_lkb(lkb); 3726 } 3727 3728 return error; 3729} 3730 3731static void purge_queue(struct dlm_rsb *r, struct list_head *queue, 3732 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb)) 3733{ 3734 struct dlm_ls *ls = r->res_ls; 3735 struct dlm_lkb *lkb, *safe; 3736 3737 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) { 3738 if (test(ls, lkb)) { 3739 rsb_set_flag(r, RSB_LOCKS_PURGED); 3740 del_lkb(r, lkb); 3741 /* this put should free the lkb */ 3742 if (!dlm_put_lkb(lkb)) 3743 log_error(ls, "purged lkb not released"); 3744 } 3745 } 3746} 3747 3748static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb) 3749{ 3750 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid)); 3751} 3752 3753static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb) 3754{ 3755 return is_master_copy(lkb); 3756} 3757 3758static void purge_dead_locks(struct dlm_rsb *r) 3759{ 3760 purge_queue(r, &r->res_grantqueue, &purge_dead_test); 3761 purge_queue(r, &r->res_convertqueue, &purge_dead_test); 3762 purge_queue(r, &r->res_waitqueue, &purge_dead_test); 3763} 3764 3765void dlm_purge_mstcpy_locks(struct dlm_rsb *r) 3766{ 3767 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test); 3768 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test); 3769 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test); 3770} 3771 3772/* Get rid of locks held by nodes that are gone. */ 3773 3774int dlm_purge_locks(struct dlm_ls *ls) 3775{ 3776 struct dlm_rsb *r; 3777 3778 log_debug(ls, "dlm_purge_locks"); 3779 3780 down_write(&ls->ls_root_sem); 3781 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 3782 hold_rsb(r); 3783 lock_rsb(r); 3784 if (is_master(r)) 3785 purge_dead_locks(r); 3786 unlock_rsb(r); 3787 unhold_rsb(r); 3788 3789 schedule(); 3790 } 3791 up_write(&ls->ls_root_sem); 3792 3793 return 0; 3794} 3795 3796static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket) 3797{ 3798 struct dlm_rsb *r, *r_ret = NULL; 3799 3800 read_lock(&ls->ls_rsbtbl[bucket].lock); 3801 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) { 3802 if (!rsb_flag(r, RSB_LOCKS_PURGED)) 3803 continue; 3804 hold_rsb(r); 3805 rsb_clear_flag(r, RSB_LOCKS_PURGED); 3806 r_ret = r; 3807 break; 3808 } 3809 read_unlock(&ls->ls_rsbtbl[bucket].lock); 3810 return r_ret; 3811} 3812 3813void dlm_grant_after_purge(struct dlm_ls *ls) 3814{ 3815 struct dlm_rsb *r; 3816 int bucket = 0; 3817 3818 while (1) { 3819 r = find_purged_rsb(ls, bucket); 3820 if (!r) { 3821 if (bucket == ls->ls_rsbtbl_size - 1) 3822 break; 3823 bucket++; 3824 continue; 3825 } 3826 lock_rsb(r); 3827 if (is_master(r)) { 3828 grant_pending_locks(r); 3829 confirm_master(r, 0); 3830 } 3831 unlock_rsb(r); 3832 put_rsb(r); 3833 schedule(); 3834 } 3835} 3836 3837static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, 3838 uint32_t remid) 3839{ 3840 struct dlm_lkb *lkb; 3841 3842 list_for_each_entry(lkb, head, lkb_statequeue) { 3843 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid) 3844 return lkb; 3845 } 3846 return NULL; 3847} 3848 3849static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, 3850 uint32_t remid) 3851{ 3852 struct dlm_lkb *lkb; 3853 3854 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid); 3855 if (lkb) 3856 return lkb; 3857 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid); 3858 if (lkb) 3859 return lkb; 3860 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid); 3861 if (lkb) 3862 return lkb; 3863 return NULL; 3864} 3865 3866static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3867 struct dlm_rsb *r, struct dlm_rcom *rc) 3868{ 3869 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 3870 int lvblen; 3871 3872 lkb->lkb_nodeid = rc->rc_header.h_nodeid; 3873 lkb->lkb_ownpid = rl->rl_ownpid; 3874 lkb->lkb_remid = rl->rl_lkid; 3875 lkb->lkb_exflags = rl->rl_exflags; 3876 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF; 3877 lkb->lkb_flags |= DLM_IFL_MSTCPY; 3878 lkb->lkb_lvbseq = rl->rl_lvbseq; 3879 lkb->lkb_rqmode = rl->rl_rqmode; 3880 lkb->lkb_grmode = rl->rl_grmode; 3881 /* don't set lkb_status because add_lkb wants to itself */ 3882 3883 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST); 3884 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP); 3885 3886 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3887 lkb->lkb_lvbptr = allocate_lvb(ls); 3888 if (!lkb->lkb_lvbptr) 3889 return -ENOMEM; 3890 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - 3891 sizeof(struct rcom_lock); 3892 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen); 3893 } 3894 3895 /* Conversions between PR and CW (middle modes) need special handling. 3896 The real granted mode of these converting locks cannot be determined 3897 until all locks have been rebuilt on the rsb (recover_conversion) */ 3898 3899 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) { 3900 rl->rl_status = DLM_LKSTS_CONVERT; 3901 lkb->lkb_grmode = DLM_LOCK_IV; 3902 rsb_set_flag(r, RSB_RECOVER_CONVERT); 3903 } 3904 3905 return 0; 3906} 3907 3908/* This lkb may have been recovered in a previous aborted recovery so we need 3909 to check if the rsb already has an lkb with the given remote nodeid/lkid. 3910 If so we just send back a standard reply. If not, we create a new lkb with 3911 the given values and send back our lkid. We send back our lkid by sending 3912 back the rcom_lock struct we got but with the remid field filled in. */ 3913 3914int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 3915{ 3916 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 3917 struct dlm_rsb *r; 3918 struct dlm_lkb *lkb; 3919 int error; 3920 3921 if (rl->rl_parent_lkid) { 3922 error = -EOPNOTSUPP; 3923 goto out; 3924 } 3925 3926 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r); 3927 if (error) 3928 goto out; 3929 3930 lock_rsb(r); 3931 3932 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid); 3933 if (lkb) { 3934 error = -EEXIST; 3935 goto out_remid; 3936 } 3937 3938 error = create_lkb(ls, &lkb); 3939 if (error) 3940 goto out_unlock; 3941 3942 error = receive_rcom_lock_args(ls, lkb, r, rc); 3943 if (error) { 3944 __put_lkb(ls, lkb); 3945 goto out_unlock; 3946 } 3947 3948 attach_lkb(r, lkb); 3949 add_lkb(r, lkb, rl->rl_status); 3950 error = 0; 3951 3952 out_remid: 3953 /* this is the new value returned to the lock holder for 3954 saving in its process-copy lkb */ 3955 rl->rl_remid = lkb->lkb_id; 3956 3957 out_unlock: 3958 unlock_rsb(r); 3959 put_rsb(r); 3960 out: 3961 if (error) 3962 log_print("recover_master_copy %d %x", error, rl->rl_lkid); 3963 rl->rl_result = error; 3964 return error; 3965} 3966 3967int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 3968{ 3969 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 3970 struct dlm_rsb *r; 3971 struct dlm_lkb *lkb; 3972 int error; 3973 3974 error = find_lkb(ls, rl->rl_lkid, &lkb); 3975 if (error) { 3976 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid); 3977 return error; 3978 } 3979 3980 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3981 3982 error = rl->rl_result; 3983 3984 r = lkb->lkb_resource; 3985 hold_rsb(r); 3986 lock_rsb(r); 3987 3988 switch (error) { 3989 case -EBADR: 3990 /* There's a chance the new master received our lock before 3991 dlm_recover_master_reply(), this wouldn't happen if we did 3992 a barrier between recover_masters and recover_locks. */ 3993 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id, 3994 (unsigned long)r, r->res_name); 3995 dlm_send_rcom_lock(r, lkb); 3996 goto out; 3997 case -EEXIST: 3998 log_debug(ls, "master copy exists %x", lkb->lkb_id); 3999 /* fall through */ 4000 case 0: 4001 lkb->lkb_remid = rl->rl_remid; 4002 break; 4003 default: 4004 log_error(ls, "dlm_recover_process_copy unknown error %d %x", 4005 error, lkb->lkb_id); 4006 } 4007 4008 /* an ack for dlm_recover_locks() which waits for replies from 4009 all the locks it sends to new masters */ 4010 dlm_recovered_lock(r); 4011 out: 4012 unlock_rsb(r); 4013 put_rsb(r); 4014 dlm_put_lkb(lkb); 4015 4016 return 0; 4017} 4018 4019int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, 4020 int mode, uint32_t flags, void *name, unsigned int namelen, 4021 uint32_t parent_lkid) 4022{ 4023 struct dlm_lkb *lkb; 4024 struct dlm_args args; 4025 int error; 4026 4027 lock_recovery(ls); 4028 4029 error = create_lkb(ls, &lkb); 4030 if (error) { 4031 kfree(ua); 4032 goto out; 4033 } 4034 4035 if (flags & DLM_LKF_VALBLK) { 4036 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL); 4037 if (!ua->lksb.sb_lvbptr) { 4038 kfree(ua); 4039 __put_lkb(ls, lkb); 4040 error = -ENOMEM; 4041 goto out; 4042 } 4043 } 4044 4045 /* After ua is attached to lkb it will be freed by free_lkb(). 4046 When DLM_IFL_USER is set, the dlm knows that this is a userspace 4047 lock and that lkb_astparam is the dlm_user_args structure. */ 4048 4049 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid, 4050 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args); 4051 lkb->lkb_flags |= DLM_IFL_USER; 4052 ua->old_mode = DLM_LOCK_IV; 4053 4054 if (error) { 4055 __put_lkb(ls, lkb); 4056 goto out; 4057 } 4058 4059 error = request_lock(ls, lkb, name, namelen, &args); 4060 4061 switch (error) { 4062 case 0: 4063 break; 4064 case -EINPROGRESS: 4065 error = 0; 4066 break; 4067 case -EAGAIN: 4068 error = 0; 4069 /* fall through */ 4070 default: 4071 __put_lkb(ls, lkb); 4072 goto out; 4073 } 4074 4075 /* add this new lkb to the per-process list of locks */ 4076 spin_lock(&ua->proc->locks_spin); 4077 hold_lkb(lkb); 4078 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 4079 spin_unlock(&ua->proc->locks_spin); 4080 out: 4081 unlock_recovery(ls); 4082 return error; 4083} 4084 4085int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 4086 int mode, uint32_t flags, uint32_t lkid, char *lvb_in) 4087{ 4088 struct dlm_lkb *lkb; 4089 struct dlm_args args; 4090 struct dlm_user_args *ua; 4091 int error; 4092 4093 lock_recovery(ls); 4094 4095 error = find_lkb(ls, lkid, &lkb); 4096 if (error) 4097 goto out; 4098 4099 /* user can change the params on its lock when it converts it, or 4100 add an lvb that didn't exist before */ 4101 4102 ua = (struct dlm_user_args *)lkb->lkb_astparam; 4103 4104 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) { 4105 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL); 4106 if (!ua->lksb.sb_lvbptr) { 4107 error = -ENOMEM; 4108 goto out_put; 4109 } 4110 } 4111 if (lvb_in && ua->lksb.sb_lvbptr) 4112 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 4113 4114 ua->castparam = ua_tmp->castparam; 4115 ua->castaddr = ua_tmp->castaddr; 4116 ua->bastparam = ua_tmp->bastparam; 4117 ua->bastaddr = ua_tmp->bastaddr; 4118 ua->user_lksb = ua_tmp->user_lksb; 4119 ua->old_mode = lkb->lkb_grmode; 4120 4121 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST, 4122 ua, DLM_FAKE_USER_AST, &args); 4123 if (error) 4124 goto out_put; 4125 4126 error = convert_lock(ls, lkb, &args); 4127 4128 if (error == -EINPROGRESS || error == -EAGAIN) 4129 error = 0; 4130 out_put: 4131 dlm_put_lkb(lkb); 4132 out: 4133 unlock_recovery(ls); 4134 kfree(ua_tmp); 4135 return error; 4136} 4137 4138int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 4139 uint32_t flags, uint32_t lkid, char *lvb_in) 4140{ 4141 struct dlm_lkb *lkb; 4142 struct dlm_args args; 4143 struct dlm_user_args *ua; 4144 int error; 4145 4146 lock_recovery(ls); 4147 4148 error = find_lkb(ls, lkid, &lkb); 4149 if (error) 4150 goto out; 4151 4152 ua = (struct dlm_user_args *)lkb->lkb_astparam; 4153 4154 if (lvb_in && ua->lksb.sb_lvbptr) 4155 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 4156 ua->castparam = ua_tmp->castparam; 4157 ua->user_lksb = ua_tmp->user_lksb; 4158 4159 error = set_unlock_args(flags, ua, &args); 4160 if (error) 4161 goto out_put; 4162 4163 error = unlock_lock(ls, lkb, &args); 4164 4165 if (error == -DLM_EUNLOCK) 4166 error = 0; 4167 /* from validate_unlock_args() */ 4168 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK)) 4169 error = 0; 4170 if (error) 4171 goto out_put; 4172 4173 spin_lock(&ua->proc->locks_spin); 4174 /* dlm_user_add_ast() may have already taken lkb off the proc list */ 4175 if (!list_empty(&lkb->lkb_ownqueue)) 4176 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); 4177 spin_unlock(&ua->proc->locks_spin); 4178 out_put: 4179 dlm_put_lkb(lkb); 4180 out: 4181 unlock_recovery(ls); 4182 kfree(ua_tmp); 4183 return error; 4184} 4185 4186int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 4187 uint32_t flags, uint32_t lkid) 4188{ 4189 struct dlm_lkb *lkb; 4190 struct dlm_args args; 4191 struct dlm_user_args *ua; 4192 int error; 4193 4194 lock_recovery(ls); 4195 4196 error = find_lkb(ls, lkid, &lkb); 4197 if (error) 4198 goto out; 4199 4200 ua = (struct dlm_user_args *)lkb->lkb_astparam; 4201 ua->castparam = ua_tmp->castparam; 4202 ua->user_lksb = ua_tmp->user_lksb; 4203 4204 error = set_unlock_args(flags, ua, &args); 4205 if (error) 4206 goto out_put; 4207 4208 error = cancel_lock(ls, lkb, &args); 4209 4210 if (error == -DLM_ECANCEL) 4211 error = 0; 4212 /* from validate_unlock_args() */ 4213 if (error == -EBUSY) 4214 error = 0; 4215 out_put: 4216 dlm_put_lkb(lkb); 4217 out: 4218 unlock_recovery(ls); 4219 kfree(ua_tmp); 4220 return error; 4221} 4222 4223/* lkb's that are removed from the waiters list by revert are just left on the 4224 orphans list with the granted orphan locks, to be freed by purge */ 4225 4226static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 4227{ 4228 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; 4229 struct dlm_args args; 4230 int error; 4231 4232 hold_lkb(lkb); 4233 mutex_lock(&ls->ls_orphans_mutex); 4234 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); 4235 mutex_unlock(&ls->ls_orphans_mutex); 4236 4237 set_unlock_args(0, ua, &args); 4238 4239 error = cancel_lock(ls, lkb, &args); 4240 if (error == -DLM_ECANCEL) 4241 error = 0; 4242 return error; 4243} 4244 4245/* The force flag allows the unlock to go ahead even if the lkb isn't granted. 4246 Regardless of what rsb queue the lock is on, it's removed and freed. */ 4247 4248static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 4249{ 4250 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; 4251 struct dlm_args args; 4252 int error; 4253 4254 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args); 4255 4256 error = unlock_lock(ls, lkb, &args); 4257 if (error == -DLM_EUNLOCK) 4258 error = 0; 4259 return error; 4260} 4261 4262/* We have to release clear_proc_locks mutex before calling unlock_proc_lock() 4263 (which does lock_rsb) due to deadlock with receiving a message that does 4264 lock_rsb followed by dlm_user_add_ast() */ 4265 4266static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, 4267 struct dlm_user_proc *proc) 4268{ 4269 struct dlm_lkb *lkb = NULL; 4270 4271 mutex_lock(&ls->ls_clear_proc_locks); 4272 if (list_empty(&proc->locks)) 4273 goto out; 4274 4275 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue); 4276 list_del_init(&lkb->lkb_ownqueue); 4277 4278 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 4279 lkb->lkb_flags |= DLM_IFL_ORPHAN; 4280 else 4281 lkb->lkb_flags |= DLM_IFL_DEAD; 4282 out: 4283 mutex_unlock(&ls->ls_clear_proc_locks); 4284 return lkb; 4285} 4286 4287/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which 4288 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, 4289 which we clear here. */ 4290 4291 4292void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 4293{ 4294 struct dlm_lkb *lkb, *safe; 4295 4296 lock_recovery(ls); 4297 4298 while (1) { 4299 lkb = del_proc_lock(ls, proc); 4300 if (!lkb) 4301 break; 4302 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 4303 orphan_proc_lock(ls, lkb); 4304 else 4305 unlock_proc_lock(ls, lkb); 4306 4307 /* this removes the reference for the proc->locks list 4308 added by dlm_user_request, it may result in the lkb 4309 being freed */ 4310 4311 dlm_put_lkb(lkb); 4312 } 4313 4314 mutex_lock(&ls->ls_clear_proc_locks); 4315 4316 /* in-progress unlocks */ 4317 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 4318 list_del_init(&lkb->lkb_ownqueue); 4319 lkb->lkb_flags |= DLM_IFL_DEAD; 4320 dlm_put_lkb(lkb); 4321 } 4322 4323 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { 4324 list_del(&lkb->lkb_astqueue); 4325 dlm_put_lkb(lkb); 4326 } 4327 4328 mutex_unlock(&ls->ls_clear_proc_locks); 4329 unlock_recovery(ls); 4330} 4331 4332static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 4333{ 4334 struct dlm_lkb *lkb, *safe; 4335 4336 while (1) { 4337 lkb = NULL; 4338 spin_lock(&proc->locks_spin); 4339 if (!list_empty(&proc->locks)) { 4340 lkb = list_entry(proc->locks.next, struct dlm_lkb, 4341 lkb_ownqueue); 4342 list_del_init(&lkb->lkb_ownqueue); 4343 } 4344 spin_unlock(&proc->locks_spin); 4345 4346 if (!lkb) 4347 break; 4348 4349 lkb->lkb_flags |= DLM_IFL_DEAD; 4350 unlock_proc_lock(ls, lkb); 4351 dlm_put_lkb(lkb); /* ref from proc->locks list */ 4352 } 4353 4354 spin_lock(&proc->locks_spin); 4355 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 4356 list_del_init(&lkb->lkb_ownqueue); 4357 lkb->lkb_flags |= DLM_IFL_DEAD; 4358 dlm_put_lkb(lkb); 4359 } 4360 spin_unlock(&proc->locks_spin); 4361 4362 spin_lock(&proc->asts_spin); 4363 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { 4364 list_del(&lkb->lkb_astqueue); 4365 dlm_put_lkb(lkb); 4366 } 4367 spin_unlock(&proc->asts_spin); 4368} 4369 4370/* pid of 0 means purge all orphans */ 4371 4372static void do_purge(struct dlm_ls *ls, int nodeid, int pid) 4373{ 4374 struct dlm_lkb *lkb, *safe; 4375 4376 mutex_lock(&ls->ls_orphans_mutex); 4377 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) { 4378 if (pid && lkb->lkb_ownpid != pid) 4379 continue; 4380 unlock_proc_lock(ls, lkb); 4381 list_del_init(&lkb->lkb_ownqueue); 4382 dlm_put_lkb(lkb); 4383 } 4384 mutex_unlock(&ls->ls_orphans_mutex); 4385} 4386 4387static int send_purge(struct dlm_ls *ls, int nodeid, int pid) 4388{ 4389 struct dlm_message *ms; 4390 struct dlm_mhandle *mh; 4391 int error; 4392 4393 error = _create_message(ls, sizeof(struct dlm_message), nodeid, 4394 DLM_MSG_PURGE, &ms, &mh); 4395 if (error) 4396 return error; 4397 ms->m_nodeid = nodeid; 4398 ms->m_pid = pid; 4399 4400 return send_message(mh, ms); 4401} 4402 4403int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, 4404 int nodeid, int pid) 4405{ 4406 int error = 0; 4407 4408 if (nodeid != dlm_our_nodeid()) { 4409 error = send_purge(ls, nodeid, pid); 4410 } else { 4411 lock_recovery(ls); 4412 if (pid == current->pid) 4413 purge_proc_locks(ls, proc); 4414 else 4415 do_purge(ls, nodeid, pid); 4416 unlock_recovery(ls); 4417 } 4418 return error; 4419} 4420