1/******************************************************************************
2*******************************************************************************
3**
4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6**
7**  This copyrighted material is made available to anyone wishing to use,
8**  modify, copy, or redistribute it subject to the terms and conditions
9**  of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "dir.h"
18#include "ast.h"
19#include "recover.h"
20#include "lowcomms.h"
21#include "lock.h"
22#include "requestqueue.h"
23#include "recoverd.h"
24
25
26/* If the start for which we're re-enabling locking (seq) has been superseded
27   by a newer stop (ls_recover_seq), we need to leave locking disabled. */
28
29static int enable_locking(struct dlm_ls *ls, uint64_t seq)
30{
31	int error = -EINTR;
32
33	spin_lock(&ls->ls_recover_lock);
34	if (ls->ls_recover_seq == seq) {
35		set_bit(LSFL_RUNNING, &ls->ls_flags);
36		up_write(&ls->ls_in_recovery);
37		error = 0;
38	}
39	spin_unlock(&ls->ls_recover_lock);
40	return error;
41}
42
43static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
44{
45	unsigned long start;
46	int error, neg = 0;
47
48	log_debug(ls, "recover %llx", (unsigned long long)rv->seq);
49
50	mutex_lock(&ls->ls_recoverd_active);
51
52	/*
53	 * Suspending and resuming dlm_astd ensures that no lkb's from this ls
54	 * will be processed by dlm_astd during recovery.
55	 */
56
57	dlm_astd_suspend();
58	dlm_astd_resume();
59
60	/*
61	 * This list of root rsb's will be the basis of most of the recovery
62	 * routines.
63	 */
64
65	dlm_create_root_list(ls);
66
67	/*
68	 * Free all the tossed rsb's so we don't have to recover them.
69	 */
70
71	dlm_clear_toss_list(ls);
72
73	/*
74	 * Add or remove nodes from the lockspace's ls_nodes list.
75	 * Also waits for all nodes to complete dlm_recover_members.
76	 */
77
78	error = dlm_recover_members(ls, rv, &neg);
79	if (error) {
80		log_debug(ls, "recover_members failed %d", error);
81		goto fail;
82	}
83	start = jiffies;
84
85	/*
86	 * Rebuild our own share of the directory by collecting from all other
87	 * nodes their master rsb names that hash to us.
88	 */
89
90	error = dlm_recover_directory(ls);
91	if (error) {
92		log_debug(ls, "recover_directory failed %d", error);
93		goto fail;
94	}
95
96	/*
97	 * Wait for all nodes to complete directory rebuild.
98	 */
99
100	error = dlm_recover_directory_wait(ls);
101	if (error) {
102		log_debug(ls, "recover_directory_wait failed %d", error);
103		goto fail;
104	}
105
106	/*
107	 * We may have outstanding operations that are waiting for a reply from
108	 * a failed node.  Mark these to be resent after recovery.  Unlock and
109	 * cancel ops can just be completed.
110	 */
111
112	dlm_recover_waiters_pre(ls);
113
114	error = dlm_recovery_stopped(ls);
115	if (error)
116		goto fail;
117
118	if (neg || dlm_no_directory(ls)) {
119		/*
120		 * Clear lkb's for departed nodes.
121		 */
122
123		dlm_purge_locks(ls);
124
125		/*
126		 * Get new master nodeid's for rsb's that were mastered on
127		 * departed nodes.
128		 */
129
130		error = dlm_recover_masters(ls);
131		if (error) {
132			log_debug(ls, "recover_masters failed %d", error);
133			goto fail;
134		}
135
136		/*
137		 * Send our locks on remastered rsb's to the new masters.
138		 */
139
140		error = dlm_recover_locks(ls);
141		if (error) {
142			log_debug(ls, "recover_locks failed %d", error);
143			goto fail;
144		}
145
146		error = dlm_recover_locks_wait(ls);
147		if (error) {
148			log_debug(ls, "recover_locks_wait failed %d", error);
149			goto fail;
150		}
151
152		/*
153		 * Finalize state in master rsb's now that all locks can be
154		 * checked.  This includes conversion resolution and lvb
155		 * settings.
156		 */
157
158		dlm_recover_rsbs(ls);
159	} else {
160		/*
161		 * Other lockspace members may be going through the "neg" steps
162		 * while also adding us to the lockspace, in which case they'll
163		 * be doing the recover_locks (RS_LOCKS) barrier.
164		 */
165		dlm_set_recover_status(ls, DLM_RS_LOCKS);
166
167		error = dlm_recover_locks_wait(ls);
168		if (error) {
169			log_debug(ls, "recover_locks_wait failed %d", error);
170			goto fail;
171		}
172	}
173
174	dlm_release_root_list(ls);
175
176	/*
177	 * Purge directory-related requests that are saved in requestqueue.
178	 * All dir requests from before recovery are invalid now due to the dir
179	 * rebuild and will be resent by the requesting nodes.
180	 */
181
182	dlm_purge_requestqueue(ls);
183
184	dlm_set_recover_status(ls, DLM_RS_DONE);
185	error = dlm_recover_done_wait(ls);
186	if (error) {
187		log_debug(ls, "recover_done_wait failed %d", error);
188		goto fail;
189	}
190
191	dlm_clear_members_gone(ls);
192
193	error = enable_locking(ls, rv->seq);
194	if (error) {
195		log_debug(ls, "enable_locking failed %d", error);
196		goto fail;
197	}
198
199	error = dlm_process_requestqueue(ls);
200	if (error) {
201		log_debug(ls, "process_requestqueue failed %d", error);
202		goto fail;
203	}
204
205	error = dlm_recover_waiters_post(ls);
206	if (error) {
207		log_debug(ls, "recover_waiters_post failed %d", error);
208		goto fail;
209	}
210
211	dlm_grant_after_purge(ls);
212
213	dlm_astd_wake();
214
215	log_debug(ls, "recover %llx done: %u ms",
216		  (unsigned long long)rv->seq,
217		  jiffies_to_msecs(jiffies - start));
218	mutex_unlock(&ls->ls_recoverd_active);
219
220	return 0;
221
222 fail:
223	dlm_release_root_list(ls);
224	log_debug(ls, "recover %llx error %d",
225		  (unsigned long long)rv->seq, error);
226	mutex_unlock(&ls->ls_recoverd_active);
227	return error;
228}
229
230/* The dlm_ls_start() that created the rv we take here may already have been
231   stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
232   flag set. */
233
234static void do_ls_recovery(struct dlm_ls *ls)
235{
236	struct dlm_recover *rv = NULL;
237
238	spin_lock(&ls->ls_recover_lock);
239	rv = ls->ls_recover_args;
240	ls->ls_recover_args = NULL;
241	if (rv && ls->ls_recover_seq == rv->seq)
242		clear_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
243	spin_unlock(&ls->ls_recover_lock);
244
245	if (rv) {
246		ls_recover(ls, rv);
247		kfree(rv->nodeids);
248		kfree(rv);
249	}
250}
251
252static int dlm_recoverd(void *arg)
253{
254	struct dlm_ls *ls;
255
256	ls = dlm_find_lockspace_local(arg);
257	if (!ls) {
258		log_print("dlm_recoverd: no lockspace %p", arg);
259		return -1;
260	}
261
262	while (!kthread_should_stop()) {
263		set_current_state(TASK_INTERRUPTIBLE);
264		if (!test_bit(LSFL_WORK, &ls->ls_flags))
265			schedule();
266		set_current_state(TASK_RUNNING);
267
268		if (test_and_clear_bit(LSFL_WORK, &ls->ls_flags))
269			do_ls_recovery(ls);
270	}
271
272	dlm_put_lockspace(ls);
273	return 0;
274}
275
276void dlm_recoverd_kick(struct dlm_ls *ls)
277{
278	set_bit(LSFL_WORK, &ls->ls_flags);
279	wake_up_process(ls->ls_recoverd_task);
280}
281
282int dlm_recoverd_start(struct dlm_ls *ls)
283{
284	struct task_struct *p;
285	int error = 0;
286
287	p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
288	if (IS_ERR(p))
289		error = PTR_ERR(p);
290	else
291                ls->ls_recoverd_task = p;
292	return error;
293}
294
295void dlm_recoverd_stop(struct dlm_ls *ls)
296{
297	kthread_stop(ls->ls_recoverd_task);
298}
299
300void dlm_recoverd_suspend(struct dlm_ls *ls)
301{
302	wake_up(&ls->ls_wait_general);
303	mutex_lock(&ls->ls_recoverd_active);
304}
305
306void dlm_recoverd_resume(struct dlm_ls *ls)
307{
308	mutex_unlock(&ls->ls_recoverd_active);
309}
310