1// SPDX-License-Identifier: GPL-2.0-only
2/*
3   drbd_worker.c
4
5   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6
7   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10
11
12*/
13
14#include <linux/module.h>
15#include <linux/drbd.h>
16#include <linux/sched/signal.h>
17#include <linux/wait.h>
18#include <linux/mm.h>
19#include <linux/memcontrol.h>
20#include <linux/mm_inline.h>
21#include <linux/slab.h>
22#include <linux/random.h>
23#include <linux/string.h>
24#include <linux/scatterlist.h>
25#include <linux/part_stat.h>
26
27#include "drbd_int.h"
28#include "drbd_protocol.h"
29#include "drbd_req.h"
30
31static int make_ov_request(struct drbd_peer_device *, int);
32static int make_resync_request(struct drbd_peer_device *, int);
33
34/* endio handlers:
35 *   drbd_md_endio (defined here)
36 *   drbd_request_endio (defined here)
37 *   drbd_peer_request_endio (defined here)
38 *   drbd_bm_endio (defined in drbd_bitmap.c)
39 *
40 * For all these callbacks, note the following:
41 * The callbacks will be called in irq context by the IDE drivers,
42 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
43 * Try to get the locking right :)
44 *
45 */
46
47/* used for synchronous meta data and bitmap IO
48 * submitted by drbd_md_sync_page_io()
49 */
50void drbd_md_endio(struct bio *bio)
51{
52	struct drbd_device *device;
53
54	device = bio->bi_private;
55	device->md_io.error = blk_status_to_errno(bio->bi_status);
56
57	/* special case: drbd_md_read() during drbd_adm_attach() */
58	if (device->ldev)
59		put_ldev(device);
60	bio_put(bio);
61
62	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
63	 * to timeout on the lower level device, and eventually detach from it.
64	 * If this io completion runs after that timeout expired, this
65	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
66	 * During normal operation, this only puts that extra reference
67	 * down to 1 again.
68	 * Make sure we first drop the reference, and only then signal
69	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
70	 * next drbd_md_sync_page_io(), that we trigger the
71	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
72	 */
73	drbd_md_put_buffer(device);
74	device->md_io.done = 1;
75	wake_up(&device->misc_wait);
76}
77
78/* reads on behalf of the partner,
79 * "submitted" by the receiver
80 */
81static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
82{
83	unsigned long flags = 0;
84	struct drbd_peer_device *peer_device = peer_req->peer_device;
85	struct drbd_device *device = peer_device->device;
86
87	spin_lock_irqsave(&device->resource->req_lock, flags);
88	device->read_cnt += peer_req->i.size >> 9;
89	list_del(&peer_req->w.list);
90	if (list_empty(&device->read_ee))
91		wake_up(&device->ee_wait);
92	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93		__drbd_chk_io_error(device, DRBD_READ_ERROR);
94	spin_unlock_irqrestore(&device->resource->req_lock, flags);
95
96	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
97	put_ldev(device);
98}
99
100/* writes on behalf of the partner, or resync writes,
101 * "submitted" by the receiver, final stage.  */
102void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103{
104	unsigned long flags = 0;
105	struct drbd_peer_device *peer_device = peer_req->peer_device;
106	struct drbd_device *device = peer_device->device;
107	struct drbd_connection *connection = peer_device->connection;
108	struct drbd_interval i;
109	int do_wake;
110	u64 block_id;
111	int do_al_complete_io;
112
113	/* after we moved peer_req to done_ee,
114	 * we may no longer access it,
115	 * it may be freed/reused already!
116	 * (as soon as we release the req_lock) */
117	i = peer_req->i;
118	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
119	block_id = peer_req->block_id;
120	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
121
122	if (peer_req->flags & EE_WAS_ERROR) {
123		/* In protocol != C, we usually do not send write acks.
124		 * In case of a write error, send the neg ack anyways. */
125		if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
126			inc_unacked(device);
127		drbd_set_out_of_sync(peer_device, peer_req->i.sector, peer_req->i.size);
128	}
129
130	spin_lock_irqsave(&device->resource->req_lock, flags);
131	device->writ_cnt += peer_req->i.size >> 9;
132	list_move_tail(&peer_req->w.list, &device->done_ee);
133
134	/*
135	 * Do not remove from the write_requests tree here: we did not send the
136	 * Ack yet and did not wake possibly waiting conflicting requests.
137	 * Removed from the tree from "drbd_process_done_ee" within the
138	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
139	 * _drbd_clear_done_ee.
140	 */
141
142	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
143
144	/* FIXME do we want to detach for failed REQ_OP_DISCARD?
145	 * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
146	if (peer_req->flags & EE_WAS_ERROR)
147		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
148
149	if (connection->cstate >= C_WF_REPORT_PARAMS) {
150		kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
151		if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
152			kref_put(&device->kref, drbd_destroy_device);
153	}
154	spin_unlock_irqrestore(&device->resource->req_lock, flags);
155
156	if (block_id == ID_SYNCER)
157		drbd_rs_complete_io(device, i.sector);
158
159	if (do_wake)
160		wake_up(&device->ee_wait);
161
162	if (do_al_complete_io)
163		drbd_al_complete_io(device, &i);
164
165	put_ldev(device);
166}
167
168/* writes on behalf of the partner, or resync writes,
169 * "submitted" by the receiver.
170 */
171void drbd_peer_request_endio(struct bio *bio)
172{
173	struct drbd_peer_request *peer_req = bio->bi_private;
174	struct drbd_device *device = peer_req->peer_device->device;
175	bool is_write = bio_data_dir(bio) == WRITE;
176	bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
177			  bio_op(bio) == REQ_OP_DISCARD;
178
179	if (bio->bi_status && drbd_ratelimit())
180		drbd_warn(device, "%s: error=%d s=%llus\n",
181				is_write ? (is_discard ? "discard" : "write")
182					: "read", bio->bi_status,
183				(unsigned long long)peer_req->i.sector);
184
185	if (bio->bi_status)
186		set_bit(__EE_WAS_ERROR, &peer_req->flags);
187
188	bio_put(bio); /* no need for the bio anymore */
189	if (atomic_dec_and_test(&peer_req->pending_bios)) {
190		if (is_write)
191			drbd_endio_write_sec_final(peer_req);
192		else
193			drbd_endio_read_sec_final(peer_req);
194	}
195}
196
197static void
198drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199{
200	panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
201		device->minor, device->resource->name, device->vnr);
202}
203
204/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205 */
206void drbd_request_endio(struct bio *bio)
207{
208	unsigned long flags;
209	struct drbd_request *req = bio->bi_private;
210	struct drbd_device *device = req->device;
211	struct bio_and_error m;
212	enum drbd_req_event what;
213
214	/* If this request was aborted locally before,
215	 * but now was completed "successfully",
216	 * chances are that this caused arbitrary data corruption.
217	 *
218	 * "aborting" requests, or force-detaching the disk, is intended for
219	 * completely blocked/hung local backing devices which do no longer
220	 * complete requests at all, not even do error completions.  In this
221	 * situation, usually a hard-reset and failover is the only way out.
222	 *
223	 * By "aborting", basically faking a local error-completion,
224	 * we allow for a more graceful swichover by cleanly migrating services.
225	 * Still the affected node has to be rebooted "soon".
226	 *
227	 * By completing these requests, we allow the upper layers to re-use
228	 * the associated data pages.
229	 *
230	 * If later the local backing device "recovers", and now DMAs some data
231	 * from disk into the original request pages, in the best case it will
232	 * just put random data into unused pages; but typically it will corrupt
233	 * meanwhile completely unrelated data, causing all sorts of damage.
234	 *
235	 * Which means delayed successful completion,
236	 * especially for READ requests,
237	 * is a reason to panic().
238	 *
239	 * We assume that a delayed *error* completion is OK,
240	 * though we still will complain noisily about it.
241	 */
242	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
243		if (drbd_ratelimit())
244			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
245
246		if (!bio->bi_status)
247			drbd_panic_after_delayed_completion_of_aborted_request(device);
248	}
249
250	/* to avoid recursion in __req_mod */
251	if (unlikely(bio->bi_status)) {
252		switch (bio_op(bio)) {
253		case REQ_OP_WRITE_ZEROES:
254		case REQ_OP_DISCARD:
255			if (bio->bi_status == BLK_STS_NOTSUPP)
256				what = DISCARD_COMPLETED_NOTSUPP;
257			else
258				what = DISCARD_COMPLETED_WITH_ERROR;
259			break;
260		case REQ_OP_READ:
261			if (bio->bi_opf & REQ_RAHEAD)
262				what = READ_AHEAD_COMPLETED_WITH_ERROR;
263			else
264				what = READ_COMPLETED_WITH_ERROR;
265			break;
266		default:
267			what = WRITE_COMPLETED_WITH_ERROR;
268			break;
269		}
270	} else {
271		what = COMPLETED_OK;
272	}
273
274	req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
275	bio_put(bio);
276
277	/* not req_mod(), we need irqsave here! */
278	spin_lock_irqsave(&device->resource->req_lock, flags);
279	__req_mod(req, what, NULL, &m);
280	spin_unlock_irqrestore(&device->resource->req_lock, flags);
281	put_ldev(device);
282
283	if (m.bio)
284		complete_master_bio(device, &m);
285}
286
287void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
288{
289	SHASH_DESC_ON_STACK(desc, tfm);
290	struct page *page = peer_req->pages;
291	struct page *tmp;
292	unsigned len;
293	void *src;
294
295	desc->tfm = tfm;
296
297	crypto_shash_init(desc);
298
299	src = kmap_atomic(page);
300	while ((tmp = page_chain_next(page))) {
301		/* all but the last page will be fully used */
302		crypto_shash_update(desc, src, PAGE_SIZE);
303		kunmap_atomic(src);
304		page = tmp;
305		src = kmap_atomic(page);
306	}
307	/* and now the last, possibly only partially used page */
308	len = peer_req->i.size & (PAGE_SIZE - 1);
309	crypto_shash_update(desc, src, len ?: PAGE_SIZE);
310	kunmap_atomic(src);
311
312	crypto_shash_final(desc, digest);
313	shash_desc_zero(desc);
314}
315
316void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
317{
318	SHASH_DESC_ON_STACK(desc, tfm);
319	struct bio_vec bvec;
320	struct bvec_iter iter;
321
322	desc->tfm = tfm;
323
324	crypto_shash_init(desc);
325
326	bio_for_each_segment(bvec, bio, iter) {
327		u8 *src;
328
329		src = bvec_kmap_local(&bvec);
330		crypto_shash_update(desc, src, bvec.bv_len);
331		kunmap_local(src);
332	}
333	crypto_shash_final(desc, digest);
334	shash_desc_zero(desc);
335}
336
337/* MAYBE merge common code with w_e_end_ov_req */
338static int w_e_send_csum(struct drbd_work *w, int cancel)
339{
340	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
341	struct drbd_peer_device *peer_device = peer_req->peer_device;
342	struct drbd_device *device = peer_device->device;
343	int digest_size;
344	void *digest;
345	int err = 0;
346
347	if (unlikely(cancel))
348		goto out;
349
350	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
351		goto out;
352
353	digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
354	digest = kmalloc(digest_size, GFP_NOIO);
355	if (digest) {
356		sector_t sector = peer_req->i.sector;
357		unsigned int size = peer_req->i.size;
358		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
359		/* Free peer_req and pages before send.
360		 * In case we block on congestion, we could otherwise run into
361		 * some distributed deadlock, if the other side blocks on
362		 * congestion as well, because our receiver blocks in
363		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
364		drbd_free_peer_req(device, peer_req);
365		peer_req = NULL;
366		inc_rs_pending(peer_device);
367		err = drbd_send_drequest_csum(peer_device, sector, size,
368					      digest, digest_size,
369					      P_CSUM_RS_REQUEST);
370		kfree(digest);
371	} else {
372		drbd_err(device, "kmalloc() of digest failed.\n");
373		err = -ENOMEM;
374	}
375
376out:
377	if (peer_req)
378		drbd_free_peer_req(device, peer_req);
379
380	if (unlikely(err))
381		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
382	return err;
383}
384
385#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
386
387static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
388{
389	struct drbd_device *device = peer_device->device;
390	struct drbd_peer_request *peer_req;
391
392	if (!get_ldev(device))
393		return -EIO;
394
395	/* GFP_TRY, because if there is no memory available right now, this may
396	 * be rescheduled for later. It is "only" background resync, after all. */
397	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
398				       size, size, GFP_TRY);
399	if (!peer_req)
400		goto defer;
401
402	peer_req->w.cb = w_e_send_csum;
403	peer_req->opf = REQ_OP_READ;
404	spin_lock_irq(&device->resource->req_lock);
405	list_add_tail(&peer_req->w.list, &device->read_ee);
406	spin_unlock_irq(&device->resource->req_lock);
407
408	atomic_add(size >> 9, &device->rs_sect_ev);
409	if (drbd_submit_peer_request(peer_req) == 0)
410		return 0;
411
412	/* If it failed because of ENOMEM, retry should help.  If it failed
413	 * because bio_add_page failed (probably broken lower level driver),
414	 * retry may or may not help.
415	 * If it does not, you may need to force disconnect. */
416	spin_lock_irq(&device->resource->req_lock);
417	list_del(&peer_req->w.list);
418	spin_unlock_irq(&device->resource->req_lock);
419
420	drbd_free_peer_req(device, peer_req);
421defer:
422	put_ldev(device);
423	return -EAGAIN;
424}
425
426int w_resync_timer(struct drbd_work *w, int cancel)
427{
428	struct drbd_device *device =
429		container_of(w, struct drbd_device, resync_work);
430
431	switch (device->state.conn) {
432	case C_VERIFY_S:
433		make_ov_request(first_peer_device(device), cancel);
434		break;
435	case C_SYNC_TARGET:
436		make_resync_request(first_peer_device(device), cancel);
437		break;
438	}
439
440	return 0;
441}
442
443void resync_timer_fn(struct timer_list *t)
444{
445	struct drbd_device *device = from_timer(device, t, resync_timer);
446
447	drbd_queue_work_if_unqueued(
448		&first_peer_device(device)->connection->sender_work,
449		&device->resync_work);
450}
451
452static void fifo_set(struct fifo_buffer *fb, int value)
453{
454	int i;
455
456	for (i = 0; i < fb->size; i++)
457		fb->values[i] = value;
458}
459
460static int fifo_push(struct fifo_buffer *fb, int value)
461{
462	int ov;
463
464	ov = fb->values[fb->head_index];
465	fb->values[fb->head_index++] = value;
466
467	if (fb->head_index >= fb->size)
468		fb->head_index = 0;
469
470	return ov;
471}
472
473static void fifo_add_val(struct fifo_buffer *fb, int value)
474{
475	int i;
476
477	for (i = 0; i < fb->size; i++)
478		fb->values[i] += value;
479}
480
481struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
482{
483	struct fifo_buffer *fb;
484
485	fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
486	if (!fb)
487		return NULL;
488
489	fb->head_index = 0;
490	fb->size = fifo_size;
491	fb->total = 0;
492
493	return fb;
494}
495
496static int drbd_rs_controller(struct drbd_peer_device *peer_device, unsigned int sect_in)
497{
498	struct drbd_device *device = peer_device->device;
499	struct disk_conf *dc;
500	unsigned int want;     /* The number of sectors we want in-flight */
501	int req_sect; /* Number of sectors to request in this turn */
502	int correction; /* Number of sectors more we need in-flight */
503	int cps; /* correction per invocation of drbd_rs_controller() */
504	int steps; /* Number of time steps to plan ahead */
505	int curr_corr;
506	int max_sect;
507	struct fifo_buffer *plan;
508
509	dc = rcu_dereference(device->ldev->disk_conf);
510	plan = rcu_dereference(device->rs_plan_s);
511
512	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
513
514	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
515		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
516	} else { /* normal path */
517		want = dc->c_fill_target ? dc->c_fill_target :
518			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
519	}
520
521	correction = want - device->rs_in_flight - plan->total;
522
523	/* Plan ahead */
524	cps = correction / steps;
525	fifo_add_val(plan, cps);
526	plan->total += cps * steps;
527
528	/* What we do in this step */
529	curr_corr = fifo_push(plan, 0);
530	plan->total -= curr_corr;
531
532	req_sect = sect_in + curr_corr;
533	if (req_sect < 0)
534		req_sect = 0;
535
536	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
537	if (req_sect > max_sect)
538		req_sect = max_sect;
539
540	/*
541	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
542		 sect_in, device->rs_in_flight, want, correction,
543		 steps, cps, device->rs_planed, curr_corr, req_sect);
544	*/
545
546	return req_sect;
547}
548
549static int drbd_rs_number_requests(struct drbd_peer_device *peer_device)
550{
551	struct drbd_device *device = peer_device->device;
552	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
553	int number, mxb;
554
555	sect_in = atomic_xchg(&device->rs_sect_in, 0);
556	device->rs_in_flight -= sect_in;
557
558	rcu_read_lock();
559	mxb = drbd_get_max_buffers(device) / 2;
560	if (rcu_dereference(device->rs_plan_s)->size) {
561		number = drbd_rs_controller(peer_device, sect_in) >> (BM_BLOCK_SHIFT - 9);
562		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
563	} else {
564		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
565		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
566	}
567	rcu_read_unlock();
568
569	/* Don't have more than "max-buffers"/2 in-flight.
570	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
571	 * potentially causing a distributed deadlock on congestion during
572	 * online-verify or (checksum-based) resync, if max-buffers,
573	 * socket buffer sizes and resync rate settings are mis-configured. */
574
575	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
576	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
577	 * "number of pages" (typically also 4k),
578	 * but "rs_in_flight" is in "sectors" (512 Byte). */
579	if (mxb - device->rs_in_flight/8 < number)
580		number = mxb - device->rs_in_flight/8;
581
582	return number;
583}
584
585static int make_resync_request(struct drbd_peer_device *const peer_device, int cancel)
586{
587	struct drbd_device *const device = peer_device->device;
588	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
589	unsigned long bit;
590	sector_t sector;
591	const sector_t capacity = get_capacity(device->vdisk);
592	int max_bio_size;
593	int number, rollback_i, size;
594	int align, requeue = 0;
595	int i = 0;
596	int discard_granularity = 0;
597
598	if (unlikely(cancel))
599		return 0;
600
601	if (device->rs_total == 0) {
602		/* empty resync? */
603		drbd_resync_finished(peer_device);
604		return 0;
605	}
606
607	if (!get_ldev(device)) {
608		/* Since we only need to access device->rsync a
609		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
610		   to continue resync with a broken disk makes no sense at
611		   all */
612		drbd_err(device, "Disk broke down during resync!\n");
613		return 0;
614	}
615
616	if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
617		rcu_read_lock();
618		discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
619		rcu_read_unlock();
620	}
621
622	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
623	number = drbd_rs_number_requests(peer_device);
624	if (number <= 0)
625		goto requeue;
626
627	for (i = 0; i < number; i++) {
628		/* Stop generating RS requests when half of the send buffer is filled,
629		 * but notify TCP that we'd like to have more space. */
630		mutex_lock(&connection->data.mutex);
631		if (connection->data.socket) {
632			struct sock *sk = connection->data.socket->sk;
633			int queued = sk->sk_wmem_queued;
634			int sndbuf = sk->sk_sndbuf;
635			if (queued > sndbuf / 2) {
636				requeue = 1;
637				if (sk->sk_socket)
638					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
639			}
640		} else
641			requeue = 1;
642		mutex_unlock(&connection->data.mutex);
643		if (requeue)
644			goto requeue;
645
646next_sector:
647		size = BM_BLOCK_SIZE;
648		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
649
650		if (bit == DRBD_END_OF_BITMAP) {
651			device->bm_resync_fo = drbd_bm_bits(device);
652			put_ldev(device);
653			return 0;
654		}
655
656		sector = BM_BIT_TO_SECT(bit);
657
658		if (drbd_try_rs_begin_io(peer_device, sector)) {
659			device->bm_resync_fo = bit;
660			goto requeue;
661		}
662		device->bm_resync_fo = bit + 1;
663
664		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
665			drbd_rs_complete_io(device, sector);
666			goto next_sector;
667		}
668
669#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
670		/* try to find some adjacent bits.
671		 * we stop if we have already the maximum req size.
672		 *
673		 * Additionally always align bigger requests, in order to
674		 * be prepared for all stripe sizes of software RAIDs.
675		 */
676		align = 1;
677		rollback_i = i;
678		while (i < number) {
679			if (size + BM_BLOCK_SIZE > max_bio_size)
680				break;
681
682			/* Be always aligned */
683			if (sector & ((1<<(align+3))-1))
684				break;
685
686			if (discard_granularity && size == discard_granularity)
687				break;
688
689			/* do not cross extent boundaries */
690			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
691				break;
692			/* now, is it actually dirty, after all?
693			 * caution, drbd_bm_test_bit is tri-state for some
694			 * obscure reason; ( b == 0 ) would get the out-of-band
695			 * only accidentally right because of the "oddly sized"
696			 * adjustment below */
697			if (drbd_bm_test_bit(device, bit+1) != 1)
698				break;
699			bit++;
700			size += BM_BLOCK_SIZE;
701			if ((BM_BLOCK_SIZE << align) <= size)
702				align++;
703			i++;
704		}
705		/* if we merged some,
706		 * reset the offset to start the next drbd_bm_find_next from */
707		if (size > BM_BLOCK_SIZE)
708			device->bm_resync_fo = bit + 1;
709#endif
710
711		/* adjust very last sectors, in case we are oddly sized */
712		if (sector + (size>>9) > capacity)
713			size = (capacity-sector)<<9;
714
715		if (device->use_csums) {
716			switch (read_for_csum(peer_device, sector, size)) {
717			case -EIO: /* Disk failure */
718				put_ldev(device);
719				return -EIO;
720			case -EAGAIN: /* allocation failed, or ldev busy */
721				drbd_rs_complete_io(device, sector);
722				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
723				i = rollback_i;
724				goto requeue;
725			case 0:
726				/* everything ok */
727				break;
728			default:
729				BUG();
730			}
731		} else {
732			int err;
733
734			inc_rs_pending(peer_device);
735			err = drbd_send_drequest(peer_device,
736						 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
737						 sector, size, ID_SYNCER);
738			if (err) {
739				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
740				dec_rs_pending(peer_device);
741				put_ldev(device);
742				return err;
743			}
744		}
745	}
746
747	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
748		/* last syncer _request_ was sent,
749		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
750		 * next sync group will resume), as soon as we receive the last
751		 * resync data block, and the last bit is cleared.
752		 * until then resync "work" is "inactive" ...
753		 */
754		put_ldev(device);
755		return 0;
756	}
757
758 requeue:
759	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
760	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
761	put_ldev(device);
762	return 0;
763}
764
765static int make_ov_request(struct drbd_peer_device *peer_device, int cancel)
766{
767	struct drbd_device *device = peer_device->device;
768	int number, i, size;
769	sector_t sector;
770	const sector_t capacity = get_capacity(device->vdisk);
771	bool stop_sector_reached = false;
772
773	if (unlikely(cancel))
774		return 1;
775
776	number = drbd_rs_number_requests(peer_device);
777
778	sector = device->ov_position;
779	for (i = 0; i < number; i++) {
780		if (sector >= capacity)
781			return 1;
782
783		/* We check for "finished" only in the reply path:
784		 * w_e_end_ov_reply().
785		 * We need to send at least one request out. */
786		stop_sector_reached = i > 0
787			&& verify_can_do_stop_sector(device)
788			&& sector >= device->ov_stop_sector;
789		if (stop_sector_reached)
790			break;
791
792		size = BM_BLOCK_SIZE;
793
794		if (drbd_try_rs_begin_io(peer_device, sector)) {
795			device->ov_position = sector;
796			goto requeue;
797		}
798
799		if (sector + (size>>9) > capacity)
800			size = (capacity-sector)<<9;
801
802		inc_rs_pending(peer_device);
803		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
804			dec_rs_pending(peer_device);
805			return 0;
806		}
807		sector += BM_SECT_PER_BIT;
808	}
809	device->ov_position = sector;
810
811 requeue:
812	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
813	if (i == 0 || !stop_sector_reached)
814		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
815	return 1;
816}
817
818int w_ov_finished(struct drbd_work *w, int cancel)
819{
820	struct drbd_device_work *dw =
821		container_of(w, struct drbd_device_work, w);
822	struct drbd_device *device = dw->device;
823	kfree(dw);
824	ov_out_of_sync_print(first_peer_device(device));
825	drbd_resync_finished(first_peer_device(device));
826
827	return 0;
828}
829
830static int w_resync_finished(struct drbd_work *w, int cancel)
831{
832	struct drbd_device_work *dw =
833		container_of(w, struct drbd_device_work, w);
834	struct drbd_device *device = dw->device;
835	kfree(dw);
836
837	drbd_resync_finished(first_peer_device(device));
838
839	return 0;
840}
841
842static void ping_peer(struct drbd_device *device)
843{
844	struct drbd_connection *connection = first_peer_device(device)->connection;
845
846	clear_bit(GOT_PING_ACK, &connection->flags);
847	request_ping(connection);
848	wait_event(connection->ping_wait,
849		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
850}
851
852int drbd_resync_finished(struct drbd_peer_device *peer_device)
853{
854	struct drbd_device *device = peer_device->device;
855	struct drbd_connection *connection = peer_device->connection;
856	unsigned long db, dt, dbdt;
857	unsigned long n_oos;
858	union drbd_state os, ns;
859	struct drbd_device_work *dw;
860	char *khelper_cmd = NULL;
861	int verify_done = 0;
862
863	/* Remove all elements from the resync LRU. Since future actions
864	 * might set bits in the (main) bitmap, then the entries in the
865	 * resync LRU would be wrong. */
866	if (drbd_rs_del_all(device)) {
867		/* In case this is not possible now, most probably because
868		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
869		 * queue (or even the read operations for those packets
870		 * is not finished by now).   Retry in 100ms. */
871
872		schedule_timeout_interruptible(HZ / 10);
873		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
874		if (dw) {
875			dw->w.cb = w_resync_finished;
876			dw->device = device;
877			drbd_queue_work(&connection->sender_work, &dw->w);
878			return 1;
879		}
880		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
881	}
882
883	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
884	if (dt <= 0)
885		dt = 1;
886
887	db = device->rs_total;
888	/* adjust for verify start and stop sectors, respective reached position */
889	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
890		db -= device->ov_left;
891
892	dbdt = Bit2KB(db/dt);
893	device->rs_paused /= HZ;
894
895	if (!get_ldev(device))
896		goto out;
897
898	ping_peer(device);
899
900	spin_lock_irq(&device->resource->req_lock);
901	os = drbd_read_state(device);
902
903	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
904
905	/* This protects us against multiple calls (that can happen in the presence
906	   of application IO), and against connectivity loss just before we arrive here. */
907	if (os.conn <= C_CONNECTED)
908		goto out_unlock;
909
910	ns = os;
911	ns.conn = C_CONNECTED;
912
913	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
914	     verify_done ? "Online verify" : "Resync",
915	     dt + device->rs_paused, device->rs_paused, dbdt);
916
917	n_oos = drbd_bm_total_weight(device);
918
919	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
920		if (n_oos) {
921			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
922			      n_oos, Bit2KB(1));
923			khelper_cmd = "out-of-sync";
924		}
925	} else {
926		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
927
928		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
929			khelper_cmd = "after-resync-target";
930
931		if (device->use_csums && device->rs_total) {
932			const unsigned long s = device->rs_same_csum;
933			const unsigned long t = device->rs_total;
934			const int ratio =
935				(t == 0)     ? 0 :
936			(t < 100000) ? ((s*100)/t) : (s/(t/100));
937			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
938			     "transferred %luK total %luK\n",
939			     ratio,
940			     Bit2KB(device->rs_same_csum),
941			     Bit2KB(device->rs_total - device->rs_same_csum),
942			     Bit2KB(device->rs_total));
943		}
944	}
945
946	if (device->rs_failed) {
947		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
948
949		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
950			ns.disk = D_INCONSISTENT;
951			ns.pdsk = D_UP_TO_DATE;
952		} else {
953			ns.disk = D_UP_TO_DATE;
954			ns.pdsk = D_INCONSISTENT;
955		}
956	} else {
957		ns.disk = D_UP_TO_DATE;
958		ns.pdsk = D_UP_TO_DATE;
959
960		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
961			if (device->p_uuid) {
962				int i;
963				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
964					_drbd_uuid_set(device, i, device->p_uuid[i]);
965				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
966				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
967			} else {
968				drbd_err(device, "device->p_uuid is NULL! BUG\n");
969			}
970		}
971
972		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
973			/* for verify runs, we don't update uuids here,
974			 * so there would be nothing to report. */
975			drbd_uuid_set_bm(device, 0UL);
976			drbd_print_uuids(device, "updated UUIDs");
977			if (device->p_uuid) {
978				/* Now the two UUID sets are equal, update what we
979				 * know of the peer. */
980				int i;
981				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
982					device->p_uuid[i] = device->ldev->md.uuid[i];
983			}
984		}
985	}
986
987	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
988out_unlock:
989	spin_unlock_irq(&device->resource->req_lock);
990
991	/* If we have been sync source, and have an effective fencing-policy,
992	 * once *all* volumes are back in sync, call "unfence". */
993	if (os.conn == C_SYNC_SOURCE) {
994		enum drbd_disk_state disk_state = D_MASK;
995		enum drbd_disk_state pdsk_state = D_MASK;
996		enum drbd_fencing_p fp = FP_DONT_CARE;
997
998		rcu_read_lock();
999		fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1000		if (fp != FP_DONT_CARE) {
1001			struct drbd_peer_device *peer_device;
1002			int vnr;
1003			idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1004				struct drbd_device *device = peer_device->device;
1005				disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1006				pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1007			}
1008		}
1009		rcu_read_unlock();
1010		if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1011			conn_khelper(connection, "unfence-peer");
1012	}
1013
1014	put_ldev(device);
1015out:
1016	device->rs_total  = 0;
1017	device->rs_failed = 0;
1018	device->rs_paused = 0;
1019
1020	/* reset start sector, if we reached end of device */
1021	if (verify_done && device->ov_left == 0)
1022		device->ov_start_sector = 0;
1023
1024	drbd_md_sync(device);
1025
1026	if (khelper_cmd)
1027		drbd_khelper(device, khelper_cmd);
1028
1029	return 1;
1030}
1031
1032/* helper */
1033static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1034{
1035	if (drbd_peer_req_has_active_page(peer_req)) {
1036		/* This might happen if sendpage() has not finished */
1037		int i = PFN_UP(peer_req->i.size);
1038		atomic_add(i, &device->pp_in_use_by_net);
1039		atomic_sub(i, &device->pp_in_use);
1040		spin_lock_irq(&device->resource->req_lock);
1041		list_add_tail(&peer_req->w.list, &device->net_ee);
1042		spin_unlock_irq(&device->resource->req_lock);
1043		wake_up(&drbd_pp_wait);
1044	} else
1045		drbd_free_peer_req(device, peer_req);
1046}
1047
1048/**
1049 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1050 * @w:		work object.
1051 * @cancel:	The connection will be closed anyways
1052 */
1053int w_e_end_data_req(struct drbd_work *w, int cancel)
1054{
1055	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1056	struct drbd_peer_device *peer_device = peer_req->peer_device;
1057	struct drbd_device *device = peer_device->device;
1058	int err;
1059
1060	if (unlikely(cancel)) {
1061		drbd_free_peer_req(device, peer_req);
1062		dec_unacked(device);
1063		return 0;
1064	}
1065
1066	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1067		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1068	} else {
1069		if (drbd_ratelimit())
1070			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1071			    (unsigned long long)peer_req->i.sector);
1072
1073		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1074	}
1075
1076	dec_unacked(device);
1077
1078	move_to_net_ee_or_free(device, peer_req);
1079
1080	if (unlikely(err))
1081		drbd_err(device, "drbd_send_block() failed\n");
1082	return err;
1083}
1084
1085static bool all_zero(struct drbd_peer_request *peer_req)
1086{
1087	struct page *page = peer_req->pages;
1088	unsigned int len = peer_req->i.size;
1089
1090	page_chain_for_each(page) {
1091		unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1092		unsigned int i, words = l / sizeof(long);
1093		unsigned long *d;
1094
1095		d = kmap_atomic(page);
1096		for (i = 0; i < words; i++) {
1097			if (d[i]) {
1098				kunmap_atomic(d);
1099				return false;
1100			}
1101		}
1102		kunmap_atomic(d);
1103		len -= l;
1104	}
1105
1106	return true;
1107}
1108
1109/**
1110 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1111 * @w:		work object.
1112 * @cancel:	The connection will be closed anyways
1113 */
1114int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1115{
1116	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1117	struct drbd_peer_device *peer_device = peer_req->peer_device;
1118	struct drbd_device *device = peer_device->device;
1119	int err;
1120
1121	if (unlikely(cancel)) {
1122		drbd_free_peer_req(device, peer_req);
1123		dec_unacked(device);
1124		return 0;
1125	}
1126
1127	if (get_ldev_if_state(device, D_FAILED)) {
1128		drbd_rs_complete_io(device, peer_req->i.sector);
1129		put_ldev(device);
1130	}
1131
1132	if (device->state.conn == C_AHEAD) {
1133		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1134	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1135		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1136			inc_rs_pending(peer_device);
1137			if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1138				err = drbd_send_rs_deallocated(peer_device, peer_req);
1139			else
1140				err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1141		} else {
1142			if (drbd_ratelimit())
1143				drbd_err(device, "Not sending RSDataReply, "
1144				    "partner DISKLESS!\n");
1145			err = 0;
1146		}
1147	} else {
1148		if (drbd_ratelimit())
1149			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1150			    (unsigned long long)peer_req->i.sector);
1151
1152		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1153
1154		/* update resync data with failure */
1155		drbd_rs_failed_io(peer_device, peer_req->i.sector, peer_req->i.size);
1156	}
1157
1158	dec_unacked(device);
1159
1160	move_to_net_ee_or_free(device, peer_req);
1161
1162	if (unlikely(err))
1163		drbd_err(device, "drbd_send_block() failed\n");
1164	return err;
1165}
1166
1167int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1168{
1169	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1170	struct drbd_peer_device *peer_device = peer_req->peer_device;
1171	struct drbd_device *device = peer_device->device;
1172	struct digest_info *di;
1173	int digest_size;
1174	void *digest = NULL;
1175	int err, eq = 0;
1176
1177	if (unlikely(cancel)) {
1178		drbd_free_peer_req(device, peer_req);
1179		dec_unacked(device);
1180		return 0;
1181	}
1182
1183	if (get_ldev(device)) {
1184		drbd_rs_complete_io(device, peer_req->i.sector);
1185		put_ldev(device);
1186	}
1187
1188	di = peer_req->digest;
1189
1190	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1191		/* quick hack to try to avoid a race against reconfiguration.
1192		 * a real fix would be much more involved,
1193		 * introducing more locking mechanisms */
1194		if (peer_device->connection->csums_tfm) {
1195			digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1196			D_ASSERT(device, digest_size == di->digest_size);
1197			digest = kmalloc(digest_size, GFP_NOIO);
1198		}
1199		if (digest) {
1200			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1201			eq = !memcmp(digest, di->digest, digest_size);
1202			kfree(digest);
1203		}
1204
1205		if (eq) {
1206			drbd_set_in_sync(peer_device, peer_req->i.sector, peer_req->i.size);
1207			/* rs_same_csums unit is BM_BLOCK_SIZE */
1208			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1209			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1210		} else {
1211			inc_rs_pending(peer_device);
1212			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1213			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1214			kfree(di);
1215			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1216		}
1217	} else {
1218		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1219		if (drbd_ratelimit())
1220			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1221	}
1222
1223	dec_unacked(device);
1224	move_to_net_ee_or_free(device, peer_req);
1225
1226	if (unlikely(err))
1227		drbd_err(device, "drbd_send_block/ack() failed\n");
1228	return err;
1229}
1230
1231int w_e_end_ov_req(struct drbd_work *w, int cancel)
1232{
1233	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1234	struct drbd_peer_device *peer_device = peer_req->peer_device;
1235	struct drbd_device *device = peer_device->device;
1236	sector_t sector = peer_req->i.sector;
1237	unsigned int size = peer_req->i.size;
1238	int digest_size;
1239	void *digest;
1240	int err = 0;
1241
1242	if (unlikely(cancel))
1243		goto out;
1244
1245	digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1246	digest = kmalloc(digest_size, GFP_NOIO);
1247	if (!digest) {
1248		err = 1;	/* terminate the connection in case the allocation failed */
1249		goto out;
1250	}
1251
1252	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1253		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1254	else
1255		memset(digest, 0, digest_size);
1256
1257	/* Free e and pages before send.
1258	 * In case we block on congestion, we could otherwise run into
1259	 * some distributed deadlock, if the other side blocks on
1260	 * congestion as well, because our receiver blocks in
1261	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1262	drbd_free_peer_req(device, peer_req);
1263	peer_req = NULL;
1264	inc_rs_pending(peer_device);
1265	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1266	if (err)
1267		dec_rs_pending(peer_device);
1268	kfree(digest);
1269
1270out:
1271	if (peer_req)
1272		drbd_free_peer_req(device, peer_req);
1273	dec_unacked(device);
1274	return err;
1275}
1276
1277void drbd_ov_out_of_sync_found(struct drbd_peer_device *peer_device, sector_t sector, int size)
1278{
1279	struct drbd_device *device = peer_device->device;
1280	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1281		device->ov_last_oos_size += size>>9;
1282	} else {
1283		device->ov_last_oos_start = sector;
1284		device->ov_last_oos_size = size>>9;
1285	}
1286	drbd_set_out_of_sync(peer_device, sector, size);
1287}
1288
1289int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1290{
1291	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1292	struct drbd_peer_device *peer_device = peer_req->peer_device;
1293	struct drbd_device *device = peer_device->device;
1294	struct digest_info *di;
1295	void *digest;
1296	sector_t sector = peer_req->i.sector;
1297	unsigned int size = peer_req->i.size;
1298	int digest_size;
1299	int err, eq = 0;
1300	bool stop_sector_reached = false;
1301
1302	if (unlikely(cancel)) {
1303		drbd_free_peer_req(device, peer_req);
1304		dec_unacked(device);
1305		return 0;
1306	}
1307
1308	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1309	 * the resync lru has been cleaned up already */
1310	if (get_ldev(device)) {
1311		drbd_rs_complete_io(device, peer_req->i.sector);
1312		put_ldev(device);
1313	}
1314
1315	di = peer_req->digest;
1316
1317	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1318		digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1319		digest = kmalloc(digest_size, GFP_NOIO);
1320		if (digest) {
1321			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1322
1323			D_ASSERT(device, digest_size == di->digest_size);
1324			eq = !memcmp(digest, di->digest, digest_size);
1325			kfree(digest);
1326		}
1327	}
1328
1329	/* Free peer_req and pages before send.
1330	 * In case we block on congestion, we could otherwise run into
1331	 * some distributed deadlock, if the other side blocks on
1332	 * congestion as well, because our receiver blocks in
1333	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1334	drbd_free_peer_req(device, peer_req);
1335	if (!eq)
1336		drbd_ov_out_of_sync_found(peer_device, sector, size);
1337	else
1338		ov_out_of_sync_print(peer_device);
1339
1340	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1341			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1342
1343	dec_unacked(device);
1344
1345	--device->ov_left;
1346
1347	/* let's advance progress step marks only for every other megabyte */
1348	if ((device->ov_left & 0x200) == 0x200)
1349		drbd_advance_rs_marks(peer_device, device->ov_left);
1350
1351	stop_sector_reached = verify_can_do_stop_sector(device) &&
1352		(sector + (size>>9)) >= device->ov_stop_sector;
1353
1354	if (device->ov_left == 0 || stop_sector_reached) {
1355		ov_out_of_sync_print(peer_device);
1356		drbd_resync_finished(peer_device);
1357	}
1358
1359	return err;
1360}
1361
1362/* FIXME
1363 * We need to track the number of pending barrier acks,
1364 * and to be able to wait for them.
1365 * See also comment in drbd_adm_attach before drbd_suspend_io.
1366 */
1367static int drbd_send_barrier(struct drbd_connection *connection)
1368{
1369	struct p_barrier *p;
1370	struct drbd_socket *sock;
1371
1372	sock = &connection->data;
1373	p = conn_prepare_command(connection, sock);
1374	if (!p)
1375		return -EIO;
1376	p->barrier = connection->send.current_epoch_nr;
1377	p->pad = 0;
1378	connection->send.current_epoch_writes = 0;
1379	connection->send.last_sent_barrier_jif = jiffies;
1380
1381	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1382}
1383
1384static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1385{
1386	struct drbd_socket *sock = &pd->connection->data;
1387	if (!drbd_prepare_command(pd, sock))
1388		return -EIO;
1389	return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1390}
1391
1392int w_send_write_hint(struct drbd_work *w, int cancel)
1393{
1394	struct drbd_device *device =
1395		container_of(w, struct drbd_device, unplug_work);
1396
1397	if (cancel)
1398		return 0;
1399	return pd_send_unplug_remote(first_peer_device(device));
1400}
1401
1402static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1403{
1404	if (!connection->send.seen_any_write_yet) {
1405		connection->send.seen_any_write_yet = true;
1406		connection->send.current_epoch_nr = epoch;
1407		connection->send.current_epoch_writes = 0;
1408		connection->send.last_sent_barrier_jif = jiffies;
1409	}
1410}
1411
1412static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1413{
1414	/* re-init if first write on this connection */
1415	if (!connection->send.seen_any_write_yet)
1416		return;
1417	if (connection->send.current_epoch_nr != epoch) {
1418		if (connection->send.current_epoch_writes)
1419			drbd_send_barrier(connection);
1420		connection->send.current_epoch_nr = epoch;
1421	}
1422}
1423
1424int w_send_out_of_sync(struct drbd_work *w, int cancel)
1425{
1426	struct drbd_request *req = container_of(w, struct drbd_request, w);
1427	struct drbd_device *device = req->device;
1428	struct drbd_peer_device *const peer_device = first_peer_device(device);
1429	struct drbd_connection *const connection = peer_device->connection;
1430	int err;
1431
1432	if (unlikely(cancel)) {
1433		req_mod(req, SEND_CANCELED, peer_device);
1434		return 0;
1435	}
1436	req->pre_send_jif = jiffies;
1437
1438	/* this time, no connection->send.current_epoch_writes++;
1439	 * If it was sent, it was the closing barrier for the last
1440	 * replicated epoch, before we went into AHEAD mode.
1441	 * No more barriers will be sent, until we leave AHEAD mode again. */
1442	maybe_send_barrier(connection, req->epoch);
1443
1444	err = drbd_send_out_of_sync(peer_device, req);
1445	req_mod(req, OOS_HANDED_TO_NETWORK, peer_device);
1446
1447	return err;
1448}
1449
1450/**
1451 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1452 * @w:		work object.
1453 * @cancel:	The connection will be closed anyways
1454 */
1455int w_send_dblock(struct drbd_work *w, int cancel)
1456{
1457	struct drbd_request *req = container_of(w, struct drbd_request, w);
1458	struct drbd_device *device = req->device;
1459	struct drbd_peer_device *const peer_device = first_peer_device(device);
1460	struct drbd_connection *connection = peer_device->connection;
1461	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1462	int err;
1463
1464	if (unlikely(cancel)) {
1465		req_mod(req, SEND_CANCELED, peer_device);
1466		return 0;
1467	}
1468	req->pre_send_jif = jiffies;
1469
1470	re_init_if_first_write(connection, req->epoch);
1471	maybe_send_barrier(connection, req->epoch);
1472	connection->send.current_epoch_writes++;
1473
1474	err = drbd_send_dblock(peer_device, req);
1475	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK, peer_device);
1476
1477	if (do_send_unplug && !err)
1478		pd_send_unplug_remote(peer_device);
1479
1480	return err;
1481}
1482
1483/**
1484 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1485 * @w:		work object.
1486 * @cancel:	The connection will be closed anyways
1487 */
1488int w_send_read_req(struct drbd_work *w, int cancel)
1489{
1490	struct drbd_request *req = container_of(w, struct drbd_request, w);
1491	struct drbd_device *device = req->device;
1492	struct drbd_peer_device *const peer_device = first_peer_device(device);
1493	struct drbd_connection *connection = peer_device->connection;
1494	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1495	int err;
1496
1497	if (unlikely(cancel)) {
1498		req_mod(req, SEND_CANCELED, peer_device);
1499		return 0;
1500	}
1501	req->pre_send_jif = jiffies;
1502
1503	/* Even read requests may close a write epoch,
1504	 * if there was any yet. */
1505	maybe_send_barrier(connection, req->epoch);
1506
1507	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1508				 (unsigned long)req);
1509
1510	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK, peer_device);
1511
1512	if (do_send_unplug && !err)
1513		pd_send_unplug_remote(peer_device);
1514
1515	return err;
1516}
1517
1518int w_restart_disk_io(struct drbd_work *w, int cancel)
1519{
1520	struct drbd_request *req = container_of(w, struct drbd_request, w);
1521	struct drbd_device *device = req->device;
1522
1523	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1524		drbd_al_begin_io(device, &req->i);
1525
1526	req->private_bio = bio_alloc_clone(device->ldev->backing_bdev,
1527					   req->master_bio, GFP_NOIO,
1528					  &drbd_io_bio_set);
1529	req->private_bio->bi_private = req;
1530	req->private_bio->bi_end_io = drbd_request_endio;
1531	submit_bio_noacct(req->private_bio);
1532
1533	return 0;
1534}
1535
1536static int _drbd_may_sync_now(struct drbd_device *device)
1537{
1538	struct drbd_device *odev = device;
1539	int resync_after;
1540
1541	while (1) {
1542		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1543			return 1;
1544		rcu_read_lock();
1545		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1546		rcu_read_unlock();
1547		if (resync_after == -1)
1548			return 1;
1549		odev = minor_to_device(resync_after);
1550		if (!odev)
1551			return 1;
1552		if ((odev->state.conn >= C_SYNC_SOURCE &&
1553		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1554		    odev->state.aftr_isp || odev->state.peer_isp ||
1555		    odev->state.user_isp)
1556			return 0;
1557	}
1558}
1559
1560/**
1561 * drbd_pause_after() - Pause resync on all devices that may not resync now
1562 * @device:	DRBD device.
1563 *
1564 * Called from process context only (admin command and after_state_ch).
1565 */
1566static bool drbd_pause_after(struct drbd_device *device)
1567{
1568	bool changed = false;
1569	struct drbd_device *odev;
1570	int i;
1571
1572	rcu_read_lock();
1573	idr_for_each_entry(&drbd_devices, odev, i) {
1574		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1575			continue;
1576		if (!_drbd_may_sync_now(odev) &&
1577		    _drbd_set_state(_NS(odev, aftr_isp, 1),
1578				    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1579			changed = true;
1580	}
1581	rcu_read_unlock();
1582
1583	return changed;
1584}
1585
1586/**
1587 * drbd_resume_next() - Resume resync on all devices that may resync now
1588 * @device:	DRBD device.
1589 *
1590 * Called from process context only (admin command and worker).
1591 */
1592static bool drbd_resume_next(struct drbd_device *device)
1593{
1594	bool changed = false;
1595	struct drbd_device *odev;
1596	int i;
1597
1598	rcu_read_lock();
1599	idr_for_each_entry(&drbd_devices, odev, i) {
1600		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1601			continue;
1602		if (odev->state.aftr_isp) {
1603			if (_drbd_may_sync_now(odev) &&
1604			    _drbd_set_state(_NS(odev, aftr_isp, 0),
1605					    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1606				changed = true;
1607		}
1608	}
1609	rcu_read_unlock();
1610	return changed;
1611}
1612
1613void resume_next_sg(struct drbd_device *device)
1614{
1615	lock_all_resources();
1616	drbd_resume_next(device);
1617	unlock_all_resources();
1618}
1619
1620void suspend_other_sg(struct drbd_device *device)
1621{
1622	lock_all_resources();
1623	drbd_pause_after(device);
1624	unlock_all_resources();
1625}
1626
1627/* caller must lock_all_resources() */
1628enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1629{
1630	struct drbd_device *odev;
1631	int resync_after;
1632
1633	if (o_minor == -1)
1634		return NO_ERROR;
1635	if (o_minor < -1 || o_minor > MINORMASK)
1636		return ERR_RESYNC_AFTER;
1637
1638	/* check for loops */
1639	odev = minor_to_device(o_minor);
1640	while (1) {
1641		if (odev == device)
1642			return ERR_RESYNC_AFTER_CYCLE;
1643
1644		/* You are free to depend on diskless, non-existing,
1645		 * or not yet/no longer existing minors.
1646		 * We only reject dependency loops.
1647		 * We cannot follow the dependency chain beyond a detached or
1648		 * missing minor.
1649		 */
1650		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1651			return NO_ERROR;
1652
1653		rcu_read_lock();
1654		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1655		rcu_read_unlock();
1656		/* dependency chain ends here, no cycles. */
1657		if (resync_after == -1)
1658			return NO_ERROR;
1659
1660		/* follow the dependency chain */
1661		odev = minor_to_device(resync_after);
1662	}
1663}
1664
1665/* caller must lock_all_resources() */
1666void drbd_resync_after_changed(struct drbd_device *device)
1667{
1668	int changed;
1669
1670	do {
1671		changed  = drbd_pause_after(device);
1672		changed |= drbd_resume_next(device);
1673	} while (changed);
1674}
1675
1676void drbd_rs_controller_reset(struct drbd_peer_device *peer_device)
1677{
1678	struct drbd_device *device = peer_device->device;
1679	struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
1680	struct fifo_buffer *plan;
1681
1682	atomic_set(&device->rs_sect_in, 0);
1683	atomic_set(&device->rs_sect_ev, 0);
1684	device->rs_in_flight = 0;
1685	device->rs_last_events =
1686		(int)part_stat_read_accum(disk->part0, sectors);
1687
1688	/* Updating the RCU protected object in place is necessary since
1689	   this function gets called from atomic context.
1690	   It is valid since all other updates also lead to an completely
1691	   empty fifo */
1692	rcu_read_lock();
1693	plan = rcu_dereference(device->rs_plan_s);
1694	plan->total = 0;
1695	fifo_set(plan, 0);
1696	rcu_read_unlock();
1697}
1698
1699void start_resync_timer_fn(struct timer_list *t)
1700{
1701	struct drbd_device *device = from_timer(device, t, start_resync_timer);
1702	drbd_device_post_work(device, RS_START);
1703}
1704
1705static void do_start_resync(struct drbd_device *device)
1706{
1707	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1708		drbd_warn(device, "postponing start_resync ...\n");
1709		device->start_resync_timer.expires = jiffies + HZ/10;
1710		add_timer(&device->start_resync_timer);
1711		return;
1712	}
1713
1714	drbd_start_resync(device, C_SYNC_SOURCE);
1715	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1716}
1717
1718static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1719{
1720	bool csums_after_crash_only;
1721	rcu_read_lock();
1722	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1723	rcu_read_unlock();
1724	return connection->agreed_pro_version >= 89 &&		/* supported? */
1725		connection->csums_tfm &&			/* configured? */
1726		(csums_after_crash_only == false		/* use for each resync? */
1727		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
1728}
1729
1730/**
1731 * drbd_start_resync() - Start the resync process
1732 * @device:	DRBD device.
1733 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1734 *
1735 * This function might bring you directly into one of the
1736 * C_PAUSED_SYNC_* states.
1737 */
1738void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1739{
1740	struct drbd_peer_device *peer_device = first_peer_device(device);
1741	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1742	union drbd_state ns;
1743	int r;
1744
1745	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1746		drbd_err(device, "Resync already running!\n");
1747		return;
1748	}
1749
1750	if (!connection) {
1751		drbd_err(device, "No connection to peer, aborting!\n");
1752		return;
1753	}
1754
1755	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1756		if (side == C_SYNC_TARGET) {
1757			/* Since application IO was locked out during C_WF_BITMAP_T and
1758			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1759			   we check that we might make the data inconsistent. */
1760			r = drbd_khelper(device, "before-resync-target");
1761			r = (r >> 8) & 0xff;
1762			if (r > 0) {
1763				drbd_info(device, "before-resync-target handler returned %d, "
1764					 "dropping connection.\n", r);
1765				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1766				return;
1767			}
1768		} else /* C_SYNC_SOURCE */ {
1769			r = drbd_khelper(device, "before-resync-source");
1770			r = (r >> 8) & 0xff;
1771			if (r > 0) {
1772				if (r == 3) {
1773					drbd_info(device, "before-resync-source handler returned %d, "
1774						 "ignoring. Old userland tools?", r);
1775				} else {
1776					drbd_info(device, "before-resync-source handler returned %d, "
1777						 "dropping connection.\n", r);
1778					conn_request_state(connection,
1779							   NS(conn, C_DISCONNECTING), CS_HARD);
1780					return;
1781				}
1782			}
1783		}
1784	}
1785
1786	if (current == connection->worker.task) {
1787		/* The worker should not sleep waiting for state_mutex,
1788		   that can take long */
1789		if (!mutex_trylock(device->state_mutex)) {
1790			set_bit(B_RS_H_DONE, &device->flags);
1791			device->start_resync_timer.expires = jiffies + HZ/5;
1792			add_timer(&device->start_resync_timer);
1793			return;
1794		}
1795	} else {
1796		mutex_lock(device->state_mutex);
1797	}
1798
1799	lock_all_resources();
1800	clear_bit(B_RS_H_DONE, &device->flags);
1801	/* Did some connection breakage or IO error race with us? */
1802	if (device->state.conn < C_CONNECTED
1803	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1804		unlock_all_resources();
1805		goto out;
1806	}
1807
1808	ns = drbd_read_state(device);
1809
1810	ns.aftr_isp = !_drbd_may_sync_now(device);
1811
1812	ns.conn = side;
1813
1814	if (side == C_SYNC_TARGET)
1815		ns.disk = D_INCONSISTENT;
1816	else /* side == C_SYNC_SOURCE */
1817		ns.pdsk = D_INCONSISTENT;
1818
1819	r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1820	ns = drbd_read_state(device);
1821
1822	if (ns.conn < C_CONNECTED)
1823		r = SS_UNKNOWN_ERROR;
1824
1825	if (r == SS_SUCCESS) {
1826		unsigned long tw = drbd_bm_total_weight(device);
1827		unsigned long now = jiffies;
1828		int i;
1829
1830		device->rs_failed    = 0;
1831		device->rs_paused    = 0;
1832		device->rs_same_csum = 0;
1833		device->rs_last_sect_ev = 0;
1834		device->rs_total     = tw;
1835		device->rs_start     = now;
1836		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1837			device->rs_mark_left[i] = tw;
1838			device->rs_mark_time[i] = now;
1839		}
1840		drbd_pause_after(device);
1841		/* Forget potentially stale cached per resync extent bit-counts.
1842		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1843		 * disabled, and know the disk state is ok. */
1844		spin_lock(&device->al_lock);
1845		lc_reset(device->resync);
1846		device->resync_locked = 0;
1847		device->resync_wenr = LC_FREE;
1848		spin_unlock(&device->al_lock);
1849	}
1850	unlock_all_resources();
1851
1852	if (r == SS_SUCCESS) {
1853		wake_up(&device->al_wait); /* for lc_reset() above */
1854		/* reset rs_last_bcast when a resync or verify is started,
1855		 * to deal with potential jiffies wrap. */
1856		device->rs_last_bcast = jiffies - HZ;
1857
1858		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1859		     drbd_conn_str(ns.conn),
1860		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1861		     (unsigned long) device->rs_total);
1862		if (side == C_SYNC_TARGET) {
1863			device->bm_resync_fo = 0;
1864			device->use_csums = use_checksum_based_resync(connection, device);
1865		} else {
1866			device->use_csums = false;
1867		}
1868
1869		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1870		 * with w_send_oos, or the sync target will get confused as to
1871		 * how much bits to resync.  We cannot do that always, because for an
1872		 * empty resync and protocol < 95, we need to do it here, as we call
1873		 * drbd_resync_finished from here in that case.
1874		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1875		 * and from after_state_ch otherwise. */
1876		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1877			drbd_gen_and_send_sync_uuid(peer_device);
1878
1879		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1880			/* This still has a race (about when exactly the peers
1881			 * detect connection loss) that can lead to a full sync
1882			 * on next handshake. In 8.3.9 we fixed this with explicit
1883			 * resync-finished notifications, but the fix
1884			 * introduces a protocol change.  Sleeping for some
1885			 * time longer than the ping interval + timeout on the
1886			 * SyncSource, to give the SyncTarget the chance to
1887			 * detect connection loss, then waiting for a ping
1888			 * response (implicit in drbd_resync_finished) reduces
1889			 * the race considerably, but does not solve it. */
1890			if (side == C_SYNC_SOURCE) {
1891				struct net_conf *nc;
1892				int timeo;
1893
1894				rcu_read_lock();
1895				nc = rcu_dereference(connection->net_conf);
1896				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1897				rcu_read_unlock();
1898				schedule_timeout_interruptible(timeo);
1899			}
1900			drbd_resync_finished(peer_device);
1901		}
1902
1903		drbd_rs_controller_reset(peer_device);
1904		/* ns.conn may already be != device->state.conn,
1905		 * we may have been paused in between, or become paused until
1906		 * the timer triggers.
1907		 * No matter, that is handled in resync_timer_fn() */
1908		if (ns.conn == C_SYNC_TARGET)
1909			mod_timer(&device->resync_timer, jiffies);
1910
1911		drbd_md_sync(device);
1912	}
1913	put_ldev(device);
1914out:
1915	mutex_unlock(device->state_mutex);
1916}
1917
1918static void update_on_disk_bitmap(struct drbd_peer_device *peer_device, bool resync_done)
1919{
1920	struct drbd_device *device = peer_device->device;
1921	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1922	device->rs_last_bcast = jiffies;
1923
1924	if (!get_ldev(device))
1925		return;
1926
1927	drbd_bm_write_lazy(device, 0);
1928	if (resync_done && is_sync_state(device->state.conn))
1929		drbd_resync_finished(peer_device);
1930
1931	drbd_bcast_event(device, &sib);
1932	/* update timestamp, in case it took a while to write out stuff */
1933	device->rs_last_bcast = jiffies;
1934	put_ldev(device);
1935}
1936
1937static void drbd_ldev_destroy(struct drbd_device *device)
1938{
1939	lc_destroy(device->resync);
1940	device->resync = NULL;
1941	lc_destroy(device->act_log);
1942	device->act_log = NULL;
1943
1944	__acquire(local);
1945	drbd_backing_dev_free(device, device->ldev);
1946	device->ldev = NULL;
1947	__release(local);
1948
1949	clear_bit(GOING_DISKLESS, &device->flags);
1950	wake_up(&device->misc_wait);
1951}
1952
1953static void go_diskless(struct drbd_device *device)
1954{
1955	struct drbd_peer_device *peer_device = first_peer_device(device);
1956	D_ASSERT(device, device->state.disk == D_FAILED);
1957	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1958	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1959	 * the protected members anymore, though, so once put_ldev reaches zero
1960	 * again, it will be safe to free them. */
1961
1962	/* Try to write changed bitmap pages, read errors may have just
1963	 * set some bits outside the area covered by the activity log.
1964	 *
1965	 * If we have an IO error during the bitmap writeout,
1966	 * we will want a full sync next time, just in case.
1967	 * (Do we want a specific meta data flag for this?)
1968	 *
1969	 * If that does not make it to stable storage either,
1970	 * we cannot do anything about that anymore.
1971	 *
1972	 * We still need to check if both bitmap and ldev are present, we may
1973	 * end up here after a failed attach, before ldev was even assigned.
1974	 */
1975	if (device->bitmap && device->ldev) {
1976		/* An interrupted resync or similar is allowed to recounts bits
1977		 * while we detach.
1978		 * Any modifications would not be expected anymore, though.
1979		 */
1980		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1981					"detach", BM_LOCKED_TEST_ALLOWED, peer_device)) {
1982			if (test_bit(WAS_READ_ERROR, &device->flags)) {
1983				drbd_md_set_flag(device, MDF_FULL_SYNC);
1984				drbd_md_sync(device);
1985			}
1986		}
1987	}
1988
1989	drbd_force_state(device, NS(disk, D_DISKLESS));
1990}
1991
1992static int do_md_sync(struct drbd_device *device)
1993{
1994	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1995	drbd_md_sync(device);
1996	return 0;
1997}
1998
1999/* only called from drbd_worker thread, no locking */
2000void __update_timing_details(
2001		struct drbd_thread_timing_details *tdp,
2002		unsigned int *cb_nr,
2003		void *cb,
2004		const char *fn, const unsigned int line)
2005{
2006	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2007	struct drbd_thread_timing_details *td = tdp + i;
2008
2009	td->start_jif = jiffies;
2010	td->cb_addr = cb;
2011	td->caller_fn = fn;
2012	td->line = line;
2013	td->cb_nr = *cb_nr;
2014
2015	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2016	td = tdp + i;
2017	memset(td, 0, sizeof(*td));
2018
2019	++(*cb_nr);
2020}
2021
2022static void do_device_work(struct drbd_device *device, const unsigned long todo)
2023{
2024	if (test_bit(MD_SYNC, &todo))
2025		do_md_sync(device);
2026	if (test_bit(RS_DONE, &todo) ||
2027	    test_bit(RS_PROGRESS, &todo))
2028		update_on_disk_bitmap(first_peer_device(device), test_bit(RS_DONE, &todo));
2029	if (test_bit(GO_DISKLESS, &todo))
2030		go_diskless(device);
2031	if (test_bit(DESTROY_DISK, &todo))
2032		drbd_ldev_destroy(device);
2033	if (test_bit(RS_START, &todo))
2034		do_start_resync(device);
2035}
2036
2037#define DRBD_DEVICE_WORK_MASK	\
2038	((1UL << GO_DISKLESS)	\
2039	|(1UL << DESTROY_DISK)	\
2040	|(1UL << MD_SYNC)	\
2041	|(1UL << RS_START)	\
2042	|(1UL << RS_PROGRESS)	\
2043	|(1UL << RS_DONE)	\
2044	)
2045
2046static unsigned long get_work_bits(unsigned long *flags)
2047{
2048	unsigned long old, new;
2049	do {
2050		old = *flags;
2051		new = old & ~DRBD_DEVICE_WORK_MASK;
2052	} while (cmpxchg(flags, old, new) != old);
2053	return old & DRBD_DEVICE_WORK_MASK;
2054}
2055
2056static void do_unqueued_work(struct drbd_connection *connection)
2057{
2058	struct drbd_peer_device *peer_device;
2059	int vnr;
2060
2061	rcu_read_lock();
2062	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2063		struct drbd_device *device = peer_device->device;
2064		unsigned long todo = get_work_bits(&device->flags);
2065		if (!todo)
2066			continue;
2067
2068		kref_get(&device->kref);
2069		rcu_read_unlock();
2070		do_device_work(device, todo);
2071		kref_put(&device->kref, drbd_destroy_device);
2072		rcu_read_lock();
2073	}
2074	rcu_read_unlock();
2075}
2076
2077static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2078{
2079	spin_lock_irq(&queue->q_lock);
2080	list_splice_tail_init(&queue->q, work_list);
2081	spin_unlock_irq(&queue->q_lock);
2082	return !list_empty(work_list);
2083}
2084
2085static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2086{
2087	DEFINE_WAIT(wait);
2088	struct net_conf *nc;
2089	int uncork, cork;
2090
2091	dequeue_work_batch(&connection->sender_work, work_list);
2092	if (!list_empty(work_list))
2093		return;
2094
2095	/* Still nothing to do?
2096	 * Maybe we still need to close the current epoch,
2097	 * even if no new requests are queued yet.
2098	 *
2099	 * Also, poke TCP, just in case.
2100	 * Then wait for new work (or signal). */
2101	rcu_read_lock();
2102	nc = rcu_dereference(connection->net_conf);
2103	uncork = nc ? nc->tcp_cork : 0;
2104	rcu_read_unlock();
2105	if (uncork) {
2106		mutex_lock(&connection->data.mutex);
2107		if (connection->data.socket)
2108			tcp_sock_set_cork(connection->data.socket->sk, false);
2109		mutex_unlock(&connection->data.mutex);
2110	}
2111
2112	for (;;) {
2113		int send_barrier;
2114		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2115		spin_lock_irq(&connection->resource->req_lock);
2116		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2117		if (!list_empty(&connection->sender_work.q))
2118			list_splice_tail_init(&connection->sender_work.q, work_list);
2119		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2120		if (!list_empty(work_list) || signal_pending(current)) {
2121			spin_unlock_irq(&connection->resource->req_lock);
2122			break;
2123		}
2124
2125		/* We found nothing new to do, no to-be-communicated request,
2126		 * no other work item.  We may still need to close the last
2127		 * epoch.  Next incoming request epoch will be connection ->
2128		 * current transfer log epoch number.  If that is different
2129		 * from the epoch of the last request we communicated, it is
2130		 * safe to send the epoch separating barrier now.
2131		 */
2132		send_barrier =
2133			atomic_read(&connection->current_tle_nr) !=
2134			connection->send.current_epoch_nr;
2135		spin_unlock_irq(&connection->resource->req_lock);
2136
2137		if (send_barrier)
2138			maybe_send_barrier(connection,
2139					connection->send.current_epoch_nr + 1);
2140
2141		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2142			break;
2143
2144		/* drbd_send() may have called flush_signals() */
2145		if (get_t_state(&connection->worker) != RUNNING)
2146			break;
2147
2148		schedule();
2149		/* may be woken up for other things but new work, too,
2150		 * e.g. if the current epoch got closed.
2151		 * In which case we send the barrier above. */
2152	}
2153	finish_wait(&connection->sender_work.q_wait, &wait);
2154
2155	/* someone may have changed the config while we have been waiting above. */
2156	rcu_read_lock();
2157	nc = rcu_dereference(connection->net_conf);
2158	cork = nc ? nc->tcp_cork : 0;
2159	rcu_read_unlock();
2160	mutex_lock(&connection->data.mutex);
2161	if (connection->data.socket) {
2162		if (cork)
2163			tcp_sock_set_cork(connection->data.socket->sk, true);
2164		else if (!uncork)
2165			tcp_sock_set_cork(connection->data.socket->sk, false);
2166	}
2167	mutex_unlock(&connection->data.mutex);
2168}
2169
2170int drbd_worker(struct drbd_thread *thi)
2171{
2172	struct drbd_connection *connection = thi->connection;
2173	struct drbd_work *w = NULL;
2174	struct drbd_peer_device *peer_device;
2175	LIST_HEAD(work_list);
2176	int vnr;
2177
2178	while (get_t_state(thi) == RUNNING) {
2179		drbd_thread_current_set_cpu(thi);
2180
2181		if (list_empty(&work_list)) {
2182			update_worker_timing_details(connection, wait_for_work);
2183			wait_for_work(connection, &work_list);
2184		}
2185
2186		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2187			update_worker_timing_details(connection, do_unqueued_work);
2188			do_unqueued_work(connection);
2189		}
2190
2191		if (signal_pending(current)) {
2192			flush_signals(current);
2193			if (get_t_state(thi) == RUNNING) {
2194				drbd_warn(connection, "Worker got an unexpected signal\n");
2195				continue;
2196			}
2197			break;
2198		}
2199
2200		if (get_t_state(thi) != RUNNING)
2201			break;
2202
2203		if (!list_empty(&work_list)) {
2204			w = list_first_entry(&work_list, struct drbd_work, list);
2205			list_del_init(&w->list);
2206			update_worker_timing_details(connection, w->cb);
2207			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2208				continue;
2209			if (connection->cstate >= C_WF_REPORT_PARAMS)
2210				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2211		}
2212	}
2213
2214	do {
2215		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2216			update_worker_timing_details(connection, do_unqueued_work);
2217			do_unqueued_work(connection);
2218		}
2219		if (!list_empty(&work_list)) {
2220			w = list_first_entry(&work_list, struct drbd_work, list);
2221			list_del_init(&w->list);
2222			update_worker_timing_details(connection, w->cb);
2223			w->cb(w, 1);
2224		} else
2225			dequeue_work_batch(&connection->sender_work, &work_list);
2226	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2227
2228	rcu_read_lock();
2229	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2230		struct drbd_device *device = peer_device->device;
2231		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2232		kref_get(&device->kref);
2233		rcu_read_unlock();
2234		drbd_device_cleanup(device);
2235		kref_put(&device->kref, drbd_destroy_device);
2236		rcu_read_lock();
2237	}
2238	rcu_read_unlock();
2239
2240	return 0;
2241}
2242