1/* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * userdlm.c 5 * 6 * Code which implements the kernel side of a minimal userspace 7 * interface to our DLM. 8 * 9 * Many of the functions here are pared down versions of dlmglue.c 10 * functions. 11 * 12 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 13 * 14 * This program is free software; you can redistribute it and/or 15 * modify it under the terms of the GNU General Public 16 * License as published by the Free Software Foundation; either 17 * version 2 of the License, or (at your option) any later version. 18 * 19 * This program is distributed in the hope that it will be useful, 20 * but WITHOUT ANY WARRANTY; without even the implied warranty of 21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 22 * General Public License for more details. 23 * 24 * You should have received a copy of the GNU General Public 25 * License along with this program; if not, write to the 26 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 27 * Boston, MA 021110-1307, USA. 28 */ 29 30#include <linux/signal.h> 31 32#include <linux/module.h> 33#include <linux/fs.h> 34#include <linux/types.h> 35#include <linux/crc32.h> 36 37 38#include "cluster/nodemanager.h" 39#include "cluster/heartbeat.h" 40#include "cluster/tcp.h" 41 42#include "dlmapi.h" 43 44#include "userdlm.h" 45 46#define MLOG_MASK_PREFIX ML_DLMFS 47#include "cluster/masklog.h" 48 49static inline int user_check_wait_flag(struct user_lock_res *lockres, 50 int flag) 51{ 52 int ret; 53 54 spin_lock(&lockres->l_lock); 55 ret = lockres->l_flags & flag; 56 spin_unlock(&lockres->l_lock); 57 58 return ret; 59} 60 61static inline void user_wait_on_busy_lock(struct user_lock_res *lockres) 62 63{ 64 wait_event(lockres->l_event, 65 !user_check_wait_flag(lockres, USER_LOCK_BUSY)); 66} 67 68static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres) 69 70{ 71 wait_event(lockres->l_event, 72 !user_check_wait_flag(lockres, USER_LOCK_BLOCKED)); 73} 74 75/* I heart container_of... */ 76static inline struct dlm_ctxt * 77dlm_ctxt_from_user_lockres(struct user_lock_res *lockres) 78{ 79 struct dlmfs_inode_private *ip; 80 81 ip = container_of(lockres, 82 struct dlmfs_inode_private, 83 ip_lockres); 84 return ip->ip_dlm; 85} 86 87static struct inode * 88user_dlm_inode_from_user_lockres(struct user_lock_res *lockres) 89{ 90 struct dlmfs_inode_private *ip; 91 92 ip = container_of(lockres, 93 struct dlmfs_inode_private, 94 ip_lockres); 95 return &ip->ip_vfs_inode; 96} 97 98static inline void user_recover_from_dlm_error(struct user_lock_res *lockres) 99{ 100 spin_lock(&lockres->l_lock); 101 lockres->l_flags &= ~USER_LOCK_BUSY; 102 spin_unlock(&lockres->l_lock); 103} 104 105#define user_log_dlm_error(_func, _stat, _lockres) do { \ 106 mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \ 107 "resource %.*s: %s\n", dlm_errname(_stat), _func, \ 108 _lockres->l_namelen, _lockres->l_name, dlm_errmsg(_stat)); \ 109} while (0) 110 111/* WARNING: This function lives in a world where the only three lock 112 * levels are EX, PR, and NL. It *will* have to be adjusted when more 113 * lock types are added. */ 114static inline int user_highest_compat_lock_level(int level) 115{ 116 int new_level = LKM_EXMODE; 117 118 if (level == LKM_EXMODE) 119 new_level = LKM_NLMODE; 120 else if (level == LKM_PRMODE) 121 new_level = LKM_PRMODE; 122 return new_level; 123} 124 125static void user_ast(void *opaque) 126{ 127 struct user_lock_res *lockres = opaque; 128 struct dlm_lockstatus *lksb; 129 130 mlog(0, "AST fired for lockres %.*s\n", lockres->l_namelen, 131 lockres->l_name); 132 133 spin_lock(&lockres->l_lock); 134 135 lksb = &(lockres->l_lksb); 136 if (lksb->status != DLM_NORMAL) { 137 mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n", 138 lksb->status, lockres->l_namelen, lockres->l_name); 139 spin_unlock(&lockres->l_lock); 140 return; 141 } 142 143 mlog_bug_on_msg(lockres->l_requested == LKM_IVMODE, 144 "Lockres %.*s, requested ivmode. flags 0x%x\n", 145 lockres->l_namelen, lockres->l_name, lockres->l_flags); 146 147 /* we're downconverting. */ 148 if (lockres->l_requested < lockres->l_level) { 149 if (lockres->l_requested <= 150 user_highest_compat_lock_level(lockres->l_blocking)) { 151 lockres->l_blocking = LKM_NLMODE; 152 lockres->l_flags &= ~USER_LOCK_BLOCKED; 153 } 154 } 155 156 lockres->l_level = lockres->l_requested; 157 lockres->l_requested = LKM_IVMODE; 158 lockres->l_flags |= USER_LOCK_ATTACHED; 159 lockres->l_flags &= ~USER_LOCK_BUSY; 160 161 spin_unlock(&lockres->l_lock); 162 163 wake_up(&lockres->l_event); 164} 165 166static inline void user_dlm_grab_inode_ref(struct user_lock_res *lockres) 167{ 168 struct inode *inode; 169 inode = user_dlm_inode_from_user_lockres(lockres); 170 if (!igrab(inode)) 171 BUG(); 172} 173 174static void user_dlm_unblock_lock(struct work_struct *work); 175 176static void __user_dlm_queue_lockres(struct user_lock_res *lockres) 177{ 178 if (!(lockres->l_flags & USER_LOCK_QUEUED)) { 179 user_dlm_grab_inode_ref(lockres); 180 181 INIT_WORK(&lockres->l_work, user_dlm_unblock_lock); 182 183 queue_work(user_dlm_worker, &lockres->l_work); 184 lockres->l_flags |= USER_LOCK_QUEUED; 185 } 186} 187 188static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres) 189{ 190 int queue = 0; 191 192 if (!(lockres->l_flags & USER_LOCK_BLOCKED)) 193 return; 194 195 switch (lockres->l_blocking) { 196 case LKM_EXMODE: 197 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 198 queue = 1; 199 break; 200 case LKM_PRMODE: 201 if (!lockres->l_ex_holders) 202 queue = 1; 203 break; 204 default: 205 BUG(); 206 } 207 208 if (queue) 209 __user_dlm_queue_lockres(lockres); 210} 211 212static void user_bast(void *opaque, int level) 213{ 214 struct user_lock_res *lockres = opaque; 215 216 mlog(0, "Blocking AST fired for lockres %.*s. Blocking level %d\n", 217 lockres->l_namelen, lockres->l_name, level); 218 219 spin_lock(&lockres->l_lock); 220 lockres->l_flags |= USER_LOCK_BLOCKED; 221 if (level > lockres->l_blocking) 222 lockres->l_blocking = level; 223 224 __user_dlm_queue_lockres(lockres); 225 spin_unlock(&lockres->l_lock); 226 227 wake_up(&lockres->l_event); 228} 229 230static void user_unlock_ast(void *opaque, enum dlm_status status) 231{ 232 struct user_lock_res *lockres = opaque; 233 234 mlog(0, "UNLOCK AST called on lock %.*s\n", lockres->l_namelen, 235 lockres->l_name); 236 237 if (status != DLM_NORMAL && status != DLM_CANCELGRANT) 238 mlog(ML_ERROR, "Dlm returns status %d\n", status); 239 240 spin_lock(&lockres->l_lock); 241 /* The teardown flag gets set early during the unlock process, 242 * so test the cancel flag to make sure that this ast isn't 243 * for a concurrent cancel. */ 244 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN 245 && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) { 246 lockres->l_level = LKM_IVMODE; 247 } else if (status == DLM_CANCELGRANT) { 248 /* We tried to cancel a convert request, but it was 249 * already granted. Don't clear the busy flag - the 250 * ast should've done this already. */ 251 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL)); 252 lockres->l_flags &= ~USER_LOCK_IN_CANCEL; 253 goto out_noclear; 254 } else { 255 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL)); 256 /* Cancel succeeded, we want to re-queue */ 257 lockres->l_requested = LKM_IVMODE; /* cancel an 258 * upconvert 259 * request. */ 260 lockres->l_flags &= ~USER_LOCK_IN_CANCEL; 261 /* we want the unblock thread to look at it again 262 * now. */ 263 if (lockres->l_flags & USER_LOCK_BLOCKED) 264 __user_dlm_queue_lockres(lockres); 265 } 266 267 lockres->l_flags &= ~USER_LOCK_BUSY; 268out_noclear: 269 spin_unlock(&lockres->l_lock); 270 271 wake_up(&lockres->l_event); 272} 273 274static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres) 275{ 276 struct inode *inode; 277 inode = user_dlm_inode_from_user_lockres(lockres); 278 iput(inode); 279} 280 281static void user_dlm_unblock_lock(struct work_struct *work) 282{ 283 int new_level, status; 284 struct user_lock_res *lockres = 285 container_of(work, struct user_lock_res, l_work); 286 struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres); 287 288 mlog(0, "processing lockres %.*s\n", lockres->l_namelen, 289 lockres->l_name); 290 291 spin_lock(&lockres->l_lock); 292 293 mlog_bug_on_msg(!(lockres->l_flags & USER_LOCK_QUEUED), 294 "Lockres %.*s, flags 0x%x\n", 295 lockres->l_namelen, lockres->l_name, lockres->l_flags); 296 297 /* notice that we don't clear USER_LOCK_BLOCKED here. If it's 298 * set, we want user_ast clear it. */ 299 lockres->l_flags &= ~USER_LOCK_QUEUED; 300 301 /* It's valid to get here and no longer be blocked - if we get 302 * several basts in a row, we might be queued by the first 303 * one, the unblock thread might run and clear the queued 304 * flag, and finally we might get another bast which re-queues 305 * us before our ast for the downconvert is called. */ 306 if (!(lockres->l_flags & USER_LOCK_BLOCKED)) { 307 spin_unlock(&lockres->l_lock); 308 goto drop_ref; 309 } 310 311 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) { 312 spin_unlock(&lockres->l_lock); 313 goto drop_ref; 314 } 315 316 if (lockres->l_flags & USER_LOCK_BUSY) { 317 if (lockres->l_flags & USER_LOCK_IN_CANCEL) { 318 spin_unlock(&lockres->l_lock); 319 goto drop_ref; 320 } 321 322 lockres->l_flags |= USER_LOCK_IN_CANCEL; 323 spin_unlock(&lockres->l_lock); 324 325 status = dlmunlock(dlm, 326 &lockres->l_lksb, 327 LKM_CANCEL, 328 user_unlock_ast, 329 lockres); 330 if (status != DLM_NORMAL) 331 user_log_dlm_error("dlmunlock", status, lockres); 332 goto drop_ref; 333 } 334 335 /* If there are still incompat holders, we can exit safely 336 * without worrying about re-queueing this lock as that will 337 * happen on the last call to user_cluster_unlock. */ 338 if ((lockres->l_blocking == LKM_EXMODE) 339 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 340 spin_unlock(&lockres->l_lock); 341 mlog(0, "can't downconvert for ex: ro = %u, ex = %u\n", 342 lockres->l_ro_holders, lockres->l_ex_holders); 343 goto drop_ref; 344 } 345 346 if ((lockres->l_blocking == LKM_PRMODE) 347 && lockres->l_ex_holders) { 348 spin_unlock(&lockres->l_lock); 349 mlog(0, "can't downconvert for pr: ex = %u\n", 350 lockres->l_ex_holders); 351 goto drop_ref; 352 } 353 354 /* yay, we can downconvert now. */ 355 new_level = user_highest_compat_lock_level(lockres->l_blocking); 356 lockres->l_requested = new_level; 357 lockres->l_flags |= USER_LOCK_BUSY; 358 mlog(0, "Downconvert lock from %d to %d\n", 359 lockres->l_level, new_level); 360 spin_unlock(&lockres->l_lock); 361 362 /* need lock downconvert request now... */ 363 status = dlmlock(dlm, 364 new_level, 365 &lockres->l_lksb, 366 LKM_CONVERT|LKM_VALBLK, 367 lockres->l_name, 368 lockres->l_namelen, 369 user_ast, 370 lockres, 371 user_bast); 372 if (status != DLM_NORMAL) { 373 user_log_dlm_error("dlmlock", status, lockres); 374 user_recover_from_dlm_error(lockres); 375 } 376 377drop_ref: 378 user_dlm_drop_inode_ref(lockres); 379} 380 381static inline void user_dlm_inc_holders(struct user_lock_res *lockres, 382 int level) 383{ 384 switch(level) { 385 case LKM_EXMODE: 386 lockres->l_ex_holders++; 387 break; 388 case LKM_PRMODE: 389 lockres->l_ro_holders++; 390 break; 391 default: 392 BUG(); 393 } 394} 395 396/* predict what lock level we'll be dropping down to on behalf 397 * of another node, and return true if the currently wanted 398 * level will be compatible with it. */ 399static inline int 400user_may_continue_on_blocked_lock(struct user_lock_res *lockres, 401 int wanted) 402{ 403 BUG_ON(!(lockres->l_flags & USER_LOCK_BLOCKED)); 404 405 return wanted <= user_highest_compat_lock_level(lockres->l_blocking); 406} 407 408int user_dlm_cluster_lock(struct user_lock_res *lockres, 409 int level, 410 int lkm_flags) 411{ 412 int status, local_flags; 413 struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres); 414 415 if (level != LKM_EXMODE && 416 level != LKM_PRMODE) { 417 mlog(ML_ERROR, "lockres %.*s: invalid request!\n", 418 lockres->l_namelen, lockres->l_name); 419 status = -EINVAL; 420 goto bail; 421 } 422 423 mlog(0, "lockres %.*s: asking for %s lock, passed flags = 0x%x\n", 424 lockres->l_namelen, lockres->l_name, 425 (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE", 426 lkm_flags); 427 428again: 429 if (signal_pending(current)) { 430 status = -ERESTARTSYS; 431 goto bail; 432 } 433 434 spin_lock(&lockres->l_lock); 435 436 /* We only compare against the currently granted level 437 * here. If the lock is blocked waiting on a downconvert, 438 * we'll get caught below. */ 439 if ((lockres->l_flags & USER_LOCK_BUSY) && 440 (level > lockres->l_level)) { 441 /* is someone sitting in dlm_lock? If so, wait on 442 * them. */ 443 spin_unlock(&lockres->l_lock); 444 445 user_wait_on_busy_lock(lockres); 446 goto again; 447 } 448 449 if ((lockres->l_flags & USER_LOCK_BLOCKED) && 450 (!user_may_continue_on_blocked_lock(lockres, level))) { 451 /* is the lock is currently blocked on behalf of 452 * another node */ 453 spin_unlock(&lockres->l_lock); 454 455 user_wait_on_blocked_lock(lockres); 456 goto again; 457 } 458 459 if (level > lockres->l_level) { 460 local_flags = lkm_flags | LKM_VALBLK; 461 if (lockres->l_level != LKM_IVMODE) 462 local_flags |= LKM_CONVERT; 463 464 lockres->l_requested = level; 465 lockres->l_flags |= USER_LOCK_BUSY; 466 spin_unlock(&lockres->l_lock); 467 468 BUG_ON(level == LKM_IVMODE); 469 BUG_ON(level == LKM_NLMODE); 470 471 /* call dlm_lock to upgrade lock now */ 472 status = dlmlock(dlm, 473 level, 474 &lockres->l_lksb, 475 local_flags, 476 lockres->l_name, 477 lockres->l_namelen, 478 user_ast, 479 lockres, 480 user_bast); 481 if (status != DLM_NORMAL) { 482 if ((lkm_flags & LKM_NOQUEUE) && 483 (status == DLM_NOTQUEUED)) 484 status = -EAGAIN; 485 else { 486 user_log_dlm_error("dlmlock", status, lockres); 487 status = -EINVAL; 488 } 489 user_recover_from_dlm_error(lockres); 490 goto bail; 491 } 492 493 user_wait_on_busy_lock(lockres); 494 goto again; 495 } 496 497 user_dlm_inc_holders(lockres, level); 498 spin_unlock(&lockres->l_lock); 499 500 status = 0; 501bail: 502 return status; 503} 504 505static inline void user_dlm_dec_holders(struct user_lock_res *lockres, 506 int level) 507{ 508 switch(level) { 509 case LKM_EXMODE: 510 BUG_ON(!lockres->l_ex_holders); 511 lockres->l_ex_holders--; 512 break; 513 case LKM_PRMODE: 514 BUG_ON(!lockres->l_ro_holders); 515 lockres->l_ro_holders--; 516 break; 517 default: 518 BUG(); 519 } 520} 521 522void user_dlm_cluster_unlock(struct user_lock_res *lockres, 523 int level) 524{ 525 if (level != LKM_EXMODE && 526 level != LKM_PRMODE) { 527 mlog(ML_ERROR, "lockres %.*s: invalid request!\n", 528 lockres->l_namelen, lockres->l_name); 529 return; 530 } 531 532 spin_lock(&lockres->l_lock); 533 user_dlm_dec_holders(lockres, level); 534 __user_dlm_cond_queue_lockres(lockres); 535 spin_unlock(&lockres->l_lock); 536} 537 538void user_dlm_write_lvb(struct inode *inode, 539 const char *val, 540 unsigned int len) 541{ 542 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres; 543 char *lvb = lockres->l_lksb.lvb; 544 545 BUG_ON(len > DLM_LVB_LEN); 546 547 spin_lock(&lockres->l_lock); 548 549 BUG_ON(lockres->l_level < LKM_EXMODE); 550 memcpy(lvb, val, len); 551 552 spin_unlock(&lockres->l_lock); 553} 554 555void user_dlm_read_lvb(struct inode *inode, 556 char *val, 557 unsigned int len) 558{ 559 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres; 560 char *lvb = lockres->l_lksb.lvb; 561 562 BUG_ON(len > DLM_LVB_LEN); 563 564 spin_lock(&lockres->l_lock); 565 566 BUG_ON(lockres->l_level < LKM_PRMODE); 567 memcpy(val, lvb, len); 568 569 spin_unlock(&lockres->l_lock); 570} 571 572void user_dlm_lock_res_init(struct user_lock_res *lockres, 573 struct dentry *dentry) 574{ 575 memset(lockres, 0, sizeof(*lockres)); 576 577 spin_lock_init(&lockres->l_lock); 578 init_waitqueue_head(&lockres->l_event); 579 lockres->l_level = LKM_IVMODE; 580 lockres->l_requested = LKM_IVMODE; 581 lockres->l_blocking = LKM_IVMODE; 582 583 /* should have been checked before getting here. */ 584 BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN); 585 586 memcpy(lockres->l_name, 587 dentry->d_name.name, 588 dentry->d_name.len); 589 lockres->l_namelen = dentry->d_name.len; 590} 591 592int user_dlm_destroy_lock(struct user_lock_res *lockres) 593{ 594 int status = -EBUSY; 595 struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres); 596 597 mlog(0, "asked to destroy %.*s\n", lockres->l_namelen, lockres->l_name); 598 599 spin_lock(&lockres->l_lock); 600 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) { 601 spin_unlock(&lockres->l_lock); 602 return 0; 603 } 604 605 lockres->l_flags |= USER_LOCK_IN_TEARDOWN; 606 607 while (lockres->l_flags & USER_LOCK_BUSY) { 608 spin_unlock(&lockres->l_lock); 609 610 user_wait_on_busy_lock(lockres); 611 612 spin_lock(&lockres->l_lock); 613 } 614 615 if (lockres->l_ro_holders || lockres->l_ex_holders) { 616 spin_unlock(&lockres->l_lock); 617 goto bail; 618 } 619 620 status = 0; 621 if (!(lockres->l_flags & USER_LOCK_ATTACHED)) { 622 spin_unlock(&lockres->l_lock); 623 goto bail; 624 } 625 626 lockres->l_flags &= ~USER_LOCK_ATTACHED; 627 lockres->l_flags |= USER_LOCK_BUSY; 628 spin_unlock(&lockres->l_lock); 629 630 status = dlmunlock(dlm, 631 &lockres->l_lksb, 632 LKM_VALBLK, 633 user_unlock_ast, 634 lockres); 635 if (status != DLM_NORMAL) { 636 user_log_dlm_error("dlmunlock", status, lockres); 637 status = -EINVAL; 638 goto bail; 639 } 640 641 user_wait_on_busy_lock(lockres); 642 643 status = 0; 644bail: 645 return status; 646} 647 648struct dlm_ctxt *user_dlm_register_context(struct qstr *name) 649{ 650 struct dlm_ctxt *dlm; 651 u32 dlm_key; 652 char *domain; 653 654 domain = kmalloc(name->len + 1, GFP_NOFS); 655 if (!domain) { 656 mlog_errno(-ENOMEM); 657 return ERR_PTR(-ENOMEM); 658 } 659 660 dlm_key = crc32_le(0, name->name, name->len); 661 662 snprintf(domain, name->len + 1, "%.*s", name->len, name->name); 663 664 dlm = dlm_register_domain(domain, dlm_key); 665 if (IS_ERR(dlm)) 666 mlog_errno(PTR_ERR(dlm)); 667 668 kfree(domain); 669 return dlm; 670} 671 672void user_dlm_unregister_context(struct dlm_ctxt *dlm) 673{ 674 dlm_unregister_domain(dlm); 675} 676