1/****************************************************************************** 2******************************************************************************* 3** 4** Copyright (C) 2005 Red Hat, Inc. All rights reserved. 5** 6** This copyrighted material is made available to anyone wishing to use, 7** modify, copy, or redistribute it subject to the terms and conditions 8** of the GNU General Public License v.2. 9** 10******************************************************************************* 11******************************************************************************/ 12 13#include "dlm_internal.h" 14#include "member.h" 15#include "lock.h" 16#include "dir.h" 17#include "config.h" 18#include "requestqueue.h" 19 20struct rq_entry { 21 struct list_head list; 22 int nodeid; 23 char request[1]; 24}; 25 26/* 27 * Requests received while the lockspace is in recovery get added to the 28 * request queue and processed when recovery is complete. This happens when 29 * the lockspace is suspended on some nodes before it is on others, or the 30 * lockspace is enabled on some while still suspended on others. 31 */ 32 33int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd) 34{ 35 struct rq_entry *e; 36 int length = hd->h_length; 37 int rv = 0; 38 39 e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL); 40 if (!e) { 41 log_print("dlm_add_requestqueue: out of memory\n"); 42 return 0; 43 } 44 45 e->nodeid = nodeid; 46 memcpy(e->request, hd, length); 47 48 /* We need to check dlm_locking_stopped() after taking the mutex to 49 avoid a race where dlm_recoverd enables locking and runs 50 process_requestqueue between our earlier dlm_locking_stopped check 51 and this addition to the requestqueue. */ 52 53 mutex_lock(&ls->ls_requestqueue_mutex); 54 if (dlm_locking_stopped(ls)) 55 list_add_tail(&e->list, &ls->ls_requestqueue); 56 else { 57 log_debug(ls, "dlm_add_requestqueue skip from %d", nodeid); 58 kfree(e); 59 rv = -EAGAIN; 60 } 61 mutex_unlock(&ls->ls_requestqueue_mutex); 62 return rv; 63} 64 65int dlm_process_requestqueue(struct dlm_ls *ls) 66{ 67 struct rq_entry *e; 68 struct dlm_header *hd; 69 int error = 0; 70 71 mutex_lock(&ls->ls_requestqueue_mutex); 72 73 for (;;) { 74 if (list_empty(&ls->ls_requestqueue)) { 75 mutex_unlock(&ls->ls_requestqueue_mutex); 76 error = 0; 77 break; 78 } 79 e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); 80 mutex_unlock(&ls->ls_requestqueue_mutex); 81 82 hd = (struct dlm_header *) e->request; 83 error = dlm_receive_message(hd, e->nodeid, 1); 84 85 if (error == -EINTR) { 86 /* entry is left on requestqueue */ 87 log_debug(ls, "process_requestqueue abort eintr"); 88 break; 89 } 90 91 mutex_lock(&ls->ls_requestqueue_mutex); 92 list_del(&e->list); 93 kfree(e); 94 95 if (dlm_locking_stopped(ls)) { 96 log_debug(ls, "process_requestqueue abort running"); 97 mutex_unlock(&ls->ls_requestqueue_mutex); 98 error = -EINTR; 99 break; 100 } 101 schedule(); 102 } 103 104 return error; 105} 106 107/* 108 * After recovery is done, locking is resumed and dlm_recoverd takes all the 109 * saved requests and processes them as they would have been by dlm_recvd. At 110 * the same time, dlm_recvd will start receiving new requests from remote 111 * nodes. We want to delay dlm_recvd processing new requests until 112 * dlm_recoverd has finished processing the old saved requests. 113 */ 114 115void dlm_wait_requestqueue(struct dlm_ls *ls) 116{ 117 for (;;) { 118 mutex_lock(&ls->ls_requestqueue_mutex); 119 if (list_empty(&ls->ls_requestqueue)) 120 break; 121 if (dlm_locking_stopped(ls)) 122 break; 123 mutex_unlock(&ls->ls_requestqueue_mutex); 124 schedule(); 125 } 126 mutex_unlock(&ls->ls_requestqueue_mutex); 127} 128 129static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid) 130{ 131 uint32_t type = ms->m_type; 132 133 /* the ls is being cleaned up and freed by release_lockspace */ 134 if (!ls->ls_count) 135 return 1; 136 137 if (dlm_is_removed(ls, nodeid)) 138 return 1; 139 140 /* directory operations are always purged because the directory is 141 always rebuilt during recovery and the lookups resent */ 142 143 if (type == DLM_MSG_REMOVE || 144 type == DLM_MSG_LOOKUP || 145 type == DLM_MSG_LOOKUP_REPLY) 146 return 1; 147 148 if (!dlm_no_directory(ls)) 149 return 0; 150 151 /* with no directory, the master is likely to change as a part of 152 recovery; requests to/from the defunct master need to be purged */ 153 154 switch (type) { 155 case DLM_MSG_REQUEST: 156 case DLM_MSG_CONVERT: 157 case DLM_MSG_UNLOCK: 158 case DLM_MSG_CANCEL: 159 /* we're no longer the master of this resource, the sender 160 will resend to the new master (see waiter_needs_recovery) */ 161 162 if (dlm_hash2nodeid(ls, ms->m_hash) != dlm_our_nodeid()) 163 return 1; 164 break; 165 166 case DLM_MSG_REQUEST_REPLY: 167 case DLM_MSG_CONVERT_REPLY: 168 case DLM_MSG_UNLOCK_REPLY: 169 case DLM_MSG_CANCEL_REPLY: 170 case DLM_MSG_GRANT: 171 /* this reply is from the former master of the resource, 172 we'll resend to the new master if needed */ 173 174 if (dlm_hash2nodeid(ls, ms->m_hash) != nodeid) 175 return 1; 176 break; 177 } 178 179 return 0; 180} 181 182void dlm_purge_requestqueue(struct dlm_ls *ls) 183{ 184 struct dlm_message *ms; 185 struct rq_entry *e, *safe; 186 187 mutex_lock(&ls->ls_requestqueue_mutex); 188 list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) { 189 ms = (struct dlm_message *) e->request; 190 191 if (purge_request(ls, ms, e->nodeid)) { 192 list_del(&e->list); 193 kfree(e); 194 } 195 } 196 mutex_unlock(&ls->ls_requestqueue_mutex); 197} 198