1/******************************************************************************
2*******************************************************************************
3**
4**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
5**
6**  This copyrighted material is made available to anyone wishing to use,
7**  modify, copy, or redistribute it subject to the terms and conditions
8**  of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13#include "dlm_internal.h"
14#include "member.h"
15#include "lock.h"
16#include "dir.h"
17#include "config.h"
18#include "requestqueue.h"
19
20struct rq_entry {
21	struct list_head list;
22	int nodeid;
23	char request[1];
24};
25
26/*
27 * Requests received while the lockspace is in recovery get added to the
28 * request queue and processed when recovery is complete.  This happens when
29 * the lockspace is suspended on some nodes before it is on others, or the
30 * lockspace is enabled on some while still suspended on others.
31 */
32
33int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd)
34{
35	struct rq_entry *e;
36	int length = hd->h_length;
37	int rv = 0;
38
39	e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL);
40	if (!e) {
41		log_print("dlm_add_requestqueue: out of memory\n");
42		return 0;
43	}
44
45	e->nodeid = nodeid;
46	memcpy(e->request, hd, length);
47
48	/* We need to check dlm_locking_stopped() after taking the mutex to
49	   avoid a race where dlm_recoverd enables locking and runs
50	   process_requestqueue between our earlier dlm_locking_stopped check
51	   and this addition to the requestqueue. */
52
53	mutex_lock(&ls->ls_requestqueue_mutex);
54	if (dlm_locking_stopped(ls))
55		list_add_tail(&e->list, &ls->ls_requestqueue);
56	else {
57		log_debug(ls, "dlm_add_requestqueue skip from %d", nodeid);
58		kfree(e);
59		rv = -EAGAIN;
60	}
61	mutex_unlock(&ls->ls_requestqueue_mutex);
62	return rv;
63}
64
65int dlm_process_requestqueue(struct dlm_ls *ls)
66{
67	struct rq_entry *e;
68	struct dlm_header *hd;
69	int error = 0;
70
71	mutex_lock(&ls->ls_requestqueue_mutex);
72
73	for (;;) {
74		if (list_empty(&ls->ls_requestqueue)) {
75			mutex_unlock(&ls->ls_requestqueue_mutex);
76			error = 0;
77			break;
78		}
79		e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
80		mutex_unlock(&ls->ls_requestqueue_mutex);
81
82		hd = (struct dlm_header *) e->request;
83		error = dlm_receive_message(hd, e->nodeid, 1);
84
85		if (error == -EINTR) {
86			/* entry is left on requestqueue */
87			log_debug(ls, "process_requestqueue abort eintr");
88			break;
89		}
90
91		mutex_lock(&ls->ls_requestqueue_mutex);
92		list_del(&e->list);
93		kfree(e);
94
95		if (dlm_locking_stopped(ls)) {
96			log_debug(ls, "process_requestqueue abort running");
97			mutex_unlock(&ls->ls_requestqueue_mutex);
98			error = -EINTR;
99			break;
100		}
101		schedule();
102	}
103
104	return error;
105}
106
107/*
108 * After recovery is done, locking is resumed and dlm_recoverd takes all the
109 * saved requests and processes them as they would have been by dlm_recvd.  At
110 * the same time, dlm_recvd will start receiving new requests from remote
111 * nodes.  We want to delay dlm_recvd processing new requests until
112 * dlm_recoverd has finished processing the old saved requests.
113 */
114
115void dlm_wait_requestqueue(struct dlm_ls *ls)
116{
117	for (;;) {
118		mutex_lock(&ls->ls_requestqueue_mutex);
119		if (list_empty(&ls->ls_requestqueue))
120			break;
121		if (dlm_locking_stopped(ls))
122			break;
123		mutex_unlock(&ls->ls_requestqueue_mutex);
124		schedule();
125	}
126	mutex_unlock(&ls->ls_requestqueue_mutex);
127}
128
129static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
130{
131	uint32_t type = ms->m_type;
132
133	/* the ls is being cleaned up and freed by release_lockspace */
134	if (!ls->ls_count)
135		return 1;
136
137	if (dlm_is_removed(ls, nodeid))
138		return 1;
139
140	/* directory operations are always purged because the directory is
141	   always rebuilt during recovery and the lookups resent */
142
143	if (type == DLM_MSG_REMOVE ||
144	    type == DLM_MSG_LOOKUP ||
145	    type == DLM_MSG_LOOKUP_REPLY)
146		return 1;
147
148	if (!dlm_no_directory(ls))
149		return 0;
150
151	/* with no directory, the master is likely to change as a part of
152	   recovery; requests to/from the defunct master need to be purged */
153
154	switch (type) {
155	case DLM_MSG_REQUEST:
156	case DLM_MSG_CONVERT:
157	case DLM_MSG_UNLOCK:
158	case DLM_MSG_CANCEL:
159		/* we're no longer the master of this resource, the sender
160		   will resend to the new master (see waiter_needs_recovery) */
161
162		if (dlm_hash2nodeid(ls, ms->m_hash) != dlm_our_nodeid())
163			return 1;
164		break;
165
166	case DLM_MSG_REQUEST_REPLY:
167	case DLM_MSG_CONVERT_REPLY:
168	case DLM_MSG_UNLOCK_REPLY:
169	case DLM_MSG_CANCEL_REPLY:
170	case DLM_MSG_GRANT:
171		/* this reply is from the former master of the resource,
172		   we'll resend to the new master if needed */
173
174		if (dlm_hash2nodeid(ls, ms->m_hash) != nodeid)
175			return 1;
176		break;
177	}
178
179	return 0;
180}
181
182void dlm_purge_requestqueue(struct dlm_ls *ls)
183{
184	struct dlm_message *ms;
185	struct rq_entry *e, *safe;
186
187	mutex_lock(&ls->ls_requestqueue_mutex);
188	list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
189		ms = (struct dlm_message *) e->request;
190
191		if (purge_request(ls, ms, e->nodeid)) {
192			list_del(&e->list);
193			kfree(e);
194		}
195	}
196	mutex_unlock(&ls->ls_requestqueue_mutex);
197}
198