1/*-
2 * SPDX-License-Identifier: BSD-2-Clause
3 *
4 * Copyright (c) 2022-2024 Chelsio Communications, Inc.
5 * Written by: John Baldwin <jhb@FreeBSD.org>
6 */
7
8#include <sys/param.h>
9#include <sys/capsicum.h>
10#include <sys/condvar.h>
11#include <sys/file.h>
12#include <sys/gsb_crc32.h>
13#include <sys/kernel.h>
14#include <sys/kthread.h>
15#include <sys/limits.h>
16#include <sys/lock.h>
17#include <sys/malloc.h>
18#include <sys/mbuf.h>
19#include <sys/module.h>
20#include <sys/mutex.h>
21#include <sys/protosw.h>
22#include <sys/refcount.h>
23#include <sys/socket.h>
24#include <sys/socketvar.h>
25#include <sys/sysctl.h>
26#include <sys/uio.h>
27#include <netinet/in.h>
28#include <dev/nvme/nvme.h>
29#include <dev/nvmf/nvmf.h>
30#include <dev/nvmf/nvmf_proto.h>
31#include <dev/nvmf/nvmf_tcp.h>
32#include <dev/nvmf/nvmf_transport.h>
33#include <dev/nvmf/nvmf_transport_internal.h>
34
35struct nvmf_tcp_capsule;
36struct nvmf_tcp_qpair;
37
38struct nvmf_tcp_command_buffer {
39	struct nvmf_tcp_qpair *qp;
40
41	struct nvmf_io_request io;
42	size_t	data_len;
43	size_t	data_xfered;
44	uint32_t data_offset;
45
46	u_int	refs;
47	int	error;
48
49	uint16_t cid;
50	uint16_t ttag;
51
52	TAILQ_ENTRY(nvmf_tcp_command_buffer) link;
53
54	/* Controller only */
55	struct nvmf_tcp_capsule *tc;
56};
57
58struct nvmf_tcp_command_buffer_list {
59	TAILQ_HEAD(, nvmf_tcp_command_buffer) head;
60	struct mtx lock;
61};
62
63struct nvmf_tcp_qpair {
64	struct nvmf_qpair qp;
65
66	struct socket *so;
67
68	volatile u_int refs;	/* Every allocated capsule holds a reference */
69	uint8_t	txpda;
70	uint8_t rxpda;
71	bool header_digests;
72	bool data_digests;
73	uint32_t maxr2t;
74	uint32_t maxh2cdata;	/* Controller only */
75	uint32_t max_tx_data;
76	uint32_t max_icd;	/* Host only */
77	uint16_t next_ttag;	/* Controller only */
78	u_int num_ttags;	/* Controller only */
79	u_int active_ttags;	/* Controller only */
80	bool send_success;	/* Controller only */
81
82	/* Receive state. */
83	struct thread *rx_thread;
84	struct cv rx_cv;
85	bool	rx_shutdown;
86
87	/* Transmit state. */
88	struct thread *tx_thread;
89	struct cv tx_cv;
90	bool	tx_shutdown;
91	struct mbufq tx_pdus;
92	STAILQ_HEAD(, nvmf_tcp_capsule) tx_capsules;
93
94	struct nvmf_tcp_command_buffer_list tx_buffers;
95	struct nvmf_tcp_command_buffer_list rx_buffers;
96
97	/*
98	 * For the controller, an RX command buffer can be in one of
99	 * two locations, all protected by the rx_buffers.lock.  If a
100	 * receive request is waiting for either an R2T slot for its
101	 * command (due to exceeding MAXR2T), or a transfer tag it is
102	 * placed on the rx_buffers list.  When a request is allocated
103	 * an active transfer tag, it moves to the open_ttags[] array
104	 * (indexed by the tag) until it completes.
105	 */
106	struct nvmf_tcp_command_buffer **open_ttags;	/* Controller only */
107};
108
109struct nvmf_tcp_rxpdu {
110	struct mbuf *m;
111	const struct nvme_tcp_common_pdu_hdr *hdr;
112	uint32_t data_len;
113	bool data_digest_mismatch;
114};
115
116struct nvmf_tcp_capsule {
117	struct nvmf_capsule nc;
118
119	volatile u_int refs;
120
121	struct nvmf_tcp_rxpdu rx_pdu;
122
123	uint32_t active_r2ts;		/* Controller only */
124#ifdef INVARIANTS
125	uint32_t tx_data_offset;	/* Controller only */
126	u_int pending_r2ts;		/* Controller only */
127#endif
128
129	STAILQ_ENTRY(nvmf_tcp_capsule) link;
130};
131
132#define	TCAP(nc)	((struct nvmf_tcp_capsule *)(nc))
133#define	TQP(qp)		((struct nvmf_tcp_qpair *)(qp))
134
135static void	tcp_release_capsule(struct nvmf_tcp_capsule *tc);
136static void	tcp_free_qpair(struct nvmf_qpair *nq);
137
138SYSCTL_NODE(_kern_nvmf, OID_AUTO, tcp, CTLFLAG_RD | CTLFLAG_MPSAFE, 0,
139    "TCP transport");
140static u_int tcp_max_transmit_data = 256 * 1024;
141SYSCTL_UINT(_kern_nvmf_tcp, OID_AUTO, max_c2hdata, CTLFLAG_RWTUN,
142    &tcp_max_transmit_data, 0,
143    "Maximum size of data payload in a transmitted PDU");
144
145static MALLOC_DEFINE(M_NVMF_TCP, "nvmf_tcp", "NVMe over TCP");
146
147static int
148mbuf_crc32c_helper(void *arg, void *data, u_int len)
149{
150	uint32_t *digestp = arg;
151
152	*digestp = calculate_crc32c(*digestp, data, len);
153	return (0);
154}
155
156static uint32_t
157mbuf_crc32c(struct mbuf *m, u_int offset, u_int len)
158{
159	uint32_t digest = 0xffffffff;
160
161	m_apply(m, offset, len, mbuf_crc32c_helper, &digest);
162	digest = digest ^ 0xffffffff;
163
164	return (digest);
165}
166
167static uint32_t
168compute_digest(const void *buf, size_t len)
169{
170	return (calculate_crc32c(0xffffffff, buf, len) ^ 0xffffffff);
171}
172
173static struct nvmf_tcp_command_buffer *
174tcp_alloc_command_buffer(struct nvmf_tcp_qpair *qp,
175    const struct nvmf_io_request *io, uint32_t data_offset, size_t data_len,
176    uint16_t cid)
177{
178	struct nvmf_tcp_command_buffer *cb;
179
180	cb = malloc(sizeof(*cb), M_NVMF_TCP, M_WAITOK);
181	cb->qp = qp;
182	cb->io = *io;
183	cb->data_offset = data_offset;
184	cb->data_len = data_len;
185	cb->data_xfered = 0;
186	refcount_init(&cb->refs, 1);
187	cb->error = 0;
188	cb->cid = cid;
189	cb->ttag = 0;
190	cb->tc = NULL;
191
192	return (cb);
193}
194
195static void
196tcp_hold_command_buffer(struct nvmf_tcp_command_buffer *cb)
197{
198	refcount_acquire(&cb->refs);
199}
200
201static void
202tcp_free_command_buffer(struct nvmf_tcp_command_buffer *cb)
203{
204	nvmf_complete_io_request(&cb->io, cb->data_xfered, cb->error);
205	if (cb->tc != NULL)
206		tcp_release_capsule(cb->tc);
207	free(cb, M_NVMF_TCP);
208}
209
210static void
211tcp_release_command_buffer(struct nvmf_tcp_command_buffer *cb)
212{
213	if (refcount_release(&cb->refs))
214		tcp_free_command_buffer(cb);
215}
216
217static void
218tcp_add_command_buffer(struct nvmf_tcp_command_buffer_list *list,
219    struct nvmf_tcp_command_buffer *cb)
220{
221	mtx_assert(&list->lock, MA_OWNED);
222	TAILQ_INSERT_HEAD(&list->head, cb, link);
223}
224
225static struct nvmf_tcp_command_buffer *
226tcp_find_command_buffer(struct nvmf_tcp_command_buffer_list *list,
227    uint16_t cid, uint16_t ttag)
228{
229	struct nvmf_tcp_command_buffer *cb;
230
231	mtx_assert(&list->lock, MA_OWNED);
232	TAILQ_FOREACH(cb, &list->head, link) {
233		if (cb->cid == cid && cb->ttag == ttag)
234			return (cb);
235	}
236	return (NULL);
237}
238
239static void
240tcp_remove_command_buffer(struct nvmf_tcp_command_buffer_list *list,
241    struct nvmf_tcp_command_buffer *cb)
242{
243	mtx_assert(&list->lock, MA_OWNED);
244	TAILQ_REMOVE(&list->head, cb, link);
245}
246
247static void
248tcp_purge_command_buffer(struct nvmf_tcp_command_buffer_list *list,
249    uint16_t cid, uint16_t ttag)
250{
251	struct nvmf_tcp_command_buffer *cb;
252
253	mtx_lock(&list->lock);
254	cb = tcp_find_command_buffer(list, cid, ttag);
255	if (cb != NULL) {
256		tcp_remove_command_buffer(list, cb);
257		mtx_unlock(&list->lock);
258		tcp_release_command_buffer(cb);
259	} else
260		mtx_unlock(&list->lock);
261}
262
263static void
264nvmf_tcp_write_pdu(struct nvmf_tcp_qpair *qp, struct mbuf *m)
265{
266	struct socket *so = qp->so;
267
268	SOCKBUF_LOCK(&so->so_snd);
269	mbufq_enqueue(&qp->tx_pdus, m);
270	/* XXX: Do we need to handle sb_hiwat being wrong? */
271	if (sowriteable(so))
272		cv_signal(&qp->tx_cv);
273	SOCKBUF_UNLOCK(&so->so_snd);
274}
275
276static void
277nvmf_tcp_report_error(struct nvmf_tcp_qpair *qp, uint16_t fes, uint32_t fei,
278    struct mbuf *rx_pdu, u_int hlen)
279{
280	struct nvme_tcp_term_req_hdr *hdr;
281	struct mbuf *m;
282
283	if (hlen != 0) {
284		hlen = min(hlen, NVME_TCP_TERM_REQ_ERROR_DATA_MAX_SIZE);
285		hlen = min(hlen, m_length(rx_pdu, NULL));
286	}
287
288	m = m_get2(sizeof(*hdr) + hlen, M_WAITOK, MT_DATA, 0);
289	m->m_len = sizeof(*hdr) + hlen;
290	hdr = mtod(m, void *);
291	memset(hdr, 0, sizeof(*hdr));
292	hdr->common.pdu_type = qp->qp.nq_controller ?
293	    NVME_TCP_PDU_TYPE_C2H_TERM_REQ : NVME_TCP_PDU_TYPE_H2C_TERM_REQ;
294	hdr->common.hlen = sizeof(*hdr);
295	hdr->common.plen = sizeof(*hdr) + hlen;
296	hdr->fes = htole16(fes);
297	le32enc(hdr->fei, fei);
298	if (hlen != 0)
299		m_copydata(rx_pdu, 0, hlen, (caddr_t)(hdr + 1));
300
301	nvmf_tcp_write_pdu(qp, m);
302}
303
304static int
305nvmf_tcp_validate_pdu(struct nvmf_tcp_qpair *qp, struct nvmf_tcp_rxpdu *pdu)
306{
307	const struct nvme_tcp_common_pdu_hdr *ch;
308	struct mbuf *m = pdu->m;
309	uint32_t data_len, fei, plen;
310	uint32_t digest, rx_digest;
311	u_int hlen;
312	int error;
313	uint16_t fes;
314
315	/* Determine how large of a PDU header to return for errors. */
316	ch = pdu->hdr;
317	hlen = ch->hlen;
318	plen = le32toh(ch->plen);
319	if (hlen < sizeof(*ch) || hlen > plen)
320		hlen = sizeof(*ch);
321
322	error = nvmf_tcp_validate_pdu_header(ch, qp->qp.nq_controller,
323	    qp->header_digests, qp->data_digests, qp->rxpda, &data_len, &fes,
324	    &fei);
325	if (error != 0) {
326		if (error != ECONNRESET)
327			nvmf_tcp_report_error(qp, fes, fei, m, hlen);
328		return (error);
329	}
330
331	/* Check header digest if present. */
332	if ((ch->flags & NVME_TCP_CH_FLAGS_HDGSTF) != 0) {
333		digest = mbuf_crc32c(m, 0, ch->hlen);
334		m_copydata(m, ch->hlen, sizeof(rx_digest), (caddr_t)&rx_digest);
335		if (digest != rx_digest) {
336			printf("NVMe/TCP: Header digest mismatch\n");
337			nvmf_tcp_report_error(qp,
338			    NVME_TCP_TERM_REQ_FES_HDGST_ERROR, rx_digest, m,
339			    hlen);
340			return (EBADMSG);
341		}
342	}
343
344	/* Check data digest if present. */
345	pdu->data_digest_mismatch = false;
346	if ((ch->flags & NVME_TCP_CH_FLAGS_DDGSTF) != 0) {
347		digest = mbuf_crc32c(m, ch->pdo, data_len);
348		m_copydata(m, plen - sizeof(rx_digest), sizeof(rx_digest),
349		    (caddr_t)&rx_digest);
350		if (digest != rx_digest) {
351			printf("NVMe/TCP: Data digest mismatch\n");
352			pdu->data_digest_mismatch = true;
353		}
354	}
355
356	pdu->data_len = data_len;
357	return (0);
358}
359
360static void
361nvmf_tcp_free_pdu(struct nvmf_tcp_rxpdu *pdu)
362{
363	m_freem(pdu->m);
364	pdu->m = NULL;
365	pdu->hdr = NULL;
366}
367
368static int
369nvmf_tcp_handle_term_req(struct nvmf_tcp_rxpdu *pdu)
370{
371	const struct nvme_tcp_term_req_hdr *hdr;
372
373	hdr = (const void *)pdu->hdr;
374
375	printf("NVMe/TCP: Received termination request: fes %#x fei %#x\n",
376	    le16toh(hdr->fes), le32dec(hdr->fei));
377	nvmf_tcp_free_pdu(pdu);
378	return (ECONNRESET);
379}
380
381static int
382nvmf_tcp_save_command_capsule(struct nvmf_tcp_qpair *qp,
383    struct nvmf_tcp_rxpdu *pdu)
384{
385	const struct nvme_tcp_cmd *cmd;
386	struct nvmf_capsule *nc;
387	struct nvmf_tcp_capsule *tc;
388
389	cmd = (const void *)pdu->hdr;
390
391	nc = nvmf_allocate_command(&qp->qp, &cmd->ccsqe, M_WAITOK);
392
393	tc = TCAP(nc);
394	tc->rx_pdu = *pdu;
395
396	nvmf_capsule_received(&qp->qp, nc);
397	return (0);
398}
399
400static int
401nvmf_tcp_save_response_capsule(struct nvmf_tcp_qpair *qp,
402    struct nvmf_tcp_rxpdu *pdu)
403{
404	const struct nvme_tcp_rsp *rsp;
405	struct nvmf_capsule *nc;
406	struct nvmf_tcp_capsule *tc;
407
408	rsp = (const void *)pdu->hdr;
409
410	nc = nvmf_allocate_response(&qp->qp, &rsp->rccqe, M_WAITOK);
411
412	nc->nc_sqhd_valid = true;
413	tc = TCAP(nc);
414	tc->rx_pdu = *pdu;
415
416	/*
417	 * Once the CQE has been received, no further transfers to the
418	 * command buffer for the associated CID can occur.
419	 */
420	tcp_purge_command_buffer(&qp->rx_buffers, rsp->rccqe.cid, 0);
421	tcp_purge_command_buffer(&qp->tx_buffers, rsp->rccqe.cid, 0);
422
423	nvmf_capsule_received(&qp->qp, nc);
424	return (0);
425}
426
427/*
428 * Construct a PDU that contains an optional data payload.  This
429 * includes dealing with digests and the length fields in the common
430 * header.
431 */
432static struct mbuf *
433nvmf_tcp_construct_pdu(struct nvmf_tcp_qpair *qp, void *hdr, size_t hlen,
434    struct mbuf *data, uint32_t data_len)
435{
436	struct nvme_tcp_common_pdu_hdr *ch;
437	struct mbuf *top;
438	uint32_t digest, pad, pdo, plen, mlen;
439
440	plen = hlen;
441	if (qp->header_digests)
442		plen += sizeof(digest);
443	if (data_len != 0) {
444		KASSERT(m_length(data, NULL) == data_len, ("length mismatch"));
445		pdo = roundup2(plen, qp->txpda);
446		pad = pdo - plen;
447		plen = pdo + data_len;
448		if (qp->data_digests)
449			plen += sizeof(digest);
450		mlen = pdo;
451	} else {
452		KASSERT(data == NULL, ("payload mbuf with zero length"));
453		pdo = 0;
454		pad = 0;
455		mlen = plen;
456	}
457
458	top = m_get2(mlen, M_WAITOK, MT_DATA, 0);
459	top->m_len = mlen;
460	ch = mtod(top, void *);
461	memcpy(ch, hdr, hlen);
462	ch->hlen = hlen;
463	if (qp->header_digests)
464		ch->flags |= NVME_TCP_CH_FLAGS_HDGSTF;
465	if (qp->data_digests && data_len != 0)
466		ch->flags |= NVME_TCP_CH_FLAGS_DDGSTF;
467	ch->pdo = pdo;
468	ch->plen = htole32(plen);
469
470	/* HDGST */
471	if (qp->header_digests) {
472		digest = compute_digest(ch, hlen);
473		memcpy((char *)ch + hlen, &digest, sizeof(digest));
474	}
475
476	if (pad != 0) {
477		/* PAD */
478		memset((char *)ch + pdo - pad, 0, pad);
479	}
480
481	if (data_len != 0) {
482		/* DATA */
483		top->m_next = data;
484
485		/* DDGST */
486		if (qp->data_digests) {
487			digest = mbuf_crc32c(data, 0, data_len);
488
489			/* XXX: Can't use m_append as it uses M_NOWAIT. */
490			while (data->m_next != NULL)
491				data = data->m_next;
492
493			data->m_next = m_get(M_WAITOK, MT_DATA);
494			data->m_next->m_len = sizeof(digest);
495			memcpy(mtod(data->m_next, void *), &digest,
496			    sizeof(digest));
497		}
498	}
499
500	return (top);
501}
502
503/* Find the next command buffer eligible to schedule for R2T. */
504static struct nvmf_tcp_command_buffer *
505nvmf_tcp_next_r2t(struct nvmf_tcp_qpair *qp)
506{
507	struct nvmf_tcp_command_buffer *cb;
508
509	mtx_assert(&qp->rx_buffers.lock, MA_OWNED);
510	MPASS(qp->active_ttags < qp->num_ttags);
511
512	TAILQ_FOREACH(cb, &qp->rx_buffers.head, link) {
513		/* NB: maxr2t is 0's based. */
514		if (cb->tc->active_r2ts > qp->maxr2t)
515			continue;
516#ifdef INVARIANTS
517		cb->tc->pending_r2ts--;
518#endif
519		TAILQ_REMOVE(&qp->rx_buffers.head, cb, link);
520		return (cb);
521	}
522	return (NULL);
523}
524
525/* Allocate the next free transfer tag and assign it to cb. */
526static void
527nvmf_tcp_allocate_ttag(struct nvmf_tcp_qpair *qp,
528    struct nvmf_tcp_command_buffer *cb)
529{
530	uint16_t ttag;
531
532	mtx_assert(&qp->rx_buffers.lock, MA_OWNED);
533
534	ttag = qp->next_ttag;
535	for (;;) {
536		if (qp->open_ttags[ttag] == NULL)
537			break;
538		if (ttag == qp->num_ttags - 1)
539			ttag = 0;
540		else
541			ttag++;
542		MPASS(ttag != qp->next_ttag);
543	}
544	if (ttag == qp->num_ttags - 1)
545		qp->next_ttag = 0;
546	else
547		qp->next_ttag = ttag + 1;
548
549	cb->tc->active_r2ts++;
550	qp->active_ttags++;
551	qp->open_ttags[ttag] = cb;
552
553	/*
554	 * Don't bother byte-swapping ttag as it is just a cookie
555	 * value returned by the other end as-is.
556	 */
557	cb->ttag = ttag;
558}
559
560/* NB: cid and ttag are both little-endian already. */
561static void
562tcp_send_r2t(struct nvmf_tcp_qpair *qp, uint16_t cid, uint16_t ttag,
563    uint32_t data_offset, uint32_t data_len)
564{
565	struct nvme_tcp_r2t_hdr r2t;
566	struct mbuf *m;
567
568	memset(&r2t, 0, sizeof(r2t));
569	r2t.common.pdu_type = NVME_TCP_PDU_TYPE_R2T;
570	r2t.cccid = cid;
571	r2t.ttag = ttag;
572	r2t.r2to = htole32(data_offset);
573	r2t.r2tl = htole32(data_len);
574
575	m = nvmf_tcp_construct_pdu(qp, &r2t, sizeof(r2t), NULL, 0);
576	nvmf_tcp_write_pdu(qp, m);
577}
578
579/*
580 * Release a transfer tag and schedule another R2T.
581 *
582 * NB: This drops the rx_buffers.lock mutex.
583 */
584static void
585nvmf_tcp_send_next_r2t(struct nvmf_tcp_qpair *qp,
586    struct nvmf_tcp_command_buffer *cb)
587{
588	struct nvmf_tcp_command_buffer *ncb;
589
590	mtx_assert(&qp->rx_buffers.lock, MA_OWNED);
591	MPASS(qp->open_ttags[cb->ttag] == cb);
592
593	/* Release this transfer tag. */
594	qp->open_ttags[cb->ttag] = NULL;
595	qp->active_ttags--;
596	cb->tc->active_r2ts--;
597
598	/* Schedule another R2T. */
599	ncb = nvmf_tcp_next_r2t(qp);
600	if (ncb != NULL) {
601		nvmf_tcp_allocate_ttag(qp, ncb);
602		mtx_unlock(&qp->rx_buffers.lock);
603		tcp_send_r2t(qp, ncb->cid, ncb->ttag, ncb->data_offset,
604		    ncb->data_len);
605	} else
606		mtx_unlock(&qp->rx_buffers.lock);
607}
608
609/*
610 * Copy len bytes starting at offset skip from an mbuf chain into an
611 * I/O buffer at destination offset io_offset.
612 */
613static void
614mbuf_copyto_io(struct mbuf *m, u_int skip, u_int len,
615    struct nvmf_io_request *io, u_int io_offset)
616{
617	u_int todo;
618
619	while (m->m_len <= skip) {
620		skip -= m->m_len;
621		m = m->m_next;
622	}
623	while (len != 0) {
624		MPASS((m->m_flags & M_EXTPG) == 0);
625
626		todo = m->m_len - skip;
627		if (todo > len)
628			todo = len;
629
630		memdesc_copyback(&io->io_mem, io_offset, todo, mtodo(m, skip));
631		skip = 0;
632		io_offset += todo;
633		len -= todo;
634		m = m->m_next;
635	}
636}
637
638static int
639nvmf_tcp_handle_h2c_data(struct nvmf_tcp_qpair *qp, struct nvmf_tcp_rxpdu *pdu)
640{
641	const struct nvme_tcp_h2c_data_hdr *h2c;
642	struct nvmf_tcp_command_buffer *cb;
643	uint32_t data_len, data_offset;
644	uint16_t ttag;
645
646	h2c = (const void *)pdu->hdr;
647	if (le32toh(h2c->datal) > qp->maxh2cdata) {
648		nvmf_tcp_report_error(qp,
649		    NVME_TCP_TERM_REQ_FES_DATA_TRANSFER_LIMIT_EXCEEDED, 0,
650		    pdu->m, pdu->hdr->hlen);
651		nvmf_tcp_free_pdu(pdu);
652		return (EBADMSG);
653	}
654
655	/*
656	 * NB: Don't bother byte-swapping ttag as we don't byte-swap
657	 * it when sending.
658	 */
659	ttag = h2c->ttag;
660	if (ttag >= qp->num_ttags) {
661		nvmf_tcp_report_error(qp,
662		    NVME_TCP_TERM_REQ_FES_INVALID_HEADER_FIELD,
663		    offsetof(struct nvme_tcp_h2c_data_hdr, ttag), pdu->m,
664		    pdu->hdr->hlen);
665		nvmf_tcp_free_pdu(pdu);
666		return (EBADMSG);
667	}
668
669	mtx_lock(&qp->rx_buffers.lock);
670	cb = qp->open_ttags[ttag];
671	if (cb == NULL) {
672		mtx_unlock(&qp->rx_buffers.lock);
673		nvmf_tcp_report_error(qp,
674		    NVME_TCP_TERM_REQ_FES_INVALID_HEADER_FIELD,
675		    offsetof(struct nvme_tcp_h2c_data_hdr, ttag), pdu->m,
676		    pdu->hdr->hlen);
677		nvmf_tcp_free_pdu(pdu);
678		return (EBADMSG);
679	}
680	MPASS(cb->ttag == ttag);
681
682	/* For a data digest mismatch, fail the I/O request. */
683	if (pdu->data_digest_mismatch) {
684		nvmf_tcp_send_next_r2t(qp, cb);
685		cb->error = EINTEGRITY;
686		tcp_release_command_buffer(cb);
687		nvmf_tcp_free_pdu(pdu);
688		return (0);
689	}
690
691	data_len = le32toh(h2c->datal);
692	if (data_len != pdu->data_len) {
693		mtx_unlock(&qp->rx_buffers.lock);
694		nvmf_tcp_report_error(qp,
695		    NVME_TCP_TERM_REQ_FES_INVALID_HEADER_FIELD,
696		    offsetof(struct nvme_tcp_h2c_data_hdr, datal), pdu->m,
697		    pdu->hdr->hlen);
698		nvmf_tcp_free_pdu(pdu);
699		return (EBADMSG);
700	}
701
702	data_offset = le32toh(h2c->datao);
703	if (data_offset < cb->data_offset ||
704	    data_offset + data_len > cb->data_offset + cb->data_len) {
705		mtx_unlock(&qp->rx_buffers.lock);
706		nvmf_tcp_report_error(qp,
707		    NVME_TCP_TERM_REQ_FES_DATA_TRANSFER_OUT_OF_RANGE, 0, pdu->m,
708		    pdu->hdr->hlen);
709		nvmf_tcp_free_pdu(pdu);
710		return (EBADMSG);
711	}
712
713	if (data_offset != cb->data_offset + cb->data_xfered) {
714		mtx_unlock(&qp->rx_buffers.lock);
715		nvmf_tcp_report_error(qp,
716		    NVME_TCP_TERM_REQ_FES_PDU_SEQUENCE_ERROR, 0, pdu->m,
717		    pdu->hdr->hlen);
718		nvmf_tcp_free_pdu(pdu);
719		return (EBADMSG);
720	}
721
722	if ((cb->data_xfered + data_len == cb->data_len) !=
723	    ((pdu->hdr->flags & NVME_TCP_H2C_DATA_FLAGS_LAST_PDU) != 0)) {
724		mtx_unlock(&qp->rx_buffers.lock);
725		nvmf_tcp_report_error(qp,
726		    NVME_TCP_TERM_REQ_FES_PDU_SEQUENCE_ERROR, 0, pdu->m,
727		    pdu->hdr->hlen);
728		nvmf_tcp_free_pdu(pdu);
729		return (EBADMSG);
730	}
731
732	cb->data_xfered += data_len;
733	data_offset -= cb->data_offset;
734	if (cb->data_xfered == cb->data_len) {
735		nvmf_tcp_send_next_r2t(qp, cb);
736	} else {
737		tcp_hold_command_buffer(cb);
738		mtx_unlock(&qp->rx_buffers.lock);
739	}
740
741	mbuf_copyto_io(pdu->m, pdu->hdr->pdo, data_len, &cb->io, data_offset);
742
743	tcp_release_command_buffer(cb);
744	nvmf_tcp_free_pdu(pdu);
745	return (0);
746}
747
748static int
749nvmf_tcp_handle_c2h_data(struct nvmf_tcp_qpair *qp, struct nvmf_tcp_rxpdu *pdu)
750{
751	const struct nvme_tcp_c2h_data_hdr *c2h;
752	struct nvmf_tcp_command_buffer *cb;
753	uint32_t data_len, data_offset;
754
755	c2h = (const void *)pdu->hdr;
756
757	mtx_lock(&qp->rx_buffers.lock);
758	cb = tcp_find_command_buffer(&qp->rx_buffers, c2h->cccid, 0);
759	if (cb == NULL) {
760		mtx_unlock(&qp->rx_buffers.lock);
761		/*
762		 * XXX: Could be PDU sequence error if cccid is for a
763		 * command that doesn't use a command buffer.
764		 */
765		nvmf_tcp_report_error(qp,
766		    NVME_TCP_TERM_REQ_FES_INVALID_HEADER_FIELD,
767		    offsetof(struct nvme_tcp_c2h_data_hdr, cccid), pdu->m,
768		    pdu->hdr->hlen);
769		nvmf_tcp_free_pdu(pdu);
770		return (EBADMSG);
771	}
772
773	/* For a data digest mismatch, fail the I/O request. */
774	if (pdu->data_digest_mismatch) {
775		cb->error = EINTEGRITY;
776		tcp_remove_command_buffer(&qp->rx_buffers, cb);
777		mtx_unlock(&qp->rx_buffers.lock);
778		tcp_release_command_buffer(cb);
779		nvmf_tcp_free_pdu(pdu);
780		return (0);
781	}
782
783	data_len = le32toh(c2h->datal);
784	if (data_len != pdu->data_len) {
785		mtx_unlock(&qp->rx_buffers.lock);
786		nvmf_tcp_report_error(qp,
787		    NVME_TCP_TERM_REQ_FES_INVALID_HEADER_FIELD,
788		    offsetof(struct nvme_tcp_c2h_data_hdr, datal), pdu->m,
789		    pdu->hdr->hlen);
790		nvmf_tcp_free_pdu(pdu);
791		return (EBADMSG);
792	}
793
794	data_offset = le32toh(c2h->datao);
795	if (data_offset < cb->data_offset ||
796	    data_offset + data_len > cb->data_offset + cb->data_len) {
797		mtx_unlock(&qp->rx_buffers.lock);
798		nvmf_tcp_report_error(qp,
799		    NVME_TCP_TERM_REQ_FES_DATA_TRANSFER_OUT_OF_RANGE, 0,
800		    pdu->m, pdu->hdr->hlen);
801		nvmf_tcp_free_pdu(pdu);
802		return (EBADMSG);
803	}
804
805	if (data_offset != cb->data_offset + cb->data_xfered) {
806		mtx_unlock(&qp->rx_buffers.lock);
807		nvmf_tcp_report_error(qp,
808		    NVME_TCP_TERM_REQ_FES_PDU_SEQUENCE_ERROR, 0, pdu->m,
809		    pdu->hdr->hlen);
810		nvmf_tcp_free_pdu(pdu);
811		return (EBADMSG);
812	}
813
814	if ((cb->data_xfered + data_len == cb->data_len) !=
815	    ((pdu->hdr->flags & NVME_TCP_C2H_DATA_FLAGS_LAST_PDU) != 0)) {
816		mtx_unlock(&qp->rx_buffers.lock);
817		nvmf_tcp_report_error(qp,
818		    NVME_TCP_TERM_REQ_FES_PDU_SEQUENCE_ERROR, 0, pdu->m,
819		    pdu->hdr->hlen);
820		nvmf_tcp_free_pdu(pdu);
821		return (EBADMSG);
822	}
823
824	cb->data_xfered += data_len;
825	data_offset -= cb->data_offset;
826	if (cb->data_xfered == cb->data_len)
827		tcp_remove_command_buffer(&qp->rx_buffers, cb);
828	else
829		tcp_hold_command_buffer(cb);
830	mtx_unlock(&qp->rx_buffers.lock);
831
832	mbuf_copyto_io(pdu->m, pdu->hdr->pdo, data_len, &cb->io, data_offset);
833
834	tcp_release_command_buffer(cb);
835
836	if ((pdu->hdr->flags & NVME_TCP_C2H_DATA_FLAGS_SUCCESS) != 0) {
837		struct nvme_completion cqe;
838		struct nvmf_capsule *nc;
839
840		memset(&cqe, 0, sizeof(cqe));
841		cqe.cid = c2h->cccid;
842
843		nc = nvmf_allocate_response(&qp->qp, &cqe, M_WAITOK);
844		nc->nc_sqhd_valid = false;
845
846		nvmf_capsule_received(&qp->qp, nc);
847	}
848
849	nvmf_tcp_free_pdu(pdu);
850	return (0);
851}
852
853/* Called when m_free drops refcount to 0. */
854static void
855nvmf_tcp_mbuf_done(struct mbuf *m)
856{
857	struct nvmf_tcp_command_buffer *cb = m->m_ext.ext_arg1;
858
859	tcp_free_command_buffer(cb);
860}
861
862static struct mbuf *
863nvmf_tcp_mbuf(void *arg, int how, void *data, size_t len)
864{
865	struct nvmf_tcp_command_buffer *cb = arg;
866	struct mbuf *m;
867
868	m = m_get(how, MT_DATA);
869	m->m_flags |= M_RDONLY;
870	m_extaddref(m, data, len, &cb->refs, nvmf_tcp_mbuf_done, cb, NULL);
871	m->m_len = len;
872	return (m);
873}
874
875static void
876nvmf_tcp_free_mext_pg(struct mbuf *m)
877{
878	struct nvmf_tcp_command_buffer *cb = m->m_ext.ext_arg1;
879
880	M_ASSERTEXTPG(m);
881	tcp_release_command_buffer(cb);
882}
883
884static struct mbuf *
885nvmf_tcp_mext_pg(void *arg, int how)
886{
887	struct nvmf_tcp_command_buffer *cb = arg;
888	struct mbuf *m;
889
890	m = mb_alloc_ext_pgs(how, nvmf_tcp_free_mext_pg);
891	m->m_ext.ext_arg1 = cb;
892	tcp_hold_command_buffer(cb);
893	return (m);
894}
895
896/*
897 * Return an mbuf chain for a range of data belonging to a command
898 * buffer.
899 *
900 * The mbuf chain uses M_EXT mbufs which hold references on the
901 * command buffer so that it remains "alive" until the data has been
902 * fully transmitted.  If truncate_ok is true, then the mbuf chain
903 * might return a short chain to avoid gratuitously splitting up a
904 * page.
905 */
906static struct mbuf *
907nvmf_tcp_command_buffer_mbuf(struct nvmf_tcp_command_buffer *cb,
908    uint32_t data_offset, uint32_t data_len, uint32_t *actual_len,
909    bool can_truncate)
910{
911	struct mbuf *m;
912	size_t len;
913
914	m = memdesc_alloc_ext_mbufs(&cb->io.io_mem, nvmf_tcp_mbuf,
915	    nvmf_tcp_mext_pg, cb, M_WAITOK, data_offset, data_len, &len,
916	    can_truncate);
917	if (actual_len != NULL)
918		*actual_len = len;
919	return (m);
920}
921
922/* NB: cid and ttag and little-endian already. */
923static void
924tcp_send_h2c_pdu(struct nvmf_tcp_qpair *qp, uint16_t cid, uint16_t ttag,
925    uint32_t data_offset, struct mbuf *m, size_t len, bool last_pdu)
926{
927	struct nvme_tcp_h2c_data_hdr h2c;
928	struct mbuf *top;
929
930	memset(&h2c, 0, sizeof(h2c));
931	h2c.common.pdu_type = NVME_TCP_PDU_TYPE_H2C_DATA;
932	if (last_pdu)
933		h2c.common.flags |= NVME_TCP_H2C_DATA_FLAGS_LAST_PDU;
934	h2c.cccid = cid;
935	h2c.ttag = ttag;
936	h2c.datao = htole32(data_offset);
937	h2c.datal = htole32(len);
938
939	top = nvmf_tcp_construct_pdu(qp, &h2c, sizeof(h2c), m, len);
940	nvmf_tcp_write_pdu(qp, top);
941}
942
943static int
944nvmf_tcp_handle_r2t(struct nvmf_tcp_qpair *qp, struct nvmf_tcp_rxpdu *pdu)
945{
946	const struct nvme_tcp_r2t_hdr *r2t;
947	struct nvmf_tcp_command_buffer *cb;
948	uint32_t data_len, data_offset;
949
950	r2t = (const void *)pdu->hdr;
951
952	mtx_lock(&qp->tx_buffers.lock);
953	cb = tcp_find_command_buffer(&qp->tx_buffers, r2t->cccid, 0);
954	if (cb == NULL) {
955		mtx_unlock(&qp->tx_buffers.lock);
956		nvmf_tcp_report_error(qp,
957		    NVME_TCP_TERM_REQ_FES_INVALID_HEADER_FIELD,
958		    offsetof(struct nvme_tcp_r2t_hdr, cccid), pdu->m,
959		    pdu->hdr->hlen);
960		nvmf_tcp_free_pdu(pdu);
961		return (EBADMSG);
962	}
963
964	data_offset = le32toh(r2t->r2to);
965	if (data_offset != cb->data_xfered) {
966		mtx_unlock(&qp->tx_buffers.lock);
967		nvmf_tcp_report_error(qp,
968		    NVME_TCP_TERM_REQ_FES_PDU_SEQUENCE_ERROR, 0, pdu->m,
969		    pdu->hdr->hlen);
970		nvmf_tcp_free_pdu(pdu);
971		return (EBADMSG);
972	}
973
974	/*
975	 * XXX: The spec does not specify how to handle R2T tranfers
976	 * out of range of the original command.
977	 */
978	data_len = le32toh(r2t->r2tl);
979	if (data_offset + data_len > cb->data_len) {
980		mtx_unlock(&qp->tx_buffers.lock);
981		nvmf_tcp_report_error(qp,
982		    NVME_TCP_TERM_REQ_FES_DATA_TRANSFER_OUT_OF_RANGE, 0,
983		    pdu->m, pdu->hdr->hlen);
984		nvmf_tcp_free_pdu(pdu);
985		return (EBADMSG);
986	}
987
988	cb->data_xfered += data_len;
989	if (cb->data_xfered == cb->data_len)
990		tcp_remove_command_buffer(&qp->tx_buffers, cb);
991	else
992		tcp_hold_command_buffer(cb);
993	mtx_unlock(&qp->tx_buffers.lock);
994
995	/*
996	 * Queue one or more H2C_DATA PDUs containing the requested
997	 * data.
998	 */
999	while (data_len > 0) {
1000		struct mbuf *m;
1001		uint32_t sent, todo;
1002
1003		todo = data_len;
1004		if (todo > qp->max_tx_data)
1005			todo = qp->max_tx_data;
1006		m = nvmf_tcp_command_buffer_mbuf(cb, data_offset, todo, &sent,
1007		    todo < data_len);
1008		tcp_send_h2c_pdu(qp, r2t->cccid, r2t->ttag, data_offset, m,
1009		    sent, sent == data_len);
1010
1011		data_offset += sent;
1012		data_len -= sent;
1013	}
1014
1015	tcp_release_command_buffer(cb);
1016	nvmf_tcp_free_pdu(pdu);
1017	return (0);
1018}
1019
1020/*
1021 * A variant of m_pullup that uses M_WAITOK instead of failing.  It
1022 * also doesn't do anything if enough bytes are already present in the
1023 * first mbuf.
1024 */
1025static struct mbuf *
1026pullup_pdu_hdr(struct mbuf *m, int len)
1027{
1028	struct mbuf *n, *p;
1029
1030	KASSERT(len <= MCLBYTES, ("%s: len too large", __func__));
1031	if (m->m_len >= len)
1032		return (m);
1033
1034	n = m_get2(len, M_WAITOK, MT_DATA, 0);
1035	n->m_len = len;
1036	m_copydata(m, 0, len, mtod(n, void *));
1037
1038	while (m != NULL && m->m_len <= len) {
1039		p = m->m_next;
1040		len -= m->m_len;
1041		m_free(m);
1042		m = p;
1043	}
1044	if (len > 0) {
1045		m->m_data += len;
1046		m->m_len -= len;
1047	}
1048	n->m_next = m;
1049	return (n);
1050}
1051
1052static int
1053nvmf_tcp_dispatch_pdu(struct nvmf_tcp_qpair *qp,
1054    const struct nvme_tcp_common_pdu_hdr *ch, struct nvmf_tcp_rxpdu *pdu)
1055{
1056	/* Ensure the PDU header is contiguous. */
1057	pdu->m = pullup_pdu_hdr(pdu->m, ch->hlen);
1058	pdu->hdr = mtod(pdu->m, const void *);
1059
1060	switch (ch->pdu_type) {
1061	default:
1062		__assert_unreachable();
1063		break;
1064	case NVME_TCP_PDU_TYPE_H2C_TERM_REQ:
1065	case NVME_TCP_PDU_TYPE_C2H_TERM_REQ:
1066		return (nvmf_tcp_handle_term_req(pdu));
1067	case NVME_TCP_PDU_TYPE_CAPSULE_CMD:
1068		return (nvmf_tcp_save_command_capsule(qp, pdu));
1069	case NVME_TCP_PDU_TYPE_CAPSULE_RESP:
1070		return (nvmf_tcp_save_response_capsule(qp, pdu));
1071	case NVME_TCP_PDU_TYPE_H2C_DATA:
1072		return (nvmf_tcp_handle_h2c_data(qp, pdu));
1073	case NVME_TCP_PDU_TYPE_C2H_DATA:
1074		return (nvmf_tcp_handle_c2h_data(qp, pdu));
1075	case NVME_TCP_PDU_TYPE_R2T:
1076		return (nvmf_tcp_handle_r2t(qp, pdu));
1077	}
1078}
1079
1080static void
1081nvmf_tcp_receive(void *arg)
1082{
1083	struct nvmf_tcp_qpair *qp = arg;
1084	struct socket *so = qp->so;
1085	struct nvmf_tcp_rxpdu pdu;
1086	struct nvme_tcp_common_pdu_hdr ch;
1087	struct uio uio;
1088	struct iovec iov[1];
1089	struct mbuf *m, *n, *tail;
1090	u_int avail, needed;
1091	int error, flags, terror;
1092	bool have_header;
1093
1094	m = tail = NULL;
1095	have_header = false;
1096	SOCKBUF_LOCK(&so->so_rcv);
1097	while (!qp->rx_shutdown) {
1098		/* Wait until there is enough data for the next step. */
1099		if (so->so_error != 0 || so->so_rerror != 0) {
1100			if (so->so_error != 0)
1101				error = so->so_error;
1102			else
1103				error = so->so_rerror;
1104			SOCKBUF_UNLOCK(&so->so_rcv);
1105		error:
1106			m_freem(m);
1107			nvmf_qpair_error(&qp->qp, error);
1108			SOCKBUF_LOCK(&so->so_rcv);
1109			while (!qp->rx_shutdown)
1110				cv_wait(&qp->rx_cv, SOCKBUF_MTX(&so->so_rcv));
1111			break;
1112		}
1113		avail = sbavail(&so->so_rcv);
1114		if ((so->so_rcv.sb_state & SBS_CANTRCVMORE) != 0) {
1115			if (!have_header && avail == 0)
1116				error = 0;
1117			else
1118				error = ECONNRESET;
1119			SOCKBUF_UNLOCK(&so->so_rcv);
1120			goto error;
1121		}
1122		if (avail == 0 || (!have_header && avail < sizeof(ch))) {
1123			cv_wait(&qp->rx_cv, SOCKBUF_MTX(&so->so_rcv));
1124			continue;
1125		}
1126		SOCKBUF_UNLOCK(&so->so_rcv);
1127
1128		if (!have_header) {
1129			KASSERT(m == NULL, ("%s: m != NULL but no header",
1130			    __func__));
1131			memset(&uio, 0, sizeof(uio));
1132			iov[0].iov_base = &ch;
1133			iov[0].iov_len = sizeof(ch);
1134			uio.uio_iov = iov;
1135			uio.uio_iovcnt = 1;
1136			uio.uio_resid = sizeof(ch);
1137			uio.uio_segflg = UIO_SYSSPACE;
1138			uio.uio_rw = UIO_READ;
1139			flags = MSG_DONTWAIT | MSG_PEEK;
1140
1141			error = soreceive(so, NULL, &uio, NULL, NULL, &flags);
1142			if (error != 0)
1143				goto error;
1144			KASSERT(uio.uio_resid == 0, ("%s: short CH read",
1145			    __func__));
1146
1147			have_header = true;
1148			needed = le32toh(ch.plen);
1149
1150			/*
1151			 * Malformed PDUs will be reported as errors
1152			 * by nvmf_tcp_validate_pdu.  Just pass along
1153			 * garbage headers if the lengths mismatch.
1154			 */
1155			if (needed < sizeof(ch) || ch.hlen > needed)
1156				needed = sizeof(ch);
1157
1158			memset(&uio, 0, sizeof(uio));
1159			uio.uio_resid = needed;
1160		}
1161
1162		flags = MSG_DONTWAIT;
1163		error = soreceive(so, NULL, &uio, &n, NULL, &flags);
1164		if (error != 0)
1165			goto error;
1166
1167		if (m == NULL)
1168			m = n;
1169		else
1170			tail->m_next = n;
1171
1172		if (uio.uio_resid != 0) {
1173			tail = n;
1174			while (tail->m_next != NULL)
1175				tail = tail->m_next;
1176
1177			SOCKBUF_LOCK(&so->so_rcv);
1178			continue;
1179		}
1180#ifdef INVARIANTS
1181		tail = NULL;
1182#endif
1183
1184		pdu.m = m;
1185		m = NULL;
1186		pdu.hdr = &ch;
1187		error = nvmf_tcp_validate_pdu(qp, &pdu);
1188		if (error != 0)
1189			m_freem(pdu.m);
1190		else
1191			error = nvmf_tcp_dispatch_pdu(qp, &ch, &pdu);
1192		if (error != 0) {
1193			/*
1194			 * If we received a termination request, close
1195			 * the connection immediately.
1196			 */
1197			if (error == ECONNRESET)
1198				goto error;
1199
1200			/*
1201			 * Wait for up to 30 seconds for the socket to
1202			 * be closed by the other end.
1203			 */
1204			SOCKBUF_LOCK(&so->so_rcv);
1205			if ((so->so_rcv.sb_state & SBS_CANTRCVMORE) == 0) {
1206				terror = cv_timedwait(&qp->rx_cv,
1207				    SOCKBUF_MTX(&so->so_rcv), 30 * hz);
1208				if (terror == ETIMEDOUT)
1209					printf("NVMe/TCP: Timed out after sending terminate request\n");
1210			}
1211			SOCKBUF_UNLOCK(&so->so_rcv);
1212			goto error;
1213		}
1214
1215		have_header = false;
1216		SOCKBUF_LOCK(&so->so_rcv);
1217	}
1218	SOCKBUF_UNLOCK(&so->so_rcv);
1219	kthread_exit();
1220}
1221
1222static struct mbuf *
1223tcp_command_pdu(struct nvmf_tcp_qpair *qp, struct nvmf_tcp_capsule *tc)
1224{
1225	struct nvmf_capsule *nc = &tc->nc;
1226	struct nvmf_tcp_command_buffer *cb;
1227	struct nvme_sgl_descriptor *sgl;
1228	struct nvme_tcp_cmd cmd;
1229	struct mbuf *top, *m;
1230	bool use_icd;
1231
1232	use_icd = false;
1233	cb = NULL;
1234	m = NULL;
1235
1236	if (nc->nc_data.io_len != 0) {
1237		cb = tcp_alloc_command_buffer(qp, &nc->nc_data, 0,
1238		    nc->nc_data.io_len, nc->nc_sqe.cid);
1239
1240		if (nc->nc_send_data && nc->nc_data.io_len <= qp->max_icd) {
1241			use_icd = true;
1242			m = nvmf_tcp_command_buffer_mbuf(cb, 0,
1243			    nc->nc_data.io_len, NULL, false);
1244			cb->data_xfered = nc->nc_data.io_len;
1245			tcp_release_command_buffer(cb);
1246		} else if (nc->nc_send_data) {
1247			mtx_lock(&qp->tx_buffers.lock);
1248			tcp_add_command_buffer(&qp->tx_buffers, cb);
1249			mtx_unlock(&qp->tx_buffers.lock);
1250		} else {
1251			mtx_lock(&qp->rx_buffers.lock);
1252			tcp_add_command_buffer(&qp->rx_buffers, cb);
1253			mtx_unlock(&qp->rx_buffers.lock);
1254		}
1255	}
1256
1257	memset(&cmd, 0, sizeof(cmd));
1258	cmd.common.pdu_type = NVME_TCP_PDU_TYPE_CAPSULE_CMD;
1259	cmd.ccsqe = nc->nc_sqe;
1260
1261	/* Populate SGL in SQE. */
1262	sgl = &cmd.ccsqe.sgl;
1263	memset(sgl, 0, sizeof(*sgl));
1264	sgl->address = 0;
1265	sgl->length = htole32(nc->nc_data.io_len);
1266	if (use_icd) {
1267		/* Use in-capsule data. */
1268		sgl->type = NVME_SGL_TYPE_ICD;
1269	} else {
1270		/* Use a command buffer. */
1271		sgl->type = NVME_SGL_TYPE_COMMAND_BUFFER;
1272	}
1273
1274	top = nvmf_tcp_construct_pdu(qp, &cmd, sizeof(cmd), m, m != NULL ?
1275	    nc->nc_data.io_len : 0);
1276	return (top);
1277}
1278
1279static struct mbuf *
1280tcp_response_pdu(struct nvmf_tcp_qpair *qp, struct nvmf_tcp_capsule *tc)
1281{
1282	struct nvmf_capsule *nc = &tc->nc;
1283	struct nvme_tcp_rsp rsp;
1284
1285	memset(&rsp, 0, sizeof(rsp));
1286	rsp.common.pdu_type = NVME_TCP_PDU_TYPE_CAPSULE_RESP;
1287	rsp.rccqe = nc->nc_cqe;
1288
1289	return (nvmf_tcp_construct_pdu(qp, &rsp, sizeof(rsp), NULL, 0));
1290}
1291
1292static struct mbuf *
1293capsule_to_pdu(struct nvmf_tcp_qpair *qp, struct nvmf_tcp_capsule *tc)
1294{
1295	if (tc->nc.nc_qe_len == sizeof(struct nvme_command))
1296		return (tcp_command_pdu(qp, tc));
1297	else
1298		return (tcp_response_pdu(qp, tc));
1299}
1300
1301static void
1302nvmf_tcp_send(void *arg)
1303{
1304	struct nvmf_tcp_qpair *qp = arg;
1305	struct nvmf_tcp_capsule *tc;
1306	struct socket *so = qp->so;
1307	struct mbuf *m, *n, *p;
1308	u_long space, tosend;
1309	int error;
1310
1311	m = NULL;
1312	SOCKBUF_LOCK(&so->so_snd);
1313	while (!qp->tx_shutdown) {
1314		if (so->so_error != 0) {
1315			error = so->so_error;
1316			SOCKBUF_UNLOCK(&so->so_snd);
1317		error:
1318			m_freem(m);
1319			nvmf_qpair_error(&qp->qp, error);
1320			SOCKBUF_LOCK(&so->so_snd);
1321			while (!qp->tx_shutdown)
1322				cv_wait(&qp->tx_cv, SOCKBUF_MTX(&so->so_snd));
1323			break;
1324		}
1325
1326		if (m == NULL) {
1327			/* Next PDU to send. */
1328			m = mbufq_dequeue(&qp->tx_pdus);
1329		}
1330		if (m == NULL) {
1331			if (STAILQ_EMPTY(&qp->tx_capsules)) {
1332				cv_wait(&qp->tx_cv, SOCKBUF_MTX(&so->so_snd));
1333				continue;
1334			}
1335
1336			/* Convert a capsule into a PDU. */
1337			tc = STAILQ_FIRST(&qp->tx_capsules);
1338			STAILQ_REMOVE_HEAD(&qp->tx_capsules, link);
1339			SOCKBUF_UNLOCK(&so->so_snd);
1340
1341			n = capsule_to_pdu(qp, tc);
1342			tcp_release_capsule(tc);
1343
1344			SOCKBUF_LOCK(&so->so_snd);
1345			mbufq_enqueue(&qp->tx_pdus, n);
1346			continue;
1347		}
1348
1349		/*
1350		 * Wait until there is enough room to send some data.
1351		 * If the socket buffer is empty, always send at least
1352		 * something.
1353		 */
1354		space = sbspace(&so->so_snd);
1355		if (space < m->m_len && sbused(&so->so_snd) != 0) {
1356			cv_wait(&qp->tx_cv, SOCKBUF_MTX(&so->so_snd));
1357			continue;
1358		}
1359		SOCKBUF_UNLOCK(&so->so_snd);
1360
1361		/*
1362		 * If 'm' is too big, then the socket buffer must be
1363		 * empty.  Split 'm' to make at least some forward
1364		 * progress.
1365		 *
1366		 * Otherwise, chain up as many pending mbufs from 'm'
1367		 * that will fit.
1368		 */
1369		if (m->m_len > space) {
1370			n = m_split(m, space, M_WAITOK);
1371		} else {
1372			tosend = m->m_len;
1373			n = m->m_next;
1374			p = m;
1375			while (n != NULL && tosend + n->m_len <= space) {
1376				tosend += n->m_len;
1377				p = n;
1378				n = n->m_next;
1379			}
1380			KASSERT(p->m_next == n, ("%s: p not before n",
1381			    __func__));
1382			p->m_next = NULL;
1383
1384			KASSERT(m_length(m, NULL) == tosend,
1385			    ("%s: length mismatch", __func__));
1386		}
1387		error = sosend(so, NULL, NULL, m, NULL, MSG_DONTWAIT, NULL);
1388		if (error != 0) {
1389			m = NULL;
1390			m_freem(n);
1391			goto error;
1392		}
1393		m = n;
1394		SOCKBUF_LOCK(&so->so_snd);
1395	}
1396	SOCKBUF_UNLOCK(&so->so_snd);
1397	kthread_exit();
1398}
1399
1400static int
1401nvmf_soupcall_receive(struct socket *so, void *arg, int waitflag)
1402{
1403	struct nvmf_tcp_qpair *qp = arg;
1404
1405	if (soreadable(so))
1406		cv_signal(&qp->rx_cv);
1407	return (SU_OK);
1408}
1409
1410static int
1411nvmf_soupcall_send(struct socket *so, void *arg, int waitflag)
1412{
1413	struct nvmf_tcp_qpair *qp = arg;
1414
1415	if (sowriteable(so))
1416		cv_signal(&qp->tx_cv);
1417	return (SU_OK);
1418}
1419
1420static struct nvmf_qpair *
1421tcp_allocate_qpair(bool controller,
1422    const struct nvmf_handoff_qpair_params *params)
1423{
1424	struct nvmf_tcp_qpair *qp;
1425	struct socket *so;
1426	struct file *fp;
1427	cap_rights_t rights;
1428	int error;
1429
1430	error = fget(curthread, params->tcp.fd, cap_rights_init_one(&rights,
1431	    CAP_SOCK_CLIENT), &fp);
1432	if (error != 0)
1433		return (NULL);
1434	if (fp->f_type != DTYPE_SOCKET) {
1435		fdrop(fp, curthread);
1436		return (NULL);
1437	}
1438	so = fp->f_data;
1439	if (so->so_type != SOCK_STREAM ||
1440	    so->so_proto->pr_protocol != IPPROTO_TCP) {
1441		fdrop(fp, curthread);
1442		return (NULL);
1443	}
1444
1445	/* Claim socket from file descriptor. */
1446	fp->f_ops = &badfileops;
1447	fp->f_data = NULL;
1448	fdrop(fp, curthread);
1449
1450	qp = malloc(sizeof(*qp), M_NVMF_TCP, M_WAITOK | M_ZERO);
1451	qp->so = so;
1452	refcount_init(&qp->refs, 1);
1453	qp->txpda = params->tcp.txpda;
1454	qp->rxpda = params->tcp.rxpda;
1455	qp->header_digests = params->tcp.header_digests;
1456	qp->data_digests = params->tcp.data_digests;
1457	qp->maxr2t = params->tcp.maxr2t;
1458	qp->maxh2cdata = params->tcp.maxh2cdata;
1459	qp->max_tx_data = tcp_max_transmit_data;
1460	if (!controller) {
1461		if (qp->max_tx_data > params->tcp.maxh2cdata)
1462			qp->max_tx_data = params->tcp.maxh2cdata;
1463	}
1464	qp->max_icd = params->tcp.max_icd;
1465
1466	if (controller) {
1467		/* Use the SUCCESS flag if SQ flow control is disabled. */
1468		qp->send_success = !params->sq_flow_control;
1469
1470		/* NB: maxr2t is 0's based. */
1471		qp->num_ttags = MIN((u_int)UINT16_MAX + 1,
1472		    (uint64_t)params->qsize * (uint64_t)qp->maxr2t + 1);
1473		qp->open_ttags = mallocarray(qp->num_ttags,
1474		    sizeof(*qp->open_ttags), M_NVMF_TCP, M_WAITOK | M_ZERO);
1475	}
1476
1477	TAILQ_INIT(&qp->rx_buffers.head);
1478	TAILQ_INIT(&qp->tx_buffers.head);
1479	mtx_init(&qp->rx_buffers.lock, "nvmf/tcp rx buffers", NULL, MTX_DEF);
1480	mtx_init(&qp->tx_buffers.lock, "nvmf/tcp tx buffers", NULL, MTX_DEF);
1481
1482	cv_init(&qp->rx_cv, "-");
1483	cv_init(&qp->tx_cv, "-");
1484	mbufq_init(&qp->tx_pdus, 0);
1485	STAILQ_INIT(&qp->tx_capsules);
1486
1487	/* Register socket upcalls. */
1488	SOCKBUF_LOCK(&so->so_rcv);
1489	soupcall_set(so, SO_RCV, nvmf_soupcall_receive, qp);
1490	SOCKBUF_UNLOCK(&so->so_rcv);
1491	SOCKBUF_LOCK(&so->so_snd);
1492	soupcall_set(so, SO_SND, nvmf_soupcall_send, qp);
1493	SOCKBUF_UNLOCK(&so->so_snd);
1494
1495	/* Spin up kthreads. */
1496	error = kthread_add(nvmf_tcp_receive, qp, NULL, &qp->rx_thread, 0, 0,
1497	    "nvmef tcp rx");
1498	if (error != 0) {
1499		tcp_free_qpair(&qp->qp);
1500		return (NULL);
1501	}
1502	error = kthread_add(nvmf_tcp_send, qp, NULL, &qp->tx_thread, 0, 0,
1503	    "nvmef tcp tx");
1504	if (error != 0) {
1505		tcp_free_qpair(&qp->qp);
1506		return (NULL);
1507	}
1508
1509	return (&qp->qp);
1510}
1511
1512static void
1513tcp_release_qpair(struct nvmf_tcp_qpair *qp)
1514{
1515	if (refcount_release(&qp->refs))
1516		free(qp, M_NVMF_TCP);
1517}
1518
1519static void
1520tcp_free_qpair(struct nvmf_qpair *nq)
1521{
1522	struct nvmf_tcp_qpair *qp = TQP(nq);
1523	struct nvmf_tcp_command_buffer *ncb, *cb;
1524	struct nvmf_tcp_capsule *ntc, *tc;
1525	struct socket *so = qp->so;
1526
1527	/* Shut down kthreads and clear upcalls */
1528	SOCKBUF_LOCK(&so->so_snd);
1529	qp->tx_shutdown = true;
1530	if (qp->tx_thread != NULL) {
1531		cv_signal(&qp->tx_cv);
1532		mtx_sleep(qp->tx_thread, SOCKBUF_MTX(&so->so_snd), 0,
1533		    "nvtcptx", 0);
1534	}
1535	soupcall_clear(so, SO_SND);
1536	SOCKBUF_UNLOCK(&so->so_snd);
1537
1538	SOCKBUF_LOCK(&so->so_rcv);
1539	qp->rx_shutdown = true;
1540	if (qp->rx_thread != NULL) {
1541		cv_signal(&qp->rx_cv);
1542		mtx_sleep(qp->rx_thread, SOCKBUF_MTX(&so->so_rcv), 0,
1543		    "nvtcprx", 0);
1544	}
1545	soupcall_clear(so, SO_RCV);
1546	SOCKBUF_UNLOCK(&so->so_rcv);
1547
1548	STAILQ_FOREACH_SAFE(tc, &qp->tx_capsules, link, ntc) {
1549		nvmf_abort_capsule_data(&tc->nc, ECONNABORTED);
1550		tcp_release_capsule(tc);
1551	}
1552	mbufq_drain(&qp->tx_pdus);
1553
1554	cv_destroy(&qp->tx_cv);
1555	cv_destroy(&qp->rx_cv);
1556
1557	if (qp->open_ttags != NULL) {
1558		for (u_int i = 0; i < qp->num_ttags; i++) {
1559			cb = qp->open_ttags[i];
1560			if (cb != NULL) {
1561				cb->error = ECONNABORTED;
1562				tcp_release_command_buffer(cb);
1563			}
1564		}
1565		free(qp->open_ttags, M_NVMF_TCP);
1566	}
1567
1568	mtx_lock(&qp->rx_buffers.lock);
1569	TAILQ_FOREACH_SAFE(cb, &qp->rx_buffers.head, link, ncb) {
1570		tcp_remove_command_buffer(&qp->rx_buffers, cb);
1571		mtx_unlock(&qp->rx_buffers.lock);
1572		cb->error = ECONNABORTED;
1573		tcp_release_command_buffer(cb);
1574		mtx_lock(&qp->rx_buffers.lock);
1575	}
1576	mtx_destroy(&qp->rx_buffers.lock);
1577
1578	mtx_lock(&qp->tx_buffers.lock);
1579	TAILQ_FOREACH_SAFE(cb, &qp->tx_buffers.head, link, ncb) {
1580		tcp_remove_command_buffer(&qp->tx_buffers, cb);
1581		mtx_unlock(&qp->tx_buffers.lock);
1582		cb->error = ECONNABORTED;
1583		tcp_release_command_buffer(cb);
1584		mtx_lock(&qp->tx_buffers.lock);
1585	}
1586	mtx_destroy(&qp->tx_buffers.lock);
1587
1588	soclose(so);
1589
1590	tcp_release_qpair(qp);
1591}
1592
1593static struct nvmf_capsule *
1594tcp_allocate_capsule(struct nvmf_qpair *nq, int how)
1595{
1596	struct nvmf_tcp_qpair *qp = TQP(nq);
1597	struct nvmf_tcp_capsule *tc;
1598
1599	tc = malloc(sizeof(*tc), M_NVMF_TCP, how | M_ZERO);
1600	if (tc == NULL)
1601		return (NULL);
1602	refcount_init(&tc->refs, 1);
1603	refcount_acquire(&qp->refs);
1604	return (&tc->nc);
1605}
1606
1607static void
1608tcp_release_capsule(struct nvmf_tcp_capsule *tc)
1609{
1610	struct nvmf_tcp_qpair *qp = TQP(tc->nc.nc_qpair);
1611
1612	if (!refcount_release(&tc->refs))
1613		return;
1614
1615	MPASS(tc->active_r2ts == 0);
1616	MPASS(tc->pending_r2ts == 0);
1617
1618	nvmf_tcp_free_pdu(&tc->rx_pdu);
1619	free(tc, M_NVMF_TCP);
1620	tcp_release_qpair(qp);
1621}
1622
1623static void
1624tcp_free_capsule(struct nvmf_capsule *nc)
1625{
1626	struct nvmf_tcp_capsule *tc = TCAP(nc);
1627
1628	tcp_release_capsule(tc);
1629}
1630
1631static int
1632tcp_transmit_capsule(struct nvmf_capsule *nc)
1633{
1634	struct nvmf_tcp_qpair *qp = TQP(nc->nc_qpair);
1635	struct nvmf_tcp_capsule *tc = TCAP(nc);
1636	struct socket *so = qp->so;
1637
1638	refcount_acquire(&tc->refs);
1639	SOCKBUF_LOCK(&so->so_snd);
1640	STAILQ_INSERT_TAIL(&qp->tx_capsules, tc, link);
1641	if (sowriteable(so))
1642		cv_signal(&qp->tx_cv);
1643	SOCKBUF_UNLOCK(&so->so_snd);
1644	return (0);
1645}
1646
1647static uint8_t
1648tcp_validate_command_capsule(struct nvmf_capsule *nc)
1649{
1650	struct nvmf_tcp_capsule *tc = TCAP(nc);
1651	struct nvme_sgl_descriptor *sgl;
1652
1653	KASSERT(tc->rx_pdu.hdr != NULL, ("capsule wasn't received"));
1654
1655	sgl = &nc->nc_sqe.sgl;
1656	switch (sgl->type) {
1657	case NVME_SGL_TYPE_ICD:
1658		if (tc->rx_pdu.data_len != le32toh(sgl->length)) {
1659			printf("NVMe/TCP: Command Capsule with mismatched ICD length\n");
1660			return (NVME_SC_DATA_SGL_LENGTH_INVALID);
1661		}
1662		break;
1663	case NVME_SGL_TYPE_COMMAND_BUFFER:
1664		if (tc->rx_pdu.data_len != 0) {
1665			printf("NVMe/TCP: Command Buffer SGL with ICD\n");
1666			return (NVME_SC_INVALID_FIELD);
1667		}
1668		break;
1669	default:
1670		printf("NVMe/TCP: Invalid SGL type in Command Capsule\n");
1671		return (NVME_SC_SGL_DESCRIPTOR_TYPE_INVALID);
1672	}
1673
1674	if (sgl->address != 0) {
1675		printf("NVMe/TCP: Invalid SGL offset in Command Capsule\n");
1676		return (NVME_SC_SGL_OFFSET_INVALID);
1677	}
1678
1679	return (NVME_SC_SUCCESS);
1680}
1681
1682static size_t
1683tcp_capsule_data_len(const struct nvmf_capsule *nc)
1684{
1685	MPASS(nc->nc_qe_len == sizeof(struct nvme_command));
1686	return (le32toh(nc->nc_sqe.sgl.length));
1687}
1688
1689static void
1690tcp_receive_r2t_data(struct nvmf_capsule *nc, uint32_t data_offset,
1691    struct nvmf_io_request *io)
1692{
1693	struct nvmf_tcp_qpair *qp = TQP(nc->nc_qpair);
1694	struct nvmf_tcp_capsule *tc = TCAP(nc);
1695	struct nvmf_tcp_command_buffer *cb;
1696
1697	cb = tcp_alloc_command_buffer(qp, io, data_offset, io->io_len,
1698	    nc->nc_sqe.cid);
1699
1700	cb->tc = tc;
1701	refcount_acquire(&tc->refs);
1702
1703	/*
1704	 * If this command has too many active R2Ts or there are no
1705	 * available transfer tags, queue the request for later.
1706	 *
1707	 * NB: maxr2t is 0's based.
1708	 */
1709	mtx_lock(&qp->rx_buffers.lock);
1710	if (tc->active_r2ts > qp->maxr2t || qp->active_ttags == qp->num_ttags) {
1711#ifdef INVARIANTS
1712		tc->pending_r2ts++;
1713#endif
1714		TAILQ_INSERT_TAIL(&qp->rx_buffers.head, cb, link);
1715		mtx_unlock(&qp->rx_buffers.lock);
1716		return;
1717	}
1718
1719	nvmf_tcp_allocate_ttag(qp, cb);
1720	mtx_unlock(&qp->rx_buffers.lock);
1721
1722	tcp_send_r2t(qp, nc->nc_sqe.cid, cb->ttag, data_offset, io->io_len);
1723}
1724
1725static void
1726tcp_receive_icd_data(struct nvmf_capsule *nc, uint32_t data_offset,
1727    struct nvmf_io_request *io)
1728{
1729	struct nvmf_tcp_capsule *tc = TCAP(nc);
1730
1731	mbuf_copyto_io(tc->rx_pdu.m, tc->rx_pdu.hdr->pdo + data_offset,
1732	    io->io_len, io, 0);
1733	nvmf_complete_io_request(io, io->io_len, 0);
1734}
1735
1736static int
1737tcp_receive_controller_data(struct nvmf_capsule *nc, uint32_t data_offset,
1738    struct nvmf_io_request *io)
1739{
1740	struct nvme_sgl_descriptor *sgl;
1741	size_t data_len;
1742
1743	if (nc->nc_qe_len != sizeof(struct nvme_command) ||
1744	    !nc->nc_qpair->nq_controller)
1745		return (EINVAL);
1746
1747	sgl = &nc->nc_sqe.sgl;
1748	data_len = le32toh(sgl->length);
1749	if (data_offset + io->io_len > data_len)
1750		return (EFBIG);
1751
1752	if (sgl->type == NVME_SGL_TYPE_ICD)
1753		tcp_receive_icd_data(nc, data_offset, io);
1754	else
1755		tcp_receive_r2t_data(nc, data_offset, io);
1756	return (0);
1757}
1758
1759/* NB: cid is little-endian already. */
1760static void
1761tcp_send_c2h_pdu(struct nvmf_tcp_qpair *qp, uint16_t cid, uint32_t data_offset,
1762    struct mbuf *m, size_t len, bool last_pdu, bool success)
1763{
1764	struct nvme_tcp_c2h_data_hdr c2h;
1765	struct mbuf *top;
1766
1767	memset(&c2h, 0, sizeof(c2h));
1768	c2h.common.pdu_type = NVME_TCP_PDU_TYPE_C2H_DATA;
1769	if (last_pdu)
1770		c2h.common.flags |= NVME_TCP_C2H_DATA_FLAGS_LAST_PDU;
1771	if (success)
1772		c2h.common.flags |= NVME_TCP_C2H_DATA_FLAGS_SUCCESS;
1773	c2h.cccid = cid;
1774	c2h.datao = htole32(data_offset);
1775	c2h.datal = htole32(len);
1776
1777	top = nvmf_tcp_construct_pdu(qp, &c2h, sizeof(c2h), m, len);
1778	nvmf_tcp_write_pdu(qp, top);
1779}
1780
1781static u_int
1782tcp_send_controller_data(struct nvmf_capsule *nc, uint32_t data_offset,
1783    struct mbuf *m, size_t len)
1784{
1785	struct nvmf_tcp_qpair *qp = TQP(nc->nc_qpair);
1786	struct nvme_sgl_descriptor *sgl;
1787	struct mbuf *n, *p;
1788	uint32_t data_len;
1789	bool last_pdu, last_xfer;
1790
1791	if (nc->nc_qe_len != sizeof(struct nvme_command) ||
1792	    !qp->qp.nq_controller) {
1793		m_freem(m);
1794		return (NVME_SC_INVALID_FIELD);
1795	}
1796
1797	sgl = &nc->nc_sqe.sgl;
1798	data_len = le32toh(sgl->length);
1799	if (data_offset + len > data_len) {
1800		m_freem(m);
1801		return (NVME_SC_INVALID_FIELD);
1802	}
1803	last_xfer = (data_offset + len == data_len);
1804
1805	if (sgl->type != NVME_SGL_TYPE_COMMAND_BUFFER) {
1806		m_freem(m);
1807		return (NVME_SC_INVALID_FIELD);
1808	}
1809
1810	KASSERT(data_offset == TCAP(nc)->tx_data_offset,
1811	    ("%s: starting data_offset %u doesn't match end of previous xfer %u",
1812	    __func__, data_offset, TCAP(nc)->tx_data_offset));
1813
1814	/* Queue one more C2H_DATA PDUs containing the data from 'm'. */
1815	while (m != NULL) {
1816		uint32_t todo;
1817
1818		todo = m->m_len;
1819		p = m;
1820		n = p->m_next;
1821		while (n != NULL) {
1822			if (todo + n->m_len > qp->max_tx_data) {
1823				p->m_next = NULL;
1824				break;
1825			}
1826			todo += n->m_len;
1827			p = n;
1828			n = p->m_next;
1829		}
1830		MPASS(m_length(m, NULL) == todo);
1831
1832		last_pdu = (n == NULL && last_xfer);
1833		tcp_send_c2h_pdu(qp, nc->nc_sqe.cid, data_offset, m, todo,
1834		    last_pdu, last_pdu && qp->send_success);
1835
1836		data_offset += todo;
1837		data_len -= todo;
1838		m = n;
1839	}
1840	MPASS(data_len == 0);
1841
1842#ifdef INVARIANTS
1843	TCAP(nc)->tx_data_offset = data_offset;
1844#endif
1845	if (!last_xfer)
1846		return (NVMF_MORE);
1847	else if (qp->send_success)
1848		return (NVMF_SUCCESS_SENT);
1849	else
1850		return (NVME_SC_SUCCESS);
1851}
1852
1853struct nvmf_transport_ops tcp_ops = {
1854	.allocate_qpair = tcp_allocate_qpair,
1855	.free_qpair = tcp_free_qpair,
1856	.allocate_capsule = tcp_allocate_capsule,
1857	.free_capsule = tcp_free_capsule,
1858	.transmit_capsule = tcp_transmit_capsule,
1859	.validate_command_capsule = tcp_validate_command_capsule,
1860	.capsule_data_len = tcp_capsule_data_len,
1861	.receive_controller_data = tcp_receive_controller_data,
1862	.send_controller_data = tcp_send_controller_data,
1863	.trtype = NVMF_TRTYPE_TCP,
1864	.priority = 0,
1865};
1866
1867NVMF_TRANSPORT(tcp, tcp_ops);
1868