1/* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmdomain.c 5 * 6 * defines domain join / leave apis 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27#include <linux/module.h> 28#include <linux/types.h> 29#include <linux/slab.h> 30#include <linux/highmem.h> 31#include <linux/init.h> 32#include <linux/spinlock.h> 33#include <linux/delay.h> 34#include <linux/err.h> 35#include <linux/debugfs.h> 36 37#include "cluster/heartbeat.h" 38#include "cluster/nodemanager.h" 39#include "cluster/tcp.h" 40 41#include "dlmapi.h" 42#include "dlmcommon.h" 43#include "dlmdomain.h" 44#include "dlmdebug.h" 45 46#include "dlmver.h" 47 48#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN) 49#include "cluster/masklog.h" 50 51static inline void byte_set_bit(u8 nr, u8 map[]) 52{ 53 map[nr >> 3] |= (1UL << (nr & 7)); 54} 55 56static inline int byte_test_bit(u8 nr, u8 map[]) 57{ 58 return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0; 59} 60 61static inline void byte_copymap(u8 dmap[], unsigned long smap[], 62 unsigned int sz) 63{ 64 unsigned int nn; 65 66 if (!sz) 67 return; 68 69 memset(dmap, 0, ((sz + 7) >> 3)); 70 for (nn = 0 ; nn < sz; nn++) 71 if (test_bit(nn, smap)) 72 byte_set_bit(nn, dmap); 73} 74 75static void dlm_free_pagevec(void **vec, int pages) 76{ 77 while (pages--) 78 free_page((unsigned long)vec[pages]); 79 kfree(vec); 80} 81 82static void **dlm_alloc_pagevec(int pages) 83{ 84 void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL); 85 int i; 86 87 if (!vec) 88 return NULL; 89 90 for (i = 0; i < pages; i++) 91 if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL))) 92 goto out_free; 93 94 mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n", 95 pages, (unsigned long)DLM_HASH_PAGES, 96 (unsigned long)DLM_BUCKETS_PER_PAGE); 97 return vec; 98out_free: 99 dlm_free_pagevec(vec, i); 100 return NULL; 101} 102 103/* 104 * 105 * spinlock lock ordering: if multiple locks are needed, obey this ordering: 106 * dlm_domain_lock 107 * struct dlm_ctxt->spinlock 108 * struct dlm_lock_resource->spinlock 109 * struct dlm_ctxt->master_lock 110 * struct dlm_ctxt->ast_lock 111 * dlm_master_list_entry->spinlock 112 * dlm_lock->spinlock 113 * 114 */ 115 116DEFINE_SPINLOCK(dlm_domain_lock); 117LIST_HEAD(dlm_domains); 118static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events); 119 120/* 121 * The supported protocol version for DLM communication. Running domains 122 * will have a negotiated version with the same major number and a minor 123 * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should 124 * be used to determine what a running domain is actually using. 125 */ 126static const struct dlm_protocol_version dlm_protocol = { 127 .pv_major = 1, 128 .pv_minor = 0, 129}; 130 131#define DLM_DOMAIN_BACKOFF_MS 200 132 133static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, 134 void **ret_data); 135static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, 136 void **ret_data); 137static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, 138 void **ret_data); 139static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, 140 void **ret_data); 141static int dlm_protocol_compare(struct dlm_protocol_version *existing, 142 struct dlm_protocol_version *request); 143 144static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); 145 146void __dlm_unhash_lockres(struct dlm_lock_resource *lockres) 147{ 148 if (!hlist_unhashed(&lockres->hash_node)) { 149 hlist_del_init(&lockres->hash_node); 150 dlm_lockres_put(lockres); 151 } 152} 153 154void __dlm_insert_lockres(struct dlm_ctxt *dlm, 155 struct dlm_lock_resource *res) 156{ 157 struct hlist_head *bucket; 158 struct qstr *q; 159 160 assert_spin_locked(&dlm->spinlock); 161 162 q = &res->lockname; 163 bucket = dlm_lockres_hash(dlm, q->hash); 164 165 /* get a reference for our hashtable */ 166 dlm_lockres_get(res); 167 168 hlist_add_head(&res->hash_node, bucket); 169} 170 171struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, 172 const char *name, 173 unsigned int len, 174 unsigned int hash) 175{ 176 struct hlist_head *bucket; 177 struct hlist_node *list; 178 179 mlog_entry("%.*s\n", len, name); 180 181 assert_spin_locked(&dlm->spinlock); 182 183 bucket = dlm_lockres_hash(dlm, hash); 184 185 hlist_for_each(list, bucket) { 186 struct dlm_lock_resource *res = hlist_entry(list, 187 struct dlm_lock_resource, hash_node); 188 if (res->lockname.name[0] != name[0]) 189 continue; 190 if (unlikely(res->lockname.len != len)) 191 continue; 192 if (memcmp(res->lockname.name + 1, name + 1, len - 1)) 193 continue; 194 dlm_lockres_get(res); 195 return res; 196 } 197 return NULL; 198} 199 200/* intended to be called by functions which do not care about lock 201 * resources which are being purged (most net _handler functions). 202 * this will return NULL for any lock resource which is found but 203 * currently in the process of dropping its mastery reference. 204 * use __dlm_lookup_lockres_full when you need the lock resource 205 * regardless (e.g. dlm_get_lock_resource) */ 206struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 207 const char *name, 208 unsigned int len, 209 unsigned int hash) 210{ 211 struct dlm_lock_resource *res = NULL; 212 213 mlog_entry("%.*s\n", len, name); 214 215 assert_spin_locked(&dlm->spinlock); 216 217 res = __dlm_lookup_lockres_full(dlm, name, len, hash); 218 if (res) { 219 spin_lock(&res->spinlock); 220 if (res->state & DLM_LOCK_RES_DROPPING_REF) { 221 spin_unlock(&res->spinlock); 222 dlm_lockres_put(res); 223 return NULL; 224 } 225 spin_unlock(&res->spinlock); 226 } 227 228 return res; 229} 230 231struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, 232 const char *name, 233 unsigned int len) 234{ 235 struct dlm_lock_resource *res; 236 unsigned int hash = dlm_lockid_hash(name, len); 237 238 spin_lock(&dlm->spinlock); 239 res = __dlm_lookup_lockres(dlm, name, len, hash); 240 spin_unlock(&dlm->spinlock); 241 return res; 242} 243 244static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len) 245{ 246 struct dlm_ctxt *tmp = NULL; 247 struct list_head *iter; 248 249 assert_spin_locked(&dlm_domain_lock); 250 251 /* tmp->name here is always NULL terminated, 252 * but domain may not be! */ 253 list_for_each(iter, &dlm_domains) { 254 tmp = list_entry (iter, struct dlm_ctxt, list); 255 if (strlen(tmp->name) == len && 256 memcmp(tmp->name, domain, len)==0) 257 break; 258 tmp = NULL; 259 } 260 261 return tmp; 262} 263 264/* For null terminated domain strings ONLY */ 265static struct dlm_ctxt * __dlm_lookup_domain(const char *domain) 266{ 267 assert_spin_locked(&dlm_domain_lock); 268 269 return __dlm_lookup_domain_full(domain, strlen(domain)); 270} 271 272 273/* returns true on one of two conditions: 274 * 1) the domain does not exist 275 * 2) the domain exists and it's state is "joined" */ 276static int dlm_wait_on_domain_helper(const char *domain) 277{ 278 int ret = 0; 279 struct dlm_ctxt *tmp = NULL; 280 281 spin_lock(&dlm_domain_lock); 282 283 tmp = __dlm_lookup_domain(domain); 284 if (!tmp) 285 ret = 1; 286 else if (tmp->dlm_state == DLM_CTXT_JOINED) 287 ret = 1; 288 289 spin_unlock(&dlm_domain_lock); 290 return ret; 291} 292 293static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm) 294{ 295 dlm_destroy_debugfs_subroot(dlm); 296 297 if (dlm->lockres_hash) 298 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); 299 300 if (dlm->master_hash) 301 dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES); 302 303 if (dlm->name) 304 kfree(dlm->name); 305 306 kfree(dlm); 307} 308 309/* A little strange - this function will be called while holding 310 * dlm_domain_lock and is expected to be holding it on the way out. We 311 * will however drop and reacquire it multiple times */ 312static void dlm_ctxt_release(struct kref *kref) 313{ 314 struct dlm_ctxt *dlm; 315 316 dlm = container_of(kref, struct dlm_ctxt, dlm_refs); 317 318 BUG_ON(dlm->num_joins); 319 BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED); 320 321 /* we may still be in the list if we hit an error during join. */ 322 list_del_init(&dlm->list); 323 324 spin_unlock(&dlm_domain_lock); 325 326 mlog(0, "freeing memory from domain %s\n", dlm->name); 327 328 wake_up(&dlm_domain_events); 329 330 dlm_free_ctxt_mem(dlm); 331 332 spin_lock(&dlm_domain_lock); 333} 334 335void dlm_put(struct dlm_ctxt *dlm) 336{ 337 spin_lock(&dlm_domain_lock); 338 kref_put(&dlm->dlm_refs, dlm_ctxt_release); 339 spin_unlock(&dlm_domain_lock); 340} 341 342static void __dlm_get(struct dlm_ctxt *dlm) 343{ 344 kref_get(&dlm->dlm_refs); 345} 346 347/* given a questionable reference to a dlm object, gets a reference if 348 * it can find it in the list, otherwise returns NULL in which case 349 * you shouldn't trust your pointer. */ 350struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm) 351{ 352 struct list_head *iter; 353 struct dlm_ctxt *target = NULL; 354 355 spin_lock(&dlm_domain_lock); 356 357 list_for_each(iter, &dlm_domains) { 358 target = list_entry (iter, struct dlm_ctxt, list); 359 360 if (target == dlm) { 361 __dlm_get(target); 362 break; 363 } 364 365 target = NULL; 366 } 367 368 spin_unlock(&dlm_domain_lock); 369 370 return target; 371} 372 373int dlm_domain_fully_joined(struct dlm_ctxt *dlm) 374{ 375 int ret; 376 377 spin_lock(&dlm_domain_lock); 378 ret = (dlm->dlm_state == DLM_CTXT_JOINED) || 379 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN); 380 spin_unlock(&dlm_domain_lock); 381 382 return ret; 383} 384 385static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm) 386{ 387 if (dlm->dlm_worker) { 388 flush_workqueue(dlm->dlm_worker); 389 destroy_workqueue(dlm->dlm_worker); 390 dlm->dlm_worker = NULL; 391 } 392} 393 394static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm) 395{ 396 dlm_unregister_domain_handlers(dlm); 397 dlm_debug_shutdown(dlm); 398 dlm_complete_thread(dlm); 399 dlm_complete_recovery_thread(dlm); 400 dlm_destroy_dlm_worker(dlm); 401 402 /* We've left the domain. Now we can take ourselves out of the 403 * list and allow the kref stuff to help us free the 404 * memory. */ 405 spin_lock(&dlm_domain_lock); 406 list_del_init(&dlm->list); 407 spin_unlock(&dlm_domain_lock); 408 409 /* Wake up anyone waiting for us to remove this domain */ 410 wake_up(&dlm_domain_events); 411} 412 413static int dlm_migrate_all_locks(struct dlm_ctxt *dlm) 414{ 415 int i, num, n, ret = 0; 416 struct dlm_lock_resource *res; 417 struct hlist_node *iter; 418 struct hlist_head *bucket; 419 int dropped; 420 421 mlog(0, "Migrating locks from domain %s\n", dlm->name); 422 423 num = 0; 424 spin_lock(&dlm->spinlock); 425 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 426redo_bucket: 427 n = 0; 428 bucket = dlm_lockres_hash(dlm, i); 429 iter = bucket->first; 430 while (iter) { 431 n++; 432 res = hlist_entry(iter, struct dlm_lock_resource, 433 hash_node); 434 dlm_lockres_get(res); 435 /* migrate, if necessary. this will drop the dlm 436 * spinlock and retake it if it does migration. */ 437 dropped = dlm_empty_lockres(dlm, res); 438 439 spin_lock(&res->spinlock); 440 __dlm_lockres_calc_usage(dlm, res); 441 iter = res->hash_node.next; 442 spin_unlock(&res->spinlock); 443 444 dlm_lockres_put(res); 445 446 if (dropped) 447 goto redo_bucket; 448 } 449 cond_resched_lock(&dlm->spinlock); 450 num += n; 451 mlog(0, "%s: touched %d lockreses in bucket %d " 452 "(tot=%d)\n", dlm->name, n, i, num); 453 } 454 spin_unlock(&dlm->spinlock); 455 wake_up(&dlm->dlm_thread_wq); 456 457 /* let the dlm thread take care of purging, keep scanning until 458 * nothing remains in the hash */ 459 if (num) { 460 mlog(0, "%s: %d lock resources in hash last pass\n", 461 dlm->name, num); 462 ret = -EAGAIN; 463 } 464 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name); 465 return ret; 466} 467 468static int dlm_no_joining_node(struct dlm_ctxt *dlm) 469{ 470 int ret; 471 472 spin_lock(&dlm->spinlock); 473 ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN; 474 spin_unlock(&dlm->spinlock); 475 476 return ret; 477} 478 479static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm) 480{ 481 /* Yikes, a double spinlock! I need domain_lock for the dlm 482 * state and the dlm spinlock for join state... Sorry! */ 483again: 484 spin_lock(&dlm_domain_lock); 485 spin_lock(&dlm->spinlock); 486 487 if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 488 mlog(0, "Node %d is joining, we wait on it.\n", 489 dlm->joining_node); 490 spin_unlock(&dlm->spinlock); 491 spin_unlock(&dlm_domain_lock); 492 493 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm)); 494 goto again; 495 } 496 497 dlm->dlm_state = DLM_CTXT_LEAVING; 498 spin_unlock(&dlm->spinlock); 499 spin_unlock(&dlm_domain_lock); 500} 501 502static void __dlm_print_nodes(struct dlm_ctxt *dlm) 503{ 504 int node = -1; 505 506 assert_spin_locked(&dlm->spinlock); 507 508 printk(KERN_NOTICE "o2dlm: Nodes in domain %s: ", dlm->name); 509 510 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 511 node + 1)) < O2NM_MAX_NODES) { 512 printk("%d ", node); 513 } 514 printk("\n"); 515} 516 517static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, 518 void **ret_data) 519{ 520 struct dlm_ctxt *dlm = data; 521 unsigned int node; 522 struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf; 523 524 mlog_entry("%p %u %p", msg, len, data); 525 526 if (!dlm_grab(dlm)) 527 return 0; 528 529 node = exit_msg->node_idx; 530 531 printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s\n", node, dlm->name); 532 533 spin_lock(&dlm->spinlock); 534 clear_bit(node, dlm->domain_map); 535 __dlm_print_nodes(dlm); 536 537 /* notify anything attached to the heartbeat events */ 538 dlm_hb_event_notify_attached(dlm, node, 0); 539 540 spin_unlock(&dlm->spinlock); 541 542 dlm_put(dlm); 543 544 return 0; 545} 546 547static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm, 548 unsigned int node) 549{ 550 int status; 551 struct dlm_exit_domain leave_msg; 552 553 mlog(0, "Asking node %u if we can leave the domain %s me = %u\n", 554 node, dlm->name, dlm->node_num); 555 556 memset(&leave_msg, 0, sizeof(leave_msg)); 557 leave_msg.node_idx = dlm->node_num; 558 559 status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key, 560 &leave_msg, sizeof(leave_msg), node, 561 NULL); 562 if (status < 0) 563 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " 564 "node %u\n", status, DLM_EXIT_DOMAIN_MSG, dlm->key, node); 565 mlog(0, "status return %d from o2net_send_message\n", status); 566 567 return status; 568} 569 570 571static void dlm_leave_domain(struct dlm_ctxt *dlm) 572{ 573 int node, clear_node, status; 574 575 /* At this point we've migrated away all our locks and won't 576 * accept mastership of new ones. The dlm is responsible for 577 * almost nothing now. We make sure not to confuse any joining 578 * nodes and then commence shutdown procedure. */ 579 580 spin_lock(&dlm->spinlock); 581 /* Clear ourselves from the domain map */ 582 clear_bit(dlm->node_num, dlm->domain_map); 583 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 584 0)) < O2NM_MAX_NODES) { 585 /* Drop the dlm spinlock. This is safe wrt the domain_map. 586 * -nodes cannot be added now as the 587 * query_join_handlers knows to respond with OK_NO_MAP 588 * -we catch the right network errors if a node is 589 * removed from the map while we're sending him the 590 * exit message. */ 591 spin_unlock(&dlm->spinlock); 592 593 clear_node = 1; 594 595 status = dlm_send_one_domain_exit(dlm, node); 596 if (status < 0 && 597 status != -ENOPROTOOPT && 598 status != -ENOTCONN) { 599 mlog(ML_NOTICE, "Error %d sending domain exit message " 600 "to node %d\n", status, node); 601 602 /* Not sure what to do here but lets sleep for 603 * a bit in case this was a transient 604 * error... */ 605 msleep(DLM_DOMAIN_BACKOFF_MS); 606 clear_node = 0; 607 } 608 609 spin_lock(&dlm->spinlock); 610 /* If we're not clearing the node bit then we intend 611 * to loop back around to try again. */ 612 if (clear_node) 613 clear_bit(node, dlm->domain_map); 614 } 615 spin_unlock(&dlm->spinlock); 616} 617 618int dlm_joined(struct dlm_ctxt *dlm) 619{ 620 int ret = 0; 621 622 spin_lock(&dlm_domain_lock); 623 624 if (dlm->dlm_state == DLM_CTXT_JOINED) 625 ret = 1; 626 627 spin_unlock(&dlm_domain_lock); 628 629 return ret; 630} 631 632int dlm_shutting_down(struct dlm_ctxt *dlm) 633{ 634 int ret = 0; 635 636 spin_lock(&dlm_domain_lock); 637 638 if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN) 639 ret = 1; 640 641 spin_unlock(&dlm_domain_lock); 642 643 return ret; 644} 645 646void dlm_unregister_domain(struct dlm_ctxt *dlm) 647{ 648 int leave = 0; 649 struct dlm_lock_resource *res; 650 651 spin_lock(&dlm_domain_lock); 652 BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED); 653 BUG_ON(!dlm->num_joins); 654 655 dlm->num_joins--; 656 if (!dlm->num_joins) { 657 /* We mark it "in shutdown" now so new register 658 * requests wait until we've completely left the 659 * domain. Don't use DLM_CTXT_LEAVING yet as we still 660 * want new domain joins to communicate with us at 661 * least until we've completed migration of our 662 * resources. */ 663 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN; 664 leave = 1; 665 } 666 spin_unlock(&dlm_domain_lock); 667 668 if (leave) { 669 mlog(0, "shutting down domain %s\n", dlm->name); 670 671 /* We changed dlm state, notify the thread */ 672 dlm_kick_thread(dlm, NULL); 673 674 while (dlm_migrate_all_locks(dlm)) { 675 /* Give dlm_thread time to purge the lockres' */ 676 msleep(500); 677 mlog(0, "%s: more migration to do\n", dlm->name); 678 } 679 680 /* This list should be empty. If not, print remaining lockres */ 681 if (!list_empty(&dlm->tracking_list)) { 682 mlog(ML_ERROR, "Following lockres' are still on the " 683 "tracking list:\n"); 684 list_for_each_entry(res, &dlm->tracking_list, tracking) 685 dlm_print_one_lock_resource(res); 686 } 687 688 dlm_mark_domain_leaving(dlm); 689 dlm_leave_domain(dlm); 690 dlm_force_free_mles(dlm); 691 dlm_complete_dlm_shutdown(dlm); 692 } 693 dlm_put(dlm); 694} 695EXPORT_SYMBOL_GPL(dlm_unregister_domain); 696 697static int dlm_query_join_proto_check(char *proto_type, int node, 698 struct dlm_protocol_version *ours, 699 struct dlm_protocol_version *request) 700{ 701 int rc; 702 struct dlm_protocol_version proto = *request; 703 704 if (!dlm_protocol_compare(ours, &proto)) { 705 mlog(0, 706 "node %u wanted to join with %s locking protocol " 707 "%u.%u, we respond with %u.%u\n", 708 node, proto_type, 709 request->pv_major, 710 request->pv_minor, 711 proto.pv_major, proto.pv_minor); 712 request->pv_minor = proto.pv_minor; 713 rc = 0; 714 } else { 715 mlog(ML_NOTICE, 716 "Node %u wanted to join with %s locking " 717 "protocol %u.%u, but we have %u.%u, disallowing\n", 718 node, proto_type, 719 request->pv_major, 720 request->pv_minor, 721 ours->pv_major, 722 ours->pv_minor); 723 rc = 1; 724 } 725 726 return rc; 727} 728 729/* 730 * struct dlm_query_join_packet is made up of four one-byte fields. They 731 * are effectively in big-endian order already. However, little-endian 732 * machines swap them before putting the packet on the wire (because 733 * query_join's response is a status, and that status is treated as a u32 734 * on the wire). Thus, a big-endian and little-endian machines will treat 735 * this structure differently. 736 * 737 * The solution is to have little-endian machines swap the structure when 738 * converting from the structure to the u32 representation. This will 739 * result in the structure having the correct format on the wire no matter 740 * the host endian format. 741 */ 742static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet, 743 u32 *wire) 744{ 745 union dlm_query_join_response response; 746 747 response.packet = *packet; 748 *wire = cpu_to_be32(response.intval); 749} 750 751static void dlm_query_join_wire_to_packet(u32 wire, 752 struct dlm_query_join_packet *packet) 753{ 754 union dlm_query_join_response response; 755 756 response.intval = cpu_to_be32(wire); 757 *packet = response.packet; 758} 759 760static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, 761 void **ret_data) 762{ 763 struct dlm_query_join_request *query; 764 struct dlm_query_join_packet packet = { 765 .code = JOIN_DISALLOW, 766 }; 767 struct dlm_ctxt *dlm = NULL; 768 u32 response; 769 u8 nodenum; 770 771 query = (struct dlm_query_join_request *) msg->buf; 772 773 mlog(0, "node %u wants to join domain %s\n", query->node_idx, 774 query->domain); 775 776 /* 777 * If heartbeat doesn't consider the node live, tell it 778 * to back off and try again. This gives heartbeat a chance 779 * to catch up. 780 */ 781 if (!o2hb_check_node_heartbeating(query->node_idx)) { 782 mlog(0, "node %u is not in our live map yet\n", 783 query->node_idx); 784 785 packet.code = JOIN_DISALLOW; 786 goto respond; 787 } 788 789 packet.code = JOIN_OK_NO_MAP; 790 791 spin_lock(&dlm_domain_lock); 792 dlm = __dlm_lookup_domain_full(query->domain, query->name_len); 793 if (!dlm) 794 goto unlock_respond; 795 796 /* 797 * There is a small window where the joining node may not see the 798 * node(s) that just left but still part of the cluster. DISALLOW 799 * join request if joining node has different node map. 800 */ 801 nodenum=0; 802 while (nodenum < O2NM_MAX_NODES) { 803 if (test_bit(nodenum, dlm->domain_map)) { 804 if (!byte_test_bit(nodenum, query->node_map)) { 805 mlog(0, "disallow join as node %u does not " 806 "have node %u in its nodemap\n", 807 query->node_idx, nodenum); 808 packet.code = JOIN_DISALLOW; 809 goto unlock_respond; 810 } 811 } 812 nodenum++; 813 } 814 815 /* Once the dlm ctxt is marked as leaving then we don't want 816 * to be put in someone's domain map. 817 * Also, explicitly disallow joining at certain troublesome 818 * times (ie. during recovery). */ 819 if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) { 820 int bit = query->node_idx; 821 spin_lock(&dlm->spinlock); 822 823 if (dlm->dlm_state == DLM_CTXT_NEW && 824 dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) { 825 /*If this is a brand new context and we 826 * haven't started our join process yet, then 827 * the other node won the race. */ 828 packet.code = JOIN_OK_NO_MAP; 829 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 830 /* Disallow parallel joins. */ 831 packet.code = JOIN_DISALLOW; 832 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { 833 mlog(0, "node %u trying to join, but recovery " 834 "is ongoing.\n", bit); 835 packet.code = JOIN_DISALLOW; 836 } else if (test_bit(bit, dlm->recovery_map)) { 837 mlog(0, "node %u trying to join, but it " 838 "still needs recovery.\n", bit); 839 packet.code = JOIN_DISALLOW; 840 } else if (test_bit(bit, dlm->domain_map)) { 841 mlog(0, "node %u trying to join, but it " 842 "is still in the domain! needs recovery?\n", 843 bit); 844 packet.code = JOIN_DISALLOW; 845 } else { 846 /* Alright we're fully a part of this domain 847 * so we keep some state as to who's joining 848 * and indicate to him that needs to be fixed 849 * up. */ 850 851 /* Make sure we speak compatible locking protocols. */ 852 if (dlm_query_join_proto_check("DLM", bit, 853 &dlm->dlm_locking_proto, 854 &query->dlm_proto)) { 855 packet.code = JOIN_PROTOCOL_MISMATCH; 856 } else if (dlm_query_join_proto_check("fs", bit, 857 &dlm->fs_locking_proto, 858 &query->fs_proto)) { 859 packet.code = JOIN_PROTOCOL_MISMATCH; 860 } else { 861 packet.dlm_minor = query->dlm_proto.pv_minor; 862 packet.fs_minor = query->fs_proto.pv_minor; 863 packet.code = JOIN_OK; 864 __dlm_set_joining_node(dlm, query->node_idx); 865 } 866 } 867 868 spin_unlock(&dlm->spinlock); 869 } 870unlock_respond: 871 spin_unlock(&dlm_domain_lock); 872 873respond: 874 mlog(0, "We respond with %u\n", packet.code); 875 876 dlm_query_join_packet_to_wire(&packet, &response); 877 return response; 878} 879 880static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, 881 void **ret_data) 882{ 883 struct dlm_assert_joined *assert; 884 struct dlm_ctxt *dlm = NULL; 885 886 assert = (struct dlm_assert_joined *) msg->buf; 887 888 mlog(0, "node %u asserts join on domain %s\n", assert->node_idx, 889 assert->domain); 890 891 spin_lock(&dlm_domain_lock); 892 dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len); 893 if (dlm) { 894 spin_lock(&dlm->spinlock); 895 896 /* Alright, this node has officially joined our 897 * domain. Set him in the map and clean up our 898 * leftover join state. */ 899 BUG_ON(dlm->joining_node != assert->node_idx); 900 set_bit(assert->node_idx, dlm->domain_map); 901 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 902 903 printk(KERN_NOTICE "o2dlm: Node %u joins domain %s\n", 904 assert->node_idx, dlm->name); 905 __dlm_print_nodes(dlm); 906 907 /* notify anything attached to the heartbeat events */ 908 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1); 909 910 spin_unlock(&dlm->spinlock); 911 } 912 spin_unlock(&dlm_domain_lock); 913 914 return 0; 915} 916 917static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, 918 void **ret_data) 919{ 920 struct dlm_cancel_join *cancel; 921 struct dlm_ctxt *dlm = NULL; 922 923 cancel = (struct dlm_cancel_join *) msg->buf; 924 925 mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx, 926 cancel->domain); 927 928 spin_lock(&dlm_domain_lock); 929 dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len); 930 931 if (dlm) { 932 spin_lock(&dlm->spinlock); 933 934 /* Yikes, this guy wants to cancel his join. No 935 * problem, we simply cleanup our join state. */ 936 BUG_ON(dlm->joining_node != cancel->node_idx); 937 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 938 939 spin_unlock(&dlm->spinlock); 940 } 941 spin_unlock(&dlm_domain_lock); 942 943 return 0; 944} 945 946static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm, 947 unsigned int node) 948{ 949 int status; 950 struct dlm_cancel_join cancel_msg; 951 952 memset(&cancel_msg, 0, sizeof(cancel_msg)); 953 cancel_msg.node_idx = dlm->node_num; 954 cancel_msg.name_len = strlen(dlm->name); 955 memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len); 956 957 status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 958 &cancel_msg, sizeof(cancel_msg), node, 959 NULL); 960 if (status < 0) { 961 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " 962 "node %u\n", status, DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 963 node); 964 goto bail; 965 } 966 967bail: 968 return status; 969} 970 971/* map_size should be in bytes. */ 972static int dlm_send_join_cancels(struct dlm_ctxt *dlm, 973 unsigned long *node_map, 974 unsigned int map_size) 975{ 976 int status, tmpstat; 977 unsigned int node; 978 979 if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) * 980 sizeof(unsigned long))) { 981 mlog(ML_ERROR, 982 "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n", 983 map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES)); 984 return -EINVAL; 985 } 986 987 status = 0; 988 node = -1; 989 while ((node = find_next_bit(node_map, O2NM_MAX_NODES, 990 node + 1)) < O2NM_MAX_NODES) { 991 if (node == dlm->node_num) 992 continue; 993 994 tmpstat = dlm_send_one_join_cancel(dlm, node); 995 if (tmpstat) { 996 mlog(ML_ERROR, "Error return %d cancelling join on " 997 "node %d\n", tmpstat, node); 998 if (!status) 999 status = tmpstat; 1000 } 1001 } 1002 1003 if (status) 1004 mlog_errno(status); 1005 return status; 1006} 1007 1008static int dlm_request_join(struct dlm_ctxt *dlm, 1009 int node, 1010 enum dlm_query_join_response_code *response) 1011{ 1012 int status; 1013 struct dlm_query_join_request join_msg; 1014 struct dlm_query_join_packet packet; 1015 u32 join_resp; 1016 1017 mlog(0, "querying node %d\n", node); 1018 1019 memset(&join_msg, 0, sizeof(join_msg)); 1020 join_msg.node_idx = dlm->node_num; 1021 join_msg.name_len = strlen(dlm->name); 1022 memcpy(join_msg.domain, dlm->name, join_msg.name_len); 1023 join_msg.dlm_proto = dlm->dlm_locking_proto; 1024 join_msg.fs_proto = dlm->fs_locking_proto; 1025 1026 /* copy live node map to join message */ 1027 byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES); 1028 1029 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg, 1030 sizeof(join_msg), node, &join_resp); 1031 if (status < 0 && status != -ENOPROTOOPT) { 1032 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " 1033 "node %u\n", status, DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, 1034 node); 1035 goto bail; 1036 } 1037 dlm_query_join_wire_to_packet(join_resp, &packet); 1038 1039 /* -ENOPROTOOPT from the net code means the other side isn't 1040 listening for our message type -- that's fine, it means 1041 his dlm isn't up, so we can consider him a 'yes' but not 1042 joined into the domain. */ 1043 if (status == -ENOPROTOOPT) { 1044 status = 0; 1045 *response = JOIN_OK_NO_MAP; 1046 } else if (packet.code == JOIN_DISALLOW || 1047 packet.code == JOIN_OK_NO_MAP) { 1048 *response = packet.code; 1049 } else if (packet.code == JOIN_PROTOCOL_MISMATCH) { 1050 mlog(ML_NOTICE, 1051 "This node requested DLM locking protocol %u.%u and " 1052 "filesystem locking protocol %u.%u. At least one of " 1053 "the protocol versions on node %d is not compatible, " 1054 "disconnecting\n", 1055 dlm->dlm_locking_proto.pv_major, 1056 dlm->dlm_locking_proto.pv_minor, 1057 dlm->fs_locking_proto.pv_major, 1058 dlm->fs_locking_proto.pv_minor, 1059 node); 1060 status = -EPROTO; 1061 *response = packet.code; 1062 } else if (packet.code == JOIN_OK) { 1063 *response = packet.code; 1064 /* Use the same locking protocol as the remote node */ 1065 dlm->dlm_locking_proto.pv_minor = packet.dlm_minor; 1066 dlm->fs_locking_proto.pv_minor = packet.fs_minor; 1067 mlog(0, 1068 "Node %d responds JOIN_OK with DLM locking protocol " 1069 "%u.%u and fs locking protocol %u.%u\n", 1070 node, 1071 dlm->dlm_locking_proto.pv_major, 1072 dlm->dlm_locking_proto.pv_minor, 1073 dlm->fs_locking_proto.pv_major, 1074 dlm->fs_locking_proto.pv_minor); 1075 } else { 1076 status = -EINVAL; 1077 mlog(ML_ERROR, "invalid response %d from node %u\n", 1078 packet.code, node); 1079 } 1080 1081 mlog(0, "status %d, node %d response is %d\n", status, node, 1082 *response); 1083 1084bail: 1085 return status; 1086} 1087 1088static int dlm_send_one_join_assert(struct dlm_ctxt *dlm, 1089 unsigned int node) 1090{ 1091 int status; 1092 struct dlm_assert_joined assert_msg; 1093 1094 mlog(0, "Sending join assert to node %u\n", node); 1095 1096 memset(&assert_msg, 0, sizeof(assert_msg)); 1097 assert_msg.node_idx = dlm->node_num; 1098 assert_msg.name_len = strlen(dlm->name); 1099 memcpy(assert_msg.domain, dlm->name, assert_msg.name_len); 1100 1101 status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 1102 &assert_msg, sizeof(assert_msg), node, 1103 NULL); 1104 if (status < 0) 1105 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " 1106 "node %u\n", status, DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 1107 node); 1108 1109 return status; 1110} 1111 1112static void dlm_send_join_asserts(struct dlm_ctxt *dlm, 1113 unsigned long *node_map) 1114{ 1115 int status, node, live; 1116 1117 status = 0; 1118 node = -1; 1119 while ((node = find_next_bit(node_map, O2NM_MAX_NODES, 1120 node + 1)) < O2NM_MAX_NODES) { 1121 if (node == dlm->node_num) 1122 continue; 1123 1124 do { 1125 /* It is very important that this message be 1126 * received so we spin until either the node 1127 * has died or it gets the message. */ 1128 status = dlm_send_one_join_assert(dlm, node); 1129 1130 spin_lock(&dlm->spinlock); 1131 live = test_bit(node, dlm->live_nodes_map); 1132 spin_unlock(&dlm->spinlock); 1133 1134 if (status) { 1135 mlog(ML_ERROR, "Error return %d asserting " 1136 "join on node %d\n", status, node); 1137 1138 /* give us some time between errors... */ 1139 if (live) 1140 msleep(DLM_DOMAIN_BACKOFF_MS); 1141 } 1142 } while (status && live); 1143 } 1144} 1145 1146struct domain_join_ctxt { 1147 unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1148 unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1149}; 1150 1151static int dlm_should_restart_join(struct dlm_ctxt *dlm, 1152 struct domain_join_ctxt *ctxt, 1153 enum dlm_query_join_response_code response) 1154{ 1155 int ret; 1156 1157 if (response == JOIN_DISALLOW) { 1158 mlog(0, "Latest response of disallow -- should restart\n"); 1159 return 1; 1160 } 1161 1162 spin_lock(&dlm->spinlock); 1163 /* For now, we restart the process if the node maps have 1164 * changed at all */ 1165 ret = memcmp(ctxt->live_map, dlm->live_nodes_map, 1166 sizeof(dlm->live_nodes_map)); 1167 spin_unlock(&dlm->spinlock); 1168 1169 if (ret) 1170 mlog(0, "Node maps changed -- should restart\n"); 1171 1172 return ret; 1173} 1174 1175static int dlm_try_to_join_domain(struct dlm_ctxt *dlm) 1176{ 1177 int status = 0, tmpstat, node; 1178 struct domain_join_ctxt *ctxt; 1179 enum dlm_query_join_response_code response = JOIN_DISALLOW; 1180 1181 mlog_entry("%p", dlm); 1182 1183 ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL); 1184 if (!ctxt) { 1185 status = -ENOMEM; 1186 mlog_errno(status); 1187 goto bail; 1188 } 1189 1190 /* group sem locking should work for us here -- we're already 1191 * registered for heartbeat events so filling this should be 1192 * atomic wrt getting those handlers called. */ 1193 o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map)); 1194 1195 spin_lock(&dlm->spinlock); 1196 memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map)); 1197 1198 __dlm_set_joining_node(dlm, dlm->node_num); 1199 1200 spin_unlock(&dlm->spinlock); 1201 1202 node = -1; 1203 while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES, 1204 node + 1)) < O2NM_MAX_NODES) { 1205 if (node == dlm->node_num) 1206 continue; 1207 1208 status = dlm_request_join(dlm, node, &response); 1209 if (status < 0) { 1210 mlog_errno(status); 1211 goto bail; 1212 } 1213 1214 /* Ok, either we got a response or the node doesn't have a 1215 * dlm up. */ 1216 if (response == JOIN_OK) 1217 set_bit(node, ctxt->yes_resp_map); 1218 1219 if (dlm_should_restart_join(dlm, ctxt, response)) { 1220 status = -EAGAIN; 1221 goto bail; 1222 } 1223 } 1224 1225 mlog(0, "Yay, done querying nodes!\n"); 1226 1227 /* Yay, everyone agree's we can join the domain. My domain is 1228 * comprised of all nodes who were put in the 1229 * yes_resp_map. Copy that into our domain map and send a join 1230 * assert message to clean up everyone elses state. */ 1231 spin_lock(&dlm->spinlock); 1232 memcpy(dlm->domain_map, ctxt->yes_resp_map, 1233 sizeof(ctxt->yes_resp_map)); 1234 set_bit(dlm->node_num, dlm->domain_map); 1235 spin_unlock(&dlm->spinlock); 1236 1237 dlm_send_join_asserts(dlm, ctxt->yes_resp_map); 1238 1239 /* Joined state *must* be set before the joining node 1240 * information, otherwise the query_join handler may read no 1241 * current joiner but a state of NEW and tell joining nodes 1242 * we're not in the domain. */ 1243 spin_lock(&dlm_domain_lock); 1244 dlm->dlm_state = DLM_CTXT_JOINED; 1245 dlm->num_joins++; 1246 spin_unlock(&dlm_domain_lock); 1247 1248bail: 1249 spin_lock(&dlm->spinlock); 1250 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 1251 if (!status) 1252 __dlm_print_nodes(dlm); 1253 spin_unlock(&dlm->spinlock); 1254 1255 if (ctxt) { 1256 /* Do we need to send a cancel message to any nodes? */ 1257 if (status < 0) { 1258 tmpstat = dlm_send_join_cancels(dlm, 1259 ctxt->yes_resp_map, 1260 sizeof(ctxt->yes_resp_map)); 1261 if (tmpstat < 0) 1262 mlog_errno(tmpstat); 1263 } 1264 kfree(ctxt); 1265 } 1266 1267 mlog(0, "returning %d\n", status); 1268 return status; 1269} 1270 1271static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm) 1272{ 1273 o2hb_unregister_callback(NULL, &dlm->dlm_hb_up); 1274 o2hb_unregister_callback(NULL, &dlm->dlm_hb_down); 1275 o2net_unregister_handler_list(&dlm->dlm_domain_handlers); 1276} 1277 1278static int dlm_register_domain_handlers(struct dlm_ctxt *dlm) 1279{ 1280 int status; 1281 1282 mlog(0, "registering handlers.\n"); 1283 1284 o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB, 1285 dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI); 1286 status = o2hb_register_callback(NULL, &dlm->dlm_hb_down); 1287 if (status) 1288 goto bail; 1289 1290 o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB, 1291 dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI); 1292 status = o2hb_register_callback(NULL, &dlm->dlm_hb_up); 1293 if (status) 1294 goto bail; 1295 1296 status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key, 1297 sizeof(struct dlm_master_request), 1298 dlm_master_request_handler, 1299 dlm, NULL, &dlm->dlm_domain_handlers); 1300 if (status) 1301 goto bail; 1302 1303 status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key, 1304 sizeof(struct dlm_assert_master), 1305 dlm_assert_master_handler, 1306 dlm, dlm_assert_master_post_handler, 1307 &dlm->dlm_domain_handlers); 1308 if (status) 1309 goto bail; 1310 1311 status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key, 1312 sizeof(struct dlm_create_lock), 1313 dlm_create_lock_handler, 1314 dlm, NULL, &dlm->dlm_domain_handlers); 1315 if (status) 1316 goto bail; 1317 1318 status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key, 1319 DLM_CONVERT_LOCK_MAX_LEN, 1320 dlm_convert_lock_handler, 1321 dlm, NULL, &dlm->dlm_domain_handlers); 1322 if (status) 1323 goto bail; 1324 1325 status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key, 1326 DLM_UNLOCK_LOCK_MAX_LEN, 1327 dlm_unlock_lock_handler, 1328 dlm, NULL, &dlm->dlm_domain_handlers); 1329 if (status) 1330 goto bail; 1331 1332 status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key, 1333 DLM_PROXY_AST_MAX_LEN, 1334 dlm_proxy_ast_handler, 1335 dlm, NULL, &dlm->dlm_domain_handlers); 1336 if (status) 1337 goto bail; 1338 1339 status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key, 1340 sizeof(struct dlm_exit_domain), 1341 dlm_exit_domain_handler, 1342 dlm, NULL, &dlm->dlm_domain_handlers); 1343 if (status) 1344 goto bail; 1345 1346 status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key, 1347 sizeof(struct dlm_deref_lockres), 1348 dlm_deref_lockres_handler, 1349 dlm, NULL, &dlm->dlm_domain_handlers); 1350 if (status) 1351 goto bail; 1352 1353 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key, 1354 sizeof(struct dlm_migrate_request), 1355 dlm_migrate_request_handler, 1356 dlm, NULL, &dlm->dlm_domain_handlers); 1357 if (status) 1358 goto bail; 1359 1360 status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key, 1361 DLM_MIG_LOCKRES_MAX_LEN, 1362 dlm_mig_lockres_handler, 1363 dlm, NULL, &dlm->dlm_domain_handlers); 1364 if (status) 1365 goto bail; 1366 1367 status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key, 1368 sizeof(struct dlm_master_requery), 1369 dlm_master_requery_handler, 1370 dlm, NULL, &dlm->dlm_domain_handlers); 1371 if (status) 1372 goto bail; 1373 1374 status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key, 1375 sizeof(struct dlm_lock_request), 1376 dlm_request_all_locks_handler, 1377 dlm, NULL, &dlm->dlm_domain_handlers); 1378 if (status) 1379 goto bail; 1380 1381 status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key, 1382 sizeof(struct dlm_reco_data_done), 1383 dlm_reco_data_done_handler, 1384 dlm, NULL, &dlm->dlm_domain_handlers); 1385 if (status) 1386 goto bail; 1387 1388 status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key, 1389 sizeof(struct dlm_begin_reco), 1390 dlm_begin_reco_handler, 1391 dlm, NULL, &dlm->dlm_domain_handlers); 1392 if (status) 1393 goto bail; 1394 1395 status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key, 1396 sizeof(struct dlm_finalize_reco), 1397 dlm_finalize_reco_handler, 1398 dlm, NULL, &dlm->dlm_domain_handlers); 1399 if (status) 1400 goto bail; 1401 1402bail: 1403 if (status) 1404 dlm_unregister_domain_handlers(dlm); 1405 1406 return status; 1407} 1408 1409static int dlm_join_domain(struct dlm_ctxt *dlm) 1410{ 1411 int status; 1412 unsigned int backoff; 1413 unsigned int total_backoff = 0; 1414 1415 BUG_ON(!dlm); 1416 1417 mlog(0, "Join domain %s\n", dlm->name); 1418 1419 status = dlm_register_domain_handlers(dlm); 1420 if (status) { 1421 mlog_errno(status); 1422 goto bail; 1423 } 1424 1425 status = dlm_debug_init(dlm); 1426 if (status < 0) { 1427 mlog_errno(status); 1428 goto bail; 1429 } 1430 1431 status = dlm_launch_thread(dlm); 1432 if (status < 0) { 1433 mlog_errno(status); 1434 goto bail; 1435 } 1436 1437 status = dlm_launch_recovery_thread(dlm); 1438 if (status < 0) { 1439 mlog_errno(status); 1440 goto bail; 1441 } 1442 1443 dlm->dlm_worker = create_singlethread_workqueue("dlm_wq"); 1444 if (!dlm->dlm_worker) { 1445 status = -ENOMEM; 1446 mlog_errno(status); 1447 goto bail; 1448 } 1449 1450 do { 1451 status = dlm_try_to_join_domain(dlm); 1452 1453 /* If we're racing another node to the join, then we 1454 * need to back off temporarily and let them 1455 * complete. */ 1456#define DLM_JOIN_TIMEOUT_MSECS 90000 1457 if (status == -EAGAIN) { 1458 if (signal_pending(current)) { 1459 status = -ERESTARTSYS; 1460 goto bail; 1461 } 1462 1463 if (total_backoff > 1464 msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) { 1465 status = -ERESTARTSYS; 1466 mlog(ML_NOTICE, "Timed out joining dlm domain " 1467 "%s after %u msecs\n", dlm->name, 1468 jiffies_to_msecs(total_backoff)); 1469 goto bail; 1470 } 1471 1472 /* 1473 * <chip> After you! 1474 * <dale> No, after you! 1475 * <chip> I insist! 1476 * <dale> But you first! 1477 * ... 1478 */ 1479 backoff = (unsigned int)(jiffies & 0x3); 1480 backoff *= DLM_DOMAIN_BACKOFF_MS; 1481 total_backoff += backoff; 1482 mlog(0, "backoff %d\n", backoff); 1483 msleep(backoff); 1484 } 1485 } while (status == -EAGAIN); 1486 1487 if (status < 0) { 1488 mlog_errno(status); 1489 goto bail; 1490 } 1491 1492 status = 0; 1493bail: 1494 wake_up(&dlm_domain_events); 1495 1496 if (status) { 1497 dlm_unregister_domain_handlers(dlm); 1498 dlm_debug_shutdown(dlm); 1499 dlm_complete_thread(dlm); 1500 dlm_complete_recovery_thread(dlm); 1501 dlm_destroy_dlm_worker(dlm); 1502 } 1503 1504 return status; 1505} 1506 1507static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, 1508 u32 key) 1509{ 1510 int i; 1511 int ret; 1512 struct dlm_ctxt *dlm = NULL; 1513 1514 dlm = kzalloc(sizeof(*dlm), GFP_KERNEL); 1515 if (!dlm) { 1516 mlog_errno(-ENOMEM); 1517 goto leave; 1518 } 1519 1520 dlm->name = kstrdup(domain, GFP_KERNEL); 1521 if (dlm->name == NULL) { 1522 mlog_errno(-ENOMEM); 1523 kfree(dlm); 1524 dlm = NULL; 1525 goto leave; 1526 } 1527 1528 dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES); 1529 if (!dlm->lockres_hash) { 1530 mlog_errno(-ENOMEM); 1531 kfree(dlm->name); 1532 kfree(dlm); 1533 dlm = NULL; 1534 goto leave; 1535 } 1536 1537 for (i = 0; i < DLM_HASH_BUCKETS; i++) 1538 INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i)); 1539 1540 dlm->master_hash = (struct hlist_head **) 1541 dlm_alloc_pagevec(DLM_HASH_PAGES); 1542 if (!dlm->master_hash) { 1543 mlog_errno(-ENOMEM); 1544 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); 1545 kfree(dlm->name); 1546 kfree(dlm); 1547 dlm = NULL; 1548 goto leave; 1549 } 1550 1551 for (i = 0; i < DLM_HASH_BUCKETS; i++) 1552 INIT_HLIST_HEAD(dlm_master_hash(dlm, i)); 1553 1554 dlm->key = key; 1555 dlm->node_num = o2nm_this_node(); 1556 1557 ret = dlm_create_debugfs_subroot(dlm); 1558 if (ret < 0) { 1559 dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES); 1560 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); 1561 kfree(dlm->name); 1562 kfree(dlm); 1563 dlm = NULL; 1564 goto leave; 1565 } 1566 1567 spin_lock_init(&dlm->spinlock); 1568 spin_lock_init(&dlm->master_lock); 1569 spin_lock_init(&dlm->ast_lock); 1570 spin_lock_init(&dlm->track_lock); 1571 INIT_LIST_HEAD(&dlm->list); 1572 INIT_LIST_HEAD(&dlm->dirty_list); 1573 INIT_LIST_HEAD(&dlm->reco.resources); 1574 INIT_LIST_HEAD(&dlm->reco.received); 1575 INIT_LIST_HEAD(&dlm->reco.node_data); 1576 INIT_LIST_HEAD(&dlm->purge_list); 1577 INIT_LIST_HEAD(&dlm->dlm_domain_handlers); 1578 INIT_LIST_HEAD(&dlm->tracking_list); 1579 dlm->reco.state = 0; 1580 1581 INIT_LIST_HEAD(&dlm->pending_asts); 1582 INIT_LIST_HEAD(&dlm->pending_basts); 1583 1584 mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n", 1585 dlm->recovery_map, &(dlm->recovery_map[0])); 1586 1587 memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map)); 1588 memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map)); 1589 memset(dlm->domain_map, 0, sizeof(dlm->domain_map)); 1590 1591 dlm->dlm_thread_task = NULL; 1592 dlm->dlm_reco_thread_task = NULL; 1593 dlm->dlm_worker = NULL; 1594 init_waitqueue_head(&dlm->dlm_thread_wq); 1595 init_waitqueue_head(&dlm->dlm_reco_thread_wq); 1596 init_waitqueue_head(&dlm->reco.event); 1597 init_waitqueue_head(&dlm->ast_wq); 1598 init_waitqueue_head(&dlm->migration_wq); 1599 INIT_LIST_HEAD(&dlm->mle_hb_events); 1600 1601 dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN; 1602 init_waitqueue_head(&dlm->dlm_join_events); 1603 1604 dlm->reco.new_master = O2NM_INVALID_NODE_NUM; 1605 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; 1606 1607 atomic_set(&dlm->res_tot_count, 0); 1608 atomic_set(&dlm->res_cur_count, 0); 1609 for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) { 1610 atomic_set(&dlm->mle_tot_count[i], 0); 1611 atomic_set(&dlm->mle_cur_count[i], 0); 1612 } 1613 1614 spin_lock_init(&dlm->work_lock); 1615 INIT_LIST_HEAD(&dlm->work_list); 1616 INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work); 1617 1618 kref_init(&dlm->dlm_refs); 1619 dlm->dlm_state = DLM_CTXT_NEW; 1620 1621 INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks); 1622 1623 mlog(0, "context init: refcount %u\n", 1624 atomic_read(&dlm->dlm_refs.refcount)); 1625 1626leave: 1627 return dlm; 1628} 1629 1630/* 1631 * Compare a requested locking protocol version against the current one. 1632 * 1633 * If the major numbers are different, they are incompatible. 1634 * If the current minor is greater than the request, they are incompatible. 1635 * If the current minor is less than or equal to the request, they are 1636 * compatible, and the requester should run at the current minor version. 1637 */ 1638static int dlm_protocol_compare(struct dlm_protocol_version *existing, 1639 struct dlm_protocol_version *request) 1640{ 1641 if (existing->pv_major != request->pv_major) 1642 return 1; 1643 1644 if (existing->pv_minor > request->pv_minor) 1645 return 1; 1646 1647 if (existing->pv_minor < request->pv_minor) 1648 request->pv_minor = existing->pv_minor; 1649 1650 return 0; 1651} 1652 1653/* 1654 * dlm_register_domain: one-time setup per "domain". 1655 * 1656 * The filesystem passes in the requested locking version via proto. 1657 * If registration was successful, proto will contain the negotiated 1658 * locking protocol. 1659 */ 1660struct dlm_ctxt * dlm_register_domain(const char *domain, 1661 u32 key, 1662 struct dlm_protocol_version *fs_proto) 1663{ 1664 int ret; 1665 struct dlm_ctxt *dlm = NULL; 1666 struct dlm_ctxt *new_ctxt = NULL; 1667 1668 if (strlen(domain) >= O2NM_MAX_NAME_LEN) { 1669 ret = -ENAMETOOLONG; 1670 mlog(ML_ERROR, "domain name length too long\n"); 1671 goto leave; 1672 } 1673 1674 if (!o2hb_check_local_node_heartbeating()) { 1675 mlog(ML_ERROR, "the local node has not been configured, or is " 1676 "not heartbeating\n"); 1677 ret = -EPROTO; 1678 goto leave; 1679 } 1680 1681 mlog(0, "register called for domain \"%s\"\n", domain); 1682 1683retry: 1684 dlm = NULL; 1685 if (signal_pending(current)) { 1686 ret = -ERESTARTSYS; 1687 mlog_errno(ret); 1688 goto leave; 1689 } 1690 1691 spin_lock(&dlm_domain_lock); 1692 1693 dlm = __dlm_lookup_domain(domain); 1694 if (dlm) { 1695 if (dlm->dlm_state != DLM_CTXT_JOINED) { 1696 spin_unlock(&dlm_domain_lock); 1697 1698 mlog(0, "This ctxt is not joined yet!\n"); 1699 wait_event_interruptible(dlm_domain_events, 1700 dlm_wait_on_domain_helper( 1701 domain)); 1702 goto retry; 1703 } 1704 1705 if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) { 1706 spin_unlock(&dlm_domain_lock); 1707 mlog(ML_ERROR, 1708 "Requested locking protocol version is not " 1709 "compatible with already registered domain " 1710 "\"%s\"\n", domain); 1711 ret = -EPROTO; 1712 goto leave; 1713 } 1714 1715 __dlm_get(dlm); 1716 dlm->num_joins++; 1717 1718 spin_unlock(&dlm_domain_lock); 1719 1720 ret = 0; 1721 goto leave; 1722 } 1723 1724 /* doesn't exist */ 1725 if (!new_ctxt) { 1726 spin_unlock(&dlm_domain_lock); 1727 1728 new_ctxt = dlm_alloc_ctxt(domain, key); 1729 if (new_ctxt) 1730 goto retry; 1731 1732 ret = -ENOMEM; 1733 mlog_errno(ret); 1734 goto leave; 1735 } 1736 1737 /* a little variable switch-a-roo here... */ 1738 dlm = new_ctxt; 1739 new_ctxt = NULL; 1740 1741 /* add the new domain */ 1742 list_add_tail(&dlm->list, &dlm_domains); 1743 spin_unlock(&dlm_domain_lock); 1744 1745 /* 1746 * Pass the locking protocol version into the join. If the join 1747 * succeeds, it will have the negotiated protocol set. 1748 */ 1749 dlm->dlm_locking_proto = dlm_protocol; 1750 dlm->fs_locking_proto = *fs_proto; 1751 1752 ret = dlm_join_domain(dlm); 1753 if (ret) { 1754 mlog_errno(ret); 1755 dlm_put(dlm); 1756 goto leave; 1757 } 1758 1759 /* Tell the caller what locking protocol we negotiated */ 1760 *fs_proto = dlm->fs_locking_proto; 1761 1762 ret = 0; 1763leave: 1764 if (new_ctxt) 1765 dlm_free_ctxt_mem(new_ctxt); 1766 1767 if (ret < 0) 1768 dlm = ERR_PTR(ret); 1769 1770 return dlm; 1771} 1772EXPORT_SYMBOL_GPL(dlm_register_domain); 1773 1774static LIST_HEAD(dlm_join_handlers); 1775 1776static void dlm_unregister_net_handlers(void) 1777{ 1778 o2net_unregister_handler_list(&dlm_join_handlers); 1779} 1780 1781static int dlm_register_net_handlers(void) 1782{ 1783 int status = 0; 1784 1785 status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, 1786 sizeof(struct dlm_query_join_request), 1787 dlm_query_join_handler, 1788 NULL, NULL, &dlm_join_handlers); 1789 if (status) 1790 goto bail; 1791 1792 status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 1793 sizeof(struct dlm_assert_joined), 1794 dlm_assert_joined_handler, 1795 NULL, NULL, &dlm_join_handlers); 1796 if (status) 1797 goto bail; 1798 1799 status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 1800 sizeof(struct dlm_cancel_join), 1801 dlm_cancel_join_handler, 1802 NULL, NULL, &dlm_join_handlers); 1803 1804bail: 1805 if (status < 0) 1806 dlm_unregister_net_handlers(); 1807 1808 return status; 1809} 1810 1811/* Domain eviction callback handling. 1812 * 1813 * The file system requires notification of node death *before* the 1814 * dlm completes it's recovery work, otherwise it may be able to 1815 * acquire locks on resources requiring recovery. Since the dlm can 1816 * evict a node from it's domain *before* heartbeat fires, a similar 1817 * mechanism is required. */ 1818 1819/* Eviction is not expected to happen often, so a per-domain lock is 1820 * not necessary. Eviction callbacks are allowed to sleep for short 1821 * periods of time. */ 1822static DECLARE_RWSEM(dlm_callback_sem); 1823 1824void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm, 1825 int node_num) 1826{ 1827 struct list_head *iter; 1828 struct dlm_eviction_cb *cb; 1829 1830 down_read(&dlm_callback_sem); 1831 list_for_each(iter, &dlm->dlm_eviction_callbacks) { 1832 cb = list_entry(iter, struct dlm_eviction_cb, ec_item); 1833 1834 cb->ec_func(node_num, cb->ec_data); 1835 } 1836 up_read(&dlm_callback_sem); 1837} 1838 1839void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb, 1840 dlm_eviction_func *f, 1841 void *data) 1842{ 1843 INIT_LIST_HEAD(&cb->ec_item); 1844 cb->ec_func = f; 1845 cb->ec_data = data; 1846} 1847EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb); 1848 1849void dlm_register_eviction_cb(struct dlm_ctxt *dlm, 1850 struct dlm_eviction_cb *cb) 1851{ 1852 down_write(&dlm_callback_sem); 1853 list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks); 1854 up_write(&dlm_callback_sem); 1855} 1856EXPORT_SYMBOL_GPL(dlm_register_eviction_cb); 1857 1858void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb) 1859{ 1860 down_write(&dlm_callback_sem); 1861 list_del_init(&cb->ec_item); 1862 up_write(&dlm_callback_sem); 1863} 1864EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb); 1865 1866static int __init dlm_init(void) 1867{ 1868 int status; 1869 1870 dlm_print_version(); 1871 1872 status = dlm_init_mle_cache(); 1873 if (status) { 1874 mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n"); 1875 goto error; 1876 } 1877 1878 status = dlm_init_master_caches(); 1879 if (status) { 1880 mlog(ML_ERROR, "Could not create o2dlm_lockres and " 1881 "o2dlm_lockname slabcaches\n"); 1882 goto error; 1883 } 1884 1885 status = dlm_init_lock_cache(); 1886 if (status) { 1887 mlog(ML_ERROR, "Count not create o2dlm_lock slabcache\n"); 1888 goto error; 1889 } 1890 1891 status = dlm_register_net_handlers(); 1892 if (status) { 1893 mlog(ML_ERROR, "Unable to register network handlers\n"); 1894 goto error; 1895 } 1896 1897 status = dlm_create_debugfs_root(); 1898 if (status) 1899 goto error; 1900 1901 return 0; 1902error: 1903 dlm_unregister_net_handlers(); 1904 dlm_destroy_lock_cache(); 1905 dlm_destroy_master_caches(); 1906 dlm_destroy_mle_cache(); 1907 return -1; 1908} 1909 1910static void __exit dlm_exit (void) 1911{ 1912 dlm_destroy_debugfs_root(); 1913 dlm_unregister_net_handlers(); 1914 dlm_destroy_lock_cache(); 1915 dlm_destroy_master_caches(); 1916 dlm_destroy_mle_cache(); 1917} 1918 1919MODULE_AUTHOR("Oracle"); 1920MODULE_LICENSE("GPL"); 1921 1922module_init(dlm_init); 1923module_exit(dlm_exit); 1924