• Home
  • History
  • Annotate
  • Raw
  • Download
  • only in /netgear-WNDR4500v2-V1.0.0.60_1.0.38/src/linux/linux-2.6/fs/dlm/

Lines Matching defs:lkb

18    request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
48 given rsb and lkb and queues callbacks.
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
157 void dlm_print_lkb(struct dlm_lkb *lkb)
159 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
161 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
162 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
163 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
175 struct dlm_lkb *lkb;
182 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
183 dlm_print_lkb(lkb);
185 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
186 dlm_print_lkb(lkb);
188 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
189 dlm_print_lkb(lkb);
191 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
192 dlm_print_lkb(lkb);
212 static inline int can_be_queued(struct dlm_lkb *lkb)
214 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 static inline int force_blocking_asts(struct dlm_lkb *lkb)
219 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 static inline int is_demoted(struct dlm_lkb *lkb)
224 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 static inline int is_altmode(struct dlm_lkb *lkb)
229 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 static inline int is_granted(struct dlm_lkb *lkb)
234 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
243 static inline int is_process_copy(struct dlm_lkb *lkb)
245 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 static inline int is_master_copy(struct dlm_lkb *lkb)
250 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
251 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
252 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 static inline int middle_conversion(struct dlm_lkb *lkb)
257 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
258 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
263 static inline int down_conversion(struct dlm_lkb *lkb)
265 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
270 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
275 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 static inline int is_overlap(struct dlm_lkb *lkb)
280 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
286 if (is_master_copy(lkb))
289 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
291 lkb->lkb_lksb->sb_status = rv;
292 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
294 dlm_add_ast(lkb, AST_COMP);
297 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
299 queue_cast(r, lkb,
300 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
303 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
305 if (is_master_copy(lkb))
306 send_bast(r, lkb, rqmode);
308 lkb->lkb_bastmode = rqmode;
309 dlm_add_ast(lkb, AST_BAST);
314 * Basic operations on rsb's and lkb's
552 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
553 The rsb must exist as long as any lkb's for it do. */
555 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
558 lkb->lkb_resource = r;
561 static void detach_lkb(struct dlm_lkb *lkb)
563 if (lkb->lkb_resource) {
564 put_rsb(lkb->lkb_resource);
565 lkb->lkb_resource = NULL;
571 struct dlm_lkb *lkb, *tmp;
575 lkb = allocate_lkb(ls);
576 if (!lkb)
579 lkb->lkb_nodeid = -1;
580 lkb->lkb_grmode = DLM_LOCK_IV;
581 kref_init(&lkb->lkb_ref);
582 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
583 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
604 lkb->lkb_id = lkid;
605 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
608 *lkb_ret = lkb;
614 struct dlm_lkb *lkb;
617 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
618 if (lkb->lkb_id == lkid)
619 return lkb;
626 struct dlm_lkb *lkb;
633 lkb = __find_lkb(ls, lkid);
634 if (lkb)
635 kref_get(&lkb->lkb_ref);
638 *lkb_ret = lkb;
639 return lkb ? 0 : -ENOENT;
644 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
649 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
652 /* __put_lkb() is used when an lkb may not have an rsb attached to
655 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
657 uint16_t bucket = (lkb->lkb_id >> 16);
660 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
661 list_del(&lkb->lkb_idtbl_list);
664 detach_lkb(lkb);
667 if (lkb->lkb_lvbptr && is_master_copy(lkb))
668 free_lvb(lkb->lkb_lvbptr);
669 free_lkb(lkb);
677 int dlm_put_lkb(struct dlm_lkb *lkb)
681 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
682 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
684 ls = lkb->lkb_resource->res_ls;
685 return __put_lkb(ls, lkb);
689 a valid reference to the lkb, so there's no need for locking. */
691 static inline void hold_lkb(struct dlm_lkb *lkb)
693 kref_get(&lkb->lkb_ref);
701 static inline void unhold_lkb(struct dlm_lkb *lkb)
704 rv = kref_put(&lkb->lkb_ref, kill_lkb);
705 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
711 struct dlm_lkb *lkb = NULL;
713 list_for_each_entry(lkb, head, lkb_statequeue)
714 if (lkb->lkb_rqmode < mode)
717 if (!lkb)
720 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
723 /* add/remove lkb to rsb's grant/convert/wait queue */
725 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
727 kref_get(&lkb->lkb_ref);
729 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
731 lkb->lkb_status = status;
735 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
736 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
738 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
742 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
743 lkb->lkb_grmode);
746 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
747 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
749 list_add_tail(&lkb->lkb_statequeue,
753 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
757 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
759 lkb->lkb_status = 0;
760 list_del(&lkb->lkb_statequeue);
761 unhold_lkb(lkb);
764 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
766 hold_lkb(lkb);
767 del_lkb(r, lkb);
768 add_lkb(r, lkb, sts);
769 unhold_lkb(lkb);
789 /* add/remove lkb from global waiters list of lkb's waiting for
792 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
794 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
799 if (is_overlap_unlock(lkb) ||
800 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
805 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
808 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
811 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
817 lkb->lkb_wait_count++;
818 hold_lkb(lkb);
821 lkb->lkb_id, lkb->lkb_wait_type, mstype,
822 lkb->lkb_wait_count, lkb->lkb_flags);
826 DLM_ASSERT(!lkb->lkb_wait_count,
827 dlm_print_lkb(lkb);
828 printk("wait_count %d\n", lkb->lkb_wait_count););
830 lkb->lkb_wait_count++;
831 lkb->lkb_wait_type = mstype;
832 hold_lkb(lkb);
833 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
837 lkb->lkb_id, error, lkb->lkb_flags, mstype,
838 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
843 /* We clear the RESEND flag because we might be taking an lkb off the waiters
848 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
850 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
853 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
854 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
859 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
860 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
868 if (lkb->lkb_wait_type) {
869 lkb->lkb_wait_type = 0;
874 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
879 if (overlap_done && lkb->lkb_wait_type) {
881 lkb->lkb_id, mstype, lkb->lkb_wait_type);
882 lkb->lkb_wait_count--;
883 lkb->lkb_wait_type = 0;
886 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
888 lkb->lkb_flags &= ~DLM_IFL_RESEND;
889 lkb->lkb_wait_count--;
890 if (!lkb->lkb_wait_count)
891 list_del_init(&lkb->lkb_wait_reply);
892 unhold_lkb(lkb);
896 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
898 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
902 error = _remove_from_waiters(lkb, mstype);
910 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
912 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
917 error = _remove_from_waiters(lkb, ms->m_type);
991 /* lkb is master or local copy */
993 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1001 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1004 if (!lkb->lkb_lvbptr)
1007 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1013 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1014 lkb->lkb_lvbseq = r->res_lvbseq;
1017 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1022 if (!lkb->lkb_lvbptr)
1025 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1034 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1036 lkb->lkb_lvbseq = r->res_lvbseq;
1041 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1044 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1046 if (lkb->lkb_grmode < DLM_LOCK_PW)
1049 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1054 if (!lkb->lkb_lvbptr)
1057 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1066 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1071 /* lkb is process copy (pc) */
1073 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1078 if (!lkb->lkb_lvbptr)
1081 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1084 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1087 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1088 lkb->lkb_lvbseq = ms->m_lvbseq;
1092 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1093 remove_lock -- used for unlock, removes lkb from granted
1094 revert_lock -- used for cancel, moves lkb from convert to granted
1095 grant_lock -- used for request and convert, adds lkb to granted or
1096 moves lkb from convert or waiting to granted
1098 Each of these is used for master or local copy lkb's. There is
1100 a process copy (pc) lkb. */
1102 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1104 del_lkb(r, lkb);
1105 lkb->lkb_grmode = DLM_LOCK_IV;
1107 so this leads to the lkb being freed */
1108 unhold_lkb(lkb);
1111 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1113 set_lvb_unlock(r, lkb);
1114 _remove_lock(r, lkb);
1117 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1119 _remove_lock(r, lkb);
1126 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1130 lkb->lkb_rqmode = DLM_LOCK_IV;
1132 switch (lkb->lkb_status) {
1136 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1140 del_lkb(r, lkb);
1141 lkb->lkb_grmode = DLM_LOCK_IV;
1143 so this leads to the lkb being freed */
1144 unhold_lkb(lkb);
1148 log_print("invalid status for revert %d", lkb->lkb_status);
1153 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1155 return revert_lock(r, lkb);
1158 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1160 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1161 lkb->lkb_grmode = lkb->lkb_rqmode;
1162 if (lkb->lkb_status)
1163 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1165 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1168 lkb->lkb_rqmode = DLM_LOCK_IV;
1171 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1173 set_lvb_lock(r, lkb);
1174 _grant_lock(r, lkb);
1175 lkb->lkb_highbast = 0;
1178 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1181 set_lvb_lock_pc(r, lkb, ms);
1182 _grant_lock(r, lkb);
1187 lkb belongs to a remote node. */
1189 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1191 grant_lock(r, lkb);
1192 if (is_master_copy(lkb))
1193 send_grant(r, lkb);
1195 queue_cast(r, lkb, 0);
1206 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1210 lkb->lkb_id, ms->m_type);
1214 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1216 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1220 lkb->lkb_grmode = DLM_LOCK_NL;
1223 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1228 lkb->lkb_id, ms->m_type);
1232 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1233 lkb->lkb_rqmode = DLM_LOCK_PR;
1234 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1235 lkb->lkb_rqmode = DLM_LOCK_CW;
1237 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1238 dlm_print_lkb(lkb);
1242 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1246 if (lkb->lkb_id == first->lkb_id)
1252 /* Check if the given lkb conflicts with another lkb on the queue. */
1254 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1259 if (this == lkb)
1261 if (!modes_compat(this, lkb))
1272 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1273 * convert queue from being granted, then demote lkb (set grmode to NL).
1284 * list. We demote the granted mode of the second lock (the lkb passed to this
1292 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1299 if (this == lkb) {
1300 self = lkb;
1304 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1308 /* if lkb is on the convert queue and is preventing the first
1309 from being granted, then there's deadlock and we demote lkb.
1314 if (!modes_compat(lkb, first) &&
1326 * lkb is the lock to be granted
1335 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1337 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1354 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1358 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1362 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1366 if (queue_conflict(&r->res_grantqueue, lkb))
1375 if (queue_conflict(&r->res_convertqueue, lkb))
1401 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1409 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1418 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1445 first_in_list(lkb, &r->res_waitqueue))
1453 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1454 conversion_deadlock_detect(r, lkb)) {
1455 lkb->lkb_grmode = DLM_LOCK_NL;
1456 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1467 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1469 uint32_t flags = lkb->lkb_exflags;
1471 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1473 rv = _can_be_granted(r, lkb, now);
1477 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1486 lkb->lkb_rqmode = alt;
1487 rv = _can_be_granted(r, lkb, now);
1489 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1491 lkb->lkb_rqmode = rqmode;
1499 struct dlm_lkb *lkb, *s;
1508 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1509 demoted = is_demoted(lkb);
1510 if (can_be_granted(r, lkb, 0)) {
1511 grant_lock_pending(r, lkb);
1514 hi = max_t(int, lkb->lkb_rqmode, hi);
1515 if (!demoted && is_demoted(lkb))
1532 struct dlm_lkb *lkb, *s;
1534 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1535 if (can_be_granted(r, lkb, 0))
1536 grant_lock_pending(r, lkb);
1538 high = max_t(int, lkb->lkb_rqmode, high);
1546 struct dlm_lkb *lkb, *s;
1558 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1559 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1560 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1561 queue_bast(r, lkb, high);
1562 lkb->lkb_highbast = high;
1568 struct dlm_lkb *lkb)
1574 gr->lkb_highbast < lkb->lkb_rqmode &&
1575 !modes_compat(gr, lkb)) {
1576 queue_bast(r, gr, lkb->lkb_rqmode);
1577 gr->lkb_highbast = lkb->lkb_rqmode;
1582 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1584 send_bast_queue(r, &r->res_grantqueue, lkb);
1587 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1589 send_bast_queue(r, &r->res_grantqueue, lkb);
1590 send_bast_queue(r, &r->res_convertqueue, lkb);
1593 /* set_master(r, lkb) -- set the master nodeid of a resource
1596 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1597 known, it can just be copied to the lkb and the function will return
1599 before it can be copied to the lkb.
1601 When the rsb nodeid is being looked up remotely, the initial lkb
1603 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1607 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1608 1: the rsb master is not available and the lkb has been placed on
1612 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1619 r->res_first_lkid = lkb->lkb_id;
1620 lkb->lkb_nodeid = r->res_nodeid;
1624 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1625 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1630 lkb->lkb_nodeid = 0;
1635 lkb->lkb_nodeid = r->res_nodeid;
1644 r->res_first_lkid = lkb->lkb_id;
1645 send_lookup(r, lkb);
1667 lkb->lkb_nodeid = 0;
1669 r->res_first_lkid = lkb->lkb_id;
1671 lkb->lkb_nodeid = ret_nodeid;
1678 struct dlm_lkb *lkb, *safe;
1680 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1681 list_del_init(&lkb->lkb_rsb_lookup);
1682 _request_lock(r, lkb);
1691 struct dlm_lkb *lkb;
1705 make a waiting lkb the first_lkid */
1710 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1712 list_del_init(&lkb->lkb_rsb_lookup);
1713 r->res_first_lkid = lkb->lkb_id;
1714 _request_lock(r, lkb);
1775 /* these args will be copied to the lkb in validate_lock_args,
1777 an active lkb cannot be modified before locking the rsb */
1804 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1810 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1814 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1818 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1821 if (lkb->lkb_wait_type)
1824 if (is_overlap(lkb))
1828 lkb->lkb_exflags = args->flags;
1829 lkb->lkb_sbflags = 0;
1830 lkb->lkb_astaddr = args->astaddr;
1831 lkb->lkb_astparam = args->astparam;
1832 lkb->lkb_bastaddr = args->bastaddr;
1833 lkb->lkb_rqmode = args->mode;
1834 lkb->lkb_lksb = args->lksb;
1835 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1836 lkb->lkb_ownpid = (int) current->pid;
1849 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1851 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1854 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1855 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1856 dlm_print_lkb(lkb);
1860 /* an lkb may still exist even though the lock is EOL'ed due to a
1864 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1865 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1870 /* an lkb may be waiting for an rsb lookup to complete where the
1874 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1875 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1876 list_del_init(&lkb->lkb_rsb_lookup);
1877 queue_cast(lkb->lkb_resource, lkb,
1880 unhold_lkb(lkb); /* undoes create_lkb() */
1889 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1892 if (is_overlap(lkb))
1895 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1896 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1901 switch (lkb->lkb_wait_type) {
1904 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1920 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1923 if (is_overlap_unlock(lkb))
1926 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1927 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1932 switch (lkb->lkb_wait_type) {
1935 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1947 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
1952 lkb->lkb_exflags |= args->flags;
1953 lkb->lkb_sbflags = 0;
1954 lkb->lkb_astparam = args->astparam;
1959 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1960 args->flags, lkb->lkb_wait_type,
1961 lkb->lkb_resource->res_name);
1972 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1976 if (can_be_granted(r, lkb, 1)) {
1977 grant_lock(r, lkb);
1978 queue_cast(r, lkb, 0);
1982 if (can_be_queued(lkb)) {
1984 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1985 send_blocking_asts(r, lkb);
1990 if (force_blocking_asts(lkb))
1991 send_blocking_asts_all(r, lkb);
1992 queue_cast(r, lkb, -EAGAIN);
1998 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2004 if (can_be_granted(r, lkb, 1)) {
2005 grant_lock(r, lkb);
2006 queue_cast(r, lkb, 0);
2017 if (is_demoted(lkb)) {
2019 if (_can_be_granted(r, lkb, 1)) {
2020 grant_lock(r, lkb);
2021 queue_cast(r, lkb, 0);
2028 if (can_be_queued(lkb)) {
2030 del_lkb(r, lkb);
2031 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2032 send_blocking_asts(r, lkb);
2037 if (force_blocking_asts(lkb))
2038 send_blocking_asts_all(r, lkb);
2039 queue_cast(r, lkb, -EAGAIN);
2045 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2047 remove_lock(r, lkb);
2048 queue_cast(r, lkb, -DLM_EUNLOCK);
2055 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2059 error = revert_lock(r, lkb);
2061 queue_cast(r, lkb, -DLM_ECANCEL);
2073 /* add a new lkb to a possibly new rsb, called by requesting process */
2075 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2079 /* set_master: sets lkb nodeid from r */
2081 error = set_master(r, lkb);
2091 error = send_request(r, lkb);
2093 error = do_request(r, lkb);
2098 /* change some property of an existing lkb, e.g. mode */
2100 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2106 error = send_convert(r, lkb);
2108 error = do_convert(r, lkb);
2113 /* remove an existing lkb from the granted queue */
2115 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2121 error = send_unlock(r, lkb);
2123 error = do_unlock(r, lkb);
2128 /* remove an existing lkb from the convert or wait queue */
2130 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2136 error = send_cancel(r, lkb);
2138 error = do_cancel(r, lkb);
2148 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2154 error = validate_lock_args(ls, lkb, args);
2164 attach_lkb(r, lkb);
2165 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2167 error = _request_lock(r, lkb);
2176 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2182 r = lkb->lkb_resource;
2187 error = validate_lock_args(ls, lkb, args);
2191 error = _convert_lock(r, lkb);
2198 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2204 r = lkb->lkb_resource;
2209 error = validate_unlock_args(lkb, args);
2213 error = _unlock_lock(r, lkb);
2220 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2226 r = lkb->lkb_resource;
2231 error = validate_unlock_args(lkb, args);
2235 error = _cancel_lock(r, lkb);
2258 struct dlm_lkb *lkb;
2269 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2271 error = create_lkb(ls, &lkb);
2282 error = convert_lock(ls, lkb, &args);
2284 error = request_lock(ls, lkb, name, namelen, &args);
2290 __put_lkb(ls, lkb);
2306 struct dlm_lkb *lkb;
2316 error = find_lkb(ls, lkid, &lkb);
2325 error = cancel_lock(ls, lkb, &args);
2327 error = unlock_lock(ls, lkb, &args);
2334 dlm_put_lkb(lkb);
2397 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2415 if (lkb && lkb->lkb_lvbptr)
2434 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2437 ms->m_nodeid = lkb->lkb_nodeid;
2438 ms->m_pid = lkb->lkb_ownpid;
2439 ms->m_lkid = lkb->lkb_id;
2440 ms->m_remid = lkb->lkb_remid;
2441 ms->m_exflags = lkb->lkb_exflags;
2442 ms->m_sbflags = lkb->lkb_sbflags;
2443 ms->m_flags = lkb->lkb_flags;
2444 ms->m_lvbseq = lkb->lkb_lvbseq;
2445 ms->m_status = lkb->lkb_status;
2446 ms->m_grmode = lkb->lkb_grmode;
2447 ms->m_rqmode = lkb->lkb_rqmode;
2451 not from lkb fields */
2453 if (lkb->lkb_bastaddr)
2455 if (lkb->lkb_astaddr)
2471 if (!lkb->lkb_lvbptr)
2473 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2478 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2484 error = add_to_waiters(lkb, mstype);
2490 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2494 send_args(r, lkb, ms);
2502 remove_from_waiters(lkb, msg_reply_type(mstype));
2506 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2508 return send_common(r, lkb, DLM_MSG_REQUEST);
2511 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2515 error = send_common(r, lkb, DLM_MSG_CONVERT);
2518 if (!error && down_conversion(lkb)) {
2519 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2522 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2523 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2530 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2532 return send_common(r, lkb, DLM_MSG_UNLOCK);
2535 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2537 return send_common(r, lkb, DLM_MSG_CANCEL);
2540 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2546 to_nodeid = lkb->lkb_nodeid;
2548 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2552 send_args(r, lkb, ms);
2561 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2567 to_nodeid = lkb->lkb_nodeid;
2573 send_args(r, lkb, ms);
2582 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2588 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2598 send_args(r, lkb, ms);
2606 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2630 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2637 to_nodeid = lkb->lkb_nodeid;
2639 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2643 send_args(r, lkb, ms);
2652 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2654 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2657 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2659 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2662 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2664 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2667 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2669 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2695 the lkb for any type of message */
2697 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2699 lkb->lkb_exflags = ms->m_exflags;
2700 lkb->lkb_sbflags = ms->m_sbflags;
2701 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2705 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2707 lkb->lkb_sbflags = ms->m_sbflags;
2708 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2717 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2722 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2723 if (!lkb->lkb_lvbptr)
2724 lkb->lkb_lvbptr = allocate_lvb(ls);
2725 if (!lkb->lkb_lvbptr)
2728 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2733 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2736 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2737 lkb->lkb_ownpid = ms->m_pid;
2738 lkb->lkb_remid = ms->m_lkid;
2739 lkb->lkb_grmode = DLM_LOCK_IV;
2740 lkb->lkb_rqmode = ms->m_rqmode;
2741 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2742 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2744 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2746 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2747 /* lkb was just created so there won't be an lvb yet */
2748 lkb->lkb_lvbptr = allocate_lvb(ls);
2749 if (!lkb->lkb_lvbptr)
2756 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2759 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2761 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2762 lkb->lkb_id, lkb->lkb_remid);
2766 if (!is_master_copy(lkb))
2769 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2772 if (receive_lvb(ls, lkb, ms))
2775 lkb->lkb_rqmode = ms->m_rqmode;
2776 lkb->lkb_lvbseq = ms->m_lvbseq;
2781 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2784 if (!is_master_copy(lkb))
2786 if (receive_lvb(ls, lkb, ms))
2791 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2796 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2797 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2798 lkb->lkb_remid = ms->m_lkid;
2803 struct dlm_lkb *lkb;
2807 error = create_lkb(ls, &lkb);
2811 receive_flags(lkb, ms);
2812 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2813 error = receive_request_args(ls, lkb, ms);
2815 __put_lkb(ls, lkb);
2823 __put_lkb(ls, lkb);
2829 attach_lkb(r, lkb);
2830 error = do_request(r, lkb);
2831 send_request_reply(r, lkb, error);
2839 dlm_put_lkb(lkb);
2849 struct dlm_lkb *lkb;
2853 error = find_lkb(ls, ms->m_remid, &lkb);
2857 r = lkb->lkb_resource;
2862 receive_flags(lkb, ms);
2863 error = receive_convert_args(ls, lkb, ms);
2866 reply = !down_conversion(lkb);
2868 error = do_convert(r, lkb);
2871 send_convert_reply(r, lkb, error);
2875 dlm_put_lkb(lkb);
2885 struct dlm_lkb *lkb;
2889 error = find_lkb(ls, ms->m_remid, &lkb);
2893 r = lkb->lkb_resource;
2898 receive_flags(lkb, ms);
2899 error = receive_unlock_args(ls, lkb, ms);
2903 error = do_unlock(r, lkb);
2905 send_unlock_reply(r, lkb, error);
2909 dlm_put_lkb(lkb);
2919 struct dlm_lkb *lkb;
2923 error = find_lkb(ls, ms->m_remid, &lkb);
2927 receive_flags(lkb, ms);
2929 r = lkb->lkb_resource;
2934 error = do_cancel(r, lkb);
2935 send_cancel_reply(r, lkb, error);
2939 dlm_put_lkb(lkb);
2949 struct dlm_lkb *lkb;
2953 error = find_lkb(ls, ms->m_remid, &lkb);
2955 log_error(ls, "receive_grant no lkb");
2958 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2960 r = lkb->lkb_resource;
2965 receive_flags_reply(lkb, ms);
2966 if (is_altmode(lkb))
2967 munge_altmode(lkb, ms);
2968 grant_lock_pc(r, lkb, ms);
2969 queue_cast(r, lkb, 0);
2973 dlm_put_lkb(lkb);
2978 struct dlm_lkb *lkb;
2982 error = find_lkb(ls, ms->m_remid, &lkb);
2984 log_error(ls, "receive_bast no lkb");
2987 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2989 r = lkb->lkb_resource;
2994 queue_bast(r, lkb, ms->m_bastmode);
2998 dlm_put_lkb(lkb);
3055 struct dlm_lkb *lkb;
3059 error = find_lkb(ls, ms->m_remid, &lkb);
3061 log_error(ls, "receive_request_reply no lkb");
3064 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3066 r = lkb->lkb_resource;
3070 mstype = lkb->lkb_wait_type;
3071 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3079 lkb->lkb_nodeid = r->res_nodeid;
3088 queue_cast(r, lkb, -EAGAIN);
3090 unhold_lkb(lkb); /* undoes create_lkb() */
3096 receive_flags_reply(lkb, ms);
3097 lkb->lkb_remid = ms->m_lkid;
3098 if (is_altmode(lkb))
3099 munge_altmode(lkb, ms);
3101 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3103 grant_lock_pc(r, lkb, ms);
3104 queue_cast(r, lkb, 0);
3113 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3115 lkb->lkb_nodeid = -1;
3117 if (is_overlap(lkb)) {
3119 queue_cast_overlap(r, lkb);
3120 unhold_lkb(lkb); /* undoes create_lkb() */
3122 _request_lock(r, lkb);
3127 lkb->lkb_id, result);
3130 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3132 lkb->lkb_id, result);
3133 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3134 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3135 send_unlock(r, lkb);
3136 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3137 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3138 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3139 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3140 send_cancel(r, lkb);
3142 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3143 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3148 dlm_put_lkb(lkb);
3151 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3158 queue_cast(r, lkb, -EAGAIN);
3163 receive_flags_reply(lkb, ms);
3164 if (is_demoted(lkb))
3165 munge_demoted(lkb, ms);
3166 del_lkb(r, lkb);
3167 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3172 receive_flags_reply(lkb, ms);
3173 if (is_demoted(lkb))
3174 munge_demoted(lkb, ms);
3175 grant_lock_pc(r, lkb, ms);
3176 queue_cast(r, lkb, 0);
3181 lkb->lkb_id, ms->m_result);
3185 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3187 struct dlm_rsb *r = lkb->lkb_resource;
3194 error = remove_from_waiters_ms(lkb, ms);
3198 __receive_convert_reply(r, lkb, ms);
3206 struct dlm_lkb *lkb;
3209 error = find_lkb(ls, ms->m_remid, &lkb);
3211 log_error(ls, "receive_convert_reply no lkb");
3214 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3216 _receive_convert_reply(lkb, ms);
3217 dlm_put_lkb(lkb);
3220 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3222 struct dlm_rsb *r = lkb->lkb_resource;
3229 error = remove_from_waiters_ms(lkb, ms);
3237 receive_flags_reply(lkb, ms);
3238 remove_lock_pc(r, lkb);
3239 queue_cast(r, lkb, -DLM_EUNLOCK);
3245 lkb->lkb_id, ms->m_result);
3254 struct dlm_lkb *lkb;
3257 error = find_lkb(ls, ms->m_remid, &lkb);
3259 log_error(ls, "receive_unlock_reply no lkb");
3262 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3264 _receive_unlock_reply(lkb, ms);
3265 dlm_put_lkb(lkb);
3268 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3270 struct dlm_rsb *r = lkb->lkb_resource;
3277 error = remove_from_waiters_ms(lkb, ms);
3285 receive_flags_reply(lkb, ms);
3286 revert_lock_pc(r, lkb);
3288 queue_cast(r, lkb, -DLM_ECANCEL);
3294 lkb->lkb_id, ms->m_result);
3303 struct dlm_lkb *lkb;
3306 error = find_lkb(ls, ms->m_remid, &lkb);
3308 log_error(ls, "receive_cancel_reply no lkb");
3311 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3313 _receive_cancel_reply(lkb, ms);
3314 dlm_put_lkb(lkb);
3319 struct dlm_lkb *lkb;
3323 error = find_lkb(ls, ms->m_lkid, &lkb);
3325 log_error(ls, "receive_lookup_reply no lkb");
3330 r = lkb->lkb_resource;
3334 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3348 if (is_overlap(lkb)) {
3350 lkb->lkb_id, lkb->lkb_flags);
3351 queue_cast_overlap(r, lkb);
3352 unhold_lkb(lkb); /* undoes create_lkb() */
3356 _request_lock(r, lkb);
3364 dlm_put_lkb(lkb);
3502 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3504 if (middle_conversion(lkb)) {
3505 hold_lkb(lkb);
3508 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3509 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3512 lkb->lkb_grmode = DLM_LOCK_IV;
3513 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3514 unhold_lkb(lkb);
3516 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3517 lkb->lkb_flags |= DLM_IFL_RESEND;
3520 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3524 /* A waiting lkb needs recovery if the master node has failed, or
3527 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3529 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3535 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3549 struct dlm_lkb *lkb, *safe;
3553 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3555 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3560 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3561 lkb->lkb_flags |= DLM_IFL_RESEND;
3565 if (!waiter_needs_recovery(ls, lkb))
3568 switch (lkb->lkb_wait_type) {
3571 lkb->lkb_flags |= DLM_IFL_RESEND;
3575 recover_convert_waiter(ls, lkb);
3579 hold_lkb(lkb);
3582 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3583 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3584 dlm_put_lkb(lkb);
3588 hold_lkb(lkb);
3591 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3592 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3593 dlm_put_lkb(lkb);
3597 log_error(ls, "invalid lkb wait_type %d",
3598 lkb->lkb_wait_type);
3607 struct dlm_lkb *lkb;
3611 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3612 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3613 hold_lkb(lkb);
3621 lkb = NULL;
3622 return lkb;
3625 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3626 master or dir-node for r. Processing the lkb may result in it being placed
3636 recovery. if before, the lkb may still have a pos wait_count; if after, the
3643 struct dlm_lkb *lkb;
3654 lkb = find_resend_waiter(ls);
3655 if (!lkb)
3658 r = lkb->lkb_resource;
3662 mstype = lkb->lkb_wait_type;
3663 oc = is_overlap_cancel(lkb);
3664 ou = is_overlap_unlock(lkb);
3668 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3674 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3675 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3676 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3677 lkb->lkb_wait_type = 0;
3678 lkb->lkb_wait_count = 0;
3680 list_del_init(&lkb->lkb_wait_reply);
3682 unhold_lkb(lkb); /* for waiters list */
3689 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3691 unhold_lkb(lkb); /* undoes create_lkb() */
3695 queue_cast(r, lkb, -DLM_ECANCEL);
3697 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3698 _unlock_lock(r, lkb);
3708 _request_lock(r, lkb);
3713 _convert_lock(r, lkb);
3722 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3725 dlm_put_lkb(lkb);
3732 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3735 struct dlm_lkb *lkb, *safe;
3737 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3738 if (test(ls, lkb)) {
3740 del_lkb(r, lkb);
3741 /* this put should free the lkb */
3742 if (!dlm_put_lkb(lkb))
3743 log_error(ls, "purged lkb not released");
3748 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3750 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3753 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3755 return is_master_copy(lkb);
3840 struct dlm_lkb *lkb;
3842 list_for_each_entry(lkb, head, lkb_statequeue) {
3843 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3844 return lkb;
3852 struct dlm_lkb *lkb;
3854 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3855 if (lkb)
3856 return lkb;
3857 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3858 if (lkb)
3859 return lkb;
3860 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3861 if (lkb)
3862 return lkb;
3866 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3872 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3873 lkb->lkb_ownpid = rl->rl_ownpid;
3874 lkb->lkb_remid = rl->rl_lkid;
3875 lkb->lkb_exflags = rl->rl_exflags;
3876 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3877 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3878 lkb->lkb_lvbseq = rl->rl_lvbseq;
3879 lkb->lkb_rqmode = rl->rl_rqmode;
3880 lkb->lkb_grmode = rl->rl_grmode;
3883 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3884 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3886 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3887 lkb->lkb_lvbptr = allocate_lvb(ls);
3888 if (!lkb->lkb_lvbptr)
3892 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3899 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3901 lkb->lkb_grmode = DLM_LOCK_IV;
3908 /* This lkb may have been recovered in a previous aborted recovery so we need
3909 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3910 If so we just send back a standard reply. If not, we create a new lkb with
3918 struct dlm_lkb *lkb;
3932 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3933 if (lkb) {
3938 error = create_lkb(ls, &lkb);
3942 error = receive_rcom_lock_args(ls, lkb, r, rc);
3944 __put_lkb(ls, lkb);
3948 attach_lkb(r, lkb);
3949 add_lkb(r, lkb, rl->rl_status);
3954 saving in its process-copy lkb */
3955 rl->rl_remid = lkb->lkb_id;
3971 struct dlm_lkb *lkb;
3974 error = find_lkb(ls, rl->rl_lkid, &lkb);
3980 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3984 r = lkb->lkb_resource;
3993 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
3995 dlm_send_rcom_lock(r, lkb);
3998 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4001 lkb->lkb_remid = rl->rl_remid;
4005 error, lkb->lkb_id);
4014 dlm_put_lkb(lkb);
4023 struct dlm_lkb *lkb;
4029 error = create_lkb(ls, &lkb);
4039 __put_lkb(ls, lkb);
4045 /* After ua is attached to lkb it will be freed by free_lkb().
4051 lkb->lkb_flags |= DLM_IFL_USER;
4055 __put_lkb(ls, lkb);
4059 error = request_lock(ls, lkb, name, namelen, &args);
4071 __put_lkb(ls, lkb);
4075 /* add this new lkb to the per-process list of locks */
4077 hold_lkb(lkb);
4078 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4088 struct dlm_lkb *lkb;
4095 error = find_lkb(ls, lkid, &lkb);
4102 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4119 ua->old_mode = lkb->lkb_grmode;
4126 error = convert_lock(ls, lkb, &args);
4131 dlm_put_lkb(lkb);
4141 struct dlm_lkb *lkb;
4148 error = find_lkb(ls, lkid, &lkb);
4152 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4163 error = unlock_lock(ls, lkb, &args);
4174 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4175 if (!list_empty(&lkb->lkb_ownqueue))
4176 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4179 dlm_put_lkb(lkb);
4189 struct dlm_lkb *lkb;
4196 error = find_lkb(ls, lkid, &lkb);
4200 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4208 error = cancel_lock(ls, lkb, &args);
4216 dlm_put_lkb(lkb);
4223 /* lkb's that are removed from the waiters list by revert are just left on the
4226 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4228 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4232 hold_lkb(lkb);
4234 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4239 error = cancel_lock(ls, lkb, &args);
4245 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4248 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4250 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4256 error = unlock_lock(ls, lkb, &args);
4269 struct dlm_lkb *lkb = NULL;
4275 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4276 list_del_init(&lkb->lkb_ownqueue);
4278 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4279 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4281 lkb->lkb_flags |= DLM_IFL_DEAD;
4284 return lkb;
4288 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4294 struct dlm_lkb *lkb, *safe;
4299 lkb = del_proc_lock(ls, proc);
4300 if (!lkb)
4302 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4303 orphan_proc_lock(ls, lkb);
4305 unlock_proc_lock(ls, lkb);
4308 added by dlm_user_request, it may result in the lkb
4311 dlm_put_lkb(lkb);
4317 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4318 list_del_init(&lkb->lkb_ownqueue);
4319 lkb->lkb_flags |= DLM_IFL_DEAD;
4320 dlm_put_lkb(lkb);
4323 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4324 list_del(&lkb->lkb_astqueue);
4325 dlm_put_lkb(lkb);
4334 struct dlm_lkb *lkb, *safe;
4337 lkb = NULL;
4340 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4342 list_del_init(&lkb->lkb_ownqueue);
4346 if (!lkb)
4349 lkb->lkb_flags |= DLM_IFL_DEAD;
4350 unlock_proc_lock(ls, lkb);
4351 dlm_put_lkb(lkb); /* ref from proc->locks list */
4355 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4356 list_del_init(&lkb->lkb_ownqueue);
4357 lkb->lkb_flags |= DLM_IFL_DEAD;
4358 dlm_put_lkb(lkb);
4363 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4364 list_del(&lkb->lkb_astqueue);
4365 dlm_put_lkb(lkb);
4374 struct dlm_lkb *lkb, *safe;
4377 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4378 if (pid && lkb->lkb_ownpid != pid)
4380 unlock_proc_lock(ls, lkb);
4381 list_del_init(&lkb->lkb_ownqueue);
4382 dlm_put_lkb(lkb);