1// SPDX-License-Identifier: GPL-2.0
2#include <linux/ceph/ceph_debug.h>
3
4#include <linux/bvec.h>
5#include <linux/crc32c.h>
6#include <linux/net.h>
7#include <linux/socket.h>
8#include <net/sock.h>
9
10#include <linux/ceph/ceph_features.h>
11#include <linux/ceph/decode.h>
12#include <linux/ceph/libceph.h>
13#include <linux/ceph/messenger.h>
14
15/* static tag bytes (protocol control messages) */
16static char tag_msg = CEPH_MSGR_TAG_MSG;
17static char tag_ack = CEPH_MSGR_TAG_ACK;
18static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
19static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
20
21/*
22 * If @buf is NULL, discard up to @len bytes.
23 */
24static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
25{
26	struct kvec iov = {buf, len};
27	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
28	int r;
29
30	if (!buf)
31		msg.msg_flags |= MSG_TRUNC;
32
33	iov_iter_kvec(&msg.msg_iter, ITER_DEST, &iov, 1, len);
34	r = sock_recvmsg(sock, &msg, msg.msg_flags);
35	if (r == -EAGAIN)
36		r = 0;
37	return r;
38}
39
40static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
41		     int page_offset, size_t length)
42{
43	struct bio_vec bvec;
44	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
45	int r;
46
47	BUG_ON(page_offset + length > PAGE_SIZE);
48	bvec_set_page(&bvec, page, length, page_offset);
49	iov_iter_bvec(&msg.msg_iter, ITER_DEST, &bvec, 1, length);
50	r = sock_recvmsg(sock, &msg, msg.msg_flags);
51	if (r == -EAGAIN)
52		r = 0;
53	return r;
54}
55
56/*
57 * write something.  @more is true if caller will be sending more data
58 * shortly.
59 */
60static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
61			    size_t kvlen, size_t len, bool more)
62{
63	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
64	int r;
65
66	if (more)
67		msg.msg_flags |= MSG_MORE;
68	else
69		msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
70
71	r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
72	if (r == -EAGAIN)
73		r = 0;
74	return r;
75}
76
77/*
78 * @more: MSG_MORE or 0.
79 */
80static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
81			     int offset, size_t size, int more)
82{
83	struct msghdr msg = {
84		.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | more,
85	};
86	struct bio_vec bvec;
87	int ret;
88
89	/*
90	 * MSG_SPLICE_PAGES cannot properly handle pages with page_count == 0,
91	 * we need to fall back to sendmsg if that's the case.
92	 *
93	 * Same goes for slab pages: skb_can_coalesce() allows
94	 * coalescing neighboring slab objects into a single frag which
95	 * triggers one of hardened usercopy checks.
96	 */
97	if (sendpage_ok(page))
98		msg.msg_flags |= MSG_SPLICE_PAGES;
99
100	bvec_set_page(&bvec, page, size, offset);
101	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, size);
102
103	ret = sock_sendmsg(sock, &msg);
104	if (ret == -EAGAIN)
105		ret = 0;
106
107	return ret;
108}
109
110static void con_out_kvec_reset(struct ceph_connection *con)
111{
112	BUG_ON(con->v1.out_skip);
113
114	con->v1.out_kvec_left = 0;
115	con->v1.out_kvec_bytes = 0;
116	con->v1.out_kvec_cur = &con->v1.out_kvec[0];
117}
118
119static void con_out_kvec_add(struct ceph_connection *con,
120				size_t size, void *data)
121{
122	int index = con->v1.out_kvec_left;
123
124	BUG_ON(con->v1.out_skip);
125	BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec));
126
127	con->v1.out_kvec[index].iov_len = size;
128	con->v1.out_kvec[index].iov_base = data;
129	con->v1.out_kvec_left++;
130	con->v1.out_kvec_bytes += size;
131}
132
133/*
134 * Chop off a kvec from the end.  Return residual number of bytes for
135 * that kvec, i.e. how many bytes would have been written if the kvec
136 * hadn't been nuked.
137 */
138static int con_out_kvec_skip(struct ceph_connection *con)
139{
140	int skip = 0;
141
142	if (con->v1.out_kvec_bytes > 0) {
143		skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len;
144		BUG_ON(con->v1.out_kvec_bytes < skip);
145		BUG_ON(!con->v1.out_kvec_left);
146		con->v1.out_kvec_bytes -= skip;
147		con->v1.out_kvec_left--;
148	}
149
150	return skip;
151}
152
153static size_t sizeof_footer(struct ceph_connection *con)
154{
155	return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
156	    sizeof(struct ceph_msg_footer) :
157	    sizeof(struct ceph_msg_footer_old);
158}
159
160static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
161{
162	/* Initialize data cursor if it's not a sparse read */
163	u64 len = msg->sparse_read_total ? : data_len;
164
165	ceph_msg_data_cursor_init(&msg->cursor, msg, len);
166}
167
168/*
169 * Prepare footer for currently outgoing message, and finish things
170 * off.  Assumes out_kvec* are already valid.. we just add on to the end.
171 */
172static void prepare_write_message_footer(struct ceph_connection *con)
173{
174	struct ceph_msg *m = con->out_msg;
175
176	m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
177
178	dout("prepare_write_message_footer %p\n", con);
179	con_out_kvec_add(con, sizeof_footer(con), &m->footer);
180	if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
181		if (con->ops->sign_message)
182			con->ops->sign_message(m);
183		else
184			m->footer.sig = 0;
185	} else {
186		m->old_footer.flags = m->footer.flags;
187	}
188	con->v1.out_more = m->more_to_follow;
189	con->v1.out_msg_done = true;
190}
191
192/*
193 * Prepare headers for the next outgoing message.
194 */
195static void prepare_write_message(struct ceph_connection *con)
196{
197	struct ceph_msg *m;
198	u32 crc;
199
200	con_out_kvec_reset(con);
201	con->v1.out_msg_done = false;
202
203	/* Sneak an ack in there first?  If we can get it into the same
204	 * TCP packet that's a good thing. */
205	if (con->in_seq > con->in_seq_acked) {
206		con->in_seq_acked = con->in_seq;
207		con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
208		con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
209		con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
210			&con->v1.out_temp_ack);
211	}
212
213	ceph_con_get_out_msg(con);
214	m = con->out_msg;
215
216	dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
217	     m, con->out_seq, le16_to_cpu(m->hdr.type),
218	     le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
219	     m->data_length);
220	WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
221	WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
222
223	/* tag + hdr + front + middle */
224	con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
225	con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr);
226	con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
227
228	if (m->middle)
229		con_out_kvec_add(con, m->middle->vec.iov_len,
230			m->middle->vec.iov_base);
231
232	/* fill in hdr crc and finalize hdr */
233	crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
234	con->out_msg->hdr.crc = cpu_to_le32(crc);
235	memcpy(&con->v1.out_hdr, &con->out_msg->hdr, sizeof(con->v1.out_hdr));
236
237	/* fill in front and middle crc, footer */
238	crc = crc32c(0, m->front.iov_base, m->front.iov_len);
239	con->out_msg->footer.front_crc = cpu_to_le32(crc);
240	if (m->middle) {
241		crc = crc32c(0, m->middle->vec.iov_base,
242				m->middle->vec.iov_len);
243		con->out_msg->footer.middle_crc = cpu_to_le32(crc);
244	} else
245		con->out_msg->footer.middle_crc = 0;
246	dout("%s front_crc %u middle_crc %u\n", __func__,
247	     le32_to_cpu(con->out_msg->footer.front_crc),
248	     le32_to_cpu(con->out_msg->footer.middle_crc));
249	con->out_msg->footer.flags = 0;
250
251	/* is there a data payload? */
252	con->out_msg->footer.data_crc = 0;
253	if (m->data_length) {
254		prepare_message_data(con->out_msg, m->data_length);
255		con->v1.out_more = 1;  /* data + footer will follow */
256	} else {
257		/* no, queue up footer too and be done */
258		prepare_write_message_footer(con);
259	}
260
261	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
262}
263
264/*
265 * Prepare an ack.
266 */
267static void prepare_write_ack(struct ceph_connection *con)
268{
269	dout("prepare_write_ack %p %llu -> %llu\n", con,
270	     con->in_seq_acked, con->in_seq);
271	con->in_seq_acked = con->in_seq;
272
273	con_out_kvec_reset(con);
274
275	con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
276
277	con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
278	con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
279			 &con->v1.out_temp_ack);
280
281	con->v1.out_more = 1;  /* more will follow.. eventually.. */
282	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
283}
284
285/*
286 * Prepare to share the seq during handshake
287 */
288static void prepare_write_seq(struct ceph_connection *con)
289{
290	dout("prepare_write_seq %p %llu -> %llu\n", con,
291	     con->in_seq_acked, con->in_seq);
292	con->in_seq_acked = con->in_seq;
293
294	con_out_kvec_reset(con);
295
296	con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
297	con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
298			 &con->v1.out_temp_ack);
299
300	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
301}
302
303/*
304 * Prepare to write keepalive byte.
305 */
306static void prepare_write_keepalive(struct ceph_connection *con)
307{
308	dout("prepare_write_keepalive %p\n", con);
309	con_out_kvec_reset(con);
310	if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
311		struct timespec64 now;
312
313		ktime_get_real_ts64(&now);
314		con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
315		ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now);
316		con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2),
317				 &con->v1.out_temp_keepalive2);
318	} else {
319		con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
320	}
321	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
322}
323
324/*
325 * Connection negotiation.
326 */
327
328static int get_connect_authorizer(struct ceph_connection *con)
329{
330	struct ceph_auth_handshake *auth;
331	int auth_proto;
332
333	if (!con->ops->get_authorizer) {
334		con->v1.auth = NULL;
335		con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
336		con->v1.out_connect.authorizer_len = 0;
337		return 0;
338	}
339
340	auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry);
341	if (IS_ERR(auth))
342		return PTR_ERR(auth);
343
344	con->v1.auth = auth;
345	con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
346	con->v1.out_connect.authorizer_len =
347		cpu_to_le32(auth->authorizer_buf_len);
348	return 0;
349}
350
351/*
352 * We connected to a peer and are saying hello.
353 */
354static void prepare_write_banner(struct ceph_connection *con)
355{
356	con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
357	con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
358					&con->msgr->my_enc_addr);
359
360	con->v1.out_more = 0;
361	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
362}
363
364static void __prepare_write_connect(struct ceph_connection *con)
365{
366	con_out_kvec_add(con, sizeof(con->v1.out_connect),
367			 &con->v1.out_connect);
368	if (con->v1.auth)
369		con_out_kvec_add(con, con->v1.auth->authorizer_buf_len,
370				 con->v1.auth->authorizer_buf);
371
372	con->v1.out_more = 0;
373	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
374}
375
376static int prepare_write_connect(struct ceph_connection *con)
377{
378	unsigned int global_seq = ceph_get_global_seq(con->msgr, 0);
379	int proto;
380	int ret;
381
382	switch (con->peer_name.type) {
383	case CEPH_ENTITY_TYPE_MON:
384		proto = CEPH_MONC_PROTOCOL;
385		break;
386	case CEPH_ENTITY_TYPE_OSD:
387		proto = CEPH_OSDC_PROTOCOL;
388		break;
389	case CEPH_ENTITY_TYPE_MDS:
390		proto = CEPH_MDSC_PROTOCOL;
391		break;
392	default:
393		BUG();
394	}
395
396	dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
397	     con->v1.connect_seq, global_seq, proto);
398
399	con->v1.out_connect.features =
400		cpu_to_le64(from_msgr(con->msgr)->supported_features);
401	con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
402	con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq);
403	con->v1.out_connect.global_seq = cpu_to_le32(global_seq);
404	con->v1.out_connect.protocol_version = cpu_to_le32(proto);
405	con->v1.out_connect.flags = 0;
406
407	ret = get_connect_authorizer(con);
408	if (ret)
409		return ret;
410
411	__prepare_write_connect(con);
412	return 0;
413}
414
415/*
416 * write as much of pending kvecs to the socket as we can.
417 *  1 -> done
418 *  0 -> socket full, but more to do
419 * <0 -> error
420 */
421static int write_partial_kvec(struct ceph_connection *con)
422{
423	int ret;
424
425	dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes);
426	while (con->v1.out_kvec_bytes > 0) {
427		ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur,
428				       con->v1.out_kvec_left,
429				       con->v1.out_kvec_bytes,
430				       con->v1.out_more);
431		if (ret <= 0)
432			goto out;
433		con->v1.out_kvec_bytes -= ret;
434		if (!con->v1.out_kvec_bytes)
435			break;            /* done */
436
437		/* account for full iov entries consumed */
438		while (ret >= con->v1.out_kvec_cur->iov_len) {
439			BUG_ON(!con->v1.out_kvec_left);
440			ret -= con->v1.out_kvec_cur->iov_len;
441			con->v1.out_kvec_cur++;
442			con->v1.out_kvec_left--;
443		}
444		/* and for a partially-consumed entry */
445		if (ret) {
446			con->v1.out_kvec_cur->iov_len -= ret;
447			con->v1.out_kvec_cur->iov_base += ret;
448		}
449	}
450	con->v1.out_kvec_left = 0;
451	ret = 1;
452out:
453	dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
454	     con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret);
455	return ret;  /* done! */
456}
457
458/*
459 * Write as much message data payload as we can.  If we finish, queue
460 * up the footer.
461 *  1 -> done, footer is now queued in out_kvec[].
462 *  0 -> socket full, but more to do
463 * <0 -> error
464 */
465static int write_partial_message_data(struct ceph_connection *con)
466{
467	struct ceph_msg *msg = con->out_msg;
468	struct ceph_msg_data_cursor *cursor = &msg->cursor;
469	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
470	u32 crc;
471
472	dout("%s %p msg %p\n", __func__, con, msg);
473
474	if (!msg->num_data_items)
475		return -EINVAL;
476
477	/*
478	 * Iterate through each page that contains data to be
479	 * written, and send as much as possible for each.
480	 *
481	 * If we are calculating the data crc (the default), we will
482	 * need to map the page.  If we have no pages, they have
483	 * been revoked, so use the zero page.
484	 */
485	crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
486	while (cursor->total_resid) {
487		struct page *page;
488		size_t page_offset;
489		size_t length;
490		int ret;
491
492		if (!cursor->resid) {
493			ceph_msg_data_advance(cursor, 0);
494			continue;
495		}
496
497		page = ceph_msg_data_next(cursor, &page_offset, &length);
498		ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
499					MSG_MORE);
500		if (ret <= 0) {
501			if (do_datacrc)
502				msg->footer.data_crc = cpu_to_le32(crc);
503
504			return ret;
505		}
506		if (do_datacrc && cursor->need_crc)
507			crc = ceph_crc32c_page(crc, page, page_offset, length);
508		ceph_msg_data_advance(cursor, (size_t)ret);
509	}
510
511	dout("%s %p msg %p done\n", __func__, con, msg);
512
513	/* prepare and queue up footer, too */
514	if (do_datacrc)
515		msg->footer.data_crc = cpu_to_le32(crc);
516	else
517		msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
518	con_out_kvec_reset(con);
519	prepare_write_message_footer(con);
520
521	return 1;	/* must return > 0 to indicate success */
522}
523
524/*
525 * write some zeros
526 */
527static int write_partial_skip(struct ceph_connection *con)
528{
529	int ret;
530
531	dout("%s %p %d left\n", __func__, con, con->v1.out_skip);
532	while (con->v1.out_skip > 0) {
533		size_t size = min(con->v1.out_skip, (int)PAGE_SIZE);
534
535		ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
536					MSG_MORE);
537		if (ret <= 0)
538			goto out;
539		con->v1.out_skip -= ret;
540	}
541	ret = 1;
542out:
543	return ret;
544}
545
546/*
547 * Prepare to read connection handshake, or an ack.
548 */
549static void prepare_read_banner(struct ceph_connection *con)
550{
551	dout("prepare_read_banner %p\n", con);
552	con->v1.in_base_pos = 0;
553}
554
555static void prepare_read_connect(struct ceph_connection *con)
556{
557	dout("prepare_read_connect %p\n", con);
558	con->v1.in_base_pos = 0;
559}
560
561static void prepare_read_ack(struct ceph_connection *con)
562{
563	dout("prepare_read_ack %p\n", con);
564	con->v1.in_base_pos = 0;
565}
566
567static void prepare_read_seq(struct ceph_connection *con)
568{
569	dout("prepare_read_seq %p\n", con);
570	con->v1.in_base_pos = 0;
571	con->v1.in_tag = CEPH_MSGR_TAG_SEQ;
572}
573
574static void prepare_read_tag(struct ceph_connection *con)
575{
576	dout("prepare_read_tag %p\n", con);
577	con->v1.in_base_pos = 0;
578	con->v1.in_tag = CEPH_MSGR_TAG_READY;
579}
580
581static void prepare_read_keepalive_ack(struct ceph_connection *con)
582{
583	dout("prepare_read_keepalive_ack %p\n", con);
584	con->v1.in_base_pos = 0;
585}
586
587/*
588 * Prepare to read a message.
589 */
590static int prepare_read_message(struct ceph_connection *con)
591{
592	dout("prepare_read_message %p\n", con);
593	BUG_ON(con->in_msg != NULL);
594	con->v1.in_base_pos = 0;
595	con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
596	return 0;
597}
598
599static int read_partial(struct ceph_connection *con,
600			int end, int size, void *object)
601{
602	while (con->v1.in_base_pos < end) {
603		int left = end - con->v1.in_base_pos;
604		int have = size - left;
605		int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
606		if (ret <= 0)
607			return ret;
608		con->v1.in_base_pos += ret;
609	}
610	return 1;
611}
612
613/*
614 * Read all or part of the connect-side handshake on a new connection
615 */
616static int read_partial_banner(struct ceph_connection *con)
617{
618	int size;
619	int end;
620	int ret;
621
622	dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos);
623
624	/* peer's banner */
625	size = strlen(CEPH_BANNER);
626	end = size;
627	ret = read_partial(con, end, size, con->v1.in_banner);
628	if (ret <= 0)
629		goto out;
630
631	size = sizeof(con->v1.actual_peer_addr);
632	end += size;
633	ret = read_partial(con, end, size, &con->v1.actual_peer_addr);
634	if (ret <= 0)
635		goto out;
636	ceph_decode_banner_addr(&con->v1.actual_peer_addr);
637
638	size = sizeof(con->v1.peer_addr_for_me);
639	end += size;
640	ret = read_partial(con, end, size, &con->v1.peer_addr_for_me);
641	if (ret <= 0)
642		goto out;
643	ceph_decode_banner_addr(&con->v1.peer_addr_for_me);
644
645out:
646	return ret;
647}
648
649static int read_partial_connect(struct ceph_connection *con)
650{
651	int size;
652	int end;
653	int ret;
654
655	dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos);
656
657	size = sizeof(con->v1.in_reply);
658	end = size;
659	ret = read_partial(con, end, size, &con->v1.in_reply);
660	if (ret <= 0)
661		goto out;
662
663	if (con->v1.auth) {
664		size = le32_to_cpu(con->v1.in_reply.authorizer_len);
665		if (size > con->v1.auth->authorizer_reply_buf_len) {
666			pr_err("authorizer reply too big: %d > %zu\n", size,
667			       con->v1.auth->authorizer_reply_buf_len);
668			ret = -EINVAL;
669			goto out;
670		}
671
672		end += size;
673		ret = read_partial(con, end, size,
674				   con->v1.auth->authorizer_reply_buf);
675		if (ret <= 0)
676			goto out;
677	}
678
679	dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
680	     con, con->v1.in_reply.tag,
681	     le32_to_cpu(con->v1.in_reply.connect_seq),
682	     le32_to_cpu(con->v1.in_reply.global_seq));
683out:
684	return ret;
685}
686
687/*
688 * Verify the hello banner looks okay.
689 */
690static int verify_hello(struct ceph_connection *con)
691{
692	if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
693		pr_err("connect to %s got bad banner\n",
694		       ceph_pr_addr(&con->peer_addr));
695		con->error_msg = "protocol error, bad banner";
696		return -1;
697	}
698	return 0;
699}
700
701static int process_banner(struct ceph_connection *con)
702{
703	struct ceph_entity_addr *my_addr = &con->msgr->inst.addr;
704
705	dout("process_banner on %p\n", con);
706
707	if (verify_hello(con) < 0)
708		return -1;
709
710	/*
711	 * Make sure the other end is who we wanted.  note that the other
712	 * end may not yet know their ip address, so if it's 0.0.0.0, give
713	 * them the benefit of the doubt.
714	 */
715	if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr,
716		   sizeof(con->peer_addr)) != 0 &&
717	    !(ceph_addr_is_blank(&con->v1.actual_peer_addr) &&
718	      con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) {
719		pr_warn("wrong peer, want %s/%u, got %s/%u\n",
720			ceph_pr_addr(&con->peer_addr),
721			le32_to_cpu(con->peer_addr.nonce),
722			ceph_pr_addr(&con->v1.actual_peer_addr),
723			le32_to_cpu(con->v1.actual_peer_addr.nonce));
724		con->error_msg = "wrong peer at address";
725		return -1;
726	}
727
728	/*
729	 * did we learn our address?
730	 */
731	if (ceph_addr_is_blank(my_addr)) {
732		memcpy(&my_addr->in_addr,
733		       &con->v1.peer_addr_for_me.in_addr,
734		       sizeof(con->v1.peer_addr_for_me.in_addr));
735		ceph_addr_set_port(my_addr, 0);
736		ceph_encode_my_addr(con->msgr);
737		dout("process_banner learned my addr is %s\n",
738		     ceph_pr_addr(my_addr));
739	}
740
741	return 0;
742}
743
744static int process_connect(struct ceph_connection *con)
745{
746	u64 sup_feat = from_msgr(con->msgr)->supported_features;
747	u64 req_feat = from_msgr(con->msgr)->required_features;
748	u64 server_feat = le64_to_cpu(con->v1.in_reply.features);
749	int ret;
750
751	dout("process_connect on %p tag %d\n", con, con->v1.in_tag);
752
753	if (con->v1.auth) {
754		int len = le32_to_cpu(con->v1.in_reply.authorizer_len);
755
756		/*
757		 * Any connection that defines ->get_authorizer()
758		 * should also define ->add_authorizer_challenge() and
759		 * ->verify_authorizer_reply().
760		 *
761		 * See get_connect_authorizer().
762		 */
763		if (con->v1.in_reply.tag ==
764				CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
765			ret = con->ops->add_authorizer_challenge(
766				con, con->v1.auth->authorizer_reply_buf, len);
767			if (ret < 0)
768				return ret;
769
770			con_out_kvec_reset(con);
771			__prepare_write_connect(con);
772			prepare_read_connect(con);
773			return 0;
774		}
775
776		if (len) {
777			ret = con->ops->verify_authorizer_reply(con);
778			if (ret < 0) {
779				con->error_msg = "bad authorize reply";
780				return ret;
781			}
782		}
783	}
784
785	switch (con->v1.in_reply.tag) {
786	case CEPH_MSGR_TAG_FEATURES:
787		pr_err("%s%lld %s feature set mismatch,"
788		       " my %llx < server's %llx, missing %llx\n",
789		       ENTITY_NAME(con->peer_name),
790		       ceph_pr_addr(&con->peer_addr),
791		       sup_feat, server_feat, server_feat & ~sup_feat);
792		con->error_msg = "missing required protocol features";
793		return -1;
794
795	case CEPH_MSGR_TAG_BADPROTOVER:
796		pr_err("%s%lld %s protocol version mismatch,"
797		       " my %d != server's %d\n",
798		       ENTITY_NAME(con->peer_name),
799		       ceph_pr_addr(&con->peer_addr),
800		       le32_to_cpu(con->v1.out_connect.protocol_version),
801		       le32_to_cpu(con->v1.in_reply.protocol_version));
802		con->error_msg = "protocol version mismatch";
803		return -1;
804
805	case CEPH_MSGR_TAG_BADAUTHORIZER:
806		con->v1.auth_retry++;
807		dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
808		     con->v1.auth_retry);
809		if (con->v1.auth_retry == 2) {
810			con->error_msg = "connect authorization failure";
811			return -1;
812		}
813		con_out_kvec_reset(con);
814		ret = prepare_write_connect(con);
815		if (ret < 0)
816			return ret;
817		prepare_read_connect(con);
818		break;
819
820	case CEPH_MSGR_TAG_RESETSESSION:
821		/*
822		 * If we connected with a large connect_seq but the peer
823		 * has no record of a session with us (no connection, or
824		 * connect_seq == 0), they will send RESETSESION to indicate
825		 * that they must have reset their session, and may have
826		 * dropped messages.
827		 */
828		dout("process_connect got RESET peer seq %u\n",
829		     le32_to_cpu(con->v1.in_reply.connect_seq));
830		pr_info("%s%lld %s session reset\n",
831			ENTITY_NAME(con->peer_name),
832			ceph_pr_addr(&con->peer_addr));
833		ceph_con_reset_session(con);
834		con_out_kvec_reset(con);
835		ret = prepare_write_connect(con);
836		if (ret < 0)
837			return ret;
838		prepare_read_connect(con);
839
840		/* Tell ceph about it. */
841		mutex_unlock(&con->mutex);
842		if (con->ops->peer_reset)
843			con->ops->peer_reset(con);
844		mutex_lock(&con->mutex);
845		if (con->state != CEPH_CON_S_V1_CONNECT_MSG)
846			return -EAGAIN;
847		break;
848
849	case CEPH_MSGR_TAG_RETRY_SESSION:
850		/*
851		 * If we sent a smaller connect_seq than the peer has, try
852		 * again with a larger value.
853		 */
854		dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
855		     le32_to_cpu(con->v1.out_connect.connect_seq),
856		     le32_to_cpu(con->v1.in_reply.connect_seq));
857		con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq);
858		con_out_kvec_reset(con);
859		ret = prepare_write_connect(con);
860		if (ret < 0)
861			return ret;
862		prepare_read_connect(con);
863		break;
864
865	case CEPH_MSGR_TAG_RETRY_GLOBAL:
866		/*
867		 * If we sent a smaller global_seq than the peer has, try
868		 * again with a larger value.
869		 */
870		dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
871		     con->v1.peer_global_seq,
872		     le32_to_cpu(con->v1.in_reply.global_seq));
873		ceph_get_global_seq(con->msgr,
874				    le32_to_cpu(con->v1.in_reply.global_seq));
875		con_out_kvec_reset(con);
876		ret = prepare_write_connect(con);
877		if (ret < 0)
878			return ret;
879		prepare_read_connect(con);
880		break;
881
882	case CEPH_MSGR_TAG_SEQ:
883	case CEPH_MSGR_TAG_READY:
884		if (req_feat & ~server_feat) {
885			pr_err("%s%lld %s protocol feature mismatch,"
886			       " my required %llx > server's %llx, need %llx\n",
887			       ENTITY_NAME(con->peer_name),
888			       ceph_pr_addr(&con->peer_addr),
889			       req_feat, server_feat, req_feat & ~server_feat);
890			con->error_msg = "missing required protocol features";
891			return -1;
892		}
893
894		WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG);
895		con->state = CEPH_CON_S_OPEN;
896		con->v1.auth_retry = 0;    /* we authenticated; clear flag */
897		con->v1.peer_global_seq =
898			le32_to_cpu(con->v1.in_reply.global_seq);
899		con->v1.connect_seq++;
900		con->peer_features = server_feat;
901		dout("process_connect got READY gseq %d cseq %d (%d)\n",
902		     con->v1.peer_global_seq,
903		     le32_to_cpu(con->v1.in_reply.connect_seq),
904		     con->v1.connect_seq);
905		WARN_ON(con->v1.connect_seq !=
906			le32_to_cpu(con->v1.in_reply.connect_seq));
907
908		if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
909			ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX);
910
911		con->delay = 0;      /* reset backoff memory */
912
913		if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) {
914			prepare_write_seq(con);
915			prepare_read_seq(con);
916		} else {
917			prepare_read_tag(con);
918		}
919		break;
920
921	case CEPH_MSGR_TAG_WAIT:
922		/*
923		 * If there is a connection race (we are opening
924		 * connections to each other), one of us may just have
925		 * to WAIT.  This shouldn't happen if we are the
926		 * client.
927		 */
928		con->error_msg = "protocol error, got WAIT as client";
929		return -1;
930
931	default:
932		con->error_msg = "protocol error, garbage tag during connect";
933		return -1;
934	}
935	return 0;
936}
937
938/*
939 * read (part of) an ack
940 */
941static int read_partial_ack(struct ceph_connection *con)
942{
943	int size = sizeof(con->v1.in_temp_ack);
944	int end = size;
945
946	return read_partial(con, end, size, &con->v1.in_temp_ack);
947}
948
949/*
950 * We can finally discard anything that's been acked.
951 */
952static void process_ack(struct ceph_connection *con)
953{
954	u64 ack = le64_to_cpu(con->v1.in_temp_ack);
955
956	if (con->v1.in_tag == CEPH_MSGR_TAG_ACK)
957		ceph_con_discard_sent(con, ack);
958	else
959		ceph_con_discard_requeued(con, ack);
960
961	prepare_read_tag(con);
962}
963
964static int read_partial_message_chunk(struct ceph_connection *con,
965				      struct kvec *section,
966				      unsigned int sec_len, u32 *crc)
967{
968	int ret, left;
969
970	BUG_ON(!section);
971
972	while (section->iov_len < sec_len) {
973		BUG_ON(section->iov_base == NULL);
974		left = sec_len - section->iov_len;
975		ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
976				       section->iov_len, left);
977		if (ret <= 0)
978			return ret;
979		section->iov_len += ret;
980	}
981	if (section->iov_len == sec_len)
982		*crc = crc32c(*crc, section->iov_base, section->iov_len);
983
984	return 1;
985}
986
987static inline int read_partial_message_section(struct ceph_connection *con,
988					       struct kvec *section,
989					       unsigned int sec_len, u32 *crc)
990{
991	*crc = 0;
992	return read_partial_message_chunk(con, section, sec_len, crc);
993}
994
995static int read_partial_sparse_msg_extent(struct ceph_connection *con, u32 *crc)
996{
997	struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
998	bool do_bounce = ceph_test_opt(from_msgr(con->msgr), RXBOUNCE);
999
1000	if (do_bounce && unlikely(!con->bounce_page)) {
1001		con->bounce_page = alloc_page(GFP_NOIO);
1002		if (!con->bounce_page) {
1003			pr_err("failed to allocate bounce page\n");
1004			return -ENOMEM;
1005		}
1006	}
1007
1008	while (cursor->sr_resid > 0) {
1009		struct page *page, *rpage;
1010		size_t off, len;
1011		int ret;
1012
1013		page = ceph_msg_data_next(cursor, &off, &len);
1014		rpage = do_bounce ? con->bounce_page : page;
1015
1016		/* clamp to what remains in extent */
1017		len = min_t(int, len, cursor->sr_resid);
1018		ret = ceph_tcp_recvpage(con->sock, rpage, (int)off, len);
1019		if (ret <= 0)
1020			return ret;
1021		*crc = ceph_crc32c_page(*crc, rpage, off, ret);
1022		ceph_msg_data_advance(cursor, (size_t)ret);
1023		cursor->sr_resid -= ret;
1024		if (do_bounce)
1025			memcpy_page(page, off, rpage, off, ret);
1026	}
1027	return 1;
1028}
1029
1030static int read_partial_sparse_msg_data(struct ceph_connection *con)
1031{
1032	struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
1033	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1034	u32 crc = 0;
1035	int ret = 1;
1036
1037	if (do_datacrc)
1038		crc = con->in_data_crc;
1039
1040	while (cursor->total_resid) {
1041		if (con->v1.in_sr_kvec.iov_base)
1042			ret = read_partial_message_chunk(con,
1043							 &con->v1.in_sr_kvec,
1044							 con->v1.in_sr_len,
1045							 &crc);
1046		else if (cursor->sr_resid > 0)
1047			ret = read_partial_sparse_msg_extent(con, &crc);
1048		if (ret <= 0)
1049			break;
1050
1051		memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec));
1052		ret = con->ops->sparse_read(con, cursor,
1053				(char **)&con->v1.in_sr_kvec.iov_base);
1054		if (ret <= 0) {
1055			ret = ret ? ret : 1;  /* must return > 0 to indicate success */
1056			break;
1057		}
1058		con->v1.in_sr_len = ret;
1059	}
1060
1061	if (do_datacrc)
1062		con->in_data_crc = crc;
1063
1064	return ret;
1065}
1066
1067static int read_partial_msg_data(struct ceph_connection *con)
1068{
1069	struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
1070	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1071	struct page *page;
1072	size_t page_offset;
1073	size_t length;
1074	u32 crc = 0;
1075	int ret;
1076
1077	if (do_datacrc)
1078		crc = con->in_data_crc;
1079	while (cursor->total_resid) {
1080		if (!cursor->resid) {
1081			ceph_msg_data_advance(cursor, 0);
1082			continue;
1083		}
1084
1085		page = ceph_msg_data_next(cursor, &page_offset, &length);
1086		ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
1087		if (ret <= 0) {
1088			if (do_datacrc)
1089				con->in_data_crc = crc;
1090
1091			return ret;
1092		}
1093
1094		if (do_datacrc)
1095			crc = ceph_crc32c_page(crc, page, page_offset, ret);
1096		ceph_msg_data_advance(cursor, (size_t)ret);
1097	}
1098	if (do_datacrc)
1099		con->in_data_crc = crc;
1100
1101	return 1;	/* must return > 0 to indicate success */
1102}
1103
1104static int read_partial_msg_data_bounce(struct ceph_connection *con)
1105{
1106	struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
1107	struct page *page;
1108	size_t off, len;
1109	u32 crc;
1110	int ret;
1111
1112	if (unlikely(!con->bounce_page)) {
1113		con->bounce_page = alloc_page(GFP_NOIO);
1114		if (!con->bounce_page) {
1115			pr_err("failed to allocate bounce page\n");
1116			return -ENOMEM;
1117		}
1118	}
1119
1120	crc = con->in_data_crc;
1121	while (cursor->total_resid) {
1122		if (!cursor->resid) {
1123			ceph_msg_data_advance(cursor, 0);
1124			continue;
1125		}
1126
1127		page = ceph_msg_data_next(cursor, &off, &len);
1128		ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len);
1129		if (ret <= 0) {
1130			con->in_data_crc = crc;
1131			return ret;
1132		}
1133
1134		crc = crc32c(crc, page_address(con->bounce_page), ret);
1135		memcpy_to_page(page, off, page_address(con->bounce_page), ret);
1136
1137		ceph_msg_data_advance(cursor, ret);
1138	}
1139	con->in_data_crc = crc;
1140
1141	return 1;	/* must return > 0 to indicate success */
1142}
1143
1144/*
1145 * read (part of) a message.
1146 */
1147static int read_partial_message(struct ceph_connection *con)
1148{
1149	struct ceph_msg *m = con->in_msg;
1150	int size;
1151	int end;
1152	int ret;
1153	unsigned int front_len, middle_len, data_len;
1154	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1155	bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
1156	u64 seq;
1157	u32 crc;
1158
1159	dout("read_partial_message con %p msg %p\n", con, m);
1160
1161	/* header */
1162	size = sizeof(con->v1.in_hdr);
1163	end = size;
1164	ret = read_partial(con, end, size, &con->v1.in_hdr);
1165	if (ret <= 0)
1166		return ret;
1167
1168	crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc));
1169	if (cpu_to_le32(crc) != con->v1.in_hdr.crc) {
1170		pr_err("read_partial_message bad hdr crc %u != expected %u\n",
1171		       crc, con->v1.in_hdr.crc);
1172		return -EBADMSG;
1173	}
1174
1175	front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1176	if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1177		return -EIO;
1178	middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1179	if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
1180		return -EIO;
1181	data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1182	if (data_len > CEPH_MSG_MAX_DATA_LEN)
1183		return -EIO;
1184
1185	/* verify seq# */
1186	seq = le64_to_cpu(con->v1.in_hdr.seq);
1187	if ((s64)seq - (s64)con->in_seq < 1) {
1188		pr_info("skipping %s%lld %s seq %lld expected %lld\n",
1189			ENTITY_NAME(con->peer_name),
1190			ceph_pr_addr(&con->peer_addr),
1191			seq, con->in_seq + 1);
1192		con->v1.in_base_pos = -front_len - middle_len - data_len -
1193				      sizeof_footer(con);
1194		con->v1.in_tag = CEPH_MSGR_TAG_READY;
1195		return 1;
1196	} else if ((s64)seq - (s64)con->in_seq > 1) {
1197		pr_err("read_partial_message bad seq %lld expected %lld\n",
1198		       seq, con->in_seq + 1);
1199		con->error_msg = "bad message sequence # for incoming message";
1200		return -EBADE;
1201	}
1202
1203	/* allocate message? */
1204	if (!con->in_msg) {
1205		int skip = 0;
1206
1207		dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type,
1208		     front_len, data_len);
1209		ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip);
1210		if (ret < 0)
1211			return ret;
1212
1213		BUG_ON((!con->in_msg) ^ skip);
1214		if (skip) {
1215			/* skip this message */
1216			dout("alloc_msg said skip message\n");
1217			con->v1.in_base_pos = -front_len - middle_len -
1218					      data_len - sizeof_footer(con);
1219			con->v1.in_tag = CEPH_MSGR_TAG_READY;
1220			con->in_seq++;
1221			return 1;
1222		}
1223
1224		BUG_ON(!con->in_msg);
1225		BUG_ON(con->in_msg->con != con);
1226		m = con->in_msg;
1227		m->front.iov_len = 0;    /* haven't read it yet */
1228		if (m->middle)
1229			m->middle->vec.iov_len = 0;
1230
1231		/* prepare for data payload, if any */
1232
1233		if (data_len)
1234			prepare_message_data(con->in_msg, data_len);
1235	}
1236
1237	/* front */
1238	ret = read_partial_message_section(con, &m->front, front_len,
1239					   &con->in_front_crc);
1240	if (ret <= 0)
1241		return ret;
1242
1243	/* middle */
1244	if (m->middle) {
1245		ret = read_partial_message_section(con, &m->middle->vec,
1246						   middle_len,
1247						   &con->in_middle_crc);
1248		if (ret <= 0)
1249			return ret;
1250	}
1251
1252	/* (page) data */
1253	if (data_len) {
1254		if (!m->num_data_items)
1255			return -EIO;
1256
1257		if (m->sparse_read_total)
1258			ret = read_partial_sparse_msg_data(con);
1259		else if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
1260			ret = read_partial_msg_data_bounce(con);
1261		else
1262			ret = read_partial_msg_data(con);
1263		if (ret <= 0)
1264			return ret;
1265	}
1266
1267	/* footer */
1268	size = sizeof_footer(con);
1269	end += size;
1270	ret = read_partial(con, end, size, &m->footer);
1271	if (ret <= 0)
1272		return ret;
1273
1274	if (!need_sign) {
1275		m->footer.flags = m->old_footer.flags;
1276		m->footer.sig = 0;
1277	}
1278
1279	dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1280	     m, front_len, m->footer.front_crc, middle_len,
1281	     m->footer.middle_crc, data_len, m->footer.data_crc);
1282
1283	/* crc ok? */
1284	if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
1285		pr_err("read_partial_message %p front crc %u != exp. %u\n",
1286		       m, con->in_front_crc, m->footer.front_crc);
1287		return -EBADMSG;
1288	}
1289	if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
1290		pr_err("read_partial_message %p middle crc %u != exp %u\n",
1291		       m, con->in_middle_crc, m->footer.middle_crc);
1292		return -EBADMSG;
1293	}
1294	if (do_datacrc &&
1295	    (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1296	    con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1297		pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
1298		       con->in_data_crc, le32_to_cpu(m->footer.data_crc));
1299		return -EBADMSG;
1300	}
1301
1302	if (need_sign && con->ops->check_message_signature &&
1303	    con->ops->check_message_signature(m)) {
1304		pr_err("read_partial_message %p signature check failed\n", m);
1305		return -EBADMSG;
1306	}
1307
1308	return 1; /* done! */
1309}
1310
1311static int read_keepalive_ack(struct ceph_connection *con)
1312{
1313	struct ceph_timespec ceph_ts;
1314	size_t size = sizeof(ceph_ts);
1315	int ret = read_partial(con, size, size, &ceph_ts);
1316	if (ret <= 0)
1317		return ret;
1318	ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
1319	prepare_read_tag(con);
1320	return 1;
1321}
1322
1323/*
1324 * Read what we can from the socket.
1325 */
1326int ceph_con_v1_try_read(struct ceph_connection *con)
1327{
1328	int ret = -1;
1329
1330more:
1331	dout("try_read start %p state %d\n", con, con->state);
1332	if (con->state != CEPH_CON_S_V1_BANNER &&
1333	    con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1334	    con->state != CEPH_CON_S_OPEN)
1335		return 0;
1336
1337	BUG_ON(!con->sock);
1338
1339	dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag,
1340	     con->v1.in_base_pos);
1341
1342	if (con->state == CEPH_CON_S_V1_BANNER) {
1343		ret = read_partial_banner(con);
1344		if (ret <= 0)
1345			goto out;
1346		ret = process_banner(con);
1347		if (ret < 0)
1348			goto out;
1349
1350		con->state = CEPH_CON_S_V1_CONNECT_MSG;
1351
1352		/*
1353		 * Received banner is good, exchange connection info.
1354		 * Do not reset out_kvec, as sending our banner raced
1355		 * with receiving peer banner after connect completed.
1356		 */
1357		ret = prepare_write_connect(con);
1358		if (ret < 0)
1359			goto out;
1360		prepare_read_connect(con);
1361
1362		/* Send connection info before awaiting response */
1363		goto out;
1364	}
1365
1366	if (con->state == CEPH_CON_S_V1_CONNECT_MSG) {
1367		ret = read_partial_connect(con);
1368		if (ret <= 0)
1369			goto out;
1370		ret = process_connect(con);
1371		if (ret < 0)
1372			goto out;
1373		goto more;
1374	}
1375
1376	WARN_ON(con->state != CEPH_CON_S_OPEN);
1377
1378	if (con->v1.in_base_pos < 0) {
1379		/*
1380		 * skipping + discarding content.
1381		 */
1382		ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos);
1383		if (ret <= 0)
1384			goto out;
1385		dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos);
1386		con->v1.in_base_pos += ret;
1387		if (con->v1.in_base_pos)
1388			goto more;
1389	}
1390	if (con->v1.in_tag == CEPH_MSGR_TAG_READY) {
1391		/*
1392		 * what's next?
1393		 */
1394		ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1);
1395		if (ret <= 0)
1396			goto out;
1397		dout("try_read got tag %d\n", con->v1.in_tag);
1398		switch (con->v1.in_tag) {
1399		case CEPH_MSGR_TAG_MSG:
1400			prepare_read_message(con);
1401			break;
1402		case CEPH_MSGR_TAG_ACK:
1403			prepare_read_ack(con);
1404			break;
1405		case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
1406			prepare_read_keepalive_ack(con);
1407			break;
1408		case CEPH_MSGR_TAG_CLOSE:
1409			ceph_con_close_socket(con);
1410			con->state = CEPH_CON_S_CLOSED;
1411			goto out;
1412		default:
1413			goto bad_tag;
1414		}
1415	}
1416	if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) {
1417		ret = read_partial_message(con);
1418		if (ret <= 0) {
1419			switch (ret) {
1420			case -EBADMSG:
1421				con->error_msg = "bad crc/signature";
1422				fallthrough;
1423			case -EBADE:
1424				ret = -EIO;
1425				break;
1426			case -EIO:
1427				con->error_msg = "io error";
1428				break;
1429			}
1430			goto out;
1431		}
1432		if (con->v1.in_tag == CEPH_MSGR_TAG_READY)
1433			goto more;
1434		ceph_con_process_message(con);
1435		if (con->state == CEPH_CON_S_OPEN)
1436			prepare_read_tag(con);
1437		goto more;
1438	}
1439	if (con->v1.in_tag == CEPH_MSGR_TAG_ACK ||
1440	    con->v1.in_tag == CEPH_MSGR_TAG_SEQ) {
1441		/*
1442		 * the final handshake seq exchange is semantically
1443		 * equivalent to an ACK
1444		 */
1445		ret = read_partial_ack(con);
1446		if (ret <= 0)
1447			goto out;
1448		process_ack(con);
1449		goto more;
1450	}
1451	if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
1452		ret = read_keepalive_ack(con);
1453		if (ret <= 0)
1454			goto out;
1455		goto more;
1456	}
1457
1458out:
1459	dout("try_read done on %p ret %d\n", con, ret);
1460	return ret;
1461
1462bad_tag:
1463	pr_err("try_read bad tag %d\n", con->v1.in_tag);
1464	con->error_msg = "protocol error, garbage tag";
1465	ret = -1;
1466	goto out;
1467}
1468
1469/*
1470 * Write something to the socket.  Called in a worker thread when the
1471 * socket appears to be writeable and we have something ready to send.
1472 */
1473int ceph_con_v1_try_write(struct ceph_connection *con)
1474{
1475	int ret = 1;
1476
1477	dout("try_write start %p state %d\n", con, con->state);
1478	if (con->state != CEPH_CON_S_PREOPEN &&
1479	    con->state != CEPH_CON_S_V1_BANNER &&
1480	    con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1481	    con->state != CEPH_CON_S_OPEN)
1482		return 0;
1483
1484	/* open the socket first? */
1485	if (con->state == CEPH_CON_S_PREOPEN) {
1486		BUG_ON(con->sock);
1487		con->state = CEPH_CON_S_V1_BANNER;
1488
1489		con_out_kvec_reset(con);
1490		prepare_write_banner(con);
1491		prepare_read_banner(con);
1492
1493		BUG_ON(con->in_msg);
1494		con->v1.in_tag = CEPH_MSGR_TAG_READY;
1495		dout("try_write initiating connect on %p new state %d\n",
1496		     con, con->state);
1497		ret = ceph_tcp_connect(con);
1498		if (ret < 0) {
1499			con->error_msg = "connect error";
1500			goto out;
1501		}
1502	}
1503
1504more:
1505	dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes);
1506	BUG_ON(!con->sock);
1507
1508	/* kvec data queued? */
1509	if (con->v1.out_kvec_left) {
1510		ret = write_partial_kvec(con);
1511		if (ret <= 0)
1512			goto out;
1513	}
1514	if (con->v1.out_skip) {
1515		ret = write_partial_skip(con);
1516		if (ret <= 0)
1517			goto out;
1518	}
1519
1520	/* msg pages? */
1521	if (con->out_msg) {
1522		if (con->v1.out_msg_done) {
1523			ceph_msg_put(con->out_msg);
1524			con->out_msg = NULL;   /* we're done with this one */
1525			goto do_next;
1526		}
1527
1528		ret = write_partial_message_data(con);
1529		if (ret == 1)
1530			goto more;  /* we need to send the footer, too! */
1531		if (ret == 0)
1532			goto out;
1533		if (ret < 0) {
1534			dout("try_write write_partial_message_data err %d\n",
1535			     ret);
1536			goto out;
1537		}
1538	}
1539
1540do_next:
1541	if (con->state == CEPH_CON_S_OPEN) {
1542		if (ceph_con_flag_test_and_clear(con,
1543				CEPH_CON_F_KEEPALIVE_PENDING)) {
1544			prepare_write_keepalive(con);
1545			goto more;
1546		}
1547		/* is anything else pending? */
1548		if (!list_empty(&con->out_queue)) {
1549			prepare_write_message(con);
1550			goto more;
1551		}
1552		if (con->in_seq > con->in_seq_acked) {
1553			prepare_write_ack(con);
1554			goto more;
1555		}
1556	}
1557
1558	/* Nothing to do! */
1559	ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
1560	dout("try_write nothing else to write.\n");
1561	ret = 0;
1562out:
1563	dout("try_write done on %p ret %d\n", con, ret);
1564	return ret;
1565}
1566
1567void ceph_con_v1_revoke(struct ceph_connection *con)
1568{
1569	struct ceph_msg *msg = con->out_msg;
1570
1571	WARN_ON(con->v1.out_skip);
1572	/* footer */
1573	if (con->v1.out_msg_done) {
1574		con->v1.out_skip += con_out_kvec_skip(con);
1575	} else {
1576		WARN_ON(!msg->data_length);
1577		con->v1.out_skip += sizeof_footer(con);
1578	}
1579	/* data, middle, front */
1580	if (msg->data_length)
1581		con->v1.out_skip += msg->cursor.total_resid;
1582	if (msg->middle)
1583		con->v1.out_skip += con_out_kvec_skip(con);
1584	con->v1.out_skip += con_out_kvec_skip(con);
1585
1586	dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
1587	     con->v1.out_kvec_bytes, con->v1.out_skip);
1588}
1589
1590void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
1591{
1592	unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1593	unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1594	unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1595
1596	/* skip rest of message */
1597	con->v1.in_base_pos = con->v1.in_base_pos -
1598			sizeof(struct ceph_msg_header) -
1599			front_len -
1600			middle_len -
1601			data_len -
1602			sizeof(struct ceph_msg_footer);
1603
1604	con->v1.in_tag = CEPH_MSGR_TAG_READY;
1605	con->in_seq++;
1606
1607	dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos);
1608}
1609
1610bool ceph_con_v1_opened(struct ceph_connection *con)
1611{
1612	return con->v1.connect_seq;
1613}
1614
1615void ceph_con_v1_reset_session(struct ceph_connection *con)
1616{
1617	con->v1.connect_seq = 0;
1618	con->v1.peer_global_seq = 0;
1619}
1620
1621void ceph_con_v1_reset_protocol(struct ceph_connection *con)
1622{
1623	con->v1.out_skip = 0;
1624}
1625