• Home
  • History
  • Annotate
  • Line#
  • Navigate
  • Raw
  • Download
  • only in /asuswrt-rt-n18u-9.0.0.4.380.2695/release/src-rt-6.x.4708/linux/linux-2.6.36/drivers/block/drbd/
1/*
2   drbd_receiver.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
39#include <linux/smp_lock.h>
40#include <linux/pkt_sched.h>
41#define __KERNEL_SYSCALLS__
42#include <linux/unistd.h>
43#include <linux/vmalloc.h>
44#include <linux/random.h>
45#include <linux/string.h>
46#include <linux/scatterlist.h>
47#include "drbd_int.h"
48#include "drbd_req.h"
49
50#include "drbd_vli.h"
51
52struct flush_work {
53	struct drbd_work w;
54	struct drbd_epoch *epoch;
55};
56
57enum finish_epoch {
58	FE_STILL_LIVE,
59	FE_DESTROYED,
60	FE_RECYCLED,
61};
62
63static int drbd_do_handshake(struct drbd_conf *mdev);
64static int drbd_do_auth(struct drbd_conf *mdev);
65
66static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
67static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
68
69static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
70{
71	struct drbd_epoch *prev;
72	spin_lock(&mdev->epoch_lock);
73	prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
74	if (prev == epoch || prev == mdev->current_epoch)
75		prev = NULL;
76	spin_unlock(&mdev->epoch_lock);
77	return prev;
78}
79
80#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
81
82/*
83 * some helper functions to deal with single linked page lists,
84 * page->private being our "next" pointer.
85 */
86
87/* If at least n pages are linked at head, get n pages off.
88 * Otherwise, don't modify head, and return NULL.
89 * Locking is the responsibility of the caller.
90 */
91static struct page *page_chain_del(struct page **head, int n)
92{
93	struct page *page;
94	struct page *tmp;
95
96	BUG_ON(!n);
97	BUG_ON(!head);
98
99	page = *head;
100
101	if (!page)
102		return NULL;
103
104	while (page) {
105		tmp = page_chain_next(page);
106		if (--n == 0)
107			break; /* found sufficient pages */
108		if (tmp == NULL)
109			/* insufficient pages, don't use any of them. */
110			return NULL;
111		page = tmp;
112	}
113
114	/* add end of list marker for the returned list */
115	set_page_private(page, 0);
116	/* actual return value, and adjustment of head */
117	page = *head;
118	*head = tmp;
119	return page;
120}
121
122/* may be used outside of locks to find the tail of a (usually short)
123 * "private" page chain, before adding it back to a global chain head
124 * with page_chain_add() under a spinlock. */
125static struct page *page_chain_tail(struct page *page, int *len)
126{
127	struct page *tmp;
128	int i = 1;
129	while ((tmp = page_chain_next(page)))
130		++i, page = tmp;
131	if (len)
132		*len = i;
133	return page;
134}
135
136static int page_chain_free(struct page *page)
137{
138	struct page *tmp;
139	int i = 0;
140	page_chain_for_each_safe(page, tmp) {
141		put_page(page);
142		++i;
143	}
144	return i;
145}
146
147static void page_chain_add(struct page **head,
148		struct page *chain_first, struct page *chain_last)
149{
150	struct page *tmp;
151	tmp = page_chain_tail(chain_first, NULL);
152	BUG_ON(tmp != chain_last);
153
154	/* add chain to head */
155	set_page_private(chain_last, (unsigned long)*head);
156	*head = chain_first;
157}
158
159static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
160{
161	struct page *page = NULL;
162	struct page *tmp = NULL;
163	int i = 0;
164
165	/* Yes, testing drbd_pp_vacant outside the lock is racy.
166	 * So what. It saves a spin_lock. */
167	if (drbd_pp_vacant >= number) {
168		spin_lock(&drbd_pp_lock);
169		page = page_chain_del(&drbd_pp_pool, number);
170		if (page)
171			drbd_pp_vacant -= number;
172		spin_unlock(&drbd_pp_lock);
173		if (page)
174			return page;
175	}
176
177	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
178	 * "criss-cross" setup, that might cause write-out on some other DRBD,
179	 * which in turn might block on the other node at this very place.  */
180	for (i = 0; i < number; i++) {
181		tmp = alloc_page(GFP_TRY);
182		if (!tmp)
183			break;
184		set_page_private(tmp, (unsigned long)page);
185		page = tmp;
186	}
187
188	if (i == number)
189		return page;
190
191	/* Not enough pages immediately available this time.
192	 * No need to jump around here, drbd_pp_alloc will retry this
193	 * function "soon". */
194	if (page) {
195		tmp = page_chain_tail(page, NULL);
196		spin_lock(&drbd_pp_lock);
197		page_chain_add(&drbd_pp_pool, page, tmp);
198		drbd_pp_vacant += i;
199		spin_unlock(&drbd_pp_lock);
200	}
201	return NULL;
202}
203
204/* kick lower level device, if we have more than (arbitrary number)
205 * reference counts on it, which typically are locally submitted io
206 * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
207static void maybe_kick_lo(struct drbd_conf *mdev)
208{
209	if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
210		drbd_kick_lo(mdev);
211}
212
213static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
214{
215	struct drbd_epoch_entry *e;
216	struct list_head *le, *tle;
217
218	/* The EEs are always appended to the end of the list. Since
219	   they are sent in order over the wire, they have to finish
220	   in order. As soon as we see the first not finished we can
221	   stop to examine the list... */
222
223	list_for_each_safe(le, tle, &mdev->net_ee) {
224		e = list_entry(le, struct drbd_epoch_entry, w.list);
225		if (drbd_ee_has_active_page(e))
226			break;
227		list_move(le, to_be_freed);
228	}
229}
230
231static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
232{
233	LIST_HEAD(reclaimed);
234	struct drbd_epoch_entry *e, *t;
235
236	maybe_kick_lo(mdev);
237	spin_lock_irq(&mdev->req_lock);
238	reclaim_net_ee(mdev, &reclaimed);
239	spin_unlock_irq(&mdev->req_lock);
240
241	list_for_each_entry_safe(e, t, &reclaimed, w.list)
242		drbd_free_ee(mdev, e);
243}
244
245/**
246 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
247 * @mdev:	DRBD device.
248 * @number:	number of pages requested
249 * @retry:	whether to retry, if not enough pages are available right now
250 *
251 * Tries to allocate number pages, first from our own page pool, then from
252 * the kernel, unless this allocation would exceed the max_buffers setting.
253 * Possibly retry until DRBD frees sufficient pages somewhere else.
254 *
255 * Returns a page chain linked via page->private.
256 */
257static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
258{
259	struct page *page = NULL;
260	DEFINE_WAIT(wait);
261
262	/* Yes, we may run up to @number over max_buffers. If we
263	 * follow it strictly, the admin will get it wrong anyways. */
264	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
265		page = drbd_pp_first_pages_or_try_alloc(mdev, number);
266
267	while (page == NULL) {
268		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
269
270		drbd_kick_lo_and_reclaim_net(mdev);
271
272		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
273			page = drbd_pp_first_pages_or_try_alloc(mdev, number);
274			if (page)
275				break;
276		}
277
278		if (!retry)
279			break;
280
281		if (signal_pending(current)) {
282			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
283			break;
284		}
285
286		schedule();
287	}
288	finish_wait(&drbd_pp_wait, &wait);
289
290	if (page)
291		atomic_add(number, &mdev->pp_in_use);
292	return page;
293}
294
295/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
296 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
297 * Either links the page chain back to the global pool,
298 * or returns all pages to the system. */
299static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
300{
301	int i;
302	if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
303		i = page_chain_free(page);
304	else {
305		struct page *tmp;
306		tmp = page_chain_tail(page, &i);
307		spin_lock(&drbd_pp_lock);
308		page_chain_add(&drbd_pp_pool, page, tmp);
309		drbd_pp_vacant += i;
310		spin_unlock(&drbd_pp_lock);
311	}
312	atomic_sub(i, &mdev->pp_in_use);
313	i = atomic_read(&mdev->pp_in_use);
314	if (i < 0)
315		dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
316	wake_up(&drbd_pp_wait);
317}
318
319/*
320You need to hold the req_lock:
321 _drbd_wait_ee_list_empty()
322
323You must not have the req_lock:
324 drbd_free_ee()
325 drbd_alloc_ee()
326 drbd_init_ee()
327 drbd_release_ee()
328 drbd_ee_fix_bhs()
329 drbd_process_done_ee()
330 drbd_clear_done_ee()
331 drbd_wait_ee_list_empty()
332*/
333
334struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
335				     u64 id,
336				     sector_t sector,
337				     unsigned int data_size,
338				     gfp_t gfp_mask) __must_hold(local)
339{
340	struct drbd_epoch_entry *e;
341	struct page *page;
342	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
343
344	if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
345		return NULL;
346
347	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
348	if (!e) {
349		if (!(gfp_mask & __GFP_NOWARN))
350			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
351		return NULL;
352	}
353
354	page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
355	if (!page)
356		goto fail;
357
358	INIT_HLIST_NODE(&e->colision);
359	e->epoch = NULL;
360	e->mdev = mdev;
361	e->pages = page;
362	atomic_set(&e->pending_bios, 0);
363	e->size = data_size;
364	e->flags = 0;
365	e->sector = sector;
366	e->sector = sector;
367	e->block_id = id;
368
369	return e;
370
371 fail:
372	mempool_free(e, drbd_ee_mempool);
373	return NULL;
374}
375
376void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
377{
378	drbd_pp_free(mdev, e->pages);
379	D_ASSERT(atomic_read(&e->pending_bios) == 0);
380	D_ASSERT(hlist_unhashed(&e->colision));
381	mempool_free(e, drbd_ee_mempool);
382}
383
384int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
385{
386	LIST_HEAD(work_list);
387	struct drbd_epoch_entry *e, *t;
388	int count = 0;
389
390	spin_lock_irq(&mdev->req_lock);
391	list_splice_init(list, &work_list);
392	spin_unlock_irq(&mdev->req_lock);
393
394	list_for_each_entry_safe(e, t, &work_list, w.list) {
395		drbd_free_ee(mdev, e);
396		count++;
397	}
398	return count;
399}
400
401
402/*
403 * This function is called from _asender only_
404 * but see also comments in _req_mod(,barrier_acked)
405 * and receive_Barrier.
406 *
407 * Move entries from net_ee to done_ee, if ready.
408 * Grab done_ee, call all callbacks, free the entries.
409 * The callbacks typically send out ACKs.
410 */
411static int drbd_process_done_ee(struct drbd_conf *mdev)
412{
413	LIST_HEAD(work_list);
414	LIST_HEAD(reclaimed);
415	struct drbd_epoch_entry *e, *t;
416	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
417
418	spin_lock_irq(&mdev->req_lock);
419	reclaim_net_ee(mdev, &reclaimed);
420	list_splice_init(&mdev->done_ee, &work_list);
421	spin_unlock_irq(&mdev->req_lock);
422
423	list_for_each_entry_safe(e, t, &reclaimed, w.list)
424		drbd_free_ee(mdev, e);
425
426	/* possible callbacks here:
427	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
428	 * all ignore the last argument.
429	 */
430	list_for_each_entry_safe(e, t, &work_list, w.list) {
431		/* list_del not necessary, next/prev members not touched */
432		ok = e->w.cb(mdev, &e->w, !ok) && ok;
433		drbd_free_ee(mdev, e);
434	}
435	wake_up(&mdev->ee_wait);
436
437	return ok;
438}
439
440void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
441{
442	DEFINE_WAIT(wait);
443
444	/* avoids spin_lock/unlock
445	 * and calling prepare_to_wait in the fast path */
446	while (!list_empty(head)) {
447		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
448		spin_unlock_irq(&mdev->req_lock);
449		drbd_kick_lo(mdev);
450		schedule();
451		finish_wait(&mdev->ee_wait, &wait);
452		spin_lock_irq(&mdev->req_lock);
453	}
454}
455
456void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
457{
458	spin_lock_irq(&mdev->req_lock);
459	_drbd_wait_ee_list_empty(mdev, head);
460	spin_unlock_irq(&mdev->req_lock);
461}
462
463/* see also kernel_accept; which is only present since 2.6.18.
464 * also we want to log which part of it failed, exactly */
465static int drbd_accept(struct drbd_conf *mdev, const char **what,
466		struct socket *sock, struct socket **newsock)
467{
468	struct sock *sk = sock->sk;
469	int err = 0;
470
471	*what = "listen";
472	err = sock->ops->listen(sock, 5);
473	if (err < 0)
474		goto out;
475
476	*what = "sock_create_lite";
477	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478			       newsock);
479	if (err < 0)
480		goto out;
481
482	*what = "accept";
483	err = sock->ops->accept(sock, *newsock, 0);
484	if (err < 0) {
485		sock_release(*newsock);
486		*newsock = NULL;
487		goto out;
488	}
489	(*newsock)->ops  = sock->ops;
490
491out:
492	return err;
493}
494
495static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
496		    void *buf, size_t size, int flags)
497{
498	mm_segment_t oldfs;
499	struct kvec iov = {
500		.iov_base = buf,
501		.iov_len = size,
502	};
503	struct msghdr msg = {
504		.msg_iovlen = 1,
505		.msg_iov = (struct iovec *)&iov,
506		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
507	};
508	int rv;
509
510	oldfs = get_fs();
511	set_fs(KERNEL_DS);
512	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
513	set_fs(oldfs);
514
515	return rv;
516}
517
518static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
519{
520	mm_segment_t oldfs;
521	struct kvec iov = {
522		.iov_base = buf,
523		.iov_len = size,
524	};
525	struct msghdr msg = {
526		.msg_iovlen = 1,
527		.msg_iov = (struct iovec *)&iov,
528		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
529	};
530	int rv;
531
532	oldfs = get_fs();
533	set_fs(KERNEL_DS);
534
535	for (;;) {
536		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
537		if (rv == size)
538			break;
539
540		/* Note:
541		 * ECONNRESET	other side closed the connection
542		 * ERESTARTSYS	(on  sock) we got a signal
543		 */
544
545		if (rv < 0) {
546			if (rv == -ECONNRESET)
547				dev_info(DEV, "sock was reset by peer\n");
548			else if (rv != -ERESTARTSYS)
549				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
550			break;
551		} else if (rv == 0) {
552			dev_info(DEV, "sock was shut down by peer\n");
553			break;
554		} else	{
555			/* signal came in, or peer/link went down,
556			 * after we read a partial message
557			 */
558			/* D_ASSERT(signal_pending(current)); */
559			break;
560		}
561	};
562
563	set_fs(oldfs);
564
565	if (rv != size)
566		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
567
568	return rv;
569}
570
571/* quoting tcp(7):
572 *   On individual connections, the socket buffer size must be set prior to the
573 *   listen(2) or connect(2) calls in order to have it take effect.
574 * This is our wrapper to do so.
575 */
576static void drbd_setbufsize(struct socket *sock, unsigned int snd,
577		unsigned int rcv)
578{
579	/* open coded SO_SNDBUF, SO_RCVBUF */
580	if (snd) {
581		sock->sk->sk_sndbuf = snd;
582		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
583	}
584	if (rcv) {
585		sock->sk->sk_rcvbuf = rcv;
586		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
587	}
588}
589
590static struct socket *drbd_try_connect(struct drbd_conf *mdev)
591{
592	const char *what;
593	struct socket *sock;
594	struct sockaddr_in6 src_in6;
595	int err;
596	int disconnect_on_error = 1;
597
598	if (!get_net_conf(mdev))
599		return NULL;
600
601	what = "sock_create_kern";
602	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
603		SOCK_STREAM, IPPROTO_TCP, &sock);
604	if (err < 0) {
605		sock = NULL;
606		goto out;
607	}
608
609	sock->sk->sk_rcvtimeo =
610	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
611	drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
612			mdev->net_conf->rcvbuf_size);
613
614       /* explicitly bind to the configured IP as source IP
615	*  for the outgoing connections.
616	*  This is needed for multihomed hosts and to be
617	*  able to use lo: interfaces for drbd.
618	* Make sure to use 0 as port number, so linux selects
619	*  a free one dynamically.
620	*/
621	memcpy(&src_in6, mdev->net_conf->my_addr,
622	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
623	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
624		src_in6.sin6_port = 0;
625	else
626		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
627
628	what = "bind before connect";
629	err = sock->ops->bind(sock,
630			      (struct sockaddr *) &src_in6,
631			      mdev->net_conf->my_addr_len);
632	if (err < 0)
633		goto out;
634
635	/* connect may fail, peer not yet available.
636	 * stay C_WF_CONNECTION, don't go Disconnecting! */
637	disconnect_on_error = 0;
638	what = "connect";
639	err = sock->ops->connect(sock,
640				 (struct sockaddr *)mdev->net_conf->peer_addr,
641				 mdev->net_conf->peer_addr_len, 0);
642
643out:
644	if (err < 0) {
645		if (sock) {
646			sock_release(sock);
647			sock = NULL;
648		}
649		switch (-err) {
650			/* timeout, busy, signal pending */
651		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
652		case EINTR: case ERESTARTSYS:
653			/* peer not (yet) available, network problem */
654		case ECONNREFUSED: case ENETUNREACH:
655		case EHOSTDOWN:    case EHOSTUNREACH:
656			disconnect_on_error = 0;
657			break;
658		default:
659			dev_err(DEV, "%s failed, err = %d\n", what, err);
660		}
661		if (disconnect_on_error)
662			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
663	}
664	put_net_conf(mdev);
665	return sock;
666}
667
668static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
669{
670	int timeo, err;
671	struct socket *s_estab = NULL, *s_listen;
672	const char *what;
673
674	if (!get_net_conf(mdev))
675		return NULL;
676
677	what = "sock_create_kern";
678	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
679		SOCK_STREAM, IPPROTO_TCP, &s_listen);
680	if (err) {
681		s_listen = NULL;
682		goto out;
683	}
684
685	timeo = mdev->net_conf->try_connect_int * HZ;
686	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
687
688	s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
689	s_listen->sk->sk_rcvtimeo = timeo;
690	s_listen->sk->sk_sndtimeo = timeo;
691	drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
692			mdev->net_conf->rcvbuf_size);
693
694	what = "bind before listen";
695	err = s_listen->ops->bind(s_listen,
696			      (struct sockaddr *) mdev->net_conf->my_addr,
697			      mdev->net_conf->my_addr_len);
698	if (err < 0)
699		goto out;
700
701	err = drbd_accept(mdev, &what, s_listen, &s_estab);
702
703out:
704	if (s_listen)
705		sock_release(s_listen);
706	if (err < 0) {
707		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
708			dev_err(DEV, "%s failed, err = %d\n", what, err);
709			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
710		}
711	}
712	put_net_conf(mdev);
713
714	return s_estab;
715}
716
717static int drbd_send_fp(struct drbd_conf *mdev,
718	struct socket *sock, enum drbd_packets cmd)
719{
720	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
721
722	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
723}
724
725static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
726{
727	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
728	int rr;
729
730	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
731
732	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
733		return be16_to_cpu(h->command);
734
735	return 0xffff;
736}
737
738/**
739 * drbd_socket_okay() - Free the socket if its connection is not okay
740 * @mdev:	DRBD device.
741 * @sock:	pointer to the pointer to the socket.
742 */
743static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
744{
745	int rr;
746	char tb[4];
747
748	if (!*sock)
749		return FALSE;
750
751	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
752
753	if (rr > 0 || rr == -EAGAIN) {
754		return TRUE;
755	} else {
756		sock_release(*sock);
757		*sock = NULL;
758		return FALSE;
759	}
760}
761
762/*
763 * return values:
764 *   1 yes, we have a valid connection
765 *   0 oops, did not work out, please try again
766 *  -1 peer talks different language,
767 *     no point in trying again, please go standalone.
768 *  -2 We do not have a network config...
769 */
770static int drbd_connect(struct drbd_conf *mdev)
771{
772	struct socket *s, *sock, *msock;
773	int try, h, ok;
774
775	D_ASSERT(!mdev->data.socket);
776
777	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
778		dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
779
780	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
781		return -2;
782
783	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
784
785	sock  = NULL;
786	msock = NULL;
787
788	do {
789		for (try = 0;;) {
790			/* 3 tries, this should take less than a second! */
791			s = drbd_try_connect(mdev);
792			if (s || ++try >= 3)
793				break;
794			/* give the other side time to call bind() & listen() */
795			__set_current_state(TASK_INTERRUPTIBLE);
796			schedule_timeout(HZ / 10);
797		}
798
799		if (s) {
800			if (!sock) {
801				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
802				sock = s;
803				s = NULL;
804			} else if (!msock) {
805				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
806				msock = s;
807				s = NULL;
808			} else {
809				dev_err(DEV, "Logic error in drbd_connect()\n");
810				goto out_release_sockets;
811			}
812		}
813
814		if (sock && msock) {
815			__set_current_state(TASK_INTERRUPTIBLE);
816			schedule_timeout(HZ / 10);
817			ok = drbd_socket_okay(mdev, &sock);
818			ok = drbd_socket_okay(mdev, &msock) && ok;
819			if (ok)
820				break;
821		}
822
823retry:
824		s = drbd_wait_for_connect(mdev);
825		if (s) {
826			try = drbd_recv_fp(mdev, s);
827			drbd_socket_okay(mdev, &sock);
828			drbd_socket_okay(mdev, &msock);
829			switch (try) {
830			case P_HAND_SHAKE_S:
831				if (sock) {
832					dev_warn(DEV, "initial packet S crossed\n");
833					sock_release(sock);
834				}
835				sock = s;
836				break;
837			case P_HAND_SHAKE_M:
838				if (msock) {
839					dev_warn(DEV, "initial packet M crossed\n");
840					sock_release(msock);
841				}
842				msock = s;
843				set_bit(DISCARD_CONCURRENT, &mdev->flags);
844				break;
845			default:
846				dev_warn(DEV, "Error receiving initial packet\n");
847				sock_release(s);
848				if (random32() & 1)
849					goto retry;
850			}
851		}
852
853		if (mdev->state.conn <= C_DISCONNECTING)
854			goto out_release_sockets;
855		if (signal_pending(current)) {
856			flush_signals(current);
857			smp_rmb();
858			if (get_t_state(&mdev->receiver) == Exiting)
859				goto out_release_sockets;
860		}
861
862		if (sock && msock) {
863			ok = drbd_socket_okay(mdev, &sock);
864			ok = drbd_socket_okay(mdev, &msock) && ok;
865			if (ok)
866				break;
867		}
868	} while (1);
869
870	msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
871	sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
872
873	sock->sk->sk_allocation = GFP_NOIO;
874	msock->sk->sk_allocation = GFP_NOIO;
875
876	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
877	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
878
879	/* NOT YET ...
880	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
881	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
882	 * first set it to the P_HAND_SHAKE timeout,
883	 * which we set to 4x the configured ping_timeout. */
884	sock->sk->sk_sndtimeo =
885	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
886
887	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
888	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
889
890	/* we don't want delays.
891	 * we use TCP_CORK where apropriate, though */
892	drbd_tcp_nodelay(sock);
893	drbd_tcp_nodelay(msock);
894
895	mdev->data.socket = sock;
896	mdev->meta.socket = msock;
897	mdev->last_received = jiffies;
898
899	D_ASSERT(mdev->asender.task == NULL);
900
901	h = drbd_do_handshake(mdev);
902	if (h <= 0)
903		return h;
904
905	if (mdev->cram_hmac_tfm) {
906		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
907		switch (drbd_do_auth(mdev)) {
908		case -1:
909			dev_err(DEV, "Authentication of peer failed\n");
910			return -1;
911		case 0:
912			dev_err(DEV, "Authentication of peer failed, trying again.\n");
913			return 0;
914		}
915	}
916
917	if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
918		return 0;
919
920	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
921	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
922
923	atomic_set(&mdev->packet_seq, 0);
924	mdev->peer_seq = 0;
925
926	drbd_thread_start(&mdev->asender);
927
928	if (!drbd_send_protocol(mdev))
929		return -1;
930	drbd_send_sync_param(mdev, &mdev->sync_conf);
931	drbd_send_sizes(mdev, 0, 0);
932	drbd_send_uuids(mdev);
933	drbd_send_state(mdev);
934	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
935	clear_bit(RESIZE_PENDING, &mdev->flags);
936
937	return 1;
938
939out_release_sockets:
940	if (sock)
941		sock_release(sock);
942	if (msock)
943		sock_release(msock);
944	return -1;
945}
946
947static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
948{
949	int r;
950
951	r = drbd_recv(mdev, h, sizeof(*h));
952
953	if (unlikely(r != sizeof(*h))) {
954		dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
955		return FALSE;
956	};
957	h->command = be16_to_cpu(h->command);
958	h->length  = be16_to_cpu(h->length);
959	if (unlikely(h->magic != BE_DRBD_MAGIC)) {
960		dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
961		    (long)be32_to_cpu(h->magic),
962		    h->command, h->length);
963		return FALSE;
964	}
965	mdev->last_received = jiffies;
966
967	return TRUE;
968}
969
970static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
971{
972	int rv;
973
974	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
975		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
976					NULL, BLKDEV_IFL_WAIT);
977		if (rv) {
978			dev_err(DEV, "local disk flush failed with status %d\n", rv);
979			/* would rather check on EOPNOTSUPP, but that is not reliable.
980			 * don't try again for ANY return value != 0
981			 * if (rv == -EOPNOTSUPP) */
982			drbd_bump_write_ordering(mdev, WO_drain_io);
983		}
984		put_ldev(mdev);
985	}
986
987	return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
988}
989
990static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
991{
992	struct flush_work *fw = (struct flush_work *)w;
993	struct drbd_epoch *epoch = fw->epoch;
994
995	kfree(w);
996
997	if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
998		drbd_flush_after_epoch(mdev, epoch);
999
1000	drbd_may_finish_epoch(mdev, epoch, EV_PUT |
1001			      (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
1002
1003	return 1;
1004}
1005
1006/**
1007 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1008 * @mdev:	DRBD device.
1009 * @epoch:	Epoch object.
1010 * @ev:		Epoch event.
1011 */
1012static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1013					       struct drbd_epoch *epoch,
1014					       enum epoch_event ev)
1015{
1016	int finish, epoch_size;
1017	struct drbd_epoch *next_epoch;
1018	int schedule_flush = 0;
1019	enum finish_epoch rv = FE_STILL_LIVE;
1020
1021	spin_lock(&mdev->epoch_lock);
1022	do {
1023		next_epoch = NULL;
1024		finish = 0;
1025
1026		epoch_size = atomic_read(&epoch->epoch_size);
1027
1028		switch (ev & ~EV_CLEANUP) {
1029		case EV_PUT:
1030			atomic_dec(&epoch->active);
1031			break;
1032		case EV_GOT_BARRIER_NR:
1033			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1034
1035			/* Special case: If we just switched from WO_bio_barrier to
1036			   WO_bdev_flush we should not finish the current epoch */
1037			if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1038			    mdev->write_ordering != WO_bio_barrier &&
1039			    epoch == mdev->current_epoch)
1040				clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1041			break;
1042		case EV_BARRIER_DONE:
1043			set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1044			break;
1045		case EV_BECAME_LAST:
1046			/* nothing to do*/
1047			break;
1048		}
1049
1050		if (epoch_size != 0 &&
1051		    atomic_read(&epoch->active) == 0 &&
1052		    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1053		    epoch->list.prev == &mdev->current_epoch->list &&
1054		    !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1055			/* Nearly all conditions are met to finish that epoch... */
1056			if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1057			    mdev->write_ordering == WO_none ||
1058			    (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1059			    ev & EV_CLEANUP) {
1060				finish = 1;
1061				set_bit(DE_IS_FINISHING, &epoch->flags);
1062			} else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1063				 mdev->write_ordering == WO_bio_barrier) {
1064				atomic_inc(&epoch->active);
1065				schedule_flush = 1;
1066			}
1067		}
1068		if (finish) {
1069			if (!(ev & EV_CLEANUP)) {
1070				spin_unlock(&mdev->epoch_lock);
1071				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1072				spin_lock(&mdev->epoch_lock);
1073			}
1074			dec_unacked(mdev);
1075
1076			if (mdev->current_epoch != epoch) {
1077				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1078				list_del(&epoch->list);
1079				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1080				mdev->epochs--;
1081				kfree(epoch);
1082
1083				if (rv == FE_STILL_LIVE)
1084					rv = FE_DESTROYED;
1085			} else {
1086				epoch->flags = 0;
1087				atomic_set(&epoch->epoch_size, 0);
1088				/* atomic_set(&epoch->active, 0); is already zero */
1089				if (rv == FE_STILL_LIVE)
1090					rv = FE_RECYCLED;
1091			}
1092		}
1093
1094		if (!next_epoch)
1095			break;
1096
1097		epoch = next_epoch;
1098	} while (1);
1099
1100	spin_unlock(&mdev->epoch_lock);
1101
1102	if (schedule_flush) {
1103		struct flush_work *fw;
1104		fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1105		if (fw) {
1106			fw->w.cb = w_flush;
1107			fw->epoch = epoch;
1108			drbd_queue_work(&mdev->data.work, &fw->w);
1109		} else {
1110			dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1111			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1112			/* That is not a recursion, only one level */
1113			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1114			drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1115		}
1116	}
1117
1118	return rv;
1119}
1120
1121/**
1122 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1123 * @mdev:	DRBD device.
1124 * @wo:		Write ordering method to try.
1125 */
1126void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1127{
1128	enum write_ordering_e pwo;
1129	static char *write_ordering_str[] = {
1130		[WO_none] = "none",
1131		[WO_drain_io] = "drain",
1132		[WO_bdev_flush] = "flush",
1133		[WO_bio_barrier] = "barrier",
1134	};
1135
1136	pwo = mdev->write_ordering;
1137	wo = min(pwo, wo);
1138	if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1139		wo = WO_bdev_flush;
1140	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1141		wo = WO_drain_io;
1142	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1143		wo = WO_none;
1144	mdev->write_ordering = wo;
1145	if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1146		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1147}
1148
1149/**
1150 * drbd_submit_ee()
1151 * @mdev:	DRBD device.
1152 * @e:		epoch entry
1153 * @rw:		flag field, see bio->bi_rw
1154 */
1155/* TODO allocate from our own bio_set. */
1156int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1157		const unsigned rw, const int fault_type)
1158{
1159	struct bio *bios = NULL;
1160	struct bio *bio;
1161	struct page *page = e->pages;
1162	sector_t sector = e->sector;
1163	unsigned ds = e->size;
1164	unsigned n_bios = 0;
1165	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1166
1167	/* In most cases, we will only need one bio.  But in case the lower
1168	 * level restrictions happen to be different at this offset on this
1169	 * side than those of the sending peer, we may need to submit the
1170	 * request in more than one bio. */
1171next_bio:
1172	bio = bio_alloc(GFP_NOIO, nr_pages);
1173	if (!bio) {
1174		dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1175		goto fail;
1176	}
1177	/* > e->sector, unless this is the first bio */
1178	bio->bi_sector = sector;
1179	bio->bi_bdev = mdev->ldev->backing_bdev;
1180	/* we special case some flags in the multi-bio case, see below
1181	 * (REQ_UNPLUG, REQ_HARDBARRIER) */
1182	bio->bi_rw = rw;
1183	bio->bi_private = e;
1184	bio->bi_end_io = drbd_endio_sec;
1185
1186	bio->bi_next = bios;
1187	bios = bio;
1188	++n_bios;
1189
1190	page_chain_for_each(page) {
1191		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1192		if (!bio_add_page(bio, page, len, 0)) {
1193			/* a single page must always be possible! */
1194			BUG_ON(bio->bi_vcnt == 0);
1195			goto next_bio;
1196		}
1197		ds -= len;
1198		sector += len >> 9;
1199		--nr_pages;
1200	}
1201	D_ASSERT(page == NULL);
1202	D_ASSERT(ds == 0);
1203
1204	atomic_set(&e->pending_bios, n_bios);
1205	do {
1206		bio = bios;
1207		bios = bios->bi_next;
1208		bio->bi_next = NULL;
1209
1210		/* strip off REQ_UNPLUG unless it is the last bio */
1211		if (bios)
1212			bio->bi_rw &= ~REQ_UNPLUG;
1213
1214		drbd_generic_make_request(mdev, fault_type, bio);
1215
1216		/* strip off REQ_HARDBARRIER,
1217		 * unless it is the first or last bio */
1218		if (bios && bios->bi_next)
1219			bios->bi_rw &= ~REQ_HARDBARRIER;
1220	} while (bios);
1221	maybe_kick_lo(mdev);
1222	return 0;
1223
1224fail:
1225	while (bios) {
1226		bio = bios;
1227		bios = bios->bi_next;
1228		bio_put(bio);
1229	}
1230	return -ENOMEM;
1231}
1232
1233/**
1234 * w_e_reissue() - Worker callback; Resubmit a bio, without REQ_HARDBARRIER set
1235 * @mdev:	DRBD device.
1236 * @w:		work object.
1237 * @cancel:	The connection will be closed anyways (unused in this callback)
1238 */
1239int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1240{
1241	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1242	/* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1243	   (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1244	   so that we can finish that epoch in drbd_may_finish_epoch().
1245	   That is necessary if we already have a long chain of Epochs, before
1246	   we realize that REQ_HARDBARRIER is actually not supported */
1247
1248	/* As long as the -ENOTSUPP on the barrier is reported immediately
1249	   that will never trigger. If it is reported late, we will just
1250	   print that warning and continue correctly for all future requests
1251	   with WO_bdev_flush */
1252	if (previous_epoch(mdev, e->epoch))
1253		dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1254
1255	/* we still have a local reference,
1256	 * get_ldev was done in receive_Data. */
1257
1258	e->w.cb = e_end_block;
1259	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1260		/* drbd_submit_ee fails for one reason only:
1261		 * if was not able to allocate sufficient bios.
1262		 * requeue, try again later. */
1263		e->w.cb = w_e_reissue;
1264		drbd_queue_work(&mdev->data.work, &e->w);
1265	}
1266	return 1;
1267}
1268
1269static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1270{
1271	int rv, issue_flush;
1272	struct p_barrier *p = (struct p_barrier *)h;
1273	struct drbd_epoch *epoch;
1274
1275	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1276
1277	rv = drbd_recv(mdev, h->payload, h->length);
1278	ERR_IF(rv != h->length) return FALSE;
1279
1280	inc_unacked(mdev);
1281
1282	if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1283		drbd_kick_lo(mdev);
1284
1285	mdev->current_epoch->barrier_nr = p->barrier;
1286	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1287
1288	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1289	 * the activity log, which means it would not be resynced in case the
1290	 * R_PRIMARY crashes now.
1291	 * Therefore we must send the barrier_ack after the barrier request was
1292	 * completed. */
1293	switch (mdev->write_ordering) {
1294	case WO_bio_barrier:
1295	case WO_none:
1296		if (rv == FE_RECYCLED)
1297			return TRUE;
1298		break;
1299
1300	case WO_bdev_flush:
1301	case WO_drain_io:
1302		if (rv == FE_STILL_LIVE) {
1303			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1304			drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1305			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1306		}
1307		if (rv == FE_RECYCLED)
1308			return TRUE;
1309
1310		/* The asender will send all the ACKs and barrier ACKs out, since
1311		   all EEs moved from the active_ee to the done_ee. We need to
1312		   provide a new epoch object for the EEs that come in soon */
1313		break;
1314	}
1315
1316	/* receiver context, in the writeout path of the other node.
1317	 * avoid potential distributed deadlock */
1318	epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1319	if (!epoch) {
1320		dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1321		issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1322		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1323		if (issue_flush) {
1324			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1325			if (rv == FE_RECYCLED)
1326				return TRUE;
1327		}
1328
1329		drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1330
1331		return TRUE;
1332	}
1333
1334	epoch->flags = 0;
1335	atomic_set(&epoch->epoch_size, 0);
1336	atomic_set(&epoch->active, 0);
1337
1338	spin_lock(&mdev->epoch_lock);
1339	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1340		list_add(&epoch->list, &mdev->current_epoch->list);
1341		mdev->current_epoch = epoch;
1342		mdev->epochs++;
1343	} else {
1344		/* The current_epoch got recycled while we allocated this one... */
1345		kfree(epoch);
1346	}
1347	spin_unlock(&mdev->epoch_lock);
1348
1349	return TRUE;
1350}
1351
1352/* used from receive_RSDataReply (recv_resync_read)
1353 * and from receive_Data */
1354static struct drbd_epoch_entry *
1355read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1356{
1357	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1358	struct drbd_epoch_entry *e;
1359	struct page *page;
1360	int dgs, ds, rr;
1361	void *dig_in = mdev->int_dig_in;
1362	void *dig_vv = mdev->int_dig_vv;
1363	unsigned long *data;
1364
1365	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1366		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1367
1368	if (dgs) {
1369		rr = drbd_recv(mdev, dig_in, dgs);
1370		if (rr != dgs) {
1371			dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1372			     rr, dgs);
1373			return NULL;
1374		}
1375	}
1376
1377	data_size -= dgs;
1378
1379	ERR_IF(data_size &  0x1ff) return NULL;
1380	ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1381
1382	/* even though we trust out peer,
1383	 * we sometimes have to double check. */
1384	if (sector + (data_size>>9) > capacity) {
1385		dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1386			(unsigned long long)capacity,
1387			(unsigned long long)sector, data_size);
1388		return NULL;
1389	}
1390
1391	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1392	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1393	 * which in turn might block on the other node at this very place.  */
1394	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1395	if (!e)
1396		return NULL;
1397
1398	ds = data_size;
1399	page = e->pages;
1400	page_chain_for_each(page) {
1401		unsigned len = min_t(int, ds, PAGE_SIZE);
1402		data = kmap(page);
1403		rr = drbd_recv(mdev, data, len);
1404		if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1405			dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1406			data[0] = data[0] ^ (unsigned long)-1;
1407		}
1408		kunmap(page);
1409		if (rr != len) {
1410			drbd_free_ee(mdev, e);
1411			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1412			     rr, len);
1413			return NULL;
1414		}
1415		ds -= rr;
1416	}
1417
1418	if (dgs) {
1419		drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1420		if (memcmp(dig_in, dig_vv, dgs)) {
1421			dev_err(DEV, "Digest integrity check FAILED.\n");
1422			drbd_bcast_ee(mdev, "digest failed",
1423					dgs, dig_in, dig_vv, e);
1424			drbd_free_ee(mdev, e);
1425			return NULL;
1426		}
1427	}
1428	mdev->recv_cnt += data_size>>9;
1429	return e;
1430}
1431
1432/* drbd_drain_block() just takes a data block
1433 * out of the socket input buffer, and discards it.
1434 */
1435static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1436{
1437	struct page *page;
1438	int rr, rv = 1;
1439	void *data;
1440
1441	if (!data_size)
1442		return TRUE;
1443
1444	page = drbd_pp_alloc(mdev, 1, 1);
1445
1446	data = kmap(page);
1447	while (data_size) {
1448		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1449		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1450			rv = 0;
1451			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1452			     rr, min_t(int, data_size, PAGE_SIZE));
1453			break;
1454		}
1455		data_size -= rr;
1456	}
1457	kunmap(page);
1458	drbd_pp_free(mdev, page);
1459	return rv;
1460}
1461
1462static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1463			   sector_t sector, int data_size)
1464{
1465	struct bio_vec *bvec;
1466	struct bio *bio;
1467	int dgs, rr, i, expect;
1468	void *dig_in = mdev->int_dig_in;
1469	void *dig_vv = mdev->int_dig_vv;
1470
1471	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1472		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1473
1474	if (dgs) {
1475		rr = drbd_recv(mdev, dig_in, dgs);
1476		if (rr != dgs) {
1477			dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1478			     rr, dgs);
1479			return 0;
1480		}
1481	}
1482
1483	data_size -= dgs;
1484
1485	/* optimistically update recv_cnt.  if receiving fails below,
1486	 * we disconnect anyways, and counters will be reset. */
1487	mdev->recv_cnt += data_size>>9;
1488
1489	bio = req->master_bio;
1490	D_ASSERT(sector == bio->bi_sector);
1491
1492	bio_for_each_segment(bvec, bio, i) {
1493		expect = min_t(int, data_size, bvec->bv_len);
1494		rr = drbd_recv(mdev,
1495			     kmap(bvec->bv_page)+bvec->bv_offset,
1496			     expect);
1497		kunmap(bvec->bv_page);
1498		if (rr != expect) {
1499			dev_warn(DEV, "short read receiving data reply: "
1500			     "read %d expected %d\n",
1501			     rr, expect);
1502			return 0;
1503		}
1504		data_size -= rr;
1505	}
1506
1507	if (dgs) {
1508		drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1509		if (memcmp(dig_in, dig_vv, dgs)) {
1510			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1511			return 0;
1512		}
1513	}
1514
1515	D_ASSERT(data_size == 0);
1516	return 1;
1517}
1518
1519/* e_end_resync_block() is called via
1520 * drbd_process_done_ee() by asender only */
1521static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1522{
1523	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1524	sector_t sector = e->sector;
1525	int ok;
1526
1527	D_ASSERT(hlist_unhashed(&e->colision));
1528
1529	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1530		drbd_set_in_sync(mdev, sector, e->size);
1531		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1532	} else {
1533		/* Record failure to sync */
1534		drbd_rs_failed_io(mdev, sector, e->size);
1535
1536		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1537	}
1538	dec_unacked(mdev);
1539
1540	return ok;
1541}
1542
1543static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1544{
1545	struct drbd_epoch_entry *e;
1546
1547	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1548	if (!e)
1549		goto fail;
1550
1551	dec_rs_pending(mdev);
1552
1553	inc_unacked(mdev);
1554	/* corresponding dec_unacked() in e_end_resync_block()
1555	 * respective _drbd_clear_done_ee */
1556
1557	e->w.cb = e_end_resync_block;
1558
1559	spin_lock_irq(&mdev->req_lock);
1560	list_add(&e->w.list, &mdev->sync_ee);
1561	spin_unlock_irq(&mdev->req_lock);
1562
1563	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1564		return TRUE;
1565
1566	drbd_free_ee(mdev, e);
1567fail:
1568	put_ldev(mdev);
1569	return FALSE;
1570}
1571
1572static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1573{
1574	struct drbd_request *req;
1575	sector_t sector;
1576	unsigned int header_size, data_size;
1577	int ok;
1578	struct p_data *p = (struct p_data *)h;
1579
1580	header_size = sizeof(*p) - sizeof(*h);
1581	data_size   = h->length  - header_size;
1582
1583	ERR_IF(data_size == 0) return FALSE;
1584
1585	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1586		return FALSE;
1587
1588	sector = be64_to_cpu(p->sector);
1589
1590	spin_lock_irq(&mdev->req_lock);
1591	req = _ar_id_to_req(mdev, p->block_id, sector);
1592	spin_unlock_irq(&mdev->req_lock);
1593	if (unlikely(!req)) {
1594		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1595		return FALSE;
1596	}
1597
1598	/* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1599	 * special casing it there for the various failure cases.
1600	 * still no race with drbd_fail_pending_reads */
1601	ok = recv_dless_read(mdev, req, sector, data_size);
1602
1603	if (ok)
1604		req_mod(req, data_received);
1605	/* else: nothing. handled from drbd_disconnect...
1606	 * I don't think we may complete this just yet
1607	 * in case we are "on-disconnect: freeze" */
1608
1609	return ok;
1610}
1611
1612static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1613{
1614	sector_t sector;
1615	unsigned int header_size, data_size;
1616	int ok;
1617	struct p_data *p = (struct p_data *)h;
1618
1619	header_size = sizeof(*p) - sizeof(*h);
1620	data_size   = h->length  - header_size;
1621
1622	ERR_IF(data_size == 0) return FALSE;
1623
1624	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1625		return FALSE;
1626
1627	sector = be64_to_cpu(p->sector);
1628	D_ASSERT(p->block_id == ID_SYNCER);
1629
1630	if (get_ldev(mdev)) {
1631		/* data is submitted to disk within recv_resync_read.
1632		 * corresponding put_ldev done below on error,
1633		 * or in drbd_endio_write_sec. */
1634		ok = recv_resync_read(mdev, sector, data_size);
1635	} else {
1636		if (__ratelimit(&drbd_ratelimit_state))
1637			dev_err(DEV, "Can not write resync data to local disk.\n");
1638
1639		ok = drbd_drain_block(mdev, data_size);
1640
1641		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1642	}
1643
1644	return ok;
1645}
1646
1647/* e_end_block() is called via drbd_process_done_ee().
1648 * this means this function only runs in the asender thread
1649 */
1650static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1651{
1652	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1653	sector_t sector = e->sector;
1654	struct drbd_epoch *epoch;
1655	int ok = 1, pcmd;
1656
1657	if (e->flags & EE_IS_BARRIER) {
1658		epoch = previous_epoch(mdev, e->epoch);
1659		if (epoch)
1660			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1661	}
1662
1663	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1664		if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1665			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1666				mdev->state.conn <= C_PAUSED_SYNC_T &&
1667				e->flags & EE_MAY_SET_IN_SYNC) ?
1668				P_RS_WRITE_ACK : P_WRITE_ACK;
1669			ok &= drbd_send_ack(mdev, pcmd, e);
1670			if (pcmd == P_RS_WRITE_ACK)
1671				drbd_set_in_sync(mdev, sector, e->size);
1672		} else {
1673			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1674			/* we expect it to be marked out of sync anyways...
1675			 * maybe assert this?  */
1676		}
1677		dec_unacked(mdev);
1678	}
1679	/* we delete from the conflict detection hash _after_ we sent out the
1680	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1681	if (mdev->net_conf->two_primaries) {
1682		spin_lock_irq(&mdev->req_lock);
1683		D_ASSERT(!hlist_unhashed(&e->colision));
1684		hlist_del_init(&e->colision);
1685		spin_unlock_irq(&mdev->req_lock);
1686	} else {
1687		D_ASSERT(hlist_unhashed(&e->colision));
1688	}
1689
1690	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1691
1692	return ok;
1693}
1694
1695static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1696{
1697	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1698	int ok = 1;
1699
1700	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1701	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1702
1703	spin_lock_irq(&mdev->req_lock);
1704	D_ASSERT(!hlist_unhashed(&e->colision));
1705	hlist_del_init(&e->colision);
1706	spin_unlock_irq(&mdev->req_lock);
1707
1708	dec_unacked(mdev);
1709
1710	return ok;
1711}
1712
1713/* Called from receive_Data.
1714 * Synchronize packets on sock with packets on msock.
1715 *
1716 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1717 * packet traveling on msock, they are still processed in the order they have
1718 * been sent.
1719 *
1720 * Note: we don't care for Ack packets overtaking P_DATA packets.
1721 *
1722 * In case packet_seq is larger than mdev->peer_seq number, there are
1723 * outstanding packets on the msock. We wait for them to arrive.
1724 * In case we are the logically next packet, we update mdev->peer_seq
1725 * ourselves. Correctly handles 32bit wrap around.
1726 *
1727 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1728 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1729 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1730 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1731 *
1732 * returns 0 if we may process the packet,
1733 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1734static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1735{
1736	DEFINE_WAIT(wait);
1737	unsigned int p_seq;
1738	long timeout;
1739	int ret = 0;
1740	spin_lock(&mdev->peer_seq_lock);
1741	for (;;) {
1742		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1743		if (seq_le(packet_seq, mdev->peer_seq+1))
1744			break;
1745		if (signal_pending(current)) {
1746			ret = -ERESTARTSYS;
1747			break;
1748		}
1749		p_seq = mdev->peer_seq;
1750		spin_unlock(&mdev->peer_seq_lock);
1751		timeout = schedule_timeout(30*HZ);
1752		spin_lock(&mdev->peer_seq_lock);
1753		if (timeout == 0 && p_seq == mdev->peer_seq) {
1754			ret = -ETIMEDOUT;
1755			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1756			break;
1757		}
1758	}
1759	finish_wait(&mdev->seq_wait, &wait);
1760	if (mdev->peer_seq+1 == packet_seq)
1761		mdev->peer_seq++;
1762	spin_unlock(&mdev->peer_seq_lock);
1763	return ret;
1764}
1765
1766/* mirrored write */
1767static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1768{
1769	sector_t sector;
1770	struct drbd_epoch_entry *e;
1771	struct p_data *p = (struct p_data *)h;
1772	int header_size, data_size;
1773	int rw = WRITE;
1774	u32 dp_flags;
1775
1776	header_size = sizeof(*p) - sizeof(*h);
1777	data_size   = h->length  - header_size;
1778
1779	ERR_IF(data_size == 0) return FALSE;
1780
1781	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1782		return FALSE;
1783
1784	if (!get_ldev(mdev)) {
1785		if (__ratelimit(&drbd_ratelimit_state))
1786			dev_err(DEV, "Can not write mirrored data block "
1787			    "to local disk.\n");
1788		spin_lock(&mdev->peer_seq_lock);
1789		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1790			mdev->peer_seq++;
1791		spin_unlock(&mdev->peer_seq_lock);
1792
1793		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1794		atomic_inc(&mdev->current_epoch->epoch_size);
1795		return drbd_drain_block(mdev, data_size);
1796	}
1797
1798	/* get_ldev(mdev) successful.
1799	 * Corresponding put_ldev done either below (on various errors),
1800	 * or in drbd_endio_write_sec, if we successfully submit the data at
1801	 * the end of this function. */
1802
1803	sector = be64_to_cpu(p->sector);
1804	e = read_in_block(mdev, p->block_id, sector, data_size);
1805	if (!e) {
1806		put_ldev(mdev);
1807		return FALSE;
1808	}
1809
1810	e->w.cb = e_end_block;
1811
1812	spin_lock(&mdev->epoch_lock);
1813	e->epoch = mdev->current_epoch;
1814	atomic_inc(&e->epoch->epoch_size);
1815	atomic_inc(&e->epoch->active);
1816
1817	if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1818		struct drbd_epoch *epoch;
1819		/* Issue a barrier if we start a new epoch, and the previous epoch
1820		   was not a epoch containing a single request which already was
1821		   a Barrier. */
1822		epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1823		if (epoch == e->epoch) {
1824			set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1825			rw |= REQ_HARDBARRIER;
1826			e->flags |= EE_IS_BARRIER;
1827		} else {
1828			if (atomic_read(&epoch->epoch_size) > 1 ||
1829			    !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1830				set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1831				set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1832				rw |= REQ_HARDBARRIER;
1833				e->flags |= EE_IS_BARRIER;
1834			}
1835		}
1836	}
1837	spin_unlock(&mdev->epoch_lock);
1838
1839	dp_flags = be32_to_cpu(p->dp_flags);
1840	if (dp_flags & DP_HARDBARRIER) {
1841		dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1842		/* rw |= REQ_HARDBARRIER; */
1843	}
1844	if (dp_flags & DP_RW_SYNC)
1845		rw |= REQ_SYNC | REQ_UNPLUG;
1846	if (dp_flags & DP_MAY_SET_IN_SYNC)
1847		e->flags |= EE_MAY_SET_IN_SYNC;
1848
1849	/* I'm the receiver, I do hold a net_cnt reference. */
1850	if (!mdev->net_conf->two_primaries) {
1851		spin_lock_irq(&mdev->req_lock);
1852	} else {
1853		/* don't get the req_lock yet,
1854		 * we may sleep in drbd_wait_peer_seq */
1855		const int size = e->size;
1856		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1857		DEFINE_WAIT(wait);
1858		struct drbd_request *i;
1859		struct hlist_node *n;
1860		struct hlist_head *slot;
1861		int first;
1862
1863		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1864		BUG_ON(mdev->ee_hash == NULL);
1865		BUG_ON(mdev->tl_hash == NULL);
1866
1867		/* conflict detection and handling:
1868		 * 1. wait on the sequence number,
1869		 *    in case this data packet overtook ACK packets.
1870		 * 2. check our hash tables for conflicting requests.
1871		 *    we only need to walk the tl_hash, since an ee can not
1872		 *    have a conflict with an other ee: on the submitting
1873		 *    node, the corresponding req had already been conflicting,
1874		 *    and a conflicting req is never sent.
1875		 *
1876		 * Note: for two_primaries, we are protocol C,
1877		 * so there cannot be any request that is DONE
1878		 * but still on the transfer log.
1879		 *
1880		 * unconditionally add to the ee_hash.
1881		 *
1882		 * if no conflicting request is found:
1883		 *    submit.
1884		 *
1885		 * if any conflicting request is found
1886		 * that has not yet been acked,
1887		 * AND I have the "discard concurrent writes" flag:
1888		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1889		 *
1890		 * if any conflicting request is found:
1891		 *	 block the receiver, waiting on misc_wait
1892		 *	 until no more conflicting requests are there,
1893		 *	 or we get interrupted (disconnect).
1894		 *
1895		 *	 we do not just write after local io completion of those
1896		 *	 requests, but only after req is done completely, i.e.
1897		 *	 we wait for the P_DISCARD_ACK to arrive!
1898		 *
1899		 *	 then proceed normally, i.e. submit.
1900		 */
1901		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1902			goto out_interrupted;
1903
1904		spin_lock_irq(&mdev->req_lock);
1905
1906		hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1907
1908#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1909		slot = tl_hash_slot(mdev, sector);
1910		first = 1;
1911		for (;;) {
1912			int have_unacked = 0;
1913			int have_conflict = 0;
1914			prepare_to_wait(&mdev->misc_wait, &wait,
1915				TASK_INTERRUPTIBLE);
1916			hlist_for_each_entry(i, n, slot, colision) {
1917				if (OVERLAPS) {
1918					/* only ALERT on first iteration,
1919					 * we may be woken up early... */
1920					if (first)
1921						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1922						      "	new: %llus +%u; pending: %llus +%u\n",
1923						      current->comm, current->pid,
1924						      (unsigned long long)sector, size,
1925						      (unsigned long long)i->sector, i->size);
1926					if (i->rq_state & RQ_NET_PENDING)
1927						++have_unacked;
1928					++have_conflict;
1929				}
1930			}
1931#undef OVERLAPS
1932			if (!have_conflict)
1933				break;
1934
1935			/* Discard Ack only for the _first_ iteration */
1936			if (first && discard && have_unacked) {
1937				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1938				     (unsigned long long)sector);
1939				inc_unacked(mdev);
1940				e->w.cb = e_send_discard_ack;
1941				list_add_tail(&e->w.list, &mdev->done_ee);
1942
1943				spin_unlock_irq(&mdev->req_lock);
1944
1945				/* we could probably send that P_DISCARD_ACK ourselves,
1946				 * but I don't like the receiver using the msock */
1947
1948				put_ldev(mdev);
1949				wake_asender(mdev);
1950				finish_wait(&mdev->misc_wait, &wait);
1951				return TRUE;
1952			}
1953
1954			if (signal_pending(current)) {
1955				hlist_del_init(&e->colision);
1956
1957				spin_unlock_irq(&mdev->req_lock);
1958
1959				finish_wait(&mdev->misc_wait, &wait);
1960				goto out_interrupted;
1961			}
1962
1963			spin_unlock_irq(&mdev->req_lock);
1964			if (first) {
1965				first = 0;
1966				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1967				     "sec=%llus\n", (unsigned long long)sector);
1968			} else if (discard) {
1969				/* we had none on the first iteration.
1970				 * there must be none now. */
1971				D_ASSERT(have_unacked == 0);
1972			}
1973			schedule();
1974			spin_lock_irq(&mdev->req_lock);
1975		}
1976		finish_wait(&mdev->misc_wait, &wait);
1977	}
1978
1979	list_add(&e->w.list, &mdev->active_ee);
1980	spin_unlock_irq(&mdev->req_lock);
1981
1982	switch (mdev->net_conf->wire_protocol) {
1983	case DRBD_PROT_C:
1984		inc_unacked(mdev);
1985		/* corresponding dec_unacked() in e_end_block()
1986		 * respective _drbd_clear_done_ee */
1987		break;
1988	case DRBD_PROT_B:
1989		/* I really don't like it that the receiver thread
1990		 * sends on the msock, but anyways */
1991		drbd_send_ack(mdev, P_RECV_ACK, e);
1992		break;
1993	case DRBD_PROT_A:
1994		/* nothing to do */
1995		break;
1996	}
1997
1998	if (mdev->state.pdsk == D_DISKLESS) {
1999		/* In case we have the only disk of the cluster, */
2000		drbd_set_out_of_sync(mdev, e->sector, e->size);
2001		e->flags |= EE_CALL_AL_COMPLETE_IO;
2002		drbd_al_begin_io(mdev, e->sector);
2003	}
2004
2005	if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
2006		return TRUE;
2007
2008out_interrupted:
2009	/* yes, the epoch_size now is imbalanced.
2010	 * but we drop the connection anyways, so we don't have a chance to
2011	 * receive a barrier... atomic_inc(&mdev->epoch_size); */
2012	put_ldev(mdev);
2013	drbd_free_ee(mdev, e);
2014	return FALSE;
2015}
2016
2017static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2018{
2019	sector_t sector;
2020	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2021	struct drbd_epoch_entry *e;
2022	struct digest_info *di = NULL;
2023	int size, digest_size;
2024	unsigned int fault_type;
2025	struct p_block_req *p =
2026		(struct p_block_req *)h;
2027	const int brps = sizeof(*p)-sizeof(*h);
2028
2029	if (drbd_recv(mdev, h->payload, brps) != brps)
2030		return FALSE;
2031
2032	sector = be64_to_cpu(p->sector);
2033	size   = be32_to_cpu(p->blksize);
2034
2035	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
2036		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2037				(unsigned long long)sector, size);
2038		return FALSE;
2039	}
2040	if (sector + (size>>9) > capacity) {
2041		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2042				(unsigned long long)sector, size);
2043		return FALSE;
2044	}
2045
2046	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2047		if (__ratelimit(&drbd_ratelimit_state))
2048			dev_err(DEV, "Can not satisfy peer's read request, "
2049			    "no local data.\n");
2050		drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
2051				 P_NEG_RS_DREPLY , p);
2052		return drbd_drain_block(mdev, h->length - brps);
2053	}
2054
2055	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2056	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2057	 * which in turn might block on the other node at this very place.  */
2058	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2059	if (!e) {
2060		put_ldev(mdev);
2061		return FALSE;
2062	}
2063
2064	switch (h->command) {
2065	case P_DATA_REQUEST:
2066		e->w.cb = w_e_end_data_req;
2067		fault_type = DRBD_FAULT_DT_RD;
2068		break;
2069	case P_RS_DATA_REQUEST:
2070		e->w.cb = w_e_end_rsdata_req;
2071		fault_type = DRBD_FAULT_RS_RD;
2072		/* Eventually this should become asynchronously. Currently it
2073		 * blocks the whole receiver just to delay the reading of a
2074		 * resync data block.
2075		 * the drbd_work_queue mechanism is made for this...
2076		 */
2077		if (!drbd_rs_begin_io(mdev, sector)) {
2078			/* we have been interrupted,
2079			 * probably connection lost! */
2080			D_ASSERT(signal_pending(current));
2081			goto out_free_e;
2082		}
2083		break;
2084
2085	case P_OV_REPLY:
2086	case P_CSUM_RS_REQUEST:
2087		fault_type = DRBD_FAULT_RS_RD;
2088		digest_size = h->length - brps ;
2089		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2090		if (!di)
2091			goto out_free_e;
2092
2093		di->digest_size = digest_size;
2094		di->digest = (((char *)di)+sizeof(struct digest_info));
2095
2096		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2097			goto out_free_e;
2098
2099		e->block_id = (u64)(unsigned long)di;
2100		if (h->command == P_CSUM_RS_REQUEST) {
2101			D_ASSERT(mdev->agreed_pro_version >= 89);
2102			e->w.cb = w_e_end_csum_rs_req;
2103		} else if (h->command == P_OV_REPLY) {
2104			e->w.cb = w_e_end_ov_reply;
2105			dec_rs_pending(mdev);
2106			break;
2107		}
2108
2109		if (!drbd_rs_begin_io(mdev, sector)) {
2110			/* we have been interrupted, probably connection lost! */
2111			D_ASSERT(signal_pending(current));
2112			goto out_free_e;
2113		}
2114		break;
2115
2116	case P_OV_REQUEST:
2117		if (mdev->state.conn >= C_CONNECTED &&
2118		    mdev->state.conn != C_VERIFY_T)
2119			dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2120				drbd_conn_str(mdev->state.conn));
2121		if (mdev->ov_start_sector == ~(sector_t)0 &&
2122		    mdev->agreed_pro_version >= 90) {
2123			mdev->ov_start_sector = sector;
2124			mdev->ov_position = sector;
2125			mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2126			dev_info(DEV, "Online Verify start sector: %llu\n",
2127					(unsigned long long)sector);
2128		}
2129		e->w.cb = w_e_end_ov_req;
2130		fault_type = DRBD_FAULT_RS_RD;
2131		/* Eventually this should become asynchronous. Currently it
2132		 * blocks the whole receiver just to delay the reading of a
2133		 * resync data block.
2134		 * the drbd_work_queue mechanism is made for this...
2135		 */
2136		if (!drbd_rs_begin_io(mdev, sector)) {
2137			/* we have been interrupted,
2138			 * probably connection lost! */
2139			D_ASSERT(signal_pending(current));
2140			goto out_free_e;
2141		}
2142		break;
2143
2144
2145	default:
2146		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2147		    cmdname(h->command));
2148		fault_type = DRBD_FAULT_MAX;
2149	}
2150
2151	spin_lock_irq(&mdev->req_lock);
2152	list_add(&e->w.list, &mdev->read_ee);
2153	spin_unlock_irq(&mdev->req_lock);
2154
2155	inc_unacked(mdev);
2156
2157	if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2158		return TRUE;
2159
2160out_free_e:
2161	kfree(di);
2162	put_ldev(mdev);
2163	drbd_free_ee(mdev, e);
2164	return FALSE;
2165}
2166
2167static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2168{
2169	int self, peer, rv = -100;
2170	unsigned long ch_self, ch_peer;
2171
2172	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2173	peer = mdev->p_uuid[UI_BITMAP] & 1;
2174
2175	ch_peer = mdev->p_uuid[UI_SIZE];
2176	ch_self = mdev->comm_bm_set;
2177
2178	switch (mdev->net_conf->after_sb_0p) {
2179	case ASB_CONSENSUS:
2180	case ASB_DISCARD_SECONDARY:
2181	case ASB_CALL_HELPER:
2182		dev_err(DEV, "Configuration error.\n");
2183		break;
2184	case ASB_DISCONNECT:
2185		break;
2186	case ASB_DISCARD_YOUNGER_PRI:
2187		if (self == 0 && peer == 1) {
2188			rv = -1;
2189			break;
2190		}
2191		if (self == 1 && peer == 0) {
2192			rv =  1;
2193			break;
2194		}
2195		/* Else fall through to one of the other strategies... */
2196	case ASB_DISCARD_OLDER_PRI:
2197		if (self == 0 && peer == 1) {
2198			rv = 1;
2199			break;
2200		}
2201		if (self == 1 && peer == 0) {
2202			rv = -1;
2203			break;
2204		}
2205		/* Else fall through to one of the other strategies... */
2206		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2207		     "Using discard-least-changes instead\n");
2208	case ASB_DISCARD_ZERO_CHG:
2209		if (ch_peer == 0 && ch_self == 0) {
2210			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2211				? -1 : 1;
2212			break;
2213		} else {
2214			if (ch_peer == 0) { rv =  1; break; }
2215			if (ch_self == 0) { rv = -1; break; }
2216		}
2217		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2218			break;
2219	case ASB_DISCARD_LEAST_CHG:
2220		if	(ch_self < ch_peer)
2221			rv = -1;
2222		else if (ch_self > ch_peer)
2223			rv =  1;
2224		else /* ( ch_self == ch_peer ) */
2225		     /* Well, then use something else. */
2226			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2227				? -1 : 1;
2228		break;
2229	case ASB_DISCARD_LOCAL:
2230		rv = -1;
2231		break;
2232	case ASB_DISCARD_REMOTE:
2233		rv =  1;
2234	}
2235
2236	return rv;
2237}
2238
2239static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2240{
2241	int self, peer, hg, rv = -100;
2242
2243	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2244	peer = mdev->p_uuid[UI_BITMAP] & 1;
2245
2246	switch (mdev->net_conf->after_sb_1p) {
2247	case ASB_DISCARD_YOUNGER_PRI:
2248	case ASB_DISCARD_OLDER_PRI:
2249	case ASB_DISCARD_LEAST_CHG:
2250	case ASB_DISCARD_LOCAL:
2251	case ASB_DISCARD_REMOTE:
2252		dev_err(DEV, "Configuration error.\n");
2253		break;
2254	case ASB_DISCONNECT:
2255		break;
2256	case ASB_CONSENSUS:
2257		hg = drbd_asb_recover_0p(mdev);
2258		if (hg == -1 && mdev->state.role == R_SECONDARY)
2259			rv = hg;
2260		if (hg == 1  && mdev->state.role == R_PRIMARY)
2261			rv = hg;
2262		break;
2263	case ASB_VIOLENTLY:
2264		rv = drbd_asb_recover_0p(mdev);
2265		break;
2266	case ASB_DISCARD_SECONDARY:
2267		return mdev->state.role == R_PRIMARY ? 1 : -1;
2268	case ASB_CALL_HELPER:
2269		hg = drbd_asb_recover_0p(mdev);
2270		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2271			self = drbd_set_role(mdev, R_SECONDARY, 0);
2272			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2273			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2274			  * we do not need to wait for the after state change work either. */
2275			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2276			if (self != SS_SUCCESS) {
2277				drbd_khelper(mdev, "pri-lost-after-sb");
2278			} else {
2279				dev_warn(DEV, "Successfully gave up primary role.\n");
2280				rv = hg;
2281			}
2282		} else
2283			rv = hg;
2284	}
2285
2286	return rv;
2287}
2288
2289static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2290{
2291	int self, peer, hg, rv = -100;
2292
2293	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2294	peer = mdev->p_uuid[UI_BITMAP] & 1;
2295
2296	switch (mdev->net_conf->after_sb_2p) {
2297	case ASB_DISCARD_YOUNGER_PRI:
2298	case ASB_DISCARD_OLDER_PRI:
2299	case ASB_DISCARD_LEAST_CHG:
2300	case ASB_DISCARD_LOCAL:
2301	case ASB_DISCARD_REMOTE:
2302	case ASB_CONSENSUS:
2303	case ASB_DISCARD_SECONDARY:
2304		dev_err(DEV, "Configuration error.\n");
2305		break;
2306	case ASB_VIOLENTLY:
2307		rv = drbd_asb_recover_0p(mdev);
2308		break;
2309	case ASB_DISCONNECT:
2310		break;
2311	case ASB_CALL_HELPER:
2312		hg = drbd_asb_recover_0p(mdev);
2313		if (hg == -1) {
2314			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2315			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2316			  * we do not need to wait for the after state change work either. */
2317			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2318			if (self != SS_SUCCESS) {
2319				drbd_khelper(mdev, "pri-lost-after-sb");
2320			} else {
2321				dev_warn(DEV, "Successfully gave up primary role.\n");
2322				rv = hg;
2323			}
2324		} else
2325			rv = hg;
2326	}
2327
2328	return rv;
2329}
2330
2331static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2332			   u64 bits, u64 flags)
2333{
2334	if (!uuid) {
2335		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2336		return;
2337	}
2338	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2339	     text,
2340	     (unsigned long long)uuid[UI_CURRENT],
2341	     (unsigned long long)uuid[UI_BITMAP],
2342	     (unsigned long long)uuid[UI_HISTORY_START],
2343	     (unsigned long long)uuid[UI_HISTORY_END],
2344	     (unsigned long long)bits,
2345	     (unsigned long long)flags);
2346}
2347
2348/*
2349  100	after split brain try auto recover
2350    2	C_SYNC_SOURCE set BitMap
2351    1	C_SYNC_SOURCE use BitMap
2352    0	no Sync
2353   -1	C_SYNC_TARGET use BitMap
2354   -2	C_SYNC_TARGET set BitMap
2355 -100	after split brain, disconnect
2356-1000	unrelated data
2357 */
2358static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2359{
2360	u64 self, peer;
2361	int i, j;
2362
2363	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2364	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2365
2366	*rule_nr = 10;
2367	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2368		return 0;
2369
2370	*rule_nr = 20;
2371	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2372	     peer != UUID_JUST_CREATED)
2373		return -2;
2374
2375	*rule_nr = 30;
2376	if (self != UUID_JUST_CREATED &&
2377	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2378		return 2;
2379
2380	if (self == peer) {
2381		int rct, dc; /* roles at crash time */
2382
2383		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2384
2385			if (mdev->agreed_pro_version < 91)
2386				return -1001;
2387
2388			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2389			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2390				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2391				drbd_uuid_set_bm(mdev, 0UL);
2392
2393				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2394					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2395				*rule_nr = 34;
2396			} else {
2397				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2398				*rule_nr = 36;
2399			}
2400
2401			return 1;
2402		}
2403
2404		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2405
2406			if (mdev->agreed_pro_version < 91)
2407				return -1001;
2408
2409			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2410			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2411				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2412
2413				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2414				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2415				mdev->p_uuid[UI_BITMAP] = 0UL;
2416
2417				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2418				*rule_nr = 35;
2419			} else {
2420				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2421				*rule_nr = 37;
2422			}
2423
2424			return -1;
2425		}
2426
2427		/* Common power [off|failure] */
2428		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2429			(mdev->p_uuid[UI_FLAGS] & 2);
2430		/* lowest bit is set when we were primary,
2431		 * next bit (weight 2) is set when peer was primary */
2432		*rule_nr = 40;
2433
2434		switch (rct) {
2435		case 0: /* !self_pri && !peer_pri */ return 0;
2436		case 1: /*  self_pri && !peer_pri */ return 1;
2437		case 2: /* !self_pri &&  peer_pri */ return -1;
2438		case 3: /*  self_pri &&  peer_pri */
2439			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2440			return dc ? -1 : 1;
2441		}
2442	}
2443
2444	*rule_nr = 50;
2445	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2446	if (self == peer)
2447		return -1;
2448
2449	*rule_nr = 51;
2450	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2451	if (self == peer) {
2452		self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2453		peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2454		if (self == peer) {
2455			/* The last P_SYNC_UUID did not get though. Undo the last start of
2456			   resync as sync source modifications of the peer's UUIDs. */
2457
2458			if (mdev->agreed_pro_version < 91)
2459				return -1001;
2460
2461			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2462			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2463			return -1;
2464		}
2465	}
2466
2467	*rule_nr = 60;
2468	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2469	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2470		peer = mdev->p_uuid[i] & ~((u64)1);
2471		if (self == peer)
2472			return -2;
2473	}
2474
2475	*rule_nr = 70;
2476	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2477	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2478	if (self == peer)
2479		return 1;
2480
2481	*rule_nr = 71;
2482	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2483	if (self == peer) {
2484		self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2485		peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2486		if (self == peer) {
2487			/* The last P_SYNC_UUID did not get though. Undo the last start of
2488			   resync as sync source modifications of our UUIDs. */
2489
2490			if (mdev->agreed_pro_version < 91)
2491				return -1001;
2492
2493			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2494			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2495
2496			dev_info(DEV, "Undid last start of resync:\n");
2497
2498			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2499				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2500
2501			return 1;
2502		}
2503	}
2504
2505
2506	*rule_nr = 80;
2507	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2508	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2509		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2510		if (self == peer)
2511			return 2;
2512	}
2513
2514	*rule_nr = 90;
2515	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2516	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2517	if (self == peer && self != ((u64)0))
2518		return 100;
2519
2520	*rule_nr = 100;
2521	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2522		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2523		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2524			peer = mdev->p_uuid[j] & ~((u64)1);
2525			if (self == peer)
2526				return -100;
2527		}
2528	}
2529
2530	return -1000;
2531}
2532
2533/* drbd_sync_handshake() returns the new conn state on success, or
2534   CONN_MASK (-1) on failure.
2535 */
2536static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2537					   enum drbd_disk_state peer_disk) __must_hold(local)
2538{
2539	int hg, rule_nr;
2540	enum drbd_conns rv = C_MASK;
2541	enum drbd_disk_state mydisk;
2542
2543	mydisk = mdev->state.disk;
2544	if (mydisk == D_NEGOTIATING)
2545		mydisk = mdev->new_state_tmp.disk;
2546
2547	dev_info(DEV, "drbd_sync_handshake:\n");
2548	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2549	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2550		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2551
2552	hg = drbd_uuid_compare(mdev, &rule_nr);
2553
2554	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2555
2556	if (hg == -1000) {
2557		dev_alert(DEV, "Unrelated data, aborting!\n");
2558		return C_MASK;
2559	}
2560	if (hg == -1001) {
2561		dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2562		return C_MASK;
2563	}
2564
2565	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2566	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2567		int f = (hg == -100) || abs(hg) == 2;
2568		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2569		if (f)
2570			hg = hg*2;
2571		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2572		     hg > 0 ? "source" : "target");
2573	}
2574
2575	if (abs(hg) == 100)
2576		drbd_khelper(mdev, "initial-split-brain");
2577
2578	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2579		int pcount = (mdev->state.role == R_PRIMARY)
2580			   + (peer_role == R_PRIMARY);
2581		int forced = (hg == -100);
2582
2583		switch (pcount) {
2584		case 0:
2585			hg = drbd_asb_recover_0p(mdev);
2586			break;
2587		case 1:
2588			hg = drbd_asb_recover_1p(mdev);
2589			break;
2590		case 2:
2591			hg = drbd_asb_recover_2p(mdev);
2592			break;
2593		}
2594		if (abs(hg) < 100) {
2595			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2596			     "automatically solved. Sync from %s node\n",
2597			     pcount, (hg < 0) ? "peer" : "this");
2598			if (forced) {
2599				dev_warn(DEV, "Doing a full sync, since"
2600				     " UUIDs where ambiguous.\n");
2601				hg = hg*2;
2602			}
2603		}
2604	}
2605
2606	if (hg == -100) {
2607		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2608			hg = -1;
2609		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2610			hg = 1;
2611
2612		if (abs(hg) < 100)
2613			dev_warn(DEV, "Split-Brain detected, manually solved. "
2614			     "Sync from %s node\n",
2615			     (hg < 0) ? "peer" : "this");
2616	}
2617
2618	if (hg == -100) {
2619		dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2620		drbd_khelper(mdev, "split-brain");
2621		return C_MASK;
2622	}
2623
2624	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2625		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2626		return C_MASK;
2627	}
2628
2629	if (hg < 0 && /* by intention we do not use mydisk here. */
2630	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2631		switch (mdev->net_conf->rr_conflict) {
2632		case ASB_CALL_HELPER:
2633			drbd_khelper(mdev, "pri-lost");
2634			/* fall through */
2635		case ASB_DISCONNECT:
2636			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2637			return C_MASK;
2638		case ASB_VIOLENTLY:
2639			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2640			     "assumption\n");
2641		}
2642	}
2643
2644	if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2645		if (hg == 0)
2646			dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2647		else
2648			dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2649				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2650				 abs(hg) >= 2 ? "full" : "bit-map based");
2651		return C_MASK;
2652	}
2653
2654	if (abs(hg) >= 2) {
2655		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2656		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2657			return C_MASK;
2658	}
2659
2660	if (hg > 0) { /* become sync source. */
2661		rv = C_WF_BITMAP_S;
2662	} else if (hg < 0) { /* become sync target */
2663		rv = C_WF_BITMAP_T;
2664	} else {
2665		rv = C_CONNECTED;
2666		if (drbd_bm_total_weight(mdev)) {
2667			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2668			     drbd_bm_total_weight(mdev));
2669		}
2670	}
2671
2672	return rv;
2673}
2674
2675/* returns 1 if invalid */
2676static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2677{
2678	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2679	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2680	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2681		return 0;
2682
2683	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2684	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2685	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2686		return 1;
2687
2688	/* everything else is valid if they are equal on both sides. */
2689	if (peer == self)
2690		return 0;
2691
2692	/* everything es is invalid. */
2693	return 1;
2694}
2695
2696static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2697{
2698	struct p_protocol *p = (struct p_protocol *)h;
2699	int header_size, data_size;
2700	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2701	int p_want_lose, p_two_primaries, cf;
2702	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2703
2704	header_size = sizeof(*p) - sizeof(*h);
2705	data_size   = h->length  - header_size;
2706
2707	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2708		return FALSE;
2709
2710	p_proto		= be32_to_cpu(p->protocol);
2711	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2712	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2713	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2714	p_two_primaries = be32_to_cpu(p->two_primaries);
2715	cf		= be32_to_cpu(p->conn_flags);
2716	p_want_lose = cf & CF_WANT_LOSE;
2717
2718	clear_bit(CONN_DRY_RUN, &mdev->flags);
2719
2720	if (cf & CF_DRY_RUN)
2721		set_bit(CONN_DRY_RUN, &mdev->flags);
2722
2723	if (p_proto != mdev->net_conf->wire_protocol) {
2724		dev_err(DEV, "incompatible communication protocols\n");
2725		goto disconnect;
2726	}
2727
2728	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2729		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2730		goto disconnect;
2731	}
2732
2733	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2734		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2735		goto disconnect;
2736	}
2737
2738	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2739		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2740		goto disconnect;
2741	}
2742
2743	if (p_want_lose && mdev->net_conf->want_lose) {
2744		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2745		goto disconnect;
2746	}
2747
2748	if (p_two_primaries != mdev->net_conf->two_primaries) {
2749		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2750		goto disconnect;
2751	}
2752
2753	if (mdev->agreed_pro_version >= 87) {
2754		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2755
2756		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2757			return FALSE;
2758
2759		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2760		if (strcmp(p_integrity_alg, my_alg)) {
2761			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2762			goto disconnect;
2763		}
2764		dev_info(DEV, "data-integrity-alg: %s\n",
2765		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2766	}
2767
2768	return TRUE;
2769
2770disconnect:
2771	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2772	return FALSE;
2773}
2774
2775/* helper function
2776 * input: alg name, feature name
2777 * return: NULL (alg name was "")
2778 *         ERR_PTR(error) if something goes wrong
2779 *         or the crypto hash ptr, if it worked out ok. */
2780struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2781		const char *alg, const char *name)
2782{
2783	struct crypto_hash *tfm;
2784
2785	if (!alg[0])
2786		return NULL;
2787
2788	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2789	if (IS_ERR(tfm)) {
2790		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2791			alg, name, PTR_ERR(tfm));
2792		return tfm;
2793	}
2794	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2795		crypto_free_hash(tfm);
2796		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2797		return ERR_PTR(-EINVAL);
2798	}
2799	return tfm;
2800}
2801
2802static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2803{
2804	int ok = TRUE;
2805	struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2806	unsigned int header_size, data_size, exp_max_sz;
2807	struct crypto_hash *verify_tfm = NULL;
2808	struct crypto_hash *csums_tfm = NULL;
2809	const int apv = mdev->agreed_pro_version;
2810
2811	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2812		    : apv == 88 ? sizeof(struct p_rs_param)
2813					+ SHARED_SECRET_MAX
2814		    : /* 89 */    sizeof(struct p_rs_param_89);
2815
2816	if (h->length > exp_max_sz) {
2817		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2818		    h->length, exp_max_sz);
2819		return FALSE;
2820	}
2821
2822	if (apv <= 88) {
2823		header_size = sizeof(struct p_rs_param) - sizeof(*h);
2824		data_size   = h->length  - header_size;
2825	} else /* apv >= 89 */ {
2826		header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2827		data_size   = h->length  - header_size;
2828		D_ASSERT(data_size == 0);
2829	}
2830
2831	/* initialize verify_alg and csums_alg */
2832	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2833
2834	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2835		return FALSE;
2836
2837	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2838
2839	if (apv >= 88) {
2840		if (apv == 88) {
2841			if (data_size > SHARED_SECRET_MAX) {
2842				dev_err(DEV, "verify-alg too long, "
2843				    "peer wants %u, accepting only %u byte\n",
2844						data_size, SHARED_SECRET_MAX);
2845				return FALSE;
2846			}
2847
2848			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2849				return FALSE;
2850
2851			/* we expect NUL terminated string */
2852			/* but just in case someone tries to be evil */
2853			D_ASSERT(p->verify_alg[data_size-1] == 0);
2854			p->verify_alg[data_size-1] = 0;
2855
2856		} else /* apv >= 89 */ {
2857			/* we still expect NUL terminated strings */
2858			/* but just in case someone tries to be evil */
2859			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2860			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2861			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2862			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2863		}
2864
2865		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2866			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2867				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2868				    mdev->sync_conf.verify_alg, p->verify_alg);
2869				goto disconnect;
2870			}
2871			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2872					p->verify_alg, "verify-alg");
2873			if (IS_ERR(verify_tfm)) {
2874				verify_tfm = NULL;
2875				goto disconnect;
2876			}
2877		}
2878
2879		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2880			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2881				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2882				    mdev->sync_conf.csums_alg, p->csums_alg);
2883				goto disconnect;
2884			}
2885			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2886					p->csums_alg, "csums-alg");
2887			if (IS_ERR(csums_tfm)) {
2888				csums_tfm = NULL;
2889				goto disconnect;
2890			}
2891		}
2892
2893
2894		spin_lock(&mdev->peer_seq_lock);
2895		/* lock against drbd_nl_syncer_conf() */
2896		if (verify_tfm) {
2897			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2898			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2899			crypto_free_hash(mdev->verify_tfm);
2900			mdev->verify_tfm = verify_tfm;
2901			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2902		}
2903		if (csums_tfm) {
2904			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2905			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2906			crypto_free_hash(mdev->csums_tfm);
2907			mdev->csums_tfm = csums_tfm;
2908			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2909		}
2910		spin_unlock(&mdev->peer_seq_lock);
2911	}
2912
2913	return ok;
2914disconnect:
2915	/* just for completeness: actually not needed,
2916	 * as this is not reached if csums_tfm was ok. */
2917	crypto_free_hash(csums_tfm);
2918	/* but free the verify_tfm again, if csums_tfm did not work out */
2919	crypto_free_hash(verify_tfm);
2920	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2921	return FALSE;
2922}
2923
2924static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2925{
2926	/* sorry, we currently have no working implementation
2927	 * of distributed TCQ */
2928}
2929
2930/* warn if the arguments differ by more than 12.5% */
2931static void warn_if_differ_considerably(struct drbd_conf *mdev,
2932	const char *s, sector_t a, sector_t b)
2933{
2934	sector_t d;
2935	if (a == 0 || b == 0)
2936		return;
2937	d = (a > b) ? (a - b) : (b - a);
2938	if (d > (a>>3) || d > (b>>3))
2939		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2940		     (unsigned long long)a, (unsigned long long)b);
2941}
2942
2943static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2944{
2945	struct p_sizes *p = (struct p_sizes *)h;
2946	enum determine_dev_size dd = unchanged;
2947	unsigned int max_seg_s;
2948	sector_t p_size, p_usize, my_usize;
2949	int ldsc = 0; /* local disk size changed */
2950	enum dds_flags ddsf;
2951
2952	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2953	if (drbd_recv(mdev, h->payload, h->length) != h->length)
2954		return FALSE;
2955
2956	p_size = be64_to_cpu(p->d_size);
2957	p_usize = be64_to_cpu(p->u_size);
2958
2959	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2960		dev_err(DEV, "some backing storage is needed\n");
2961		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2962		return FALSE;
2963	}
2964
2965	/* just store the peer's disk size for now.
2966	 * we still need to figure out whether we accept that. */
2967	mdev->p_size = p_size;
2968
2969#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2970	if (get_ldev(mdev)) {
2971		warn_if_differ_considerably(mdev, "lower level device sizes",
2972			   p_size, drbd_get_max_capacity(mdev->ldev));
2973		warn_if_differ_considerably(mdev, "user requested size",
2974					    p_usize, mdev->ldev->dc.disk_size);
2975
2976		/* if this is the first connect, or an otherwise expected
2977		 * param exchange, choose the minimum */
2978		if (mdev->state.conn == C_WF_REPORT_PARAMS)
2979			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2980					     p_usize);
2981
2982		my_usize = mdev->ldev->dc.disk_size;
2983
2984		if (mdev->ldev->dc.disk_size != p_usize) {
2985			mdev->ldev->dc.disk_size = p_usize;
2986			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2987			     (unsigned long)mdev->ldev->dc.disk_size);
2988		}
2989
2990		/* Never shrink a device with usable data during connect.
2991		   But allow online shrinking if we are connected. */
2992		if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2993		   drbd_get_capacity(mdev->this_bdev) &&
2994		   mdev->state.disk >= D_OUTDATED &&
2995		   mdev->state.conn < C_CONNECTED) {
2996			dev_err(DEV, "The peer's disk size is too small!\n");
2997			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2998			mdev->ldev->dc.disk_size = my_usize;
2999			put_ldev(mdev);
3000			return FALSE;
3001		}
3002		put_ldev(mdev);
3003	}
3004#undef min_not_zero
3005
3006	ddsf = be16_to_cpu(p->dds_flags);
3007	if (get_ldev(mdev)) {
3008		dd = drbd_determin_dev_size(mdev, ddsf);
3009		put_ldev(mdev);
3010		if (dd == dev_size_error)
3011			return FALSE;
3012		drbd_md_sync(mdev);
3013	} else {
3014		/* I am diskless, need to accept the peer's size. */
3015		drbd_set_my_capacity(mdev, p_size);
3016	}
3017
3018	if (get_ldev(mdev)) {
3019		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3020			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3021			ldsc = 1;
3022		}
3023
3024		if (mdev->agreed_pro_version < 94)
3025			max_seg_s = be32_to_cpu(p->max_segment_size);
3026		else /* drbd 8.3.8 onwards */
3027			max_seg_s = DRBD_MAX_SEGMENT_SIZE;
3028
3029		if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
3030			drbd_setup_queue_param(mdev, max_seg_s);
3031
3032		drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
3033		put_ldev(mdev);
3034	}
3035
3036	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3037		if (be64_to_cpu(p->c_size) !=
3038		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
3039			/* we have different sizes, probably peer
3040			 * needs to know my new size... */
3041			drbd_send_sizes(mdev, 0, ddsf);
3042		}
3043		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3044		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
3045			if (mdev->state.pdsk >= D_INCONSISTENT &&
3046			    mdev->state.disk >= D_INCONSISTENT) {
3047				if (ddsf & DDSF_NO_RESYNC)
3048					dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3049				else
3050					resync_after_online_grow(mdev);
3051			} else
3052				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3053		}
3054	}
3055
3056	return TRUE;
3057}
3058
3059static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
3060{
3061	struct p_uuids *p = (struct p_uuids *)h;
3062	u64 *p_uuid;
3063	int i;
3064
3065	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3066	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3067		return FALSE;
3068
3069	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3070
3071	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3072		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3073
3074	kfree(mdev->p_uuid);
3075	mdev->p_uuid = p_uuid;
3076
3077	if (mdev->state.conn < C_CONNECTED &&
3078	    mdev->state.disk < D_INCONSISTENT &&
3079	    mdev->state.role == R_PRIMARY &&
3080	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3081		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3082		    (unsigned long long)mdev->ed_uuid);
3083		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3084		return FALSE;
3085	}
3086
3087	if (get_ldev(mdev)) {
3088		int skip_initial_sync =
3089			mdev->state.conn == C_CONNECTED &&
3090			mdev->agreed_pro_version >= 90 &&
3091			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3092			(p_uuid[UI_FLAGS] & 8);
3093		if (skip_initial_sync) {
3094			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3095			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3096					"clear_n_write from receive_uuids");
3097			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3098			_drbd_uuid_set(mdev, UI_BITMAP, 0);
3099			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3100					CS_VERBOSE, NULL);
3101			drbd_md_sync(mdev);
3102		}
3103		put_ldev(mdev);
3104	}
3105
3106	/* Before we test for the disk state, we should wait until an eventually
3107	   ongoing cluster wide state change is finished. That is important if
3108	   we are primary and are detaching from our disk. We need to see the
3109	   new disk state... */
3110	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3111	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3112		drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3113
3114	return TRUE;
3115}
3116
3117/**
3118 * convert_state() - Converts the peer's view of the cluster state to our point of view
3119 * @ps:		The state as seen by the peer.
3120 */
3121static union drbd_state convert_state(union drbd_state ps)
3122{
3123	union drbd_state ms;
3124
3125	static enum drbd_conns c_tab[] = {
3126		[C_CONNECTED] = C_CONNECTED,
3127
3128		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3129		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3130		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3131		[C_VERIFY_S]       = C_VERIFY_T,
3132		[C_MASK]   = C_MASK,
3133	};
3134
3135	ms.i = ps.i;
3136
3137	ms.conn = c_tab[ps.conn];
3138	ms.peer = ps.role;
3139	ms.role = ps.peer;
3140	ms.pdsk = ps.disk;
3141	ms.disk = ps.pdsk;
3142	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3143
3144	return ms;
3145}
3146
3147static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3148{
3149	struct p_req_state *p = (struct p_req_state *)h;
3150	union drbd_state mask, val;
3151	int rv;
3152
3153	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3154	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3155		return FALSE;
3156
3157	mask.i = be32_to_cpu(p->mask);
3158	val.i = be32_to_cpu(p->val);
3159
3160	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3161	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3162		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3163		return TRUE;
3164	}
3165
3166	mask = convert_state(mask);
3167	val = convert_state(val);
3168
3169	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3170
3171	drbd_send_sr_reply(mdev, rv);
3172	drbd_md_sync(mdev);
3173
3174	return TRUE;
3175}
3176
3177static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3178{
3179	struct p_state *p = (struct p_state *)h;
3180	enum drbd_conns nconn, oconn;
3181	union drbd_state ns, peer_state;
3182	enum drbd_disk_state real_peer_disk;
3183	int rv;
3184
3185	ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3186		return FALSE;
3187
3188	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3189		return FALSE;
3190
3191	peer_state.i = be32_to_cpu(p->state);
3192
3193	real_peer_disk = peer_state.disk;
3194	if (peer_state.disk == D_NEGOTIATING) {
3195		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3196		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3197	}
3198
3199	spin_lock_irq(&mdev->req_lock);
3200 retry:
3201	oconn = nconn = mdev->state.conn;
3202	spin_unlock_irq(&mdev->req_lock);
3203
3204	if (nconn == C_WF_REPORT_PARAMS)
3205		nconn = C_CONNECTED;
3206
3207	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3208	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3209		int cr; /* consider resync */
3210
3211		/* if we established a new connection */
3212		cr  = (oconn < C_CONNECTED);
3213		/* if we had an established connection
3214		 * and one of the nodes newly attaches a disk */
3215		cr |= (oconn == C_CONNECTED &&
3216		       (peer_state.disk == D_NEGOTIATING ||
3217			mdev->state.disk == D_NEGOTIATING));
3218		/* if we have both been inconsistent, and the peer has been
3219		 * forced to be UpToDate with --overwrite-data */
3220		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3221		/* if we had been plain connected, and the admin requested to
3222		 * start a sync by "invalidate" or "invalidate-remote" */
3223		cr |= (oconn == C_CONNECTED &&
3224				(peer_state.conn >= C_STARTING_SYNC_S &&
3225				 peer_state.conn <= C_WF_BITMAP_T));
3226
3227		if (cr)
3228			nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3229
3230		put_ldev(mdev);
3231		if (nconn == C_MASK) {
3232			nconn = C_CONNECTED;
3233			if (mdev->state.disk == D_NEGOTIATING) {
3234				drbd_force_state(mdev, NS(disk, D_DISKLESS));
3235			} else if (peer_state.disk == D_NEGOTIATING) {
3236				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3237				peer_state.disk = D_DISKLESS;
3238				real_peer_disk = D_DISKLESS;
3239			} else {
3240				if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3241					return FALSE;
3242				D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3243				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3244				return FALSE;
3245			}
3246		}
3247	}
3248
3249	spin_lock_irq(&mdev->req_lock);
3250	if (mdev->state.conn != oconn)
3251		goto retry;
3252	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3253	ns.i = mdev->state.i;
3254	ns.conn = nconn;
3255	ns.peer = peer_state.role;
3256	ns.pdsk = real_peer_disk;
3257	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3258	if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3259		ns.disk = mdev->new_state_tmp.disk;
3260
3261	rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3262	ns = mdev->state;
3263	spin_unlock_irq(&mdev->req_lock);
3264
3265	if (rv < SS_SUCCESS) {
3266		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3267		return FALSE;
3268	}
3269
3270	if (oconn > C_WF_REPORT_PARAMS) {
3271		if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3272		    peer_state.disk != D_NEGOTIATING ) {
3273			/* we want resync, peer has not yet decided to sync... */
3274			/* Nowadays only used when forcing a node into primary role and
3275			   setting its disk to UpToDate with that */
3276			drbd_send_uuids(mdev);
3277			drbd_send_state(mdev);
3278		}
3279	}
3280
3281	mdev->net_conf->want_lose = 0;
3282
3283	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3284
3285	return TRUE;
3286}
3287
3288static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3289{
3290	struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3291
3292	wait_event(mdev->misc_wait,
3293		   mdev->state.conn == C_WF_SYNC_UUID ||
3294		   mdev->state.conn < C_CONNECTED ||
3295		   mdev->state.disk < D_NEGOTIATING);
3296
3297	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3298
3299	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3300	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3301		return FALSE;
3302
3303	/* Here the _drbd_uuid_ functions are right, current should
3304	   _not_ be rotated into the history */
3305	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3306		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3307		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3308
3309		drbd_start_resync(mdev, C_SYNC_TARGET);
3310
3311		put_ldev(mdev);
3312	} else
3313		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3314
3315	return TRUE;
3316}
3317
3318enum receive_bitmap_ret { OK, DONE, FAILED };
3319
3320static enum receive_bitmap_ret
3321receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3322	unsigned long *buffer, struct bm_xfer_ctx *c)
3323{
3324	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3325	unsigned want = num_words * sizeof(long);
3326
3327	if (want != h->length) {
3328		dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3329		return FAILED;
3330	}
3331	if (want == 0)
3332		return DONE;
3333	if (drbd_recv(mdev, buffer, want) != want)
3334		return FAILED;
3335
3336	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3337
3338	c->word_offset += num_words;
3339	c->bit_offset = c->word_offset * BITS_PER_LONG;
3340	if (c->bit_offset > c->bm_bits)
3341		c->bit_offset = c->bm_bits;
3342
3343	return OK;
3344}
3345
3346static enum receive_bitmap_ret
3347recv_bm_rle_bits(struct drbd_conf *mdev,
3348		struct p_compressed_bm *p,
3349		struct bm_xfer_ctx *c)
3350{
3351	struct bitstream bs;
3352	u64 look_ahead;
3353	u64 rl;
3354	u64 tmp;
3355	unsigned long s = c->bit_offset;
3356	unsigned long e;
3357	int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3358	int toggle = DCBP_get_start(p);
3359	int have;
3360	int bits;
3361
3362	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3363
3364	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3365	if (bits < 0)
3366		return FAILED;
3367
3368	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3369		bits = vli_decode_bits(&rl, look_ahead);
3370		if (bits <= 0)
3371			return FAILED;
3372
3373		if (toggle) {
3374			e = s + rl -1;
3375			if (e >= c->bm_bits) {
3376				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3377				return FAILED;
3378			}
3379			_drbd_bm_set_bits(mdev, s, e);
3380		}
3381
3382		if (have < bits) {
3383			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3384				have, bits, look_ahead,
3385				(unsigned int)(bs.cur.b - p->code),
3386				(unsigned int)bs.buf_len);
3387			return FAILED;
3388		}
3389		look_ahead >>= bits;
3390		have -= bits;
3391
3392		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3393		if (bits < 0)
3394			return FAILED;
3395		look_ahead |= tmp << have;
3396		have += bits;
3397	}
3398
3399	c->bit_offset = s;
3400	bm_xfer_ctx_bit_to_word_offset(c);
3401
3402	return (s == c->bm_bits) ? DONE : OK;
3403}
3404
3405static enum receive_bitmap_ret
3406decode_bitmap_c(struct drbd_conf *mdev,
3407		struct p_compressed_bm *p,
3408		struct bm_xfer_ctx *c)
3409{
3410	if (DCBP_get_code(p) == RLE_VLI_Bits)
3411		return recv_bm_rle_bits(mdev, p, c);
3412
3413	/* other variants had been implemented for evaluation,
3414	 * but have been dropped as this one turned out to be "best"
3415	 * during all our tests. */
3416
3417	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3418	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3419	return FAILED;
3420}
3421
3422void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3423		const char *direction, struct bm_xfer_ctx *c)
3424{
3425	/* what would it take to transfer it "plaintext" */
3426	unsigned plain = sizeof(struct p_header) *
3427		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3428		+ c->bm_words * sizeof(long);
3429	unsigned total = c->bytes[0] + c->bytes[1];
3430	unsigned r;
3431
3432	/* total can not be zero. but just in case: */
3433	if (total == 0)
3434		return;
3435
3436	/* don't report if not compressed */
3437	if (total >= plain)
3438		return;
3439
3440	/* total < plain. check for overflow, still */
3441	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3442		                    : (1000 * total / plain);
3443
3444	if (r > 1000)
3445		r = 1000;
3446
3447	r = 1000 - r;
3448	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3449	     "total %u; compression: %u.%u%%\n",
3450			direction,
3451			c->bytes[1], c->packets[1],
3452			c->bytes[0], c->packets[0],
3453			total, r/10, r % 10);
3454}
3455
3456/* Since we are processing the bitfield from lower addresses to higher,
3457   it does not matter if the process it in 32 bit chunks or 64 bit
3458   chunks as long as it is little endian. (Understand it as byte stream,
3459   beginning with the lowest byte...) If we would use big endian
3460   we would need to process it from the highest address to the lowest,
3461   in order to be agnostic to the 32 vs 64 bits issue.
3462
3463   returns 0 on failure, 1 if we successfully received it. */
3464static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3465{
3466	struct bm_xfer_ctx c;
3467	void *buffer;
3468	enum receive_bitmap_ret ret;
3469	int ok = FALSE;
3470
3471	wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3472
3473	drbd_bm_lock(mdev, "receive bitmap");
3474
3475	/* maybe we should use some per thread scratch page,
3476	 * and allocate that during initial device creation? */
3477	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3478	if (!buffer) {
3479		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3480		goto out;
3481	}
3482
3483	c = (struct bm_xfer_ctx) {
3484		.bm_bits = drbd_bm_bits(mdev),
3485		.bm_words = drbd_bm_words(mdev),
3486	};
3487
3488	do {
3489		if (h->command == P_BITMAP) {
3490			ret = receive_bitmap_plain(mdev, h, buffer, &c);
3491		} else if (h->command == P_COMPRESSED_BITMAP) {
3492			/* MAYBE: sanity check that we speak proto >= 90,
3493			 * and the feature is enabled! */
3494			struct p_compressed_bm *p;
3495
3496			if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3497				dev_err(DEV, "ReportCBitmap packet too large\n");
3498				goto out;
3499			}
3500			/* use the page buff */
3501			p = buffer;
3502			memcpy(p, h, sizeof(*h));
3503			if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3504				goto out;
3505			if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3506				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3507				return FAILED;
3508			}
3509			ret = decode_bitmap_c(mdev, p, &c);
3510		} else {
3511			dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3512			goto out;
3513		}
3514
3515		c.packets[h->command == P_BITMAP]++;
3516		c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3517
3518		if (ret != OK)
3519			break;
3520
3521		if (!drbd_recv_header(mdev, h))
3522			goto out;
3523	} while (ret == OK);
3524	if (ret == FAILED)
3525		goto out;
3526
3527	INFO_bm_xfer_stats(mdev, "receive", &c);
3528
3529	if (mdev->state.conn == C_WF_BITMAP_T) {
3530		ok = !drbd_send_bitmap(mdev);
3531		if (!ok)
3532			goto out;
3533		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3534		ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3535		D_ASSERT(ok == SS_SUCCESS);
3536	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3537		/* admin may have requested C_DISCONNECTING,
3538		 * other threads may have noticed network errors */
3539		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3540		    drbd_conn_str(mdev->state.conn));
3541	}
3542
3543	ok = TRUE;
3544 out:
3545	drbd_bm_unlock(mdev);
3546	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3547		drbd_start_resync(mdev, C_SYNC_SOURCE);
3548	free_page((unsigned long) buffer);
3549	return ok;
3550}
3551
3552static int receive_skip_(struct drbd_conf *mdev, struct p_header *h, int silent)
3553{
3554	/* TODO zero copy sink :) */
3555	static char sink[128];
3556	int size, want, r;
3557
3558	if (!silent)
3559		dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3560		     h->command, h->length);
3561
3562	size = h->length;
3563	while (size > 0) {
3564		want = min_t(int, size, sizeof(sink));
3565		r = drbd_recv(mdev, sink, want);
3566		ERR_IF(r <= 0) break;
3567		size -= r;
3568	}
3569	return size == 0;
3570}
3571
3572static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3573{
3574	return receive_skip_(mdev, h, 0);
3575}
3576
3577static int receive_skip_silent(struct drbd_conf *mdev, struct p_header *h)
3578{
3579	return receive_skip_(mdev, h, 1);
3580}
3581
3582static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3583{
3584	if (mdev->state.disk >= D_INCONSISTENT)
3585		drbd_kick_lo(mdev);
3586
3587	/* Make sure we've acked all the TCP data associated
3588	 * with the data requests being unplugged */
3589	drbd_tcp_quickack(mdev->data.socket);
3590
3591	return TRUE;
3592}
3593
3594typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3595
3596static drbd_cmd_handler_f drbd_default_handler[] = {
3597	[P_DATA]	    = receive_Data,
3598	[P_DATA_REPLY]	    = receive_DataReply,
3599	[P_RS_DATA_REPLY]   = receive_RSDataReply,
3600	[P_BARRIER]	    = receive_Barrier,
3601	[P_BITMAP]	    = receive_bitmap,
3602	[P_COMPRESSED_BITMAP]    = receive_bitmap,
3603	[P_UNPLUG_REMOTE]   = receive_UnplugRemote,
3604	[P_DATA_REQUEST]    = receive_DataRequest,
3605	[P_RS_DATA_REQUEST] = receive_DataRequest,
3606	[P_SYNC_PARAM]	    = receive_SyncParam,
3607	[P_SYNC_PARAM89]	   = receive_SyncParam,
3608	[P_PROTOCOL]        = receive_protocol,
3609	[P_UUIDS]	    = receive_uuids,
3610	[P_SIZES]	    = receive_sizes,
3611	[P_STATE]	    = receive_state,
3612	[P_STATE_CHG_REQ]   = receive_req_state,
3613	[P_SYNC_UUID]       = receive_sync_uuid,
3614	[P_OV_REQUEST]      = receive_DataRequest,
3615	[P_OV_REPLY]        = receive_DataRequest,
3616	[P_CSUM_RS_REQUEST]    = receive_DataRequest,
3617	[P_DELAY_PROBE]     = receive_skip_silent,
3618	/* anything missing from this table is in
3619	 * the asender_tbl, see get_asender_cmd */
3620	[P_MAX_CMD]	    = NULL,
3621};
3622
3623static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3624static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3625
3626static void drbdd(struct drbd_conf *mdev)
3627{
3628	drbd_cmd_handler_f handler;
3629	struct p_header *header = &mdev->data.rbuf.header;
3630
3631	while (get_t_state(&mdev->receiver) == Running) {
3632		drbd_thread_current_set_cpu(mdev);
3633		if (!drbd_recv_header(mdev, header)) {
3634			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3635			break;
3636		}
3637
3638		if (header->command < P_MAX_CMD)
3639			handler = drbd_cmd_handler[header->command];
3640		else if (P_MAY_IGNORE < header->command
3641		     && header->command < P_MAX_OPT_CMD)
3642			handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3643		else if (header->command > P_MAX_OPT_CMD)
3644			handler = receive_skip;
3645		else
3646			handler = NULL;
3647
3648		if (unlikely(!handler)) {
3649			dev_err(DEV, "unknown packet type %d, l: %d!\n",
3650			    header->command, header->length);
3651			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3652			break;
3653		}
3654		if (unlikely(!handler(mdev, header))) {
3655			dev_err(DEV, "error receiving %s, l: %d!\n",
3656			    cmdname(header->command), header->length);
3657			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3658			break;
3659		}
3660	}
3661}
3662
3663static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3664{
3665	struct hlist_head *slot;
3666	struct hlist_node *pos;
3667	struct hlist_node *tmp;
3668	struct drbd_request *req;
3669	int i;
3670
3671	/*
3672	 * Application READ requests
3673	 */
3674	spin_lock_irq(&mdev->req_lock);
3675	for (i = 0; i < APP_R_HSIZE; i++) {
3676		slot = mdev->app_reads_hash+i;
3677		hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3678			/* it may (but should not any longer!)
3679			 * be on the work queue; if that assert triggers,
3680			 * we need to also grab the
3681			 * spin_lock_irq(&mdev->data.work.q_lock);
3682			 * and list_del_init here. */
3683			D_ASSERT(list_empty(&req->w.list));
3684			/* It would be nice to complete outside of spinlock.
3685			 * But this is easier for now. */
3686			_req_mod(req, connection_lost_while_pending);
3687		}
3688	}
3689	for (i = 0; i < APP_R_HSIZE; i++)
3690		if (!hlist_empty(mdev->app_reads_hash+i))
3691			dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3692				"%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3693
3694	memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3695	spin_unlock_irq(&mdev->req_lock);
3696}
3697
3698void drbd_flush_workqueue(struct drbd_conf *mdev)
3699{
3700	struct drbd_wq_barrier barr;
3701
3702	barr.w.cb = w_prev_work_done;
3703	init_completion(&barr.done);
3704	drbd_queue_work(&mdev->data.work, &barr.w);
3705	wait_for_completion(&barr.done);
3706}
3707
3708static void drbd_disconnect(struct drbd_conf *mdev)
3709{
3710	enum drbd_fencing_p fp;
3711	union drbd_state os, ns;
3712	int rv = SS_UNKNOWN_ERROR;
3713	unsigned int i;
3714
3715	if (mdev->state.conn == C_STANDALONE)
3716		return;
3717	if (mdev->state.conn >= C_WF_CONNECTION)
3718		dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3719				drbd_conn_str(mdev->state.conn));
3720
3721	/* asender does not clean up anything. it must not interfere, either */
3722	drbd_thread_stop(&mdev->asender);
3723	drbd_free_sock(mdev);
3724
3725	spin_lock_irq(&mdev->req_lock);
3726	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3727	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3728	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3729	spin_unlock_irq(&mdev->req_lock);
3730
3731	/* We do not have data structures that would allow us to
3732	 * get the rs_pending_cnt down to 0 again.
3733	 *  * On C_SYNC_TARGET we do not have any data structures describing
3734	 *    the pending RSDataRequest's we have sent.
3735	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3736	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3737	 *  And no, it is not the sum of the reference counts in the
3738	 *  resync_LRU. The resync_LRU tracks the whole operation including
3739	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3740	 *  on the fly. */
3741	drbd_rs_cancel_all(mdev);
3742	mdev->rs_total = 0;
3743	mdev->rs_failed = 0;
3744	atomic_set(&mdev->rs_pending_cnt, 0);
3745	wake_up(&mdev->misc_wait);
3746
3747	/* make sure syncer is stopped and w_resume_next_sg queued */
3748	del_timer_sync(&mdev->resync_timer);
3749	set_bit(STOP_SYNC_TIMER, &mdev->flags);
3750	resync_timer_fn((unsigned long)mdev);
3751
3752	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3753	 * w_make_resync_request etc. which may still be on the worker queue
3754	 * to be "canceled" */
3755	drbd_flush_workqueue(mdev);
3756
3757	/* This also does reclaim_net_ee().  If we do this too early, we might
3758	 * miss some resync ee and pages.*/
3759	drbd_process_done_ee(mdev);
3760
3761	kfree(mdev->p_uuid);
3762	mdev->p_uuid = NULL;
3763
3764	if (!mdev->state.susp)
3765		tl_clear(mdev);
3766
3767	drbd_fail_pending_reads(mdev);
3768
3769	dev_info(DEV, "Connection closed\n");
3770
3771	drbd_md_sync(mdev);
3772
3773	fp = FP_DONT_CARE;
3774	if (get_ldev(mdev)) {
3775		fp = mdev->ldev->dc.fencing;
3776		put_ldev(mdev);
3777	}
3778
3779	if (mdev->state.role == R_PRIMARY) {
3780		if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3781			enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3782			drbd_request_state(mdev, NS(pdsk, nps));
3783		}
3784	}
3785
3786	spin_lock_irq(&mdev->req_lock);
3787	os = mdev->state;
3788	if (os.conn >= C_UNCONNECTED) {
3789		/* Do not restart in case we are C_DISCONNECTING */
3790		ns = os;
3791		ns.conn = C_UNCONNECTED;
3792		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3793	}
3794	spin_unlock_irq(&mdev->req_lock);
3795
3796	if (os.conn == C_DISCONNECTING) {
3797		struct hlist_head *h;
3798		wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3799
3800		/* we must not free the tl_hash
3801		 * while application io is still on the fly */
3802		wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3803
3804		spin_lock_irq(&mdev->req_lock);
3805		/* paranoia code */
3806		for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3807			if (h->first)
3808				dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3809						(int)(h - mdev->ee_hash), h->first);
3810		kfree(mdev->ee_hash);
3811		mdev->ee_hash = NULL;
3812		mdev->ee_hash_s = 0;
3813
3814		/* paranoia code */
3815		for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3816			if (h->first)
3817				dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3818						(int)(h - mdev->tl_hash), h->first);
3819		kfree(mdev->tl_hash);
3820		mdev->tl_hash = NULL;
3821		mdev->tl_hash_s = 0;
3822		spin_unlock_irq(&mdev->req_lock);
3823
3824		crypto_free_hash(mdev->cram_hmac_tfm);
3825		mdev->cram_hmac_tfm = NULL;
3826
3827		kfree(mdev->net_conf);
3828		mdev->net_conf = NULL;
3829		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3830	}
3831
3832	/* tcp_close and release of sendpage pages can be deferred.  I don't
3833	 * want to use SO_LINGER, because apparently it can be deferred for
3834	 * more than 20 seconds (longest time I checked).
3835	 *
3836	 * Actually we don't care for exactly when the network stack does its
3837	 * put_page(), but release our reference on these pages right here.
3838	 */
3839	i = drbd_release_ee(mdev, &mdev->net_ee);
3840	if (i)
3841		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3842	i = atomic_read(&mdev->pp_in_use);
3843	if (i)
3844		dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3845
3846	D_ASSERT(list_empty(&mdev->read_ee));
3847	D_ASSERT(list_empty(&mdev->active_ee));
3848	D_ASSERT(list_empty(&mdev->sync_ee));
3849	D_ASSERT(list_empty(&mdev->done_ee));
3850
3851	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3852	atomic_set(&mdev->current_epoch->epoch_size, 0);
3853	D_ASSERT(list_empty(&mdev->current_epoch->list));
3854}
3855
3856/*
3857 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3858 * we can agree on is stored in agreed_pro_version.
3859 *
3860 * feature flags and the reserved array should be enough room for future
3861 * enhancements of the handshake protocol, and possible plugins...
3862 *
3863 * for now, they are expected to be zero, but ignored.
3864 */
3865static int drbd_send_handshake(struct drbd_conf *mdev)
3866{
3867	/* ASSERT current == mdev->receiver ... */
3868	struct p_handshake *p = &mdev->data.sbuf.handshake;
3869	int ok;
3870
3871	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3872		dev_err(DEV, "interrupted during initial handshake\n");
3873		return 0; /* interrupted. not ok. */
3874	}
3875
3876	if (mdev->data.socket == NULL) {
3877		mutex_unlock(&mdev->data.mutex);
3878		return 0;
3879	}
3880
3881	memset(p, 0, sizeof(*p));
3882	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3883	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3884	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3885			     (struct p_header *)p, sizeof(*p), 0 );
3886	mutex_unlock(&mdev->data.mutex);
3887	return ok;
3888}
3889
3890/*
3891 * return values:
3892 *   1 yes, we have a valid connection
3893 *   0 oops, did not work out, please try again
3894 *  -1 peer talks different language,
3895 *     no point in trying again, please go standalone.
3896 */
3897static int drbd_do_handshake(struct drbd_conf *mdev)
3898{
3899	/* ASSERT current == mdev->receiver ... */
3900	struct p_handshake *p = &mdev->data.rbuf.handshake;
3901	const int expect = sizeof(struct p_handshake)
3902			  -sizeof(struct p_header);
3903	int rv;
3904
3905	rv = drbd_send_handshake(mdev);
3906	if (!rv)
3907		return 0;
3908
3909	rv = drbd_recv_header(mdev, &p->head);
3910	if (!rv)
3911		return 0;
3912
3913	if (p->head.command != P_HAND_SHAKE) {
3914		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3915		     cmdname(p->head.command), p->head.command);
3916		return -1;
3917	}
3918
3919	if (p->head.length != expect) {
3920		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3921		     expect, p->head.length);
3922		return -1;
3923	}
3924
3925	rv = drbd_recv(mdev, &p->head.payload, expect);
3926
3927	if (rv != expect) {
3928		dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3929		return 0;
3930	}
3931
3932	p->protocol_min = be32_to_cpu(p->protocol_min);
3933	p->protocol_max = be32_to_cpu(p->protocol_max);
3934	if (p->protocol_max == 0)
3935		p->protocol_max = p->protocol_min;
3936
3937	if (PRO_VERSION_MAX < p->protocol_min ||
3938	    PRO_VERSION_MIN > p->protocol_max)
3939		goto incompat;
3940
3941	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3942
3943	dev_info(DEV, "Handshake successful: "
3944	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3945
3946	return 1;
3947
3948 incompat:
3949	dev_err(DEV, "incompatible DRBD dialects: "
3950	    "I support %d-%d, peer supports %d-%d\n",
3951	    PRO_VERSION_MIN, PRO_VERSION_MAX,
3952	    p->protocol_min, p->protocol_max);
3953	return -1;
3954}
3955
3956#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3957static int drbd_do_auth(struct drbd_conf *mdev)
3958{
3959	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3960	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3961	return -1;
3962}
3963#else
3964#define CHALLENGE_LEN 64
3965
3966/* Return value:
3967	1 - auth succeeded,
3968	0 - failed, try again (network error),
3969	-1 - auth failed, don't try again.
3970*/
3971
3972static int drbd_do_auth(struct drbd_conf *mdev)
3973{
3974	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
3975	struct scatterlist sg;
3976	char *response = NULL;
3977	char *right_response = NULL;
3978	char *peers_ch = NULL;
3979	struct p_header p;
3980	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3981	unsigned int resp_size;
3982	struct hash_desc desc;
3983	int rv;
3984
3985	desc.tfm = mdev->cram_hmac_tfm;
3986	desc.flags = 0;
3987
3988	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3989				(u8 *)mdev->net_conf->shared_secret, key_len);
3990	if (rv) {
3991		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
3992		rv = -1;
3993		goto fail;
3994	}
3995
3996	get_random_bytes(my_challenge, CHALLENGE_LEN);
3997
3998	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
3999	if (!rv)
4000		goto fail;
4001
4002	rv = drbd_recv_header(mdev, &p);
4003	if (!rv)
4004		goto fail;
4005
4006	if (p.command != P_AUTH_CHALLENGE) {
4007		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4008		    cmdname(p.command), p.command);
4009		rv = 0;
4010		goto fail;
4011	}
4012
4013	if (p.length > CHALLENGE_LEN*2) {
4014		dev_err(DEV, "expected AuthChallenge payload too big.\n");
4015		rv = -1;
4016		goto fail;
4017	}
4018
4019	peers_ch = kmalloc(p.length, GFP_NOIO);
4020	if (peers_ch == NULL) {
4021		dev_err(DEV, "kmalloc of peers_ch failed\n");
4022		rv = -1;
4023		goto fail;
4024	}
4025
4026	rv = drbd_recv(mdev, peers_ch, p.length);
4027
4028	if (rv != p.length) {
4029		dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4030		rv = 0;
4031		goto fail;
4032	}
4033
4034	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4035	response = kmalloc(resp_size, GFP_NOIO);
4036	if (response == NULL) {
4037		dev_err(DEV, "kmalloc of response failed\n");
4038		rv = -1;
4039		goto fail;
4040	}
4041
4042	sg_init_table(&sg, 1);
4043	sg_set_buf(&sg, peers_ch, p.length);
4044
4045	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4046	if (rv) {
4047		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4048		rv = -1;
4049		goto fail;
4050	}
4051
4052	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4053	if (!rv)
4054		goto fail;
4055
4056	rv = drbd_recv_header(mdev, &p);
4057	if (!rv)
4058		goto fail;
4059
4060	if (p.command != P_AUTH_RESPONSE) {
4061		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4062		    cmdname(p.command), p.command);
4063		rv = 0;
4064		goto fail;
4065	}
4066
4067	if (p.length != resp_size) {
4068		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4069		rv = 0;
4070		goto fail;
4071	}
4072
4073	rv = drbd_recv(mdev, response , resp_size);
4074
4075	if (rv != resp_size) {
4076		dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4077		rv = 0;
4078		goto fail;
4079	}
4080
4081	right_response = kmalloc(resp_size, GFP_NOIO);
4082	if (right_response == NULL) {
4083		dev_err(DEV, "kmalloc of right_response failed\n");
4084		rv = -1;
4085		goto fail;
4086	}
4087
4088	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4089
4090	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4091	if (rv) {
4092		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4093		rv = -1;
4094		goto fail;
4095	}
4096
4097	rv = !memcmp(response, right_response, resp_size);
4098
4099	if (rv)
4100		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4101		     resp_size, mdev->net_conf->cram_hmac_alg);
4102	else
4103		rv = -1;
4104
4105 fail:
4106	kfree(peers_ch);
4107	kfree(response);
4108	kfree(right_response);
4109
4110	return rv;
4111}
4112#endif
4113
4114int drbdd_init(struct drbd_thread *thi)
4115{
4116	struct drbd_conf *mdev = thi->mdev;
4117	unsigned int minor = mdev_to_minor(mdev);
4118	int h;
4119
4120	sprintf(current->comm, "drbd%d_receiver", minor);
4121
4122	dev_info(DEV, "receiver (re)started\n");
4123
4124	do {
4125		h = drbd_connect(mdev);
4126		if (h == 0) {
4127			drbd_disconnect(mdev);
4128			__set_current_state(TASK_INTERRUPTIBLE);
4129			schedule_timeout(HZ);
4130		}
4131		if (h == -1) {
4132			dev_warn(DEV, "Discarding network configuration.\n");
4133			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4134		}
4135	} while (h == 0);
4136
4137	if (h > 0) {
4138		if (get_net_conf(mdev)) {
4139			drbdd(mdev);
4140			put_net_conf(mdev);
4141		}
4142	}
4143
4144	drbd_disconnect(mdev);
4145
4146	dev_info(DEV, "receiver terminated\n");
4147	return 0;
4148}
4149
4150/* ********* acknowledge sender ******** */
4151
4152static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4153{
4154	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4155
4156	int retcode = be32_to_cpu(p->retcode);
4157
4158	if (retcode >= SS_SUCCESS) {
4159		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4160	} else {
4161		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4162		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4163		    drbd_set_st_err_str(retcode), retcode);
4164	}
4165	wake_up(&mdev->state_wait);
4166
4167	return TRUE;
4168}
4169
4170static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4171{
4172	return drbd_send_ping_ack(mdev);
4173
4174}
4175
4176static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4177{
4178	/* restore idle timeout */
4179	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4180	if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4181		wake_up(&mdev->misc_wait);
4182
4183	return TRUE;
4184}
4185
4186static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4187{
4188	struct p_block_ack *p = (struct p_block_ack *)h;
4189	sector_t sector = be64_to_cpu(p->sector);
4190	int blksize = be32_to_cpu(p->blksize);
4191
4192	D_ASSERT(mdev->agreed_pro_version >= 89);
4193
4194	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4195
4196	drbd_rs_complete_io(mdev, sector);
4197	drbd_set_in_sync(mdev, sector, blksize);
4198	/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4199	mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4200	dec_rs_pending(mdev);
4201
4202	return TRUE;
4203}
4204
4205/* when we receive the ACK for a write request,
4206 * verify that we actually know about it */
4207static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4208	u64 id, sector_t sector)
4209{
4210	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4211	struct hlist_node *n;
4212	struct drbd_request *req;
4213
4214	hlist_for_each_entry(req, n, slot, colision) {
4215		if ((unsigned long)req == (unsigned long)id) {
4216			if (req->sector != sector) {
4217				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4218				    "wrong sector (%llus versus %llus)\n", req,
4219				    (unsigned long long)req->sector,
4220				    (unsigned long long)sector);
4221				break;
4222			}
4223			return req;
4224		}
4225	}
4226	dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4227		(void *)(unsigned long)id, (unsigned long long)sector);
4228	return NULL;
4229}
4230
4231typedef struct drbd_request *(req_validator_fn)
4232	(struct drbd_conf *mdev, u64 id, sector_t sector);
4233
4234static int validate_req_change_req_state(struct drbd_conf *mdev,
4235	u64 id, sector_t sector, req_validator_fn validator,
4236	const char *func, enum drbd_req_event what)
4237{
4238	struct drbd_request *req;
4239	struct bio_and_error m;
4240
4241	spin_lock_irq(&mdev->req_lock);
4242	req = validator(mdev, id, sector);
4243	if (unlikely(!req)) {
4244		spin_unlock_irq(&mdev->req_lock);
4245		dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4246		return FALSE;
4247	}
4248	__req_mod(req, what, &m);
4249	spin_unlock_irq(&mdev->req_lock);
4250
4251	if (m.bio)
4252		complete_master_bio(mdev, &m);
4253	return TRUE;
4254}
4255
4256static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4257{
4258	struct p_block_ack *p = (struct p_block_ack *)h;
4259	sector_t sector = be64_to_cpu(p->sector);
4260	int blksize = be32_to_cpu(p->blksize);
4261	enum drbd_req_event what;
4262
4263	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4264
4265	if (is_syncer_block_id(p->block_id)) {
4266		drbd_set_in_sync(mdev, sector, blksize);
4267		dec_rs_pending(mdev);
4268		return TRUE;
4269	}
4270	switch (be16_to_cpu(h->command)) {
4271	case P_RS_WRITE_ACK:
4272		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4273		what = write_acked_by_peer_and_sis;
4274		break;
4275	case P_WRITE_ACK:
4276		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4277		what = write_acked_by_peer;
4278		break;
4279	case P_RECV_ACK:
4280		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4281		what = recv_acked_by_peer;
4282		break;
4283	case P_DISCARD_ACK:
4284		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4285		what = conflict_discarded_by_peer;
4286		break;
4287	default:
4288		D_ASSERT(0);
4289		return FALSE;
4290	}
4291
4292	return validate_req_change_req_state(mdev, p->block_id, sector,
4293		_ack_id_to_req, __func__ , what);
4294}
4295
4296static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4297{
4298	struct p_block_ack *p = (struct p_block_ack *)h;
4299	sector_t sector = be64_to_cpu(p->sector);
4300
4301	if (__ratelimit(&drbd_ratelimit_state))
4302		dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4303
4304	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4305
4306	if (is_syncer_block_id(p->block_id)) {
4307		int size = be32_to_cpu(p->blksize);
4308		dec_rs_pending(mdev);
4309		drbd_rs_failed_io(mdev, sector, size);
4310		return TRUE;
4311	}
4312	return validate_req_change_req_state(mdev, p->block_id, sector,
4313		_ack_id_to_req, __func__ , neg_acked);
4314}
4315
4316static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4317{
4318	struct p_block_ack *p = (struct p_block_ack *)h;
4319	sector_t sector = be64_to_cpu(p->sector);
4320
4321	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4322	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4323	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4324
4325	return validate_req_change_req_state(mdev, p->block_id, sector,
4326		_ar_id_to_req, __func__ , neg_acked);
4327}
4328
4329static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4330{
4331	sector_t sector;
4332	int size;
4333	struct p_block_ack *p = (struct p_block_ack *)h;
4334
4335	sector = be64_to_cpu(p->sector);
4336	size = be32_to_cpu(p->blksize);
4337
4338	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4339
4340	dec_rs_pending(mdev);
4341
4342	if (get_ldev_if_state(mdev, D_FAILED)) {
4343		drbd_rs_complete_io(mdev, sector);
4344		drbd_rs_failed_io(mdev, sector, size);
4345		put_ldev(mdev);
4346	}
4347
4348	return TRUE;
4349}
4350
4351static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4352{
4353	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4354
4355	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4356
4357	return TRUE;
4358}
4359
4360static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4361{
4362	struct p_block_ack *p = (struct p_block_ack *)h;
4363	struct drbd_work *w;
4364	sector_t sector;
4365	int size;
4366
4367	sector = be64_to_cpu(p->sector);
4368	size = be32_to_cpu(p->blksize);
4369
4370	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4371
4372	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4373		drbd_ov_oos_found(mdev, sector, size);
4374	else
4375		ov_oos_print(mdev);
4376
4377	drbd_rs_complete_io(mdev, sector);
4378	dec_rs_pending(mdev);
4379
4380	if (--mdev->ov_left == 0) {
4381		w = kmalloc(sizeof(*w), GFP_NOIO);
4382		if (w) {
4383			w->cb = w_ov_finished;
4384			drbd_queue_work_front(&mdev->data.work, w);
4385		} else {
4386			dev_err(DEV, "kmalloc(w) failed.");
4387			ov_oos_print(mdev);
4388			drbd_resync_finished(mdev);
4389		}
4390	}
4391	return TRUE;
4392}
4393
4394static int got_something_to_ignore_m(struct drbd_conf *mdev, struct p_header *h)
4395{
4396	/* IGNORE */
4397	return TRUE;
4398}
4399
4400struct asender_cmd {
4401	size_t pkt_size;
4402	int (*process)(struct drbd_conf *mdev, struct p_header *h);
4403};
4404
4405static struct asender_cmd *get_asender_cmd(int cmd)
4406{
4407	static struct asender_cmd asender_tbl[] = {
4408		/* anything missing from this table is in
4409		 * the drbd_cmd_handler (drbd_default_handler) table,
4410		 * see the beginning of drbdd() */
4411	[P_PING]	    = { sizeof(struct p_header), got_Ping },
4412	[P_PING_ACK]	    = { sizeof(struct p_header), got_PingAck },
4413	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4414	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4415	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4416	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4417	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4418	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4419	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4420	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4421	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4422	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4423	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4424	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe), got_something_to_ignore_m },
4425	[P_MAX_CMD]	    = { 0, NULL },
4426	};
4427	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4428		return NULL;
4429	return &asender_tbl[cmd];
4430}
4431
4432int drbd_asender(struct drbd_thread *thi)
4433{
4434	struct drbd_conf *mdev = thi->mdev;
4435	struct p_header *h = &mdev->meta.rbuf.header;
4436	struct asender_cmd *cmd = NULL;
4437
4438	int rv, len;
4439	void *buf    = h;
4440	int received = 0;
4441	int expect   = sizeof(struct p_header);
4442	int empty;
4443
4444	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4445
4446	current->policy = SCHED_RR;  /* Make this a realtime task! */
4447	current->rt_priority = 2;    /* more important than all other tasks */
4448
4449	while (get_t_state(thi) == Running) {
4450		drbd_thread_current_set_cpu(mdev);
4451		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4452			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4453			mdev->meta.socket->sk->sk_rcvtimeo =
4454				mdev->net_conf->ping_timeo*HZ/10;
4455		}
4456
4457		/* conditionally cork;
4458		 * it may hurt latency if we cork without much to send */
4459		if (!mdev->net_conf->no_cork &&
4460			3 < atomic_read(&mdev->unacked_cnt))
4461			drbd_tcp_cork(mdev->meta.socket);
4462		while (1) {
4463			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4464			flush_signals(current);
4465			if (!drbd_process_done_ee(mdev)) {
4466				dev_err(DEV, "process_done_ee() = NOT_OK\n");
4467				goto reconnect;
4468			}
4469			/* to avoid race with newly queued ACKs */
4470			set_bit(SIGNAL_ASENDER, &mdev->flags);
4471			spin_lock_irq(&mdev->req_lock);
4472			empty = list_empty(&mdev->done_ee);
4473			spin_unlock_irq(&mdev->req_lock);
4474			/* new ack may have been queued right here,
4475			 * but then there is also a signal pending,
4476			 * and we start over... */
4477			if (empty)
4478				break;
4479		}
4480		/* but unconditionally uncork unless disabled */
4481		if (!mdev->net_conf->no_cork)
4482			drbd_tcp_uncork(mdev->meta.socket);
4483
4484		/* short circuit, recv_msg would return EINTR anyways. */
4485		if (signal_pending(current))
4486			continue;
4487
4488		rv = drbd_recv_short(mdev, mdev->meta.socket,
4489				     buf, expect-received, 0);
4490		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4491
4492		flush_signals(current);
4493
4494		/* Note:
4495		 * -EINTR	 (on meta) we got a signal
4496		 * -EAGAIN	 (on meta) rcvtimeo expired
4497		 * -ECONNRESET	 other side closed the connection
4498		 * -ERESTARTSYS  (on data) we got a signal
4499		 * rv <  0	 other than above: unexpected error!
4500		 * rv == expected: full header or command
4501		 * rv <  expected: "woken" by signal during receive
4502		 * rv == 0	 : "connection shut down by peer"
4503		 */
4504		if (likely(rv > 0)) {
4505			received += rv;
4506			buf	 += rv;
4507		} else if (rv == 0) {
4508			dev_err(DEV, "meta connection shut down by peer.\n");
4509			goto reconnect;
4510		} else if (rv == -EAGAIN) {
4511			if (mdev->meta.socket->sk->sk_rcvtimeo ==
4512			    mdev->net_conf->ping_timeo*HZ/10) {
4513				dev_err(DEV, "PingAck did not arrive in time.\n");
4514				goto reconnect;
4515			}
4516			set_bit(SEND_PING, &mdev->flags);
4517			continue;
4518		} else if (rv == -EINTR) {
4519			continue;
4520		} else {
4521			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4522			goto reconnect;
4523		}
4524
4525		if (received == expect && cmd == NULL) {
4526			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4527				dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4528				    (long)be32_to_cpu(h->magic),
4529				    h->command, h->length);
4530				goto reconnect;
4531			}
4532			cmd = get_asender_cmd(be16_to_cpu(h->command));
4533			len = be16_to_cpu(h->length);
4534			if (unlikely(cmd == NULL)) {
4535				dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4536				    (long)be32_to_cpu(h->magic),
4537				    h->command, h->length);
4538				goto disconnect;
4539			}
4540			expect = cmd->pkt_size;
4541			ERR_IF(len != expect-sizeof(struct p_header))
4542				goto reconnect;
4543		}
4544		if (received == expect) {
4545			D_ASSERT(cmd != NULL);
4546			if (!cmd->process(mdev, h))
4547				goto reconnect;
4548
4549			buf	 = h;
4550			received = 0;
4551			expect	 = sizeof(struct p_header);
4552			cmd	 = NULL;
4553		}
4554	}
4555
4556	if (0) {
4557reconnect:
4558		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4559	}
4560	if (0) {
4561disconnect:
4562		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4563	}
4564	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4565
4566	D_ASSERT(mdev->state.conn < C_CONNECTED);
4567	dev_info(DEV, "asender terminated\n");
4568
4569	return 0;
4570}
4571