1// SPDX-License-Identifier: GPL-2.0-or-later
2/*
3 * journal.c
4 *
5 * Defines functions of journalling api
6 *
7 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
8 */
9
10#include <linux/fs.h>
11#include <linux/types.h>
12#include <linux/slab.h>
13#include <linux/highmem.h>
14#include <linux/kthread.h>
15#include <linux/time.h>
16#include <linux/random.h>
17#include <linux/delay.h>
18
19#include <cluster/masklog.h>
20
21#include "ocfs2.h"
22
23#include "alloc.h"
24#include "blockcheck.h"
25#include "dir.h"
26#include "dlmglue.h"
27#include "extent_map.h"
28#include "heartbeat.h"
29#include "inode.h"
30#include "journal.h"
31#include "localalloc.h"
32#include "slot_map.h"
33#include "super.h"
34#include "sysfile.h"
35#include "uptodate.h"
36#include "quota.h"
37#include "file.h"
38#include "namei.h"
39
40#include "buffer_head_io.h"
41#include "ocfs2_trace.h"
42
43DEFINE_SPINLOCK(trans_inc_lock);
44
45#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
46
47static int ocfs2_force_read_journal(struct inode *inode);
48static int ocfs2_recover_node(struct ocfs2_super *osb,
49			      int node_num, int slot_num);
50static int __ocfs2_recovery_thread(void *arg);
51static int ocfs2_commit_cache(struct ocfs2_super *osb);
52static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
53static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
54				      int dirty, int replayed);
55static int ocfs2_trylock_journal(struct ocfs2_super *osb,
56				 int slot_num);
57static int ocfs2_recover_orphans(struct ocfs2_super *osb,
58				 int slot,
59				 enum ocfs2_orphan_reco_type orphan_reco_type);
60static int ocfs2_commit_thread(void *arg);
61static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
62					    int slot_num,
63					    struct ocfs2_dinode *la_dinode,
64					    struct ocfs2_dinode *tl_dinode,
65					    struct ocfs2_quota_recovery *qrec,
66					    enum ocfs2_orphan_reco_type orphan_reco_type);
67
68static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb)
69{
70	return __ocfs2_wait_on_mount(osb, 0);
71}
72
73static inline int ocfs2_wait_on_quotas(struct ocfs2_super *osb)
74{
75	return __ocfs2_wait_on_mount(osb, 1);
76}
77
78/*
79 * This replay_map is to track online/offline slots, so we could recover
80 * offline slots during recovery and mount
81 */
82
83enum ocfs2_replay_state {
84	REPLAY_UNNEEDED = 0,	/* Replay is not needed, so ignore this map */
85	REPLAY_NEEDED, 		/* Replay slots marked in rm_replay_slots */
86	REPLAY_DONE 		/* Replay was already queued */
87};
88
89struct ocfs2_replay_map {
90	unsigned int rm_slots;
91	enum ocfs2_replay_state rm_state;
92	unsigned char rm_replay_slots[];
93};
94
95static void ocfs2_replay_map_set_state(struct ocfs2_super *osb, int state)
96{
97	if (!osb->replay_map)
98		return;
99
100	/* If we've already queued the replay, we don't have any more to do */
101	if (osb->replay_map->rm_state == REPLAY_DONE)
102		return;
103
104	osb->replay_map->rm_state = state;
105}
106
107int ocfs2_compute_replay_slots(struct ocfs2_super *osb)
108{
109	struct ocfs2_replay_map *replay_map;
110	int i, node_num;
111
112	/* If replay map is already set, we don't do it again */
113	if (osb->replay_map)
114		return 0;
115
116	replay_map = kzalloc(sizeof(struct ocfs2_replay_map) +
117			     (osb->max_slots * sizeof(char)), GFP_KERNEL);
118
119	if (!replay_map) {
120		mlog_errno(-ENOMEM);
121		return -ENOMEM;
122	}
123
124	spin_lock(&osb->osb_lock);
125
126	replay_map->rm_slots = osb->max_slots;
127	replay_map->rm_state = REPLAY_UNNEEDED;
128
129	/* set rm_replay_slots for offline slot(s) */
130	for (i = 0; i < replay_map->rm_slots; i++) {
131		if (ocfs2_slot_to_node_num_locked(osb, i, &node_num) == -ENOENT)
132			replay_map->rm_replay_slots[i] = 1;
133	}
134
135	osb->replay_map = replay_map;
136	spin_unlock(&osb->osb_lock);
137	return 0;
138}
139
140static void ocfs2_queue_replay_slots(struct ocfs2_super *osb,
141		enum ocfs2_orphan_reco_type orphan_reco_type)
142{
143	struct ocfs2_replay_map *replay_map = osb->replay_map;
144	int i;
145
146	if (!replay_map)
147		return;
148
149	if (replay_map->rm_state != REPLAY_NEEDED)
150		return;
151
152	for (i = 0; i < replay_map->rm_slots; i++)
153		if (replay_map->rm_replay_slots[i])
154			ocfs2_queue_recovery_completion(osb->journal, i, NULL,
155							NULL, NULL,
156							orphan_reco_type);
157	replay_map->rm_state = REPLAY_DONE;
158}
159
160static void ocfs2_free_replay_slots(struct ocfs2_super *osb)
161{
162	struct ocfs2_replay_map *replay_map = osb->replay_map;
163
164	if (!osb->replay_map)
165		return;
166
167	kfree(replay_map);
168	osb->replay_map = NULL;
169}
170
171int ocfs2_recovery_init(struct ocfs2_super *osb)
172{
173	struct ocfs2_recovery_map *rm;
174
175	mutex_init(&osb->recovery_lock);
176	osb->disable_recovery = 0;
177	osb->recovery_thread_task = NULL;
178	init_waitqueue_head(&osb->recovery_event);
179
180	rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
181		     osb->max_slots * sizeof(unsigned int),
182		     GFP_KERNEL);
183	if (!rm) {
184		mlog_errno(-ENOMEM);
185		return -ENOMEM;
186	}
187
188	rm->rm_entries = (unsigned int *)((char *)rm +
189					  sizeof(struct ocfs2_recovery_map));
190	osb->recovery_map = rm;
191
192	return 0;
193}
194
195/* we can't grab the goofy sem lock from inside wait_event, so we use
196 * memory barriers to make sure that we'll see the null task before
197 * being woken up */
198static int ocfs2_recovery_thread_running(struct ocfs2_super *osb)
199{
200	mb();
201	return osb->recovery_thread_task != NULL;
202}
203
204void ocfs2_recovery_exit(struct ocfs2_super *osb)
205{
206	struct ocfs2_recovery_map *rm;
207
208	/* disable any new recovery threads and wait for any currently
209	 * running ones to exit. Do this before setting the vol_state. */
210	mutex_lock(&osb->recovery_lock);
211	osb->disable_recovery = 1;
212	mutex_unlock(&osb->recovery_lock);
213	wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
214
215	/* At this point, we know that no more recovery threads can be
216	 * launched, so wait for any recovery completion work to
217	 * complete. */
218	if (osb->ocfs2_wq)
219		flush_workqueue(osb->ocfs2_wq);
220
221	/*
222	 * Now that recovery is shut down, and the osb is about to be
223	 * freed,  the osb_lock is not taken here.
224	 */
225	rm = osb->recovery_map;
226	/* XXX: Should we bug if there are dirty entries? */
227
228	kfree(rm);
229}
230
231static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
232				     unsigned int node_num)
233{
234	int i;
235	struct ocfs2_recovery_map *rm = osb->recovery_map;
236
237	assert_spin_locked(&osb->osb_lock);
238
239	for (i = 0; i < rm->rm_used; i++) {
240		if (rm->rm_entries[i] == node_num)
241			return 1;
242	}
243
244	return 0;
245}
246
247/* Behaves like test-and-set.  Returns the previous value */
248static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
249				  unsigned int node_num)
250{
251	struct ocfs2_recovery_map *rm = osb->recovery_map;
252
253	spin_lock(&osb->osb_lock);
254	if (__ocfs2_recovery_map_test(osb, node_num)) {
255		spin_unlock(&osb->osb_lock);
256		return 1;
257	}
258
259	/* XXX: Can this be exploited? Not from o2dlm... */
260	BUG_ON(rm->rm_used >= osb->max_slots);
261
262	rm->rm_entries[rm->rm_used] = node_num;
263	rm->rm_used++;
264	spin_unlock(&osb->osb_lock);
265
266	return 0;
267}
268
269static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
270				     unsigned int node_num)
271{
272	int i;
273	struct ocfs2_recovery_map *rm = osb->recovery_map;
274
275	spin_lock(&osb->osb_lock);
276
277	for (i = 0; i < rm->rm_used; i++) {
278		if (rm->rm_entries[i] == node_num)
279			break;
280	}
281
282	if (i < rm->rm_used) {
283		/* XXX: be careful with the pointer math */
284		memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
285			(rm->rm_used - i - 1) * sizeof(unsigned int));
286		rm->rm_used--;
287	}
288
289	spin_unlock(&osb->osb_lock);
290}
291
292static int ocfs2_commit_cache(struct ocfs2_super *osb)
293{
294	int status = 0;
295	unsigned int flushed;
296	struct ocfs2_journal *journal = NULL;
297
298	journal = osb->journal;
299
300	/* Flush all pending commits and checkpoint the journal. */
301	down_write(&journal->j_trans_barrier);
302
303	flushed = atomic_read(&journal->j_num_trans);
304	trace_ocfs2_commit_cache_begin(flushed);
305	if (flushed == 0) {
306		up_write(&journal->j_trans_barrier);
307		goto finally;
308	}
309
310	jbd2_journal_lock_updates(journal->j_journal);
311	status = jbd2_journal_flush(journal->j_journal, 0);
312	jbd2_journal_unlock_updates(journal->j_journal);
313	if (status < 0) {
314		up_write(&journal->j_trans_barrier);
315		mlog_errno(status);
316		goto finally;
317	}
318
319	ocfs2_inc_trans_id(journal);
320
321	flushed = atomic_read(&journal->j_num_trans);
322	atomic_set(&journal->j_num_trans, 0);
323	up_write(&journal->j_trans_barrier);
324
325	trace_ocfs2_commit_cache_end(journal->j_trans_id, flushed);
326
327	ocfs2_wake_downconvert_thread(osb);
328	wake_up(&journal->j_checkpointed);
329finally:
330	return status;
331}
332
333handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs)
334{
335	journal_t *journal = osb->journal->j_journal;
336	handle_t *handle;
337
338	BUG_ON(!osb || !osb->journal->j_journal);
339
340	if (ocfs2_is_hard_readonly(osb))
341		return ERR_PTR(-EROFS);
342
343	BUG_ON(osb->journal->j_state == OCFS2_JOURNAL_FREE);
344	BUG_ON(max_buffs <= 0);
345
346	/* Nested transaction? Just return the handle... */
347	if (journal_current_handle())
348		return jbd2_journal_start(journal, max_buffs);
349
350	sb_start_intwrite(osb->sb);
351
352	down_read(&osb->journal->j_trans_barrier);
353
354	handle = jbd2_journal_start(journal, max_buffs);
355	if (IS_ERR(handle)) {
356		up_read(&osb->journal->j_trans_barrier);
357		sb_end_intwrite(osb->sb);
358
359		mlog_errno(PTR_ERR(handle));
360
361		if (is_journal_aborted(journal)) {
362			ocfs2_abort(osb->sb, "Detected aborted journal\n");
363			handle = ERR_PTR(-EROFS);
364		}
365	} else {
366		if (!ocfs2_mount_local(osb))
367			atomic_inc(&(osb->journal->j_num_trans));
368	}
369
370	return handle;
371}
372
373int ocfs2_commit_trans(struct ocfs2_super *osb,
374		       handle_t *handle)
375{
376	int ret, nested;
377	struct ocfs2_journal *journal = osb->journal;
378
379	BUG_ON(!handle);
380
381	nested = handle->h_ref > 1;
382	ret = jbd2_journal_stop(handle);
383	if (ret < 0)
384		mlog_errno(ret);
385
386	if (!nested) {
387		up_read(&journal->j_trans_barrier);
388		sb_end_intwrite(osb->sb);
389	}
390
391	return ret;
392}
393
394/*
395 * 'nblocks' is what you want to add to the current transaction.
396 *
397 * This might call jbd2_journal_restart() which will commit dirty buffers
398 * and then restart the transaction. Before calling
399 * ocfs2_extend_trans(), any changed blocks should have been
400 * dirtied. After calling it, all blocks which need to be changed must
401 * go through another set of journal_access/journal_dirty calls.
402 *
403 * WARNING: This will not release any semaphores or disk locks taken
404 * during the transaction, so make sure they were taken *before*
405 * start_trans or we'll have ordering deadlocks.
406 *
407 * WARNING2: Note that we do *not* drop j_trans_barrier here. This is
408 * good because transaction ids haven't yet been recorded on the
409 * cluster locks associated with this handle.
410 */
411int ocfs2_extend_trans(handle_t *handle, int nblocks)
412{
413	int status, old_nblocks;
414
415	BUG_ON(!handle);
416	BUG_ON(nblocks < 0);
417
418	if (!nblocks)
419		return 0;
420
421	old_nblocks = jbd2_handle_buffer_credits(handle);
422
423	trace_ocfs2_extend_trans(old_nblocks, nblocks);
424
425#ifdef CONFIG_OCFS2_DEBUG_FS
426	status = 1;
427#else
428	status = jbd2_journal_extend(handle, nblocks, 0);
429	if (status < 0) {
430		mlog_errno(status);
431		goto bail;
432	}
433#endif
434
435	if (status > 0) {
436		trace_ocfs2_extend_trans_restart(old_nblocks + nblocks);
437		status = jbd2_journal_restart(handle,
438					      old_nblocks + nblocks);
439		if (status < 0) {
440			mlog_errno(status);
441			goto bail;
442		}
443	}
444
445	status = 0;
446bail:
447	return status;
448}
449
450/*
451 * If we have fewer than thresh credits, extend by OCFS2_MAX_TRANS_DATA.
452 * If that fails, restart the transaction & regain write access for the
453 * buffer head which is used for metadata modifications.
454 * Taken from Ext4: extend_or_restart_transaction()
455 */
456int ocfs2_allocate_extend_trans(handle_t *handle, int thresh)
457{
458	int status, old_nblks;
459
460	BUG_ON(!handle);
461
462	old_nblks = jbd2_handle_buffer_credits(handle);
463	trace_ocfs2_allocate_extend_trans(old_nblks, thresh);
464
465	if (old_nblks < thresh)
466		return 0;
467
468	status = jbd2_journal_extend(handle, OCFS2_MAX_TRANS_DATA, 0);
469	if (status < 0) {
470		mlog_errno(status);
471		goto bail;
472	}
473
474	if (status > 0) {
475		status = jbd2_journal_restart(handle, OCFS2_MAX_TRANS_DATA);
476		if (status < 0)
477			mlog_errno(status);
478	}
479
480bail:
481	return status;
482}
483
484
485struct ocfs2_triggers {
486	struct jbd2_buffer_trigger_type	ot_triggers;
487	int				ot_offset;
488};
489
490static inline struct ocfs2_triggers *to_ocfs2_trigger(struct jbd2_buffer_trigger_type *triggers)
491{
492	return container_of(triggers, struct ocfs2_triggers, ot_triggers);
493}
494
495static void ocfs2_frozen_trigger(struct jbd2_buffer_trigger_type *triggers,
496				 struct buffer_head *bh,
497				 void *data, size_t size)
498{
499	struct ocfs2_triggers *ot = to_ocfs2_trigger(triggers);
500
501	/*
502	 * We aren't guaranteed to have the superblock here, so we
503	 * must unconditionally compute the ecc data.
504	 * __ocfs2_journal_access() will only set the triggers if
505	 * metaecc is enabled.
506	 */
507	ocfs2_block_check_compute(data, size, data + ot->ot_offset);
508}
509
510/*
511 * Quota blocks have their own trigger because the struct ocfs2_block_check
512 * offset depends on the blocksize.
513 */
514static void ocfs2_dq_frozen_trigger(struct jbd2_buffer_trigger_type *triggers,
515				 struct buffer_head *bh,
516				 void *data, size_t size)
517{
518	struct ocfs2_disk_dqtrailer *dqt =
519		ocfs2_block_dqtrailer(size, data);
520
521	/*
522	 * We aren't guaranteed to have the superblock here, so we
523	 * must unconditionally compute the ecc data.
524	 * __ocfs2_journal_access() will only set the triggers if
525	 * metaecc is enabled.
526	 */
527	ocfs2_block_check_compute(data, size, &dqt->dq_check);
528}
529
530/*
531 * Directory blocks also have their own trigger because the
532 * struct ocfs2_block_check offset depends on the blocksize.
533 */
534static void ocfs2_db_frozen_trigger(struct jbd2_buffer_trigger_type *triggers,
535				 struct buffer_head *bh,
536				 void *data, size_t size)
537{
538	struct ocfs2_dir_block_trailer *trailer =
539		ocfs2_dir_trailer_from_size(size, data);
540
541	/*
542	 * We aren't guaranteed to have the superblock here, so we
543	 * must unconditionally compute the ecc data.
544	 * __ocfs2_journal_access() will only set the triggers if
545	 * metaecc is enabled.
546	 */
547	ocfs2_block_check_compute(data, size, &trailer->db_check);
548}
549
550static void ocfs2_abort_trigger(struct jbd2_buffer_trigger_type *triggers,
551				struct buffer_head *bh)
552{
553	mlog(ML_ERROR,
554	     "ocfs2_abort_trigger called by JBD2.  bh = 0x%lx, "
555	     "bh->b_blocknr = %llu\n",
556	     (unsigned long)bh,
557	     (unsigned long long)bh->b_blocknr);
558
559	ocfs2_error(bh->b_bdev->bd_super,
560		    "JBD2 has aborted our journal, ocfs2 cannot continue\n");
561}
562
563static struct ocfs2_triggers di_triggers = {
564	.ot_triggers = {
565		.t_frozen = ocfs2_frozen_trigger,
566		.t_abort = ocfs2_abort_trigger,
567	},
568	.ot_offset	= offsetof(struct ocfs2_dinode, i_check),
569};
570
571static struct ocfs2_triggers eb_triggers = {
572	.ot_triggers = {
573		.t_frozen = ocfs2_frozen_trigger,
574		.t_abort = ocfs2_abort_trigger,
575	},
576	.ot_offset	= offsetof(struct ocfs2_extent_block, h_check),
577};
578
579static struct ocfs2_triggers rb_triggers = {
580	.ot_triggers = {
581		.t_frozen = ocfs2_frozen_trigger,
582		.t_abort = ocfs2_abort_trigger,
583	},
584	.ot_offset	= offsetof(struct ocfs2_refcount_block, rf_check),
585};
586
587static struct ocfs2_triggers gd_triggers = {
588	.ot_triggers = {
589		.t_frozen = ocfs2_frozen_trigger,
590		.t_abort = ocfs2_abort_trigger,
591	},
592	.ot_offset	= offsetof(struct ocfs2_group_desc, bg_check),
593};
594
595static struct ocfs2_triggers db_triggers = {
596	.ot_triggers = {
597		.t_frozen = ocfs2_db_frozen_trigger,
598		.t_abort = ocfs2_abort_trigger,
599	},
600};
601
602static struct ocfs2_triggers xb_triggers = {
603	.ot_triggers = {
604		.t_frozen = ocfs2_frozen_trigger,
605		.t_abort = ocfs2_abort_trigger,
606	},
607	.ot_offset	= offsetof(struct ocfs2_xattr_block, xb_check),
608};
609
610static struct ocfs2_triggers dq_triggers = {
611	.ot_triggers = {
612		.t_frozen = ocfs2_dq_frozen_trigger,
613		.t_abort = ocfs2_abort_trigger,
614	},
615};
616
617static struct ocfs2_triggers dr_triggers = {
618	.ot_triggers = {
619		.t_frozen = ocfs2_frozen_trigger,
620		.t_abort = ocfs2_abort_trigger,
621	},
622	.ot_offset	= offsetof(struct ocfs2_dx_root_block, dr_check),
623};
624
625static struct ocfs2_triggers dl_triggers = {
626	.ot_triggers = {
627		.t_frozen = ocfs2_frozen_trigger,
628		.t_abort = ocfs2_abort_trigger,
629	},
630	.ot_offset	= offsetof(struct ocfs2_dx_leaf, dl_check),
631};
632
633static int __ocfs2_journal_access(handle_t *handle,
634				  struct ocfs2_caching_info *ci,
635				  struct buffer_head *bh,
636				  struct ocfs2_triggers *triggers,
637				  int type)
638{
639	int status;
640	struct ocfs2_super *osb =
641		OCFS2_SB(ocfs2_metadata_cache_get_super(ci));
642
643	BUG_ON(!ci || !ci->ci_ops);
644	BUG_ON(!handle);
645	BUG_ON(!bh);
646
647	trace_ocfs2_journal_access(
648		(unsigned long long)ocfs2_metadata_cache_owner(ci),
649		(unsigned long long)bh->b_blocknr, type, bh->b_size);
650
651	/* we can safely remove this assertion after testing. */
652	if (!buffer_uptodate(bh)) {
653		mlog(ML_ERROR, "giving me a buffer that's not uptodate!\n");
654		mlog(ML_ERROR, "b_blocknr=%llu, b_state=0x%lx\n",
655		     (unsigned long long)bh->b_blocknr, bh->b_state);
656
657		lock_buffer(bh);
658		/*
659		 * A previous transaction with a couple of buffer heads fail
660		 * to checkpoint, so all the bhs are marked as BH_Write_EIO.
661		 * For current transaction, the bh is just among those error
662		 * bhs which previous transaction handle. We can't just clear
663		 * its BH_Write_EIO and reuse directly, since other bhs are
664		 * not written to disk yet and that will cause metadata
665		 * inconsistency. So we should set fs read-only to avoid
666		 * further damage.
667		 */
668		if (buffer_write_io_error(bh) && !buffer_uptodate(bh)) {
669			unlock_buffer(bh);
670			return ocfs2_error(osb->sb, "A previous attempt to "
671					"write this buffer head failed\n");
672		}
673		unlock_buffer(bh);
674	}
675
676	/* Set the current transaction information on the ci so
677	 * that the locking code knows whether it can drop it's locks
678	 * on this ci or not. We're protected from the commit
679	 * thread updating the current transaction id until
680	 * ocfs2_commit_trans() because ocfs2_start_trans() took
681	 * j_trans_barrier for us. */
682	ocfs2_set_ci_lock_trans(osb->journal, ci);
683
684	ocfs2_metadata_cache_io_lock(ci);
685	switch (type) {
686	case OCFS2_JOURNAL_ACCESS_CREATE:
687	case OCFS2_JOURNAL_ACCESS_WRITE:
688		status = jbd2_journal_get_write_access(handle, bh);
689		break;
690
691	case OCFS2_JOURNAL_ACCESS_UNDO:
692		status = jbd2_journal_get_undo_access(handle, bh);
693		break;
694
695	default:
696		status = -EINVAL;
697		mlog(ML_ERROR, "Unknown access type!\n");
698	}
699	if (!status && ocfs2_meta_ecc(osb) && triggers)
700		jbd2_journal_set_triggers(bh, &triggers->ot_triggers);
701	ocfs2_metadata_cache_io_unlock(ci);
702
703	if (status < 0)
704		mlog(ML_ERROR, "Error %d getting %d access to buffer!\n",
705		     status, type);
706
707	return status;
708}
709
710int ocfs2_journal_access_di(handle_t *handle, struct ocfs2_caching_info *ci,
711			    struct buffer_head *bh, int type)
712{
713	return __ocfs2_journal_access(handle, ci, bh, &di_triggers, type);
714}
715
716int ocfs2_journal_access_eb(handle_t *handle, struct ocfs2_caching_info *ci,
717			    struct buffer_head *bh, int type)
718{
719	return __ocfs2_journal_access(handle, ci, bh, &eb_triggers, type);
720}
721
722int ocfs2_journal_access_rb(handle_t *handle, struct ocfs2_caching_info *ci,
723			    struct buffer_head *bh, int type)
724{
725	return __ocfs2_journal_access(handle, ci, bh, &rb_triggers,
726				      type);
727}
728
729int ocfs2_journal_access_gd(handle_t *handle, struct ocfs2_caching_info *ci,
730			    struct buffer_head *bh, int type)
731{
732	return __ocfs2_journal_access(handle, ci, bh, &gd_triggers, type);
733}
734
735int ocfs2_journal_access_db(handle_t *handle, struct ocfs2_caching_info *ci,
736			    struct buffer_head *bh, int type)
737{
738	return __ocfs2_journal_access(handle, ci, bh, &db_triggers, type);
739}
740
741int ocfs2_journal_access_xb(handle_t *handle, struct ocfs2_caching_info *ci,
742			    struct buffer_head *bh, int type)
743{
744	return __ocfs2_journal_access(handle, ci, bh, &xb_triggers, type);
745}
746
747int ocfs2_journal_access_dq(handle_t *handle, struct ocfs2_caching_info *ci,
748			    struct buffer_head *bh, int type)
749{
750	return __ocfs2_journal_access(handle, ci, bh, &dq_triggers, type);
751}
752
753int ocfs2_journal_access_dr(handle_t *handle, struct ocfs2_caching_info *ci,
754			    struct buffer_head *bh, int type)
755{
756	return __ocfs2_journal_access(handle, ci, bh, &dr_triggers, type);
757}
758
759int ocfs2_journal_access_dl(handle_t *handle, struct ocfs2_caching_info *ci,
760			    struct buffer_head *bh, int type)
761{
762	return __ocfs2_journal_access(handle, ci, bh, &dl_triggers, type);
763}
764
765int ocfs2_journal_access(handle_t *handle, struct ocfs2_caching_info *ci,
766			 struct buffer_head *bh, int type)
767{
768	return __ocfs2_journal_access(handle, ci, bh, NULL, type);
769}
770
771void ocfs2_journal_dirty(handle_t *handle, struct buffer_head *bh)
772{
773	int status;
774
775	trace_ocfs2_journal_dirty((unsigned long long)bh->b_blocknr);
776
777	status = jbd2_journal_dirty_metadata(handle, bh);
778	if (status) {
779		mlog_errno(status);
780		if (!is_handle_aborted(handle)) {
781			journal_t *journal = handle->h_transaction->t_journal;
782			struct super_block *sb = bh->b_bdev->bd_super;
783
784			mlog(ML_ERROR, "jbd2_journal_dirty_metadata failed. "
785					"Aborting transaction and journal.\n");
786			handle->h_err = status;
787			jbd2_journal_abort_handle(handle);
788			jbd2_journal_abort(journal, status);
789			ocfs2_abort(sb, "Journal already aborted.\n");
790		}
791	}
792}
793
794#define OCFS2_DEFAULT_COMMIT_INTERVAL	(HZ * JBD2_DEFAULT_MAX_COMMIT_AGE)
795
796void ocfs2_set_journal_params(struct ocfs2_super *osb)
797{
798	journal_t *journal = osb->journal->j_journal;
799	unsigned long commit_interval = OCFS2_DEFAULT_COMMIT_INTERVAL;
800
801	if (osb->osb_commit_interval)
802		commit_interval = osb->osb_commit_interval;
803
804	write_lock(&journal->j_state_lock);
805	journal->j_commit_interval = commit_interval;
806	if (osb->s_mount_opt & OCFS2_MOUNT_BARRIER)
807		journal->j_flags |= JBD2_BARRIER;
808	else
809		journal->j_flags &= ~JBD2_BARRIER;
810	write_unlock(&journal->j_state_lock);
811}
812
813int ocfs2_journal_init(struct ocfs2_super *osb, int *dirty)
814{
815	int status = -1;
816	struct inode *inode = NULL; /* the journal inode */
817	journal_t *j_journal = NULL;
818	struct ocfs2_journal *journal = NULL;
819	struct ocfs2_dinode *di = NULL;
820	struct buffer_head *bh = NULL;
821	int inode_lock = 0;
822
823	/* initialize our journal structure */
824	journal = kzalloc(sizeof(struct ocfs2_journal), GFP_KERNEL);
825	if (!journal) {
826		mlog(ML_ERROR, "unable to alloc journal\n");
827		status = -ENOMEM;
828		goto done;
829	}
830	osb->journal = journal;
831	journal->j_osb = osb;
832
833	atomic_set(&journal->j_num_trans, 0);
834	init_rwsem(&journal->j_trans_barrier);
835	init_waitqueue_head(&journal->j_checkpointed);
836	spin_lock_init(&journal->j_lock);
837	journal->j_trans_id = 1UL;
838	INIT_LIST_HEAD(&journal->j_la_cleanups);
839	INIT_WORK(&journal->j_recovery_work, ocfs2_complete_recovery);
840	journal->j_state = OCFS2_JOURNAL_FREE;
841
842	/* already have the inode for our journal */
843	inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
844					    osb->slot_num);
845	if (inode == NULL) {
846		status = -EACCES;
847		mlog_errno(status);
848		goto done;
849	}
850	if (is_bad_inode(inode)) {
851		mlog(ML_ERROR, "access error (bad inode)\n");
852		iput(inode);
853		inode = NULL;
854		status = -EACCES;
855		goto done;
856	}
857
858	SET_INODE_JOURNAL(inode);
859	OCFS2_I(inode)->ip_open_count++;
860
861	/* Skip recovery waits here - journal inode metadata never
862	 * changes in a live cluster so it can be considered an
863	 * exception to the rule. */
864	status = ocfs2_inode_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY);
865	if (status < 0) {
866		if (status != -ERESTARTSYS)
867			mlog(ML_ERROR, "Could not get lock on journal!\n");
868		goto done;
869	}
870
871	inode_lock = 1;
872	di = (struct ocfs2_dinode *)bh->b_data;
873
874	if (i_size_read(inode) <  OCFS2_MIN_JOURNAL_SIZE) {
875		mlog(ML_ERROR, "Journal file size (%lld) is too small!\n",
876		     i_size_read(inode));
877		status = -EINVAL;
878		goto done;
879	}
880
881	trace_ocfs2_journal_init(i_size_read(inode),
882				 (unsigned long long)inode->i_blocks,
883				 OCFS2_I(inode)->ip_clusters);
884
885	/* call the kernels journal init function now */
886	j_journal = jbd2_journal_init_inode(inode);
887	if (j_journal == NULL) {
888		mlog(ML_ERROR, "Linux journal layer error\n");
889		status = -EINVAL;
890		goto done;
891	}
892
893	trace_ocfs2_journal_init_maxlen(j_journal->j_total_len);
894
895	*dirty = (le32_to_cpu(di->id1.journal1.ij_flags) &
896		  OCFS2_JOURNAL_DIRTY_FL);
897
898	journal->j_journal = j_journal;
899	journal->j_journal->j_submit_inode_data_buffers =
900		jbd2_journal_submit_inode_data_buffers;
901	journal->j_journal->j_finish_inode_data_buffers =
902		jbd2_journal_finish_inode_data_buffers;
903	journal->j_inode = inode;
904	journal->j_bh = bh;
905
906	ocfs2_set_journal_params(osb);
907
908	journal->j_state = OCFS2_JOURNAL_LOADED;
909
910	status = 0;
911done:
912	if (status < 0) {
913		if (inode_lock)
914			ocfs2_inode_unlock(inode, 1);
915		brelse(bh);
916		if (inode) {
917			OCFS2_I(inode)->ip_open_count--;
918			iput(inode);
919		}
920	}
921
922	return status;
923}
924
925static void ocfs2_bump_recovery_generation(struct ocfs2_dinode *di)
926{
927	le32_add_cpu(&(di->id1.journal1.ij_recovery_generation), 1);
928}
929
930static u32 ocfs2_get_recovery_generation(struct ocfs2_dinode *di)
931{
932	return le32_to_cpu(di->id1.journal1.ij_recovery_generation);
933}
934
935static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
936				      int dirty, int replayed)
937{
938	int status;
939	unsigned int flags;
940	struct ocfs2_journal *journal = osb->journal;
941	struct buffer_head *bh = journal->j_bh;
942	struct ocfs2_dinode *fe;
943
944	fe = (struct ocfs2_dinode *)bh->b_data;
945
946	/* The journal bh on the osb always comes from ocfs2_journal_init()
947	 * and was validated there inside ocfs2_inode_lock_full().  It's a
948	 * code bug if we mess it up. */
949	BUG_ON(!OCFS2_IS_VALID_DINODE(fe));
950
951	flags = le32_to_cpu(fe->id1.journal1.ij_flags);
952	if (dirty)
953		flags |= OCFS2_JOURNAL_DIRTY_FL;
954	else
955		flags &= ~OCFS2_JOURNAL_DIRTY_FL;
956	fe->id1.journal1.ij_flags = cpu_to_le32(flags);
957
958	if (replayed)
959		ocfs2_bump_recovery_generation(fe);
960
961	ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check);
962	status = ocfs2_write_block(osb, bh, INODE_CACHE(journal->j_inode));
963	if (status < 0)
964		mlog_errno(status);
965
966	return status;
967}
968
969/*
970 * If the journal has been kmalloc'd it needs to be freed after this
971 * call.
972 */
973void ocfs2_journal_shutdown(struct ocfs2_super *osb)
974{
975	struct ocfs2_journal *journal = NULL;
976	int status = 0;
977	struct inode *inode = NULL;
978	int num_running_trans = 0;
979
980	BUG_ON(!osb);
981
982	journal = osb->journal;
983	if (!journal)
984		goto done;
985
986	inode = journal->j_inode;
987
988	if (journal->j_state != OCFS2_JOURNAL_LOADED)
989		goto done;
990
991	/* need to inc inode use count - jbd2_journal_destroy will iput. */
992	if (!igrab(inode))
993		BUG();
994
995	num_running_trans = atomic_read(&(osb->journal->j_num_trans));
996	trace_ocfs2_journal_shutdown(num_running_trans);
997
998	/* Do a commit_cache here. It will flush our journal, *and*
999	 * release any locks that are still held.
1000	 * set the SHUTDOWN flag and release the trans lock.
1001	 * the commit thread will take the trans lock for us below. */
1002	journal->j_state = OCFS2_JOURNAL_IN_SHUTDOWN;
1003
1004	/* The OCFS2_JOURNAL_IN_SHUTDOWN will signal to commit_cache to not
1005	 * drop the trans_lock (which we want to hold until we
1006	 * completely destroy the journal. */
1007	if (osb->commit_task) {
1008		/* Wait for the commit thread */
1009		trace_ocfs2_journal_shutdown_wait(osb->commit_task);
1010		kthread_stop(osb->commit_task);
1011		osb->commit_task = NULL;
1012	}
1013
1014	BUG_ON(atomic_read(&(osb->journal->j_num_trans)) != 0);
1015
1016	if (ocfs2_mount_local(osb)) {
1017		jbd2_journal_lock_updates(journal->j_journal);
1018		status = jbd2_journal_flush(journal->j_journal, 0);
1019		jbd2_journal_unlock_updates(journal->j_journal);
1020		if (status < 0)
1021			mlog_errno(status);
1022	}
1023
1024	/* Shutdown the kernel journal system */
1025	if (!jbd2_journal_destroy(journal->j_journal) && !status) {
1026		/*
1027		 * Do not toggle if flush was unsuccessful otherwise
1028		 * will leave dirty metadata in a "clean" journal
1029		 */
1030		status = ocfs2_journal_toggle_dirty(osb, 0, 0);
1031		if (status < 0)
1032			mlog_errno(status);
1033	}
1034	journal->j_journal = NULL;
1035
1036	OCFS2_I(inode)->ip_open_count--;
1037
1038	/* unlock our journal */
1039	ocfs2_inode_unlock(inode, 1);
1040
1041	brelse(journal->j_bh);
1042	journal->j_bh = NULL;
1043
1044	journal->j_state = OCFS2_JOURNAL_FREE;
1045
1046done:
1047	iput(inode);
1048	kfree(journal);
1049	osb->journal = NULL;
1050}
1051
1052static void ocfs2_clear_journal_error(struct super_block *sb,
1053				      journal_t *journal,
1054				      int slot)
1055{
1056	int olderr;
1057
1058	olderr = jbd2_journal_errno(journal);
1059	if (olderr) {
1060		mlog(ML_ERROR, "File system error %d recorded in "
1061		     "journal %u.\n", olderr, slot);
1062		mlog(ML_ERROR, "File system on device %s needs checking.\n",
1063		     sb->s_id);
1064
1065		jbd2_journal_ack_err(journal);
1066		jbd2_journal_clear_err(journal);
1067	}
1068}
1069
1070int ocfs2_journal_load(struct ocfs2_journal *journal, int local, int replayed)
1071{
1072	int status = 0;
1073	struct ocfs2_super *osb;
1074
1075	BUG_ON(!journal);
1076
1077	osb = journal->j_osb;
1078
1079	status = jbd2_journal_load(journal->j_journal);
1080	if (status < 0) {
1081		mlog(ML_ERROR, "Failed to load journal!\n");
1082		goto done;
1083	}
1084
1085	ocfs2_clear_journal_error(osb->sb, journal->j_journal, osb->slot_num);
1086
1087	if (replayed) {
1088		jbd2_journal_lock_updates(journal->j_journal);
1089		status = jbd2_journal_flush(journal->j_journal, 0);
1090		jbd2_journal_unlock_updates(journal->j_journal);
1091		if (status < 0)
1092			mlog_errno(status);
1093	}
1094
1095	status = ocfs2_journal_toggle_dirty(osb, 1, replayed);
1096	if (status < 0) {
1097		mlog_errno(status);
1098		goto done;
1099	}
1100
1101	/* Launch the commit thread */
1102	if (!local) {
1103		osb->commit_task = kthread_run(ocfs2_commit_thread, osb,
1104				"ocfs2cmt-%s", osb->uuid_str);
1105		if (IS_ERR(osb->commit_task)) {
1106			status = PTR_ERR(osb->commit_task);
1107			osb->commit_task = NULL;
1108			mlog(ML_ERROR, "unable to launch ocfs2commit thread, "
1109			     "error=%d", status);
1110			goto done;
1111		}
1112	} else
1113		osb->commit_task = NULL;
1114
1115done:
1116	return status;
1117}
1118
1119
1120/* 'full' flag tells us whether we clear out all blocks or if we just
1121 * mark the journal clean */
1122int ocfs2_journal_wipe(struct ocfs2_journal *journal, int full)
1123{
1124	int status;
1125
1126	BUG_ON(!journal);
1127
1128	status = jbd2_journal_wipe(journal->j_journal, full);
1129	if (status < 0) {
1130		mlog_errno(status);
1131		goto bail;
1132	}
1133
1134	status = ocfs2_journal_toggle_dirty(journal->j_osb, 0, 0);
1135	if (status < 0)
1136		mlog_errno(status);
1137
1138bail:
1139	return status;
1140}
1141
1142static int ocfs2_recovery_completed(struct ocfs2_super *osb)
1143{
1144	int empty;
1145	struct ocfs2_recovery_map *rm = osb->recovery_map;
1146
1147	spin_lock(&osb->osb_lock);
1148	empty = (rm->rm_used == 0);
1149	spin_unlock(&osb->osb_lock);
1150
1151	return empty;
1152}
1153
1154void ocfs2_wait_for_recovery(struct ocfs2_super *osb)
1155{
1156	wait_event(osb->recovery_event, ocfs2_recovery_completed(osb));
1157}
1158
1159/*
1160 * JBD Might read a cached version of another nodes journal file. We
1161 * don't want this as this file changes often and we get no
1162 * notification on those changes. The only way to be sure that we've
1163 * got the most up to date version of those blocks then is to force
1164 * read them off disk. Just searching through the buffer cache won't
1165 * work as there may be pages backing this file which are still marked
1166 * up to date. We know things can't change on this file underneath us
1167 * as we have the lock by now :)
1168 */
1169static int ocfs2_force_read_journal(struct inode *inode)
1170{
1171	int status = 0;
1172	int i;
1173	u64 v_blkno, p_blkno, p_blocks, num_blocks;
1174	struct buffer_head *bh = NULL;
1175	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1176
1177	num_blocks = ocfs2_blocks_for_bytes(inode->i_sb, i_size_read(inode));
1178	v_blkno = 0;
1179	while (v_blkno < num_blocks) {
1180		status = ocfs2_extent_map_get_blocks(inode, v_blkno,
1181						     &p_blkno, &p_blocks, NULL);
1182		if (status < 0) {
1183			mlog_errno(status);
1184			goto bail;
1185		}
1186
1187		for (i = 0; i < p_blocks; i++, p_blkno++) {
1188			bh = __find_get_block(osb->sb->s_bdev, p_blkno,
1189					osb->sb->s_blocksize);
1190			/* block not cached. */
1191			if (!bh)
1192				continue;
1193
1194			brelse(bh);
1195			bh = NULL;
1196			/* We are reading journal data which should not
1197			 * be put in the uptodate cache.
1198			 */
1199			status = ocfs2_read_blocks_sync(osb, p_blkno, 1, &bh);
1200			if (status < 0) {
1201				mlog_errno(status);
1202				goto bail;
1203			}
1204
1205			brelse(bh);
1206			bh = NULL;
1207		}
1208
1209		v_blkno += p_blocks;
1210	}
1211
1212bail:
1213	return status;
1214}
1215
1216struct ocfs2_la_recovery_item {
1217	struct list_head	lri_list;
1218	int			lri_slot;
1219	struct ocfs2_dinode	*lri_la_dinode;
1220	struct ocfs2_dinode	*lri_tl_dinode;
1221	struct ocfs2_quota_recovery *lri_qrec;
1222	enum ocfs2_orphan_reco_type  lri_orphan_reco_type;
1223};
1224
1225/* Does the second half of the recovery process. By this point, the
1226 * node is marked clean and can actually be considered recovered,
1227 * hence it's no longer in the recovery map, but there's still some
1228 * cleanup we can do which shouldn't happen within the recovery thread
1229 * as locking in that context becomes very difficult if we are to take
1230 * recovering nodes into account.
1231 *
1232 * NOTE: This function can and will sleep on recovery of other nodes
1233 * during cluster locking, just like any other ocfs2 process.
1234 */
1235void ocfs2_complete_recovery(struct work_struct *work)
1236{
1237	int ret = 0;
1238	struct ocfs2_journal *journal =
1239		container_of(work, struct ocfs2_journal, j_recovery_work);
1240	struct ocfs2_super *osb = journal->j_osb;
1241	struct ocfs2_dinode *la_dinode, *tl_dinode;
1242	struct ocfs2_la_recovery_item *item, *n;
1243	struct ocfs2_quota_recovery *qrec;
1244	enum ocfs2_orphan_reco_type orphan_reco_type;
1245	LIST_HEAD(tmp_la_list);
1246
1247	trace_ocfs2_complete_recovery(
1248		(unsigned long long)OCFS2_I(journal->j_inode)->ip_blkno);
1249
1250	spin_lock(&journal->j_lock);
1251	list_splice_init(&journal->j_la_cleanups, &tmp_la_list);
1252	spin_unlock(&journal->j_lock);
1253
1254	list_for_each_entry_safe(item, n, &tmp_la_list, lri_list) {
1255		list_del_init(&item->lri_list);
1256
1257		ocfs2_wait_on_quotas(osb);
1258
1259		la_dinode = item->lri_la_dinode;
1260		tl_dinode = item->lri_tl_dinode;
1261		qrec = item->lri_qrec;
1262		orphan_reco_type = item->lri_orphan_reco_type;
1263
1264		trace_ocfs2_complete_recovery_slot(item->lri_slot,
1265			la_dinode ? le64_to_cpu(la_dinode->i_blkno) : 0,
1266			tl_dinode ? le64_to_cpu(tl_dinode->i_blkno) : 0,
1267			qrec);
1268
1269		if (la_dinode) {
1270			ret = ocfs2_complete_local_alloc_recovery(osb,
1271								  la_dinode);
1272			if (ret < 0)
1273				mlog_errno(ret);
1274
1275			kfree(la_dinode);
1276		}
1277
1278		if (tl_dinode) {
1279			ret = ocfs2_complete_truncate_log_recovery(osb,
1280								   tl_dinode);
1281			if (ret < 0)
1282				mlog_errno(ret);
1283
1284			kfree(tl_dinode);
1285		}
1286
1287		ret = ocfs2_recover_orphans(osb, item->lri_slot,
1288				orphan_reco_type);
1289		if (ret < 0)
1290			mlog_errno(ret);
1291
1292		if (qrec) {
1293			ret = ocfs2_finish_quota_recovery(osb, qrec,
1294							  item->lri_slot);
1295			if (ret < 0)
1296				mlog_errno(ret);
1297			/* Recovery info is already freed now */
1298		}
1299
1300		kfree(item);
1301	}
1302
1303	trace_ocfs2_complete_recovery_end(ret);
1304}
1305
1306/* NOTE: This function always eats your references to la_dinode and
1307 * tl_dinode, either manually on error, or by passing them to
1308 * ocfs2_complete_recovery */
1309static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
1310					    int slot_num,
1311					    struct ocfs2_dinode *la_dinode,
1312					    struct ocfs2_dinode *tl_dinode,
1313					    struct ocfs2_quota_recovery *qrec,
1314					    enum ocfs2_orphan_reco_type orphan_reco_type)
1315{
1316	struct ocfs2_la_recovery_item *item;
1317
1318	item = kmalloc(sizeof(struct ocfs2_la_recovery_item), GFP_NOFS);
1319	if (!item) {
1320		/* Though we wish to avoid it, we are in fact safe in
1321		 * skipping local alloc cleanup as fsck.ocfs2 is more
1322		 * than capable of reclaiming unused space. */
1323		kfree(la_dinode);
1324		kfree(tl_dinode);
1325
1326		if (qrec)
1327			ocfs2_free_quota_recovery(qrec);
1328
1329		mlog_errno(-ENOMEM);
1330		return;
1331	}
1332
1333	INIT_LIST_HEAD(&item->lri_list);
1334	item->lri_la_dinode = la_dinode;
1335	item->lri_slot = slot_num;
1336	item->lri_tl_dinode = tl_dinode;
1337	item->lri_qrec = qrec;
1338	item->lri_orphan_reco_type = orphan_reco_type;
1339
1340	spin_lock(&journal->j_lock);
1341	list_add_tail(&item->lri_list, &journal->j_la_cleanups);
1342	queue_work(journal->j_osb->ocfs2_wq, &journal->j_recovery_work);
1343	spin_unlock(&journal->j_lock);
1344}
1345
1346/* Called by the mount code to queue recovery the last part of
1347 * recovery for it's own and offline slot(s). */
1348void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
1349{
1350	struct ocfs2_journal *journal = osb->journal;
1351
1352	if (ocfs2_is_hard_readonly(osb))
1353		return;
1354
1355	/* No need to queue up our truncate_log as regular cleanup will catch
1356	 * that */
1357	ocfs2_queue_recovery_completion(journal, osb->slot_num,
1358					osb->local_alloc_copy, NULL, NULL,
1359					ORPHAN_NEED_TRUNCATE);
1360	ocfs2_schedule_truncate_log_flush(osb, 0);
1361
1362	osb->local_alloc_copy = NULL;
1363
1364	/* queue to recover orphan slots for all offline slots */
1365	ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
1366	ocfs2_queue_replay_slots(osb, ORPHAN_NEED_TRUNCATE);
1367	ocfs2_free_replay_slots(osb);
1368}
1369
1370void ocfs2_complete_quota_recovery(struct ocfs2_super *osb)
1371{
1372	if (osb->quota_rec) {
1373		ocfs2_queue_recovery_completion(osb->journal,
1374						osb->slot_num,
1375						NULL,
1376						NULL,
1377						osb->quota_rec,
1378						ORPHAN_NEED_TRUNCATE);
1379		osb->quota_rec = NULL;
1380	}
1381}
1382
1383static int __ocfs2_recovery_thread(void *arg)
1384{
1385	int status, node_num, slot_num;
1386	struct ocfs2_super *osb = arg;
1387	struct ocfs2_recovery_map *rm = osb->recovery_map;
1388	int *rm_quota = NULL;
1389	int rm_quota_used = 0, i;
1390	struct ocfs2_quota_recovery *qrec;
1391
1392	/* Whether the quota supported. */
1393	int quota_enabled = OCFS2_HAS_RO_COMPAT_FEATURE(osb->sb,
1394			OCFS2_FEATURE_RO_COMPAT_USRQUOTA)
1395		|| OCFS2_HAS_RO_COMPAT_FEATURE(osb->sb,
1396			OCFS2_FEATURE_RO_COMPAT_GRPQUOTA);
1397
1398	status = ocfs2_wait_on_mount(osb);
1399	if (status < 0) {
1400		goto bail;
1401	}
1402
1403	if (quota_enabled) {
1404		rm_quota = kcalloc(osb->max_slots, sizeof(int), GFP_NOFS);
1405		if (!rm_quota) {
1406			status = -ENOMEM;
1407			goto bail;
1408		}
1409	}
1410restart:
1411	status = ocfs2_super_lock(osb, 1);
1412	if (status < 0) {
1413		mlog_errno(status);
1414		goto bail;
1415	}
1416
1417	status = ocfs2_compute_replay_slots(osb);
1418	if (status < 0)
1419		mlog_errno(status);
1420
1421	/* queue recovery for our own slot */
1422	ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
1423					NULL, NULL, ORPHAN_NO_NEED_TRUNCATE);
1424
1425	spin_lock(&osb->osb_lock);
1426	while (rm->rm_used) {
1427		/* It's always safe to remove entry zero, as we won't
1428		 * clear it until ocfs2_recover_node() has succeeded. */
1429		node_num = rm->rm_entries[0];
1430		spin_unlock(&osb->osb_lock);
1431		slot_num = ocfs2_node_num_to_slot(osb, node_num);
1432		trace_ocfs2_recovery_thread_node(node_num, slot_num);
1433		if (slot_num == -ENOENT) {
1434			status = 0;
1435			goto skip_recovery;
1436		}
1437
1438		/* It is a bit subtle with quota recovery. We cannot do it
1439		 * immediately because we have to obtain cluster locks from
1440		 * quota files and we also don't want to just skip it because
1441		 * then quota usage would be out of sync until some node takes
1442		 * the slot. So we remember which nodes need quota recovery
1443		 * and when everything else is done, we recover quotas. */
1444		if (quota_enabled) {
1445			for (i = 0; i < rm_quota_used
1446					&& rm_quota[i] != slot_num; i++)
1447				;
1448
1449			if (i == rm_quota_used)
1450				rm_quota[rm_quota_used++] = slot_num;
1451		}
1452
1453		status = ocfs2_recover_node(osb, node_num, slot_num);
1454skip_recovery:
1455		if (!status) {
1456			ocfs2_recovery_map_clear(osb, node_num);
1457		} else {
1458			mlog(ML_ERROR,
1459			     "Error %d recovering node %d on device (%u,%u)!\n",
1460			     status, node_num,
1461			     MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
1462			mlog(ML_ERROR, "Volume requires unmount.\n");
1463		}
1464
1465		spin_lock(&osb->osb_lock);
1466	}
1467	spin_unlock(&osb->osb_lock);
1468	trace_ocfs2_recovery_thread_end(status);
1469
1470	/* Refresh all journal recovery generations from disk */
1471	status = ocfs2_check_journals_nolocks(osb);
1472	status = (status == -EROFS) ? 0 : status;
1473	if (status < 0)
1474		mlog_errno(status);
1475
1476	/* Now it is right time to recover quotas... We have to do this under
1477	 * superblock lock so that no one can start using the slot (and crash)
1478	 * before we recover it */
1479	if (quota_enabled) {
1480		for (i = 0; i < rm_quota_used; i++) {
1481			qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]);
1482			if (IS_ERR(qrec)) {
1483				status = PTR_ERR(qrec);
1484				mlog_errno(status);
1485				continue;
1486			}
1487			ocfs2_queue_recovery_completion(osb->journal,
1488					rm_quota[i],
1489					NULL, NULL, qrec,
1490					ORPHAN_NEED_TRUNCATE);
1491		}
1492	}
1493
1494	ocfs2_super_unlock(osb, 1);
1495
1496	/* queue recovery for offline slots */
1497	ocfs2_queue_replay_slots(osb, ORPHAN_NEED_TRUNCATE);
1498
1499bail:
1500	mutex_lock(&osb->recovery_lock);
1501	if (!status && !ocfs2_recovery_completed(osb)) {
1502		mutex_unlock(&osb->recovery_lock);
1503		goto restart;
1504	}
1505
1506	ocfs2_free_replay_slots(osb);
1507	osb->recovery_thread_task = NULL;
1508	mb(); /* sync with ocfs2_recovery_thread_running */
1509	wake_up(&osb->recovery_event);
1510
1511	mutex_unlock(&osb->recovery_lock);
1512
1513	if (quota_enabled)
1514		kfree(rm_quota);
1515
1516	return status;
1517}
1518
1519void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
1520{
1521	mutex_lock(&osb->recovery_lock);
1522
1523	trace_ocfs2_recovery_thread(node_num, osb->node_num,
1524		osb->disable_recovery, osb->recovery_thread_task,
1525		osb->disable_recovery ?
1526		-1 : ocfs2_recovery_map_set(osb, node_num));
1527
1528	if (osb->disable_recovery)
1529		goto out;
1530
1531	if (osb->recovery_thread_task)
1532		goto out;
1533
1534	osb->recovery_thread_task =  kthread_run(__ocfs2_recovery_thread, osb,
1535			"ocfs2rec-%s", osb->uuid_str);
1536	if (IS_ERR(osb->recovery_thread_task)) {
1537		mlog_errno((int)PTR_ERR(osb->recovery_thread_task));
1538		osb->recovery_thread_task = NULL;
1539	}
1540
1541out:
1542	mutex_unlock(&osb->recovery_lock);
1543	wake_up(&osb->recovery_event);
1544}
1545
1546static int ocfs2_read_journal_inode(struct ocfs2_super *osb,
1547				    int slot_num,
1548				    struct buffer_head **bh,
1549				    struct inode **ret_inode)
1550{
1551	int status = -EACCES;
1552	struct inode *inode = NULL;
1553
1554	BUG_ON(slot_num >= osb->max_slots);
1555
1556	inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
1557					    slot_num);
1558	if (!inode || is_bad_inode(inode)) {
1559		mlog_errno(status);
1560		goto bail;
1561	}
1562	SET_INODE_JOURNAL(inode);
1563
1564	status = ocfs2_read_inode_block_full(inode, bh, OCFS2_BH_IGNORE_CACHE);
1565	if (status < 0) {
1566		mlog_errno(status);
1567		goto bail;
1568	}
1569
1570	status = 0;
1571
1572bail:
1573	if (inode) {
1574		if (status || !ret_inode)
1575			iput(inode);
1576		else
1577			*ret_inode = inode;
1578	}
1579	return status;
1580}
1581
1582/* Does the actual journal replay and marks the journal inode as
1583 * clean. Will only replay if the journal inode is marked dirty. */
1584static int ocfs2_replay_journal(struct ocfs2_super *osb,
1585				int node_num,
1586				int slot_num)
1587{
1588	int status;
1589	int got_lock = 0;
1590	unsigned int flags;
1591	struct inode *inode = NULL;
1592	struct ocfs2_dinode *fe;
1593	journal_t *journal = NULL;
1594	struct buffer_head *bh = NULL;
1595	u32 slot_reco_gen;
1596
1597	status = ocfs2_read_journal_inode(osb, slot_num, &bh, &inode);
1598	if (status) {
1599		mlog_errno(status);
1600		goto done;
1601	}
1602
1603	fe = (struct ocfs2_dinode *)bh->b_data;
1604	slot_reco_gen = ocfs2_get_recovery_generation(fe);
1605	brelse(bh);
1606	bh = NULL;
1607
1608	/*
1609	 * As the fs recovery is asynchronous, there is a small chance that
1610	 * another node mounted (and recovered) the slot before the recovery
1611	 * thread could get the lock. To handle that, we dirty read the journal
1612	 * inode for that slot to get the recovery generation. If it is
1613	 * different than what we expected, the slot has been recovered.
1614	 * If not, it needs recovery.
1615	 */
1616	if (osb->slot_recovery_generations[slot_num] != slot_reco_gen) {
1617		trace_ocfs2_replay_journal_recovered(slot_num,
1618		     osb->slot_recovery_generations[slot_num], slot_reco_gen);
1619		osb->slot_recovery_generations[slot_num] = slot_reco_gen;
1620		status = -EBUSY;
1621		goto done;
1622	}
1623
1624	/* Continue with recovery as the journal has not yet been recovered */
1625
1626	status = ocfs2_inode_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY);
1627	if (status < 0) {
1628		trace_ocfs2_replay_journal_lock_err(status);
1629		if (status != -ERESTARTSYS)
1630			mlog(ML_ERROR, "Could not lock journal!\n");
1631		goto done;
1632	}
1633	got_lock = 1;
1634
1635	fe = (struct ocfs2_dinode *) bh->b_data;
1636
1637	flags = le32_to_cpu(fe->id1.journal1.ij_flags);
1638	slot_reco_gen = ocfs2_get_recovery_generation(fe);
1639
1640	if (!(flags & OCFS2_JOURNAL_DIRTY_FL)) {
1641		trace_ocfs2_replay_journal_skip(node_num);
1642		/* Refresh recovery generation for the slot */
1643		osb->slot_recovery_generations[slot_num] = slot_reco_gen;
1644		goto done;
1645	}
1646
1647	/* we need to run complete recovery for offline orphan slots */
1648	ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
1649
1650	printk(KERN_NOTICE "ocfs2: Begin replay journal (node %d, slot %d) on "\
1651	       "device (%u,%u)\n", node_num, slot_num, MAJOR(osb->sb->s_dev),
1652	       MINOR(osb->sb->s_dev));
1653
1654	OCFS2_I(inode)->ip_clusters = le32_to_cpu(fe->i_clusters);
1655
1656	status = ocfs2_force_read_journal(inode);
1657	if (status < 0) {
1658		mlog_errno(status);
1659		goto done;
1660	}
1661
1662	journal = jbd2_journal_init_inode(inode);
1663	if (journal == NULL) {
1664		mlog(ML_ERROR, "Linux journal layer error\n");
1665		status = -EIO;
1666		goto done;
1667	}
1668
1669	status = jbd2_journal_load(journal);
1670	if (status < 0) {
1671		mlog_errno(status);
1672		BUG_ON(!igrab(inode));
1673		jbd2_journal_destroy(journal);
1674		goto done;
1675	}
1676
1677	ocfs2_clear_journal_error(osb->sb, journal, slot_num);
1678
1679	/* wipe the journal */
1680	jbd2_journal_lock_updates(journal);
1681	status = jbd2_journal_flush(journal, 0);
1682	jbd2_journal_unlock_updates(journal);
1683	if (status < 0)
1684		mlog_errno(status);
1685
1686	/* This will mark the node clean */
1687	flags = le32_to_cpu(fe->id1.journal1.ij_flags);
1688	flags &= ~OCFS2_JOURNAL_DIRTY_FL;
1689	fe->id1.journal1.ij_flags = cpu_to_le32(flags);
1690
1691	/* Increment recovery generation to indicate successful recovery */
1692	ocfs2_bump_recovery_generation(fe);
1693	osb->slot_recovery_generations[slot_num] =
1694					ocfs2_get_recovery_generation(fe);
1695
1696	ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check);
1697	status = ocfs2_write_block(osb, bh, INODE_CACHE(inode));
1698	if (status < 0)
1699		mlog_errno(status);
1700
1701	BUG_ON(!igrab(inode));
1702
1703	jbd2_journal_destroy(journal);
1704
1705	printk(KERN_NOTICE "ocfs2: End replay journal (node %d, slot %d) on "\
1706	       "device (%u,%u)\n", node_num, slot_num, MAJOR(osb->sb->s_dev),
1707	       MINOR(osb->sb->s_dev));
1708done:
1709	/* drop the lock on this nodes journal */
1710	if (got_lock)
1711		ocfs2_inode_unlock(inode, 1);
1712
1713	iput(inode);
1714	brelse(bh);
1715
1716	return status;
1717}
1718
1719/*
1720 * Do the most important parts of node recovery:
1721 *  - Replay it's journal
1722 *  - Stamp a clean local allocator file
1723 *  - Stamp a clean truncate log
1724 *  - Mark the node clean
1725 *
1726 * If this function completes without error, a node in OCFS2 can be
1727 * said to have been safely recovered. As a result, failure during the
1728 * second part of a nodes recovery process (local alloc recovery) is
1729 * far less concerning.
1730 */
1731static int ocfs2_recover_node(struct ocfs2_super *osb,
1732			      int node_num, int slot_num)
1733{
1734	int status = 0;
1735	struct ocfs2_dinode *la_copy = NULL;
1736	struct ocfs2_dinode *tl_copy = NULL;
1737
1738	trace_ocfs2_recover_node(node_num, slot_num, osb->node_num);
1739
1740	/* Should not ever be called to recover ourselves -- in that
1741	 * case we should've called ocfs2_journal_load instead. */
1742	BUG_ON(osb->node_num == node_num);
1743
1744	status = ocfs2_replay_journal(osb, node_num, slot_num);
1745	if (status < 0) {
1746		if (status == -EBUSY) {
1747			trace_ocfs2_recover_node_skip(slot_num, node_num);
1748			status = 0;
1749			goto done;
1750		}
1751		mlog_errno(status);
1752		goto done;
1753	}
1754
1755	/* Stamp a clean local alloc file AFTER recovering the journal... */
1756	status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy);
1757	if (status < 0) {
1758		mlog_errno(status);
1759		goto done;
1760	}
1761
1762	/* An error from begin_truncate_log_recovery is not
1763	 * serious enough to warrant halting the rest of
1764	 * recovery. */
1765	status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy);
1766	if (status < 0)
1767		mlog_errno(status);
1768
1769	/* Likewise, this would be a strange but ultimately not so
1770	 * harmful place to get an error... */
1771	status = ocfs2_clear_slot(osb, slot_num);
1772	if (status < 0)
1773		mlog_errno(status);
1774
1775	/* This will kfree the memory pointed to by la_copy and tl_copy */
1776	ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
1777					tl_copy, NULL, ORPHAN_NEED_TRUNCATE);
1778
1779	status = 0;
1780done:
1781
1782	return status;
1783}
1784
1785/* Test node liveness by trylocking his journal. If we get the lock,
1786 * we drop it here. Return 0 if we got the lock, -EAGAIN if node is
1787 * still alive (we couldn't get the lock) and < 0 on error. */
1788static int ocfs2_trylock_journal(struct ocfs2_super *osb,
1789				 int slot_num)
1790{
1791	int status, flags;
1792	struct inode *inode = NULL;
1793
1794	inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
1795					    slot_num);
1796	if (inode == NULL) {
1797		mlog(ML_ERROR, "access error\n");
1798		status = -EACCES;
1799		goto bail;
1800	}
1801	if (is_bad_inode(inode)) {
1802		mlog(ML_ERROR, "access error (bad inode)\n");
1803		iput(inode);
1804		inode = NULL;
1805		status = -EACCES;
1806		goto bail;
1807	}
1808	SET_INODE_JOURNAL(inode);
1809
1810	flags = OCFS2_META_LOCK_RECOVERY | OCFS2_META_LOCK_NOQUEUE;
1811	status = ocfs2_inode_lock_full(inode, NULL, 1, flags);
1812	if (status < 0) {
1813		if (status != -EAGAIN)
1814			mlog_errno(status);
1815		goto bail;
1816	}
1817
1818	ocfs2_inode_unlock(inode, 1);
1819bail:
1820	iput(inode);
1821
1822	return status;
1823}
1824
1825/* Call this underneath ocfs2_super_lock. It also assumes that the
1826 * slot info struct has been updated from disk. */
1827int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
1828{
1829	unsigned int node_num;
1830	int status, i;
1831	u32 gen;
1832	struct buffer_head *bh = NULL;
1833	struct ocfs2_dinode *di;
1834
1835	/* This is called with the super block cluster lock, so we
1836	 * know that the slot map can't change underneath us. */
1837
1838	for (i = 0; i < osb->max_slots; i++) {
1839		/* Read journal inode to get the recovery generation */
1840		status = ocfs2_read_journal_inode(osb, i, &bh, NULL);
1841		if (status) {
1842			mlog_errno(status);
1843			goto bail;
1844		}
1845		di = (struct ocfs2_dinode *)bh->b_data;
1846		gen = ocfs2_get_recovery_generation(di);
1847		brelse(bh);
1848		bh = NULL;
1849
1850		spin_lock(&osb->osb_lock);
1851		osb->slot_recovery_generations[i] = gen;
1852
1853		trace_ocfs2_mark_dead_nodes(i,
1854					    osb->slot_recovery_generations[i]);
1855
1856		if (i == osb->slot_num) {
1857			spin_unlock(&osb->osb_lock);
1858			continue;
1859		}
1860
1861		status = ocfs2_slot_to_node_num_locked(osb, i, &node_num);
1862		if (status == -ENOENT) {
1863			spin_unlock(&osb->osb_lock);
1864			continue;
1865		}
1866
1867		if (__ocfs2_recovery_map_test(osb, node_num)) {
1868			spin_unlock(&osb->osb_lock);
1869			continue;
1870		}
1871		spin_unlock(&osb->osb_lock);
1872
1873		/* Ok, we have a slot occupied by another node which
1874		 * is not in the recovery map. We trylock his journal
1875		 * file here to test if he's alive. */
1876		status = ocfs2_trylock_journal(osb, i);
1877		if (!status) {
1878			/* Since we're called from mount, we know that
1879			 * the recovery thread can't race us on
1880			 * setting / checking the recovery bits. */
1881			ocfs2_recovery_thread(osb, node_num);
1882		} else if ((status < 0) && (status != -EAGAIN)) {
1883			mlog_errno(status);
1884			goto bail;
1885		}
1886	}
1887
1888	status = 0;
1889bail:
1890	return status;
1891}
1892
1893/*
1894 * Scan timer should get fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT. Add some
1895 * randomness to the timeout to minimize multple nodes firing the timer at the
1896 * same time.
1897 */
1898static inline unsigned long ocfs2_orphan_scan_timeout(void)
1899{
1900	unsigned long time;
1901
1902	get_random_bytes(&time, sizeof(time));
1903	time = ORPHAN_SCAN_SCHEDULE_TIMEOUT + (time % 5000);
1904	return msecs_to_jiffies(time);
1905}
1906
1907/*
1908 * ocfs2_queue_orphan_scan calls ocfs2_queue_recovery_completion for
1909 * every slot, queuing a recovery of the slot on the ocfs2_wq thread. This
1910 * is done to catch any orphans that are left over in orphan directories.
1911 *
1912 * It scans all slots, even ones that are in use. It does so to handle the
1913 * case described below:
1914 *
1915 *   Node 1 has an inode it was using. The dentry went away due to memory
1916 *   pressure.  Node 1 closes the inode, but it's on the free list. The node
1917 *   has the open lock.
1918 *   Node 2 unlinks the inode. It grabs the dentry lock to notify others,
1919 *   but node 1 has no dentry and doesn't get the message. It trylocks the
1920 *   open lock, sees that another node has a PR, and does nothing.
1921 *   Later node 2 runs its orphan dir. It igets the inode, trylocks the
1922 *   open lock, sees the PR still, and does nothing.
1923 *   Basically, we have to trigger an orphan iput on node 1. The only way
1924 *   for this to happen is if node 1 runs node 2's orphan dir.
1925 *
1926 * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT
1927 * seconds.  It gets an EX lock on os_lockres and checks sequence number
1928 * stored in LVB. If the sequence number has changed, it means some other
1929 * node has done the scan.  This node skips the scan and tracks the
1930 * sequence number.  If the sequence number didn't change, it means a scan
1931 * hasn't happened.  The node queues a scan and increments the
1932 * sequence number in the LVB.
1933 */
1934static void ocfs2_queue_orphan_scan(struct ocfs2_super *osb)
1935{
1936	struct ocfs2_orphan_scan *os;
1937	int status, i;
1938	u32 seqno = 0;
1939
1940	os = &osb->osb_orphan_scan;
1941
1942	if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE)
1943		goto out;
1944
1945	trace_ocfs2_queue_orphan_scan_begin(os->os_count, os->os_seqno,
1946					    atomic_read(&os->os_state));
1947
1948	status = ocfs2_orphan_scan_lock(osb, &seqno);
1949	if (status < 0) {
1950		if (status != -EAGAIN)
1951			mlog_errno(status);
1952		goto out;
1953	}
1954
1955	/* Do no queue the tasks if the volume is being umounted */
1956	if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE)
1957		goto unlock;
1958
1959	if (os->os_seqno != seqno) {
1960		os->os_seqno = seqno;
1961		goto unlock;
1962	}
1963
1964	for (i = 0; i < osb->max_slots; i++)
1965		ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL,
1966						NULL, ORPHAN_NO_NEED_TRUNCATE);
1967	/*
1968	 * We queued a recovery on orphan slots, increment the sequence
1969	 * number and update LVB so other node will skip the scan for a while
1970	 */
1971	seqno++;
1972	os->os_count++;
1973	os->os_scantime = ktime_get_seconds();
1974unlock:
1975	ocfs2_orphan_scan_unlock(osb, seqno);
1976out:
1977	trace_ocfs2_queue_orphan_scan_end(os->os_count, os->os_seqno,
1978					  atomic_read(&os->os_state));
1979	return;
1980}
1981
1982/* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */
1983static void ocfs2_orphan_scan_work(struct work_struct *work)
1984{
1985	struct ocfs2_orphan_scan *os;
1986	struct ocfs2_super *osb;
1987
1988	os = container_of(work, struct ocfs2_orphan_scan,
1989			  os_orphan_scan_work.work);
1990	osb = os->os_osb;
1991
1992	mutex_lock(&os->os_lock);
1993	ocfs2_queue_orphan_scan(osb);
1994	if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE)
1995		queue_delayed_work(osb->ocfs2_wq, &os->os_orphan_scan_work,
1996				      ocfs2_orphan_scan_timeout());
1997	mutex_unlock(&os->os_lock);
1998}
1999
2000void ocfs2_orphan_scan_stop(struct ocfs2_super *osb)
2001{
2002	struct ocfs2_orphan_scan *os;
2003
2004	os = &osb->osb_orphan_scan;
2005	if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE) {
2006		atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE);
2007		mutex_lock(&os->os_lock);
2008		cancel_delayed_work(&os->os_orphan_scan_work);
2009		mutex_unlock(&os->os_lock);
2010	}
2011}
2012
2013void ocfs2_orphan_scan_init(struct ocfs2_super *osb)
2014{
2015	struct ocfs2_orphan_scan *os;
2016
2017	os = &osb->osb_orphan_scan;
2018	os->os_osb = osb;
2019	os->os_count = 0;
2020	os->os_seqno = 0;
2021	mutex_init(&os->os_lock);
2022	INIT_DELAYED_WORK(&os->os_orphan_scan_work, ocfs2_orphan_scan_work);
2023}
2024
2025void ocfs2_orphan_scan_start(struct ocfs2_super *osb)
2026{
2027	struct ocfs2_orphan_scan *os;
2028
2029	os = &osb->osb_orphan_scan;
2030	os->os_scantime = ktime_get_seconds();
2031	if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb))
2032		atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE);
2033	else {
2034		atomic_set(&os->os_state, ORPHAN_SCAN_ACTIVE);
2035		queue_delayed_work(osb->ocfs2_wq, &os->os_orphan_scan_work,
2036				   ocfs2_orphan_scan_timeout());
2037	}
2038}
2039
2040struct ocfs2_orphan_filldir_priv {
2041	struct dir_context	ctx;
2042	struct inode		*head;
2043	struct ocfs2_super	*osb;
2044	enum ocfs2_orphan_reco_type orphan_reco_type;
2045};
2046
2047static int ocfs2_orphan_filldir(struct dir_context *ctx, const char *name,
2048				int name_len, loff_t pos, u64 ino,
2049				unsigned type)
2050{
2051	struct ocfs2_orphan_filldir_priv *p =
2052		container_of(ctx, struct ocfs2_orphan_filldir_priv, ctx);
2053	struct inode *iter;
2054
2055	if (name_len == 1 && !strncmp(".", name, 1))
2056		return 0;
2057	if (name_len == 2 && !strncmp("..", name, 2))
2058		return 0;
2059
2060	/* do not include dio entry in case of orphan scan */
2061	if ((p->orphan_reco_type == ORPHAN_NO_NEED_TRUNCATE) &&
2062			(!strncmp(name, OCFS2_DIO_ORPHAN_PREFIX,
2063			OCFS2_DIO_ORPHAN_PREFIX_LEN)))
2064		return 0;
2065
2066	/* Skip bad inodes so that recovery can continue */
2067	iter = ocfs2_iget(p->osb, ino,
2068			  OCFS2_FI_FLAG_ORPHAN_RECOVERY, 0);
2069	if (IS_ERR(iter))
2070		return 0;
2071
2072	if (!strncmp(name, OCFS2_DIO_ORPHAN_PREFIX,
2073			OCFS2_DIO_ORPHAN_PREFIX_LEN))
2074		OCFS2_I(iter)->ip_flags |= OCFS2_INODE_DIO_ORPHAN_ENTRY;
2075
2076	/* Skip inodes which are already added to recover list, since dio may
2077	 * happen concurrently with unlink/rename */
2078	if (OCFS2_I(iter)->ip_next_orphan) {
2079		iput(iter);
2080		return 0;
2081	}
2082
2083	trace_ocfs2_orphan_filldir((unsigned long long)OCFS2_I(iter)->ip_blkno);
2084	/* No locking is required for the next_orphan queue as there
2085	 * is only ever a single process doing orphan recovery. */
2086	OCFS2_I(iter)->ip_next_orphan = p->head;
2087	p->head = iter;
2088
2089	return 0;
2090}
2091
2092static int ocfs2_queue_orphans(struct ocfs2_super *osb,
2093			       int slot,
2094			       struct inode **head,
2095			       enum ocfs2_orphan_reco_type orphan_reco_type)
2096{
2097	int status;
2098	struct inode *orphan_dir_inode = NULL;
2099	struct ocfs2_orphan_filldir_priv priv = {
2100		.ctx.actor = ocfs2_orphan_filldir,
2101		.osb = osb,
2102		.head = *head,
2103		.orphan_reco_type = orphan_reco_type
2104	};
2105
2106	orphan_dir_inode = ocfs2_get_system_file_inode(osb,
2107						       ORPHAN_DIR_SYSTEM_INODE,
2108						       slot);
2109	if  (!orphan_dir_inode) {
2110		status = -ENOENT;
2111		mlog_errno(status);
2112		return status;
2113	}
2114
2115	inode_lock(orphan_dir_inode);
2116	status = ocfs2_inode_lock(orphan_dir_inode, NULL, 0);
2117	if (status < 0) {
2118		mlog_errno(status);
2119		goto out;
2120	}
2121
2122	status = ocfs2_dir_foreach(orphan_dir_inode, &priv.ctx);
2123	if (status) {
2124		mlog_errno(status);
2125		goto out_cluster;
2126	}
2127
2128	*head = priv.head;
2129
2130out_cluster:
2131	ocfs2_inode_unlock(orphan_dir_inode, 0);
2132out:
2133	inode_unlock(orphan_dir_inode);
2134	iput(orphan_dir_inode);
2135	return status;
2136}
2137
2138static int ocfs2_orphan_recovery_can_continue(struct ocfs2_super *osb,
2139					      int slot)
2140{
2141	int ret;
2142
2143	spin_lock(&osb->osb_lock);
2144	ret = !osb->osb_orphan_wipes[slot];
2145	spin_unlock(&osb->osb_lock);
2146	return ret;
2147}
2148
2149static void ocfs2_mark_recovering_orphan_dir(struct ocfs2_super *osb,
2150					     int slot)
2151{
2152	spin_lock(&osb->osb_lock);
2153	/* Mark ourselves such that new processes in delete_inode()
2154	 * know to quit early. */
2155	ocfs2_node_map_set_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
2156	while (osb->osb_orphan_wipes[slot]) {
2157		/* If any processes are already in the middle of an
2158		 * orphan wipe on this dir, then we need to wait for
2159		 * them. */
2160		spin_unlock(&osb->osb_lock);
2161		wait_event_interruptible(osb->osb_wipe_event,
2162					 ocfs2_orphan_recovery_can_continue(osb, slot));
2163		spin_lock(&osb->osb_lock);
2164	}
2165	spin_unlock(&osb->osb_lock);
2166}
2167
2168static void ocfs2_clear_recovering_orphan_dir(struct ocfs2_super *osb,
2169					      int slot)
2170{
2171	ocfs2_node_map_clear_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
2172}
2173
2174/*
2175 * Orphan recovery. Each mounted node has it's own orphan dir which we
2176 * must run during recovery. Our strategy here is to build a list of
2177 * the inodes in the orphan dir and iget/iput them. The VFS does
2178 * (most) of the rest of the work.
2179 *
2180 * Orphan recovery can happen at any time, not just mount so we have a
2181 * couple of extra considerations.
2182 *
2183 * - We grab as many inodes as we can under the orphan dir lock -
2184 *   doing iget() outside the orphan dir risks getting a reference on
2185 *   an invalid inode.
2186 * - We must be sure not to deadlock with other processes on the
2187 *   system wanting to run delete_inode(). This can happen when they go
2188 *   to lock the orphan dir and the orphan recovery process attempts to
2189 *   iget() inside the orphan dir lock. This can be avoided by
2190 *   advertising our state to ocfs2_delete_inode().
2191 */
2192static int ocfs2_recover_orphans(struct ocfs2_super *osb,
2193				 int slot,
2194				 enum ocfs2_orphan_reco_type orphan_reco_type)
2195{
2196	int ret = 0;
2197	struct inode *inode = NULL;
2198	struct inode *iter;
2199	struct ocfs2_inode_info *oi;
2200	struct buffer_head *di_bh = NULL;
2201	struct ocfs2_dinode *di = NULL;
2202
2203	trace_ocfs2_recover_orphans(slot);
2204
2205	ocfs2_mark_recovering_orphan_dir(osb, slot);
2206	ret = ocfs2_queue_orphans(osb, slot, &inode, orphan_reco_type);
2207	ocfs2_clear_recovering_orphan_dir(osb, slot);
2208
2209	/* Error here should be noted, but we want to continue with as
2210	 * many queued inodes as we've got. */
2211	if (ret)
2212		mlog_errno(ret);
2213
2214	while (inode) {
2215		oi = OCFS2_I(inode);
2216		trace_ocfs2_recover_orphans_iput(
2217					(unsigned long long)oi->ip_blkno);
2218
2219		iter = oi->ip_next_orphan;
2220		oi->ip_next_orphan = NULL;
2221
2222		if (oi->ip_flags & OCFS2_INODE_DIO_ORPHAN_ENTRY) {
2223			inode_lock(inode);
2224			ret = ocfs2_rw_lock(inode, 1);
2225			if (ret < 0) {
2226				mlog_errno(ret);
2227				goto unlock_mutex;
2228			}
2229			/*
2230			 * We need to take and drop the inode lock to
2231			 * force read inode from disk.
2232			 */
2233			ret = ocfs2_inode_lock(inode, &di_bh, 1);
2234			if (ret) {
2235				mlog_errno(ret);
2236				goto unlock_rw;
2237			}
2238
2239			di = (struct ocfs2_dinode *)di_bh->b_data;
2240
2241			if (di->i_flags & cpu_to_le32(OCFS2_DIO_ORPHANED_FL)) {
2242				ret = ocfs2_truncate_file(inode, di_bh,
2243						i_size_read(inode));
2244				if (ret < 0) {
2245					if (ret != -ENOSPC)
2246						mlog_errno(ret);
2247					goto unlock_inode;
2248				}
2249
2250				ret = ocfs2_del_inode_from_orphan(osb, inode,
2251						di_bh, 0, 0);
2252				if (ret)
2253					mlog_errno(ret);
2254			}
2255unlock_inode:
2256			ocfs2_inode_unlock(inode, 1);
2257			brelse(di_bh);
2258			di_bh = NULL;
2259unlock_rw:
2260			ocfs2_rw_unlock(inode, 1);
2261unlock_mutex:
2262			inode_unlock(inode);
2263
2264			/* clear dio flag in ocfs2_inode_info */
2265			oi->ip_flags &= ~OCFS2_INODE_DIO_ORPHAN_ENTRY;
2266		} else {
2267			spin_lock(&oi->ip_lock);
2268			/* Set the proper information to get us going into
2269			 * ocfs2_delete_inode. */
2270			oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2271			spin_unlock(&oi->ip_lock);
2272		}
2273
2274		iput(inode);
2275		inode = iter;
2276	}
2277
2278	return ret;
2279}
2280
2281static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota)
2282{
2283	/* This check is good because ocfs2 will wait on our recovery
2284	 * thread before changing it to something other than MOUNTED
2285	 * or DISABLED. */
2286	wait_event(osb->osb_mount_event,
2287		  (!quota && atomic_read(&osb->vol_state) == VOLUME_MOUNTED) ||
2288		   atomic_read(&osb->vol_state) == VOLUME_MOUNTED_QUOTAS ||
2289		   atomic_read(&osb->vol_state) == VOLUME_DISABLED);
2290
2291	/* If there's an error on mount, then we may never get to the
2292	 * MOUNTED flag, but this is set right before
2293	 * dismount_volume() so we can trust it. */
2294	if (atomic_read(&osb->vol_state) == VOLUME_DISABLED) {
2295		trace_ocfs2_wait_on_mount(VOLUME_DISABLED);
2296		mlog(0, "mount error, exiting!\n");
2297		return -EBUSY;
2298	}
2299
2300	return 0;
2301}
2302
2303static int ocfs2_commit_thread(void *arg)
2304{
2305	int status;
2306	struct ocfs2_super *osb = arg;
2307	struct ocfs2_journal *journal = osb->journal;
2308
2309	/* we can trust j_num_trans here because _should_stop() is only set in
2310	 * shutdown and nobody other than ourselves should be able to start
2311	 * transactions.  committing on shutdown might take a few iterations
2312	 * as final transactions put deleted inodes on the list */
2313	while (!(kthread_should_stop() &&
2314		 atomic_read(&journal->j_num_trans) == 0)) {
2315
2316		wait_event_interruptible(osb->checkpoint_event,
2317					 atomic_read(&journal->j_num_trans)
2318					 || kthread_should_stop());
2319
2320		status = ocfs2_commit_cache(osb);
2321		if (status < 0) {
2322			static unsigned long abort_warn_time;
2323
2324			/* Warn about this once per minute */
2325			if (printk_timed_ratelimit(&abort_warn_time, 60*HZ))
2326				mlog(ML_ERROR, "status = %d, journal is "
2327						"already aborted.\n", status);
2328			/*
2329			 * After ocfs2_commit_cache() fails, j_num_trans has a
2330			 * non-zero value.  Sleep here to avoid a busy-wait
2331			 * loop.
2332			 */
2333			msleep_interruptible(1000);
2334		}
2335
2336		if (kthread_should_stop() && atomic_read(&journal->j_num_trans)){
2337			mlog(ML_KTHREAD,
2338			     "commit_thread: %u transactions pending on "
2339			     "shutdown\n",
2340			     atomic_read(&journal->j_num_trans));
2341		}
2342	}
2343
2344	return 0;
2345}
2346
2347/* Reads all the journal inodes without taking any cluster locks. Used
2348 * for hard readonly access to determine whether any journal requires
2349 * recovery. Also used to refresh the recovery generation numbers after
2350 * a journal has been recovered by another node.
2351 */
2352int ocfs2_check_journals_nolocks(struct ocfs2_super *osb)
2353{
2354	int ret = 0;
2355	unsigned int slot;
2356	struct buffer_head *di_bh = NULL;
2357	struct ocfs2_dinode *di;
2358	int journal_dirty = 0;
2359
2360	for(slot = 0; slot < osb->max_slots; slot++) {
2361		ret = ocfs2_read_journal_inode(osb, slot, &di_bh, NULL);
2362		if (ret) {
2363			mlog_errno(ret);
2364			goto out;
2365		}
2366
2367		di = (struct ocfs2_dinode *) di_bh->b_data;
2368
2369		osb->slot_recovery_generations[slot] =
2370					ocfs2_get_recovery_generation(di);
2371
2372		if (le32_to_cpu(di->id1.journal1.ij_flags) &
2373		    OCFS2_JOURNAL_DIRTY_FL)
2374			journal_dirty = 1;
2375
2376		brelse(di_bh);
2377		di_bh = NULL;
2378	}
2379
2380out:
2381	if (journal_dirty)
2382		ret = -EROFS;
2383	return ret;
2384}
2385