Lines Matching defs:lkb

16    request_lock(ls, lkb)
17 convert_lock(ls, lkb)
18 unlock_lock(ls, lkb)
19 cancel_lock(ls, lkb)
21 _request_lock(r, lkb)
22 _convert_lock(r, lkb)
23 _unlock_lock(r, lkb)
24 _cancel_lock(r, lkb)
26 do_request(r, lkb)
27 do_convert(r, lkb)
28 do_unlock(r, lkb)
29 do_cancel(r, lkb)
46 given rsb and lkb and queues callbacks.
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
161 void dlm_print_lkb(struct dlm_lkb *lkb)
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 (unsigned long long)lkb->lkb_recover_seq);
182 struct dlm_lkb *lkb;
189 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190 dlm_print_lkb(lkb);
192 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193 dlm_print_lkb(lkb);
195 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196 dlm_print_lkb(lkb);
198 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199 dlm_print_lkb(lkb);
219 static inline int can_be_queued(struct dlm_lkb *lkb)
221 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
229 static inline int is_demoted(struct dlm_lkb *lkb)
231 return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
234 static inline int is_altmode(struct dlm_lkb *lkb)
236 return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
239 static inline int is_granted(struct dlm_lkb *lkb)
241 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
250 static inline int is_process_copy(struct dlm_lkb *lkb)
252 return lkb->lkb_nodeid &&
253 !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
256 static inline int is_master_copy(struct dlm_lkb *lkb)
258 return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
261 static inline int middle_conversion(struct dlm_lkb *lkb)
263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
269 static inline int down_conversion(struct dlm_lkb *lkb)
271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
276 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
281 return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
284 static inline int is_overlap(struct dlm_lkb *lkb)
286 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287 test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
292 if (is_master_copy(lkb))
295 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298 test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
301 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
306 queue_cast(r, lkb,
307 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
312 if (is_master_copy(lkb)) {
313 send_bast(r, lkb, rqmode);
315 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
320 * Basic operations on rsb's and lkb's
1173 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1174 The rsb must exist as long as any lkb's for it do. */
1176 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1179 lkb->lkb_resource = r;
1182 static void detach_lkb(struct dlm_lkb *lkb)
1184 if (lkb->lkb_resource) {
1185 put_rsb(lkb->lkb_resource);
1186 lkb->lkb_resource = NULL;
1193 struct dlm_lkb *lkb;
1196 lkb = dlm_allocate_lkb(ls);
1197 if (!lkb)
1200 lkb->lkb_last_bast_mode = -1;
1201 lkb->lkb_nodeid = -1;
1202 lkb->lkb_grmode = DLM_LOCK_IV;
1203 kref_init(&lkb->lkb_ref);
1204 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1205 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1206 INIT_LIST_HEAD(&lkb->lkb_cb_list);
1207 INIT_LIST_HEAD(&lkb->lkb_callbacks);
1208 spin_lock_init(&lkb->lkb_cb_lock);
1209 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1213 rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1215 lkb->lkb_id = rv;
1221 dlm_free_lkb(lkb);
1225 *lkb_ret = lkb;
1236 struct dlm_lkb *lkb;
1239 lkb = idr_find(&ls->ls_lkbidr, lkid);
1240 if (lkb)
1241 kref_get(&lkb->lkb_ref);
1244 *lkb_ret = lkb;
1245 return lkb ? 0 : -ENOENT;
1250 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1255 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1258 /* __put_lkb() is used when an lkb may not have an rsb attached to
1261 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1263 uint32_t lkid = lkb->lkb_id;
1266 rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
1272 detach_lkb(lkb);
1275 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1276 dlm_free_lvb(lkb->lkb_lvbptr);
1277 dlm_free_lkb(lkb);
1283 int dlm_put_lkb(struct dlm_lkb *lkb)
1287 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1288 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1290 ls = lkb->lkb_resource->res_ls;
1291 return __put_lkb(ls, lkb);
1295 a valid reference to the lkb, so there's no need for locking. */
1297 static inline void hold_lkb(struct dlm_lkb *lkb)
1299 kref_get(&lkb->lkb_ref);
1304 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1306 DLM_ASSERT(false, dlm_print_lkb(lkb););
1314 static inline void unhold_lkb(struct dlm_lkb *lkb)
1316 kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1322 struct dlm_lkb *lkb = NULL, *iter;
1326 lkb = iter;
1331 if (!lkb)
1335 /* add/remove lkb to rsb's grant/convert/wait queue */
1337 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1339 kref_get(&lkb->lkb_ref);
1341 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1343 lkb->lkb_timestamp = ktime_get();
1345 lkb->lkb_status = status;
1349 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1350 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1352 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1356 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1357 lkb->lkb_grmode);
1360 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1361 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1363 list_add_tail(&lkb->lkb_statequeue,
1367 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1371 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1373 lkb->lkb_status = 0;
1374 list_del(&lkb->lkb_statequeue);
1375 unhold_lkb(lkb);
1378 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1380 hold_lkb(lkb);
1381 del_lkb(r, lkb);
1382 add_lkb(r, lkb, sts);
1383 unhold_lkb(lkb);
1403 /* add/remove lkb from global waiters list of lkb's waiting for
1406 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1408 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1413 if (is_overlap_unlock(lkb) ||
1414 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1419 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1422 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1425 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1431 lkb->lkb_wait_count++;
1432 hold_lkb(lkb);
1435 lkb->lkb_id, lkb->lkb_wait_type, mstype,
1436 lkb->lkb_wait_count, dlm_iflags_val(lkb));
1440 DLM_ASSERT(!lkb->lkb_wait_count,
1441 dlm_print_lkb(lkb);
1442 printk("wait_count %d\n", lkb->lkb_wait_count););
1444 lkb->lkb_wait_count++;
1445 lkb->lkb_wait_type = mstype;
1446 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1447 hold_lkb(lkb);
1448 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1452 lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
1453 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1458 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1463 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1466 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1470 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1471 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1477 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1478 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1487 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1489 lkb->lkb_id, lkb->lkb_wait_type);
1502 (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1503 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1505 lkb->lkb_id);
1506 lkb->lkb_wait_type = 0;
1507 lkb->lkb_wait_count--;
1508 unhold_lkb(lkb);
1515 if (lkb->lkb_wait_type) {
1516 lkb->lkb_wait_type = 0;
1521 lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1522 lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1531 if (overlap_done && lkb->lkb_wait_type) {
1533 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1534 lkb->lkb_wait_count--;
1535 unhold_lkb(lkb);
1536 lkb->lkb_wait_type = 0;
1539 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1541 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1542 lkb->lkb_wait_count--;
1543 if (!lkb->lkb_wait_count)
1544 list_del_init(&lkb->lkb_wait_reply);
1545 unhold_lkb(lkb);
1549 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1551 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1555 error = _remove_from_waiters(lkb, mstype, NULL);
1563 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1566 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1571 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1723 /* lkb is master or local copy */
1725 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1733 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1736 if (!lkb->lkb_lvbptr)
1739 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1745 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1746 lkb->lkb_lvbseq = r->res_lvbseq;
1749 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1754 if (!lkb->lkb_lvbptr)
1757 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1766 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1768 lkb->lkb_lvbseq = r->res_lvbseq;
1773 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1776 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1778 if (lkb->lkb_grmode < DLM_LOCK_PW)
1781 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1786 if (!lkb->lkb_lvbptr)
1789 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1798 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1803 /* lkb is process copy (pc) */
1805 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1810 if (!lkb->lkb_lvbptr)
1813 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1816 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1821 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1822 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1826 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1827 remove_lock -- used for unlock, removes lkb from granted
1828 revert_lock -- used for cancel, moves lkb from convert to granted
1829 grant_lock -- used for request and convert, adds lkb to granted or
1830 moves lkb from convert or waiting to granted
1832 Each of these is used for master or local copy lkb's. There is
1834 a process copy (pc) lkb. */
1836 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1838 del_lkb(r, lkb);
1839 lkb->lkb_grmode = DLM_LOCK_IV;
1841 so this leads to the lkb being freed */
1842 unhold_lkb(lkb);
1845 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1847 set_lvb_unlock(r, lkb);
1848 _remove_lock(r, lkb);
1851 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1853 _remove_lock(r, lkb);
1860 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1864 lkb->lkb_rqmode = DLM_LOCK_IV;
1866 switch (lkb->lkb_status) {
1870 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1874 del_lkb(r, lkb);
1875 lkb->lkb_grmode = DLM_LOCK_IV;
1877 so this leads to the lkb being freed */
1878 unhold_lkb(lkb);
1882 log_print("invalid status for revert %d", lkb->lkb_status);
1887 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1889 return revert_lock(r, lkb);
1892 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1894 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1895 lkb->lkb_grmode = lkb->lkb_rqmode;
1896 if (lkb->lkb_status)
1897 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1899 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1902 lkb->lkb_rqmode = DLM_LOCK_IV;
1903 lkb->lkb_highbast = 0;
1906 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1908 set_lvb_lock(r, lkb);
1909 _grant_lock(r, lkb);
1912 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1915 set_lvb_lock_pc(r, lkb, ms);
1916 _grant_lock(r, lkb);
1921 lkb belongs to a remote node. */
1923 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1925 grant_lock(r, lkb);
1926 if (is_master_copy(lkb))
1927 send_grant(r, lkb);
1929 queue_cast(r, lkb, 0);
1940 static void munge_demoted(struct dlm_lkb *lkb)
1942 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1944 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1948 lkb->lkb_grmode = DLM_LOCK_NL;
1951 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
1956 lkb->lkb_id, le32_to_cpu(ms->m_type));
1960 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1961 lkb->lkb_rqmode = DLM_LOCK_PR;
1962 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1963 lkb->lkb_rqmode = DLM_LOCK_CW;
1965 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1966 dlm_print_lkb(lkb);
1970 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1974 if (lkb->lkb_id == first->lkb_id)
1980 /* Check if the given lkb conflicts with another lkb on the queue. */
1982 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1987 if (this == lkb)
1989 if (!modes_compat(this, lkb))
2000 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2001 * convert queue from being granted, then deadlk/demote lkb.
2010 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2011 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2063 * lkb is the lock to be granted
2075 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2078 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2095 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2099 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2103 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2107 if (queue_conflict(&r->res_grantqueue, lkb))
2116 if (queue_conflict(&r->res_convertqueue, lkb))
2121 * locks for a recovered rsb, on which lkb's have been rebuilt.
2122 * The lkb's may have been rebuilt on the queues in a different
2159 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2167 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2179 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2188 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2215 first_in_list(lkb, &r->res_waitqueue))
2221 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2225 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2226 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2231 rv = _can_be_granted(r, lkb, now, recover);
2241 if (is_convert && can_be_queued(lkb) &&
2242 conversion_deadlock_detect(r, lkb)) {
2243 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2244 lkb->lkb_grmode = DLM_LOCK_NL;
2245 set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2250 lkb->lkb_id, now);
2263 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2265 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2269 lkb->lkb_rqmode = alt;
2270 rv = _can_be_granted(r, lkb, now, 0);
2272 set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2274 lkb->lkb_rqmode = rqmode;
2286 struct dlm_lkb *lkb, *s;
2297 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2298 demoted = is_demoted(lkb);
2301 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2302 grant_lock_pending(r, lkb);
2309 if (!demoted && is_demoted(lkb)) {
2311 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2322 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2323 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2324 queue_bast(r, lkb, lkb->lkb_rqmode);
2325 lkb->lkb_highbast = lkb->lkb_rqmode;
2329 lkb->lkb_id, lkb->lkb_nodeid,
2336 hi = max_t(int, lkb->lkb_rqmode, hi);
2338 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2355 struct dlm_lkb *lkb, *s;
2357 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2358 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2359 grant_lock_pending(r, lkb);
2363 high = max_t(int, lkb->lkb_rqmode, high);
2364 if (lkb->lkb_rqmode == DLM_LOCK_CW)
2393 struct dlm_lkb *lkb, *s;
2415 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2416 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2418 lkb->lkb_grmode == DLM_LOCK_PR)
2419 queue_bast(r, lkb, DLM_LOCK_CW);
2421 queue_bast(r, lkb, high);
2422 lkb->lkb_highbast = high;
2442 struct dlm_lkb *lkb)
2448 if (gr == lkb)
2450 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2451 queue_bast(r, gr, lkb->lkb_rqmode);
2452 gr->lkb_highbast = lkb->lkb_rqmode;
2457 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2459 send_bast_queue(r, &r->res_grantqueue, lkb);
2462 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2464 send_bast_queue(r, &r->res_grantqueue, lkb);
2465 send_bast_queue(r, &r->res_convertqueue, lkb);
2468 /* set_master(r, lkb) -- set the master nodeid of a resource
2471 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2472 known, it can just be copied to the lkb and the function will return
2474 before it can be copied to the lkb.
2476 When the rsb nodeid is being looked up remotely, the initial lkb
2478 lookup reply. Other lkb's waiting for the same rsb lookup are kept
2482 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2483 1: the rsb master is not available and the lkb has been placed on
2487 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2493 r->res_first_lkid = lkb->lkb_id;
2494 lkb->lkb_nodeid = r->res_nodeid;
2498 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2499 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2504 lkb->lkb_nodeid = 0;
2509 lkb->lkb_nodeid = r->res_master_nodeid;
2521 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2525 lkb->lkb_nodeid = 0;
2529 r->res_first_lkid = lkb->lkb_id;
2530 send_lookup(r, lkb);
2536 struct dlm_lkb *lkb, *safe;
2538 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2539 list_del_init(&lkb->lkb_rsb_lookup);
2540 _request_lock(r, lkb);
2549 struct dlm_lkb *lkb;
2566 lkb the first_lkid */
2571 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2573 list_del_init(&lkb->lkb_rsb_lookup);
2574 r->res_first_lkid = lkb->lkb_id;
2575 _request_lock(r, lkb);
2633 /* these args will be copied to the lkb in validate_lock_args,
2635 an active lkb cannot be modified before locking the rsb */
2662 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2668 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2672 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2675 if (is_overlap(lkb))
2679 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2683 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2687 lkb->lkb_exflags = args->flags;
2688 dlm_set_sbflags_val(lkb, 0);
2689 lkb->lkb_astfn = args->astfn;
2690 lkb->lkb_astparam = args->astparam;
2691 lkb->lkb_bastfn = args->bastfn;
2692 lkb->lkb_rqmode = args->mode;
2693 lkb->lkb_lksb = args->lksb;
2694 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2695 lkb->lkb_ownpid = (int) current->pid;
2705 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2706 lkb->lkb_status, lkb->lkb_wait_type,
2707 lkb->lkb_resource->res_name);
2711 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2712 lkb->lkb_status, lkb->lkb_wait_type,
2713 lkb->lkb_resource->res_name);
2727 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2729 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2734 (lkb->lkb_wait_type || lkb->lkb_wait_count))
2737 /* an lkb may be waiting for an rsb lookup to complete where the
2740 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2742 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2743 list_del_init(&lkb->lkb_rsb_lookup);
2744 queue_cast(lkb->lkb_resource, lkb,
2747 unhold_lkb(lkb); /* undoes create_lkb() */
2754 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2755 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2756 dlm_print_lkb(lkb);
2760 /* an lkb may still exist even though the lock is EOL'ed due to a
2765 if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2766 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2774 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2777 if (is_overlap(lkb))
2780 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2781 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2787 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2788 !lkb->lkb_wait_type) {
2793 switch (lkb->lkb_wait_type) {
2796 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2812 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2815 if (is_overlap_unlock(lkb))
2818 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2819 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2824 switch (lkb->lkb_wait_type) {
2827 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2838 lkb->lkb_exflags |= args->flags;
2839 dlm_set_sbflags_val(lkb, 0);
2840 lkb->lkb_astparam = args->astparam;
2850 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
2851 args->flags, lkb->lkb_wait_type,
2852 lkb->lkb_resource->res_name);
2856 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
2857 args->flags, lkb->lkb_wait_type,
2858 lkb->lkb_resource->res_name);
2872 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2876 if (can_be_granted(r, lkb, 1, 0, NULL)) {
2877 grant_lock(r, lkb);
2878 queue_cast(r, lkb, 0);
2882 if (can_be_queued(lkb)) {
2884 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2889 queue_cast(r, lkb, -EAGAIN);
2894 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2899 if (force_blocking_asts(lkb))
2900 send_blocking_asts_all(r, lkb);
2903 send_blocking_asts(r, lkb);
2908 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2915 if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
2916 grant_lock(r, lkb);
2917 queue_cast(r, lkb, 0);
2925 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2927 revert_lock(r, lkb);
2928 queue_cast(r, lkb, -EDEADLK);
2939 if (is_demoted(lkb)) {
2941 if (_can_be_granted(r, lkb, 1, 0)) {
2942 grant_lock(r, lkb);
2943 queue_cast(r, lkb, 0);
2949 if (can_be_queued(lkb)) {
2951 del_lkb(r, lkb);
2952 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2957 queue_cast(r, lkb, -EAGAIN);
2962 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2971 if (force_blocking_asts(lkb))
2972 send_blocking_asts_all(r, lkb);
2975 send_blocking_asts(r, lkb);
2980 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2982 remove_lock(r, lkb);
2983 queue_cast(r, lkb, -DLM_EUNLOCK);
2987 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2995 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2999 error = revert_lock(r, lkb);
3001 queue_cast(r, lkb, -DLM_ECANCEL);
3007 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3019 /* add a new lkb to a possibly new rsb, called by requesting process */
3021 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3025 /* set_master: sets lkb nodeid from r */
3027 error = set_master(r, lkb);
3037 error = send_request(r, lkb);
3039 error = do_request(r, lkb);
3042 do_request_effects(r, lkb, error);
3048 /* change some property of an existing lkb, e.g. mode */
3050 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3056 error = send_convert(r, lkb);
3058 error = do_convert(r, lkb);
3061 do_convert_effects(r, lkb, error);
3067 /* remove an existing lkb from the granted queue */
3069 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3075 error = send_unlock(r, lkb);
3077 error = do_unlock(r, lkb);
3080 do_unlock_effects(r, lkb, error);
3086 /* remove an existing lkb from the convert or wait queue */
3088 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3094 error = send_cancel(r, lkb);
3096 error = do_cancel(r, lkb);
3099 do_cancel_effects(r, lkb, error);
3110 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3117 error = validate_lock_args(ls, lkb, args);
3127 attach_lkb(r, lkb);
3128 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3130 error = _request_lock(r, lkb);
3137 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3143 r = lkb->lkb_resource;
3148 error = validate_lock_args(ls, lkb, args);
3152 error = _convert_lock(r, lkb);
3159 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3165 r = lkb->lkb_resource;
3170 error = validate_unlock_args(lkb, args);
3174 error = _unlock_lock(r, lkb);
3181 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3187 r = lkb->lkb_resource;
3192 error = validate_unlock_args(lkb, args);
3196 error = _cancel_lock(r, lkb);
3219 struct dlm_lkb *lkb;
3230 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3232 error = create_lkb(ls, &lkb);
3237 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3245 error = convert_lock(ls, lkb, &args);
3247 error = request_lock(ls, lkb, name, namelen, &args);
3252 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3255 __put_lkb(ls, lkb);
3271 struct dlm_lkb *lkb;
3281 error = find_lkb(ls, lkid, &lkb);
3285 trace_dlm_unlock_start(ls, lkb, flags);
3292 error = cancel_lock(ls, lkb, &args);
3294 error = unlock_lock(ls, lkb, &args);
3301 trace_dlm_unlock_end(ls, lkb, flags, error);
3303 dlm_put_lkb(lkb);
3365 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3384 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3403 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3406 ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid);
3407 ms->m_pid = cpu_to_le32(lkb->lkb_ownpid);
3408 ms->m_lkid = cpu_to_le32(lkb->lkb_id);
3409 ms->m_remid = cpu_to_le32(lkb->lkb_remid);
3410 ms->m_exflags = cpu_to_le32(lkb->lkb_exflags);
3411 ms->m_sbflags = cpu_to_le32(dlm_sbflags_val(lkb));
3412 ms->m_flags = cpu_to_le32(dlm_dflags_val(lkb));
3413 ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
3414 ms->m_status = cpu_to_le32(lkb->lkb_status);
3415 ms->m_grmode = cpu_to_le32(lkb->lkb_grmode);
3416 ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode);
3420 not from lkb fields */
3422 if (lkb->lkb_bastfn)
3424 if (lkb->lkb_astfn)
3440 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3442 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3447 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3455 error = add_to_waiters(lkb, mstype, to_nodeid);
3459 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
3463 send_args(r, lkb, ms);
3471 remove_from_waiters(lkb, msg_reply_type(mstype));
3475 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3477 return send_common(r, lkb, DLM_MSG_REQUEST);
3480 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3484 error = send_common(r, lkb, DLM_MSG_CONVERT);
3487 if (!error && down_conversion(lkb)) {
3488 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3491 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3497 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3501 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3503 return send_common(r, lkb, DLM_MSG_UNLOCK);
3506 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3508 return send_common(r, lkb, DLM_MSG_CANCEL);
3511 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3517 to_nodeid = lkb->lkb_nodeid;
3519 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh,
3524 send_args(r, lkb, ms);
3533 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3539 to_nodeid = lkb->lkb_nodeid;
3546 send_args(r, lkb, ms);
3555 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3563 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3572 send_args(r, lkb, ms);
3580 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3605 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3612 to_nodeid = lkb->lkb_nodeid;
3614 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
3618 send_args(r, lkb, ms);
3627 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3629 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3632 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3634 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3637 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3639 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3642 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3644 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3672 the lkb for any type of message */
3674 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3676 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3677 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3678 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3681 static void receive_flags_reply(struct dlm_lkb *lkb,
3688 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3689 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3698 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3703 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3704 if (!lkb->lkb_lvbptr)
3705 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3706 if (!lkb->lkb_lvbptr)
3711 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3726 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3729 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3730 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3731 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3732 lkb->lkb_grmode = DLM_LOCK_IV;
3733 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3735 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3736 lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3738 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3739 /* lkb was just created so there won't be an lvb yet */
3740 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3741 if (!lkb->lkb_lvbptr)
3748 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3751 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3754 if (receive_lvb(ls, lkb, ms))
3757 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3758 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3763 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3766 if (receive_lvb(ls, lkb, ms))
3771 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3776 struct dlm_lkb *lkb = &ls->ls_local_lkb;
3777 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3778 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3782 fields in the lkb. */
3784 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3791 !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3792 log_error(lkb->lkb_resource->res_ls,
3802 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3811 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3816 if (!is_process_copy(lkb))
3818 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3828 log_error(lkb->lkb_resource->res_ls,
3830 le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3831 lkb->lkb_remid, dlm_iflags_val(lkb),
3832 lkb->lkb_nodeid);
3838 struct dlm_lkb *lkb;
3845 error = create_lkb(ls, &lkb);
3849 receive_flags(lkb, ms);
3850 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
3851 error = receive_request_args(ls, lkb, ms);
3853 __put_lkb(ls, lkb);
3868 __put_lkb(ls, lkb);
3879 __put_lkb(ls, lkb);
3884 attach_lkb(r, lkb);
3885 error = do_request(r, lkb);
3886 send_request_reply(r, lkb, error);
3887 do_request_effects(r, lkb, error);
3895 dlm_put_lkb(lkb);
3899 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
3917 struct dlm_lkb *lkb;
3921 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3925 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
3927 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
3928 (unsigned long long)lkb->lkb_recover_seq,
3932 dlm_put_lkb(lkb);
3936 r = lkb->lkb_resource;
3941 error = validate_message(lkb, ms);
3945 receive_flags(lkb, ms);
3947 error = receive_convert_args(ls, lkb, ms);
3949 send_convert_reply(r, lkb, error);
3953 reply = !down_conversion(lkb);
3955 error = do_convert(r, lkb);
3957 send_convert_reply(r, lkb, error);
3958 do_convert_effects(r, lkb, error);
3962 dlm_put_lkb(lkb);
3973 struct dlm_lkb *lkb;
3977 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3981 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
3983 lkb->lkb_id, lkb->lkb_remid,
3987 dlm_put_lkb(lkb);
3991 r = lkb->lkb_resource;
3996 error = validate_message(lkb, ms);
4000 receive_flags(lkb, ms);
4002 error = receive_unlock_args(ls, lkb, ms);
4004 send_unlock_reply(r, lkb, error);
4008 error = do_unlock(r, lkb);
4009 send_unlock_reply(r, lkb, error);
4010 do_unlock_effects(r, lkb, error);
4014 dlm_put_lkb(lkb);
4025 struct dlm_lkb *lkb;
4029 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4033 receive_flags(lkb, ms);
4035 r = lkb->lkb_resource;
4040 error = validate_message(lkb, ms);
4044 error = do_cancel(r, lkb);
4045 send_cancel_reply(r, lkb, error);
4046 do_cancel_effects(r, lkb, error);
4050 dlm_put_lkb(lkb);
4061 struct dlm_lkb *lkb;
4065 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4069 r = lkb->lkb_resource;
4074 error = validate_message(lkb, ms);
4078 receive_flags_reply(lkb, ms, false);
4079 if (is_altmode(lkb))
4080 munge_altmode(lkb, ms);
4081 grant_lock_pc(r, lkb, ms);
4082 queue_cast(r, lkb, 0);
4086 dlm_put_lkb(lkb);
4092 struct dlm_lkb *lkb;
4096 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4100 r = lkb->lkb_resource;
4105 error = validate_message(lkb, ms);
4109 queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4110 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4114 dlm_put_lkb(lkb);
4234 struct dlm_lkb *lkb;
4239 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4243 r = lkb->lkb_resource;
4247 error = validate_message(lkb, ms);
4251 mstype = lkb->lkb_wait_type;
4252 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4255 lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4266 lkb->lkb_nodeid = from_nodeid;
4275 queue_cast(r, lkb, -EAGAIN);
4277 unhold_lkb(lkb); /* undoes create_lkb() */
4283 receive_flags_reply(lkb, ms, false);
4284 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4285 if (is_altmode(lkb))
4286 munge_altmode(lkb, ms);
4288 add_lkb(r, lkb, DLM_LKSTS_WAITING);
4290 grant_lock_pc(r, lkb, ms);
4291 queue_cast(r, lkb, 0);
4300 "master %d dir %d first %x %s", lkb->lkb_id,
4309 lkb->lkb_nodeid = -1;
4312 if (is_overlap(lkb)) {
4314 queue_cast_overlap(r, lkb);
4316 unhold_lkb(lkb); /* undoes create_lkb() */
4318 _request_lock(r, lkb);
4327 lkb->lkb_id, result);
4331 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4333 lkb->lkb_id, result);
4334 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4335 send_unlock(r, lkb);
4338 &lkb->lkb_iflags)) {
4339 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4340 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4341 send_cancel(r, lkb);
4343 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4344 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4349 dlm_put_lkb(lkb);
4353 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4360 queue_cast(r, lkb, -EAGAIN);
4364 receive_flags_reply(lkb, ms, local);
4365 revert_lock_pc(r, lkb);
4366 queue_cast(r, lkb, -EDEADLK);
4371 receive_flags_reply(lkb, ms, local);
4372 if (is_demoted(lkb))
4373 munge_demoted(lkb);
4374 del_lkb(r, lkb);
4375 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4380 receive_flags_reply(lkb, ms, local);
4381 if (is_demoted(lkb))
4382 munge_demoted(lkb);
4383 grant_lock_pc(r, lkb, ms);
4384 queue_cast(r, lkb, 0);
4389 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4393 dlm_print_lkb(lkb);
4397 static void _receive_convert_reply(struct dlm_lkb *lkb,
4400 struct dlm_rsb *r = lkb->lkb_resource;
4406 error = validate_message(lkb, ms);
4411 error = remove_from_waiters_ms(lkb, ms, local);
4415 __receive_convert_reply(r, lkb, ms, local);
4424 struct dlm_lkb *lkb;
4427 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4431 _receive_convert_reply(lkb, ms, false);
4432 dlm_put_lkb(lkb);
4436 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4439 struct dlm_rsb *r = lkb->lkb_resource;
4445 error = validate_message(lkb, ms);
4450 error = remove_from_waiters_ms(lkb, ms, local);
4458 receive_flags_reply(lkb, ms, local);
4459 remove_lock_pc(r, lkb);
4460 queue_cast(r, lkb, -DLM_EUNLOCK);
4466 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4476 struct dlm_lkb *lkb;
4479 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4483 _receive_unlock_reply(lkb, ms, false);
4484 dlm_put_lkb(lkb);
4488 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4491 struct dlm_rsb *r = lkb->lkb_resource;
4497 error = validate_message(lkb, ms);
4502 error = remove_from_waiters_ms(lkb, ms, local);
4510 receive_flags_reply(lkb, ms, local);
4511 revert_lock_pc(r, lkb);
4512 queue_cast(r, lkb, -DLM_ECANCEL);
4518 lkb->lkb_id,
4529 struct dlm_lkb *lkb;
4532 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4536 _receive_cancel_reply(lkb, ms, false);
4537 dlm_put_lkb(lkb);
4544 struct dlm_lkb *lkb;
4549 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4559 r = lkb->lkb_resource;
4563 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4579 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4592 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4595 lkb->lkb_nodeid = -1;
4602 if (is_overlap(lkb)) {
4604 lkb->lkb_id, dlm_iflags_val(lkb));
4605 queue_cast_overlap(r, lkb);
4606 unhold_lkb(lkb); /* undoes create_lkb() */
4610 _request_lock(r, lkb);
4618 dlm_put_lkb(lkb);
4846 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4849 if (middle_conversion(lkb)) {
4850 hold_lkb(lkb);
4854 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4855 _receive_convert_reply(lkb, ms_local, true);
4858 lkb->lkb_grmode = DLM_LOCK_IV;
4859 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4860 unhold_lkb(lkb);
4862 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4863 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4866 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4870 /* A waiting lkb needs recovery if the master node has failed, or
4873 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4879 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4893 struct dlm_lkb *lkb, *safe;
4904 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4906 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
4911 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4914 lkb->lkb_id,
4915 lkb->lkb_remid,
4916 lkb->lkb_wait_type,
4917 lkb->lkb_resource->res_nodeid,
4918 lkb->lkb_nodeid,
4919 lkb->lkb_wait_nodeid,
4926 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4927 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4931 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
4934 wait_type = lkb->lkb_wait_type;
4944 if (is_overlap_cancel(lkb)) {
4946 if (lkb->lkb_grmode == DLM_LOCK_IV)
4949 if (is_overlap_unlock(lkb)) {
4951 if (lkb->lkb_grmode == DLM_LOCK_IV)
4956 lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
4963 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4967 recover_convert_waiter(ls, lkb, ms_local);
4971 hold_lkb(lkb);
4975 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4976 _receive_unlock_reply(lkb, ms_local, true);
4977 dlm_put_lkb(lkb);
4981 hold_lkb(lkb);
4985 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4986 _receive_cancel_reply(lkb, ms_local, true);
4987 dlm_put_lkb(lkb);
4991 log_error(ls, "invalid lkb wait_type %d %d",
4992 lkb->lkb_wait_type, wait_type);
5002 struct dlm_lkb *lkb = NULL, *iter;
5008 lkb = iter;
5014 return lkb;
5024 * First, the lkb state for the voided remote operation is forcibly reset,
5026 * . lkb removed from ls_waiters list
5027 * . lkb wait_type cleared
5028 * . lkb waiters_count cleared
5029 * . lkb ref count decremented for each waiters_count (almost always 1,
5031 * two remote replies were being expected for the lkb.)
5033 * Second, the lkb is reprocessed like an original operation would be,
5035 * process the lkb operation locally, or send it to a remote node again
5036 * and put the lkb back onto the waiters list.
5038 * When reprocessing the lkb, we may find that it's flagged for an overlapping
5046 struct dlm_lkb *lkb;
5058 * Find an lkb from the waiters list that's been affected by
5062 lkb = find_resend_waiter(ls);
5063 if (!lkb)
5066 r = lkb->lkb_resource;
5071 * If the lkb has been flagged for a force unlock or cancel,
5075 mstype = lkb->lkb_wait_type;
5077 &lkb->lkb_iflags);
5079 &lkb->lkb_iflags);
5084 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5085 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5094 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5097 lkb->lkb_wait_type = 0;
5103 * add_to_waiters() finds the lkb is already on the waiters
5106 while (lkb->lkb_wait_count) {
5107 lkb->lkb_wait_count--;
5108 unhold_lkb(lkb);
5113 list_del_init(&lkb->lkb_wait_reply);
5117 * The lkb is now clear of all prior waiters state and can be
5127 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5129 unhold_lkb(lkb); /* undoes create_lkb() */
5133 queue_cast(r, lkb, -DLM_ECANCEL);
5135 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5136 _unlock_lock(r, lkb);
5146 _request_lock(r, lkb);
5151 _convert_lock(r, lkb);
5161 lkb->lkb_id, mstype, r->res_nodeid,
5166 dlm_put_lkb(lkb);
5175 struct dlm_lkb *lkb, *safe;
5177 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5178 if (!is_master_copy(lkb))
5184 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5187 del_lkb(r, lkb);
5189 /* this put should free the lkb */
5190 if (!dlm_put_lkb(lkb))
5191 log_error(ls, "purged mstcpy lkb not released");
5208 struct dlm_lkb *lkb, *safe;
5210 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5211 if (!is_master_copy(lkb))
5214 if ((lkb->lkb_nodeid == nodeid_gone) ||
5215 dlm_is_removed(ls, lkb->lkb_nodeid)) {
5219 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5220 (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5224 del_lkb(r, lkb);
5226 /* this put should free the lkb */
5227 if (!dlm_put_lkb(lkb))
5228 log_error(ls, "purged dead lkb not released");
5309 * we are interested in are those with lkb's on either the convert or
5358 struct dlm_lkb *lkb;
5360 list_for_each_entry(lkb, head, lkb_statequeue) {
5361 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5362 return lkb;
5370 struct dlm_lkb *lkb;
5372 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5373 if (lkb)
5374 return lkb;
5375 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5376 if (lkb)
5377 return lkb;
5378 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5379 if (lkb)
5380 return lkb;
5385 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5390 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5391 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5392 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5393 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5394 dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5395 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5396 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5397 lkb->lkb_rqmode = rl->rl_rqmode;
5398 lkb->lkb_grmode = rl->rl_grmode;
5401 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5402 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5404 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5409 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5410 if (!lkb->lkb_lvbptr)
5412 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5420 middle_conversion(lkb)) {
5422 lkb->lkb_grmode = DLM_LOCK_IV;
5429 /* This lkb may have been recovered in a previous aborted recovery so we need
5430 to check if the rsb already has an lkb with the given remote nodeid/lkid.
5431 If so we just send back a standard reply. If not, we create a new lkb with
5441 struct dlm_lkb *lkb;
5478 lkb = search_remid(r, from_nodeid, remid);
5479 if (lkb) {
5484 error = create_lkb(ls, &lkb);
5488 error = receive_rcom_lock_args(ls, lkb, r, rc);
5490 __put_lkb(ls, lkb);
5494 attach_lkb(r, lkb);
5495 add_lkb(r, lkb, rl->rl_status);
5503 saving in its process-copy lkb */
5504 *rl_remid = cpu_to_le32(lkb->lkb_id);
5506 lkb->lkb_recover_seq = ls->ls_recover_seq;
5525 struct dlm_lkb *lkb;
5533 error = find_lkb(ls, lkid, &lkb);
5541 r = lkb->lkb_resource;
5545 if (!is_process_copy(lkb)) {
5552 dlm_put_lkb(lkb);
5566 dlm_send_rcom_lock(r, lkb, seq);
5570 lkb->lkb_remid = remid;
5584 dlm_put_lkb(lkb);
5592 struct dlm_lkb *lkb;
5599 error = create_lkb(ls, &lkb);
5605 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5624 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5627 set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5628 error = request_lock(ls, lkb, name, namelen, &args);
5643 /* add this new lkb to the per-process list of locks */
5645 hold_lkb(lkb);
5646 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5650 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5652 __put_lkb(ls, lkb);
5661 struct dlm_lkb *lkb;
5668 error = find_lkb(ls, lkid, &lkb);
5672 trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5677 ua = lkb->lkb_ua;
5701 error = convert_lock(ls, lkb, &args);
5706 trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5707 dlm_put_lkb(lkb);
5724 struct dlm_lkb *lkb = NULL, *iter;
5740 lkb = iter;
5748 if (!lkb && found_other_mode) {
5753 if (!lkb) {
5758 lkb->lkb_exflags = flags;
5759 lkb->lkb_ownpid = (int) current->pid;
5761 ua = lkb->lkb_ua;
5772 * The lkb reference from the ls_orphans list was not
5778 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5788 struct dlm_lkb *lkb;
5795 error = find_lkb(ls, lkid, &lkb);
5799 trace_dlm_unlock_start(ls, lkb, flags);
5801 ua = lkb->lkb_ua;
5813 error = unlock_lock(ls, lkb, &args);
5824 /* dlm_user_add_cb() may have already taken lkb off the proc list */
5825 if (!list_empty(&lkb->lkb_ownqueue))
5826 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5829 trace_dlm_unlock_end(ls, lkb, flags, error);
5830 dlm_put_lkb(lkb);
5840 struct dlm_lkb *lkb;
5847 error = find_lkb(ls, lkid, &lkb);
5851 trace_dlm_unlock_start(ls, lkb, flags);
5853 ua = lkb->lkb_ua;
5862 error = cancel_lock(ls, lkb, &args);
5870 trace_dlm_unlock_end(ls, lkb, flags, error);
5871 dlm_put_lkb(lkb);
5880 struct dlm_lkb *lkb;
5888 error = find_lkb(ls, lkid, &lkb);
5892 trace_dlm_unlock_start(ls, lkb, flags);
5894 ua = lkb->lkb_ua;
5902 r = lkb->lkb_resource;
5906 error = validate_unlock_args(lkb, &args);
5909 set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
5911 error = _cancel_lock(r, lkb);
5922 trace_dlm_unlock_end(ls, lkb, flags, error);
5923 dlm_put_lkb(lkb);
5929 /* lkb's that are removed from the waiters list by revert are just left on the
5932 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5937 hold_lkb(lkb); /* reference for the ls_orphans list */
5939 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5942 set_unlock_args(0, lkb->lkb_ua, &args);
5944 error = cancel_lock(ls, lkb, &args);
5950 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
5955 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5961 lkb->lkb_ua, &args);
5963 error = unlock_lock(ls, lkb, &args);
5976 struct dlm_lkb *lkb = NULL;
5982 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5983 list_del_init(&lkb->lkb_ownqueue);
5985 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5986 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
5988 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
5991 return lkb;
5995 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5999 list, and no more device_writes should add lkb's to proc->locks list; so we
6006 struct dlm_lkb *lkb, *safe;
6011 lkb = del_proc_lock(ls, proc);
6012 if (!lkb)
6014 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6015 orphan_proc_lock(ls, lkb);
6017 unlock_proc_lock(ls, lkb);
6020 added by dlm_user_request, it may result in the lkb
6023 dlm_put_lkb(lkb);
6029 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6030 list_del_init(&lkb->lkb_ownqueue);
6031 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6032 dlm_put_lkb(lkb);
6035 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6036 dlm_purge_lkb_callbacks(lkb);
6037 list_del_init(&lkb->lkb_cb_list);
6038 dlm_put_lkb(lkb);
6047 struct dlm_lkb *lkb, *safe;
6050 lkb = NULL;
6053 lkb = list_entry(proc->locks.next, struct dlm_lkb,
6055 list_del_init(&lkb->lkb_ownqueue);
6059 if (!lkb)
6062 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6063 unlock_proc_lock(ls, lkb);
6064 dlm_put_lkb(lkb); /* ref from proc->locks list */
6068 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6069 list_del_init(&lkb->lkb_ownqueue);
6070 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6071 dlm_put_lkb(lkb);
6076 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6077 dlm_purge_lkb_callbacks(lkb);
6078 list_del_init(&lkb->lkb_cb_list);
6079 dlm_put_lkb(lkb);
6088 struct dlm_lkb *lkb, *safe;
6091 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6092 if (pid && lkb->lkb_ownpid != pid)
6094 unlock_proc_lock(ls, lkb);
6095 list_del_init(&lkb->lkb_ownqueue);
6096 dlm_put_lkb(lkb);
6140 struct dlm_lkb *lkb;
6152 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6158 dlm_set_dflags_val(lkb, lkb_dflags);
6159 lkb->lkb_nodeid = lkb_nodeid;
6160 lkb->lkb_lksb = lksb;
6163 lkb->lkb_astparam = (void *)0xDEADBEEF;
6168 __put_lkb(ls, lkb);
6173 attach_lkb(r, lkb);
6174 add_lkb(r, lkb, lkb_status);
6184 struct dlm_lkb *lkb;
6187 error = find_lkb(ls, lkb_id, &lkb);
6191 error = add_to_waiters(lkb, mstype, to_nodeid);
6192 dlm_put_lkb(lkb);