Lines Matching defs:dlm

38 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
42 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
48 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
53 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
58 if (dlm != mle->dlm)
75 struct dlm_ctxt *dlm,
81 static int dlm_find_mle(struct dlm_ctxt *dlm,
89 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
93 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
97 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
104 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
106 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
108 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
111 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
150 * dlm's established heartbeat callbacks. the mle is attached
151 * when it is created, and since the dlm->spinlock is held at
154 * dlm->mle_hb_events list as soon as heartbeat events are no
161 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
164 assert_spin_locked(&dlm->spinlock);
166 list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
170 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
178 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
181 spin_lock(&dlm->spinlock);
182 __dlm_mle_detach_hb_events(dlm, mle);
183 spin_unlock(&dlm->spinlock);
188 struct dlm_ctxt *dlm;
189 dlm = mle->dlm;
191 assert_spin_locked(&dlm->spinlock);
192 assert_spin_locked(&dlm->master_lock);
199 struct dlm_ctxt *dlm;
200 dlm = mle->dlm;
202 spin_lock(&dlm->spinlock);
203 spin_lock(&dlm->master_lock);
206 spin_unlock(&dlm->master_lock);
207 spin_unlock(&dlm->spinlock);
214 struct dlm_ctxt *dlm;
215 dlm = mle->dlm;
217 assert_spin_locked(&dlm->spinlock);
218 assert_spin_locked(&dlm->master_lock);
233 struct dlm_ctxt *dlm;
234 dlm = mle->dlm;
236 spin_lock(&dlm->spinlock);
237 spin_lock(&dlm->master_lock);
239 spin_unlock(&dlm->master_lock);
240 spin_unlock(&dlm->spinlock);
250 struct dlm_ctxt *dlm,
255 assert_spin_locked(&dlm->spinlock);
257 mle->dlm = dlm;
289 atomic_inc(&dlm->mle_tot_count[mle->type]);
290 atomic_inc(&dlm->mle_cur_count[mle->type]);
293 bitmap_copy(mle->node_map, dlm->domain_map, O2NM_MAX_NODES);
294 bitmap_copy(mle->vote_map, dlm->domain_map, O2NM_MAX_NODES);
295 clear_bit(dlm->node_num, mle->vote_map);
296 clear_bit(dlm->node_num, mle->node_map);
299 __dlm_mle_attach_hb_events(dlm, mle);
302 void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
304 assert_spin_locked(&dlm->spinlock);
305 assert_spin_locked(&dlm->master_lock);
311 void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
315 assert_spin_locked(&dlm->master_lock);
317 bucket = dlm_master_hash(dlm, mle->mnamehash);
322 static int dlm_find_mle(struct dlm_ctxt *dlm,
330 assert_spin_locked(&dlm->master_lock);
333 bucket = dlm_master_hash(dlm, hash);
335 if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
344 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
348 assert_spin_locked(&dlm->spinlock);
350 list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
352 dlm_mle_node_up(dlm, mle, NULL, idx);
354 dlm_mle_node_down(dlm, mle, NULL, idx);
358 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
372 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
406 struct dlm_ctxt *dlm;
409 dlm = mle->dlm;
411 assert_spin_locked(&dlm->spinlock);
412 assert_spin_locked(&dlm->master_lock);
418 __dlm_unlink_mle(dlm, mle);
421 __dlm_mle_detach_hb_events(dlm, mle);
423 atomic_dec(&dlm->mle_cur_count[mle->type]);
467 struct dlm_ctxt *dlm;
470 dlm = res->dlm;
479 atomic_dec(&dlm->res_cur_count);
523 static void dlm_init_lockres(struct dlm_ctxt *dlm,
554 res->dlm = dlm;
558 atomic_inc(&dlm->res_tot_count);
559 atomic_inc(&dlm->res_cur_count);
563 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
570 spin_lock(&dlm->track_lock);
571 list_add_tail(&res->tracking, &dlm->tracking_list);
572 spin_unlock(&dlm->track_lock);
578 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
592 dlm_init_lockres(dlm, res, name, namelen);
601 void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
612 void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
623 static void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
628 mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
633 void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
637 __dlm_lockres_grab_inflight_ref(dlm, res);
640 void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
649 mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
656 void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
662 dlm->name, res->lockname.len, res->lockname.name,
666 static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
673 dlm->name, res->lockname.len, res->lockname.name,
677 static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
681 __dlm_lockres_drop_inflight_worker(dlm, res);
693 * also, do a lookup in the dlm->master_list to see
701 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
723 spin_lock(&dlm->spinlock);
724 tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
726 spin_unlock(&dlm->spinlock);
730 * Right after dlm spinlock was released, dlm_thread could have
753 BUG_ON(tmpres->owner == dlm->node_num);
763 dlm_lockres_grab_inflight_ref(dlm, tmpres);
767 spin_lock(&dlm->track_lock);
775 spin_unlock(&dlm->track_lock);
783 spin_unlock(&dlm->spinlock);
789 res = dlm_new_lockres(dlm, lockid, namelen);
801 dlm_change_lockres_owner(dlm, res, dlm->node_num);
802 __dlm_insert_lockres(dlm, res);
803 dlm_lockres_grab_inflight_ref(dlm, res);
805 spin_unlock(&dlm->spinlock);
811 spin_lock(&dlm->master_lock);
814 blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
831 BUG_ON(mig && mle->master == dlm->node_num);
835 dlm->name, namelen, lockid,
837 spin_unlock(&dlm->master_lock);
838 spin_unlock(&dlm->spinlock);
842 dlm_mle_detach_hb_events(dlm, mle);
856 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
857 set_bit(dlm->node_num, mle->maybe_map);
858 __dlm_insert_mle(dlm, mle);
860 /* still holding the dlm spinlock, check the recovery map
864 bit = find_first_bit(dlm->recovery_map, O2NM_MAX_NODES);
868 dlm->name, namelen, (char *)lockid, bit);
879 __dlm_insert_lockres(dlm, res);
882 __dlm_lockres_grab_inflight_ref(dlm, res);
889 spin_unlock(&dlm->master_lock);
890 spin_unlock(&dlm->spinlock);
895 * dlm spinlock would be detectable be a change on the mle,
899 "master $RECOVERY lock now\n", dlm->name);
900 if (!dlm_pre_master_reco_lockres(dlm, res))
904 "change\n", dlm->name);
910 dlm_kick_recovery_thread(dlm);
912 dlm_wait_for_recovery(dlm);
914 spin_lock(&dlm->spinlock);
915 bit = find_first_bit(dlm->recovery_map, O2NM_MAX_NODES);
919 dlm->name, namelen, (char *)lockid, bit);
923 spin_unlock(&dlm->spinlock);
926 dlm_wait_for_node_recovery(dlm, bit, 10000);
948 "master is %u, keep going\n", dlm->name, namelen,
955 ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
959 "request now, blocked=%d\n", dlm->name, res->lockname.len,
964 dlm->name, res->lockname.len,
973 mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
979 dlm_mle_detach_hb_events(dlm, mle);
1001 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
1018 mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
1023 if (res->owner != dlm->node_num) {
1048 dlm->name, res->lockname.len, res->lockname.name);
1049 ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1053 dlm->name, res->lockname.len, res->lockname.name,
1063 "rechecking now\n", dlm->name, res->lockname.len,
1069 "for %s:%.*s\n", dlm->name, res->lockname.len,
1083 if (dlm->node_num <= bit) {
1087 mle->master = dlm->node_num;
1108 mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1119 m = dlm->node_num;
1122 ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1140 dlm_change_lockres_owner(dlm, res, m);
1206 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1253 "now\n", dlm->name,
1269 dlm->name,
1287 set_bit(dlm->node_num, mle->maybe_map);
1309 struct dlm_ctxt *dlm = mle->dlm;
1314 request.node_idx = dlm->node_num;
1322 ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1358 "reference\n", dlm->name, res->lockname.len,
1393 * dlm->spinlock
1396 * dlm->master_list
1404 struct dlm_ctxt *dlm = data;
1415 if (!dlm_grab(dlm))
1418 if (!dlm_domain_fully_joined(dlm)) {
1433 spin_lock(&dlm->spinlock);
1434 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1436 spin_unlock(&dlm->spinlock);
1442 * Right after dlm spinlock was released, dlm_thread could have
1463 if (res->owner == dlm->node_num) {
1464 dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
1497 spin_lock(&dlm->master_lock);
1498 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1512 if (tmpmle->master == dlm->node_num) {
1523 if (tmpmle->master == dlm->node_num) {
1529 dlm_lockres_set_refmap_bit(dlm, res,
1542 spin_unlock(&dlm->master_lock);
1558 spin_lock(&dlm->master_lock);
1559 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1564 spin_unlock(&dlm->master_lock);
1565 spin_unlock(&dlm->spinlock);
1578 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1580 __dlm_insert_mle(dlm, mle);
1584 if (tmpmle->master == dlm->node_num) {
1600 spin_unlock(&dlm->master_lock);
1601 spin_unlock(&dlm->spinlock);
1616 dlm->node_num, res->lockname.len, res->lockname.name);
1618 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1627 __dlm_lockres_grab_inflight_worker(dlm, res);
1636 dlm_put(dlm);
1650 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1680 assert.node_idx = dlm->node_num;
1685 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1690 DLM_ASSERT_MASTER_MSG, dlm->key, to);
1704 spin_lock(&dlm->spinlock);
1705 spin_lock(&dlm->master_lock);
1706 if (dlm_find_mle(dlm, &mle, (char *)lockname,
1711 spin_unlock(&dlm->master_lock);
1712 spin_unlock(&dlm->spinlock);
1734 dlm_lockres_set_refmap_bit(dlm, res, to);
1752 * dlm->spinlock
1755 * dlm->master_list
1762 struct dlm_ctxt *dlm = data;
1772 if (!dlm_grab(dlm))
1785 spin_lock(&dlm->spinlock);
1791 spin_lock(&dlm->master_lock);
1792 if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1825 dlm->name, namelen, name,
1830 dlm->name, namelen, name,
1833 spin_unlock(&dlm->master_lock);
1834 spin_unlock(&dlm->spinlock);
1839 spin_unlock(&dlm->master_lock);
1843 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1916 if (nn != dlm->node_num && nn != assert->node_idx) {
1934 dlm->node_num, mle->new_master);
1937 dlm_change_lockres_owner(dlm, res, mle->new_master);
1940 dlm_change_lockres_owner(dlm, res, mle->master);
1951 spin_lock(&dlm->master_lock);
1968 "inuse=%d\n", dlm->name, namelen, name,
1972 __dlm_unlink_mle(dlm, mle);
1973 __dlm_mle_detach_hb_events(dlm, mle);
1982 spin_unlock(&dlm->master_lock);
1990 spin_unlock(&dlm->spinlock);
2000 dlm_put(dlm);
2008 assert->node_idx, dlm->name, namelen, name);
2015 dlm->name, namelen, name, assert->node_idx);
2025 spin_lock(&dlm->master_lock);
2028 spin_unlock(&dlm->master_lock);
2029 spin_unlock(&dlm->spinlock);
2031 dlm_put(dlm);
2049 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2060 dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2071 spin_lock(&dlm->work_lock);
2072 list_add_tail(&item->list, &dlm->work_list);
2073 spin_unlock(&dlm->work_lock);
2075 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2081 struct dlm_ctxt *dlm = data;
2090 dlm = item->dlm;
2096 spin_lock(&dlm->spinlock);
2097 bitmap_copy(nodemap, dlm->domain_map, O2NM_MAX_NODES);
2098 spin_unlock(&dlm->spinlock);
2100 clear_bit(dlm->node_num, nodemap);
2106 bit = dlm->node_num;
2136 res->lockname.len, res->lockname.name, dlm->node_num);
2137 ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2145 dlm_lockres_release_ast(dlm, res);
2148 dlm_lockres_drop_inflight_worker(dlm, res);
2165 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2173 spin_lock(&dlm->spinlock);
2174 dlm_node_iter_init(dlm->domain_map, &iter);
2175 spin_unlock(&dlm->spinlock);
2179 if (nodenum == dlm->node_num)
2181 ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2193 spin_lock(&dlm->spinlock);
2194 if (test_bit(master, dlm->recovery_map)) {
2198 "lock. must wait.\n", dlm->name,
2202 spin_unlock(&dlm->spinlock);
2203 mlog(0, "%s: reco lock master is %u\n", dlm->name,
2215 int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2227 deref.node_idx = dlm->node_num;
2231 ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2235 dlm->name, namelen, lockname, ret, res->owner);
2239 dlm->name, namelen, lockname, res->owner, r);
2252 struct dlm_ctxt *dlm = data;
2264 if (!dlm_grab(dlm))
2282 spin_lock(&dlm->spinlock);
2283 res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2285 spin_unlock(&dlm->spinlock);
2287 dlm->name, namelen, name);
2290 spin_unlock(&dlm->spinlock);
2298 dlm_lockres_clear_refmap_bit(dlm, res, node);
2306 dlm_lockres_calc_usage(dlm, res);
2309 "but it is already dropped!\n", dlm->name,
2324 dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2328 spin_lock(&dlm->work_lock);
2329 list_add_tail(&item->list, &dlm->work_list);
2330 spin_unlock(&dlm->work_lock);
2332 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2338 dlm_put(dlm);
2346 struct dlm_ctxt *dlm = data;
2356 if (!dlm_grab(dlm))
2374 spin_lock(&dlm->spinlock);
2375 res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2377 spin_unlock(&dlm->spinlock);
2379 dlm->name, namelen, name);
2386 spin_unlock(&dlm->spinlock);
2388 "but it is already derefed!\n", dlm->name,
2394 __dlm_do_purge_lockres(dlm, res);
2398 spin_unlock(&dlm->spinlock);
2404 dlm_put(dlm);
2408 static void dlm_drop_lockres_ref_done(struct dlm_ctxt *dlm,
2421 deref.node_idx = dlm->node_num;
2425 ret = o2net_send_message(DLM_DEREF_LOCKRES_DONE, dlm->key,
2429 " to node %u\n", dlm->name, namelen,
2434 dlm->name, namelen, lockname, node, r);
2441 struct dlm_ctxt *dlm;
2446 dlm = item->dlm;
2454 dlm_lockres_clear_refmap_bit(dlm, res, node);
2459 dlm_drop_lockres_ref_done(dlm, res, node);
2463 dlm->name, res->lockname.len, res->lockname.name, node);
2464 dlm_lockres_calc_usage(dlm, res);
2467 "but it is already dropped!\n", dlm->name,
2482 static int dlm_is_lockres_migratable(struct dlm_ctxt *dlm,
2502 if (res->owner != dlm->node_num)
2508 if (lock->ml.node != dlm->node_num) {
2514 "%s list\n", dlm->name, res->lockname.len,
2529 mlog(0, "%s: res %.*s, Migratable\n", dlm->name, res->lockname.len,
2540 static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2552 if (!dlm_grab(dlm))
2558 mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
2580 spin_lock(&dlm->spinlock);
2581 spin_lock(&dlm->master_lock);
2582 ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2583 namelen, target, dlm->node_num);
2591 spin_unlock(&dlm->master_lock);
2592 spin_unlock(&dlm->spinlock);
2604 if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2618 dlm_mle_detach_hb_events(dlm, oldmle);
2624 dlm_mle_detach_hb_events(dlm, mle);
2642 flush_workqueue(dlm->dlm_worker);
2648 ret = dlm_send_one_lockres(dlm, res, mres, target,
2655 dlm_mle_detach_hb_events(dlm, mle);
2663 dlm_wait_for_node_death(dlm, target,
2691 dlm->name, res->lockname.len, res->lockname.name);
2694 if (dlm_is_node_dead(dlm, target)) {
2697 dlm->name, res->lockname.len,
2701 dlm_mle_detach_hb_events(dlm, mle);
2712 dlm->name, res->lockname.len, res->lockname.name);
2717 dlm_set_lockres_owner(dlm, res, target);
2719 dlm_remove_nonlocal_locks(dlm, res);
2724 dlm_mle_detach_hb_events(dlm, mle);
2728 dlm_lockres_calc_usage(dlm, res);
2733 dlm_kick_thread(dlm, res);
2743 dlm_put(dlm);
2745 mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
2755 * Called with the dlm spinlock held, may drop it to do migration, but
2758 * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
2760 int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2761 __must_hold(&dlm->spinlock)
2767 assert_spin_locked(&dlm->spinlock);
2770 if (dlm_is_lockres_migratable(dlm, res))
2771 target = dlm_pick_migration_target(dlm, res);
2778 spin_unlock(&dlm->spinlock);
2780 ret = dlm_migrate_lockres(dlm, res, target);
2783 dlm->name, res->lockname.len, res->lockname.name,
2785 spin_lock(&dlm->spinlock);
2790 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2793 spin_lock(&dlm->ast_lock);
2797 spin_unlock(&dlm->ast_lock);
2801 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2812 spin_lock(&dlm->spinlock);
2813 if (!test_bit(mig_target, dlm->domain_map))
2815 spin_unlock(&dlm->spinlock);
2819 static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2830 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2837 res->lockname.len, res->lockname.name, dlm->node_num,
2850 dlm_kick_thread(dlm, res);
2858 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2859 dlm_lockres_release_ast(dlm, res);
2867 ret = wait_event_interruptible_timeout(dlm->migration_wq,
2868 dlm_migration_can_proceed(dlm, res, target),
2873 test_bit(target, dlm->domain_map) ? "no":"yes");
2877 test_bit(target, dlm->domain_map) ? "no":"yes");
2879 if (!dlm_migration_can_proceed(dlm, res, target)) {
2886 spin_lock(&dlm->spinlock);
2887 if (!test_bit(target, dlm->domain_map)) {
2892 spin_unlock(&dlm->spinlock);
2922 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2931 BUG_ON(res->owner == dlm->node_num);
2935 if (lock->ml.node != dlm->node_num) {
2943 dlm_lockres_clear_refmap_bit(dlm, res,
2961 if (bit != dlm->node_num) {
2963 "migrating lockres, clearing\n", dlm->name,
2965 dlm_lockres_clear_refmap_bit(dlm, res, bit);
2976 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2985 assert_spin_locked(&dlm->spinlock);
2992 if (lock->ml.node == dlm->node_num)
2994 if (test_bit(lock->ml.node, dlm->exit_domain_map))
3008 if (noderef == dlm->node_num)
3010 if (test_bit(noderef, dlm->exit_domain_map))
3022 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
3046 spin_lock(&dlm->spinlock);
3047 skip = (!test_bit(nodenum, dlm->domain_map));
3048 spin_unlock(&dlm->spinlock);
3054 ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
3059 "MIGRATE_REQUEST to node %u\n", dlm->name,
3076 dlm->name, res->lockname.len, res->lockname.name,
3079 dlm_lockres_set_refmap_bit(dlm, res, nodenum);
3102 struct dlm_ctxt *dlm = data;
3110 if (!dlm_grab(dlm))
3126 spin_lock(&dlm->spinlock);
3127 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3145 spin_lock(&dlm->master_lock);
3147 ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3155 spin_unlock(&dlm->master_lock);
3157 spin_unlock(&dlm->spinlock);
3161 dlm_mle_detach_hb_events(dlm, oldmle);
3168 dlm_put(dlm);
3172 /* must be holding dlm->spinlock and dlm->master_lock
3179 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3191 assert_spin_locked(&dlm->spinlock);
3192 assert_spin_locked(&dlm->master_lock);
3195 found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3200 if (master == dlm->node_num) {
3224 __dlm_unlink_mle(dlm, tmp);
3225 __dlm_mle_detach_hb_events(dlm, tmp);
3231 "migration\n", dlm->name,
3240 dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3247 __dlm_insert_mle(dlm, mle);
3255 static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3261 res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3264 spin_unlock(&dlm->master_lock);
3268 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3269 dlm_move_lockres_to_recovery_list(dlm, res);
3274 __dlm_mle_detach_hb_events(dlm, mle);
3277 spin_lock(&dlm->master_lock);
3279 spin_unlock(&dlm->master_lock);
3285 static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3288 __dlm_mle_detach_hb_events(dlm, mle);
3291 __dlm_unlink_mle(dlm, mle);
3298 static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3322 __dlm_mle_detach_hb_events(dlm, mle);
3327 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3335 mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
3337 assert_spin_locked(&dlm->spinlock);
3340 spin_lock(&dlm->master_lock);
3342 bucket = dlm_master_hash(dlm, i);
3358 dlm_clean_block_mle(dlm, mle, dead_node);
3381 dlm->name, dead_node,
3388 dlm_clean_migration_mle(dlm, mle);
3391 "%u to %u!\n", dlm->name, dead_node, mle->master,
3399 res = dlm_reset_mleres_owner(dlm, mle);
3408 spin_unlock(&dlm->master_lock);
3411 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3417 spin_lock(&dlm->spinlock);
3418 dlm_node_iter_init(dlm->domain_map, &iter);
3420 clear_bit(dlm->node_num, iter.node_map);
3421 spin_unlock(&dlm->spinlock);
3427 dlm_lockres_set_refmap_bit(dlm, res, old_master);
3431 ret = dlm_do_migrate_request(dlm, res, old_master,
3432 dlm->node_num, &iter);
3442 ret = dlm_do_assert_master(dlm, res, iter.node_map,
3454 ret = dlm_do_assert_master(dlm, res, iter.node_map,
3466 dlm_set_lockres_owner(dlm, res, dlm->node_num);
3470 dlm_kick_thread(dlm, res);
3509 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3525 wake_up(&dlm->migration_wq);
3528 void dlm_force_free_mles(struct dlm_ctxt *dlm)
3537 * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
3541 spin_lock(&dlm->spinlock);
3542 spin_lock(&dlm->master_lock);
3544 BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
3545 BUG_ON((find_first_bit(dlm->domain_map, O2NM_MAX_NODES) < O2NM_MAX_NODES));
3548 bucket = dlm_master_hash(dlm, i);
3557 __dlm_unlink_mle(dlm, mle);
3558 __dlm_mle_detach_hb_events(dlm, mle);
3562 spin_unlock(&dlm->master_lock);
3563 spin_unlock(&dlm->spinlock);