1/*-
2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
3 *
4 * Copyright (c) 2012 Chelsio Communications, Inc.
5 * All rights reserved.
6 * Written by: Navdeep Parhar <np@FreeBSD.org>
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 */
29
30#include <sys/cdefs.h>
31__FBSDID("$FreeBSD$");
32
33#include "opt_inet.h"
34
35#include <sys/param.h>
36#include <sys/aio.h>
37#include <sys/file.h>
38#include <sys/systm.h>
39#include <sys/kernel.h>
40#include <sys/ktr.h>
41#include <sys/module.h>
42#include <sys/protosw.h>
43#include <sys/proc.h>
44#include <sys/domain.h>
45#include <sys/socket.h>
46#include <sys/socketvar.h>
47#include <sys/taskqueue.h>
48#include <sys/uio.h>
49#include <netinet/in.h>
50#include <netinet/in_pcb.h>
51#include <netinet/ip.h>
52#include <netinet/tcp_var.h>
53#define TCPSTATES
54#include <netinet/tcp_fsm.h>
55#include <netinet/toecore.h>
56
57#include <vm/vm.h>
58#include <vm/vm_extern.h>
59#include <vm/vm_param.h>
60#include <vm/pmap.h>
61#include <vm/vm_map.h>
62#include <vm/vm_page.h>
63#include <vm/vm_object.h>
64
65#ifdef TCP_OFFLOAD
66#include "common/common.h"
67#include "common/t4_msg.h"
68#include "common/t4_regs.h"
69#include "common/t4_tcb.h"
70#include "tom/t4_tom.h"
71
72/*
73 * Use the 'backend3' field in AIO jobs to store the amount of data
74 * received by the AIO job so far.
75 */
76#define	aio_received	backend3
77
78static void aio_ddp_requeue_task(void *context, int pending);
79static void ddp_complete_all(struct toepcb *toep, int error);
80static void t4_aio_cancel_active(struct kaiocb *job);
81static void t4_aio_cancel_queued(struct kaiocb *job);
82
83static TAILQ_HEAD(, pageset) ddp_orphan_pagesets;
84static struct mtx ddp_orphan_pagesets_lock;
85static struct task ddp_orphan_task;
86
87#define MAX_DDP_BUFFER_SIZE		(M_TCB_RX_DDP_BUF0_LEN)
88
89/*
90 * A page set holds information about a buffer used for DDP.  The page
91 * set holds resources such as the VM pages backing the buffer (either
92 * held or wired) and the page pods associated with the buffer.
93 * Recently used page sets are cached to allow for efficient reuse of
94 * buffers (avoiding the need to re-fault in pages, hold them, etc.).
95 * Note that cached page sets keep the backing pages wired.  The
96 * number of wired pages is capped by only allowing for two wired
97 * pagesets per connection.  This is not a perfect cap, but is a
98 * trade-off for performance.
99 *
100 * If an application ping-pongs two buffers for a connection via
101 * aio_read(2) then those buffers should remain wired and expensive VM
102 * fault lookups should be avoided after each buffer has been used
103 * once.  If an application uses more than two buffers then this will
104 * fall back to doing expensive VM fault lookups for each operation.
105 */
106static void
107free_pageset(struct tom_data *td, struct pageset *ps)
108{
109	vm_page_t p;
110	int i;
111
112	if (ps->prsv.prsv_nppods > 0)
113		t4_free_page_pods(&ps->prsv);
114
115	for (i = 0; i < ps->npages; i++) {
116		p = ps->pages[i];
117		vm_page_unwire(p, PQ_INACTIVE);
118	}
119	mtx_lock(&ddp_orphan_pagesets_lock);
120	TAILQ_INSERT_TAIL(&ddp_orphan_pagesets, ps, link);
121	taskqueue_enqueue(taskqueue_thread, &ddp_orphan_task);
122	mtx_unlock(&ddp_orphan_pagesets_lock);
123}
124
125static void
126ddp_free_orphan_pagesets(void *context, int pending)
127{
128	struct pageset *ps;
129
130	mtx_lock(&ddp_orphan_pagesets_lock);
131	while (!TAILQ_EMPTY(&ddp_orphan_pagesets)) {
132		ps = TAILQ_FIRST(&ddp_orphan_pagesets);
133		TAILQ_REMOVE(&ddp_orphan_pagesets, ps, link);
134		mtx_unlock(&ddp_orphan_pagesets_lock);
135		if (ps->vm)
136			vmspace_free(ps->vm);
137		free(ps, M_CXGBE);
138		mtx_lock(&ddp_orphan_pagesets_lock);
139	}
140	mtx_unlock(&ddp_orphan_pagesets_lock);
141}
142
143static void
144recycle_pageset(struct toepcb *toep, struct pageset *ps)
145{
146
147	DDP_ASSERT_LOCKED(toep);
148	if (!(toep->ddp.flags & DDP_DEAD)) {
149		KASSERT(toep->ddp.cached_count + toep->ddp.active_count <
150		    nitems(toep->ddp.db), ("too many wired pagesets"));
151		TAILQ_INSERT_HEAD(&toep->ddp.cached_pagesets, ps, link);
152		toep->ddp.cached_count++;
153	} else
154		free_pageset(toep->td, ps);
155}
156
157static void
158ddp_complete_one(struct kaiocb *job, int error)
159{
160	long copied;
161
162	/*
163	 * If this job had copied data out of the socket buffer before
164	 * it was cancelled, report it as a short read rather than an
165	 * error.
166	 */
167	copied = job->aio_received;
168	if (copied != 0 || error == 0)
169		aio_complete(job, copied, 0);
170	else
171		aio_complete(job, -1, error);
172}
173
174static void
175free_ddp_buffer(struct tom_data *td, struct ddp_buffer *db)
176{
177
178	if (db->job) {
179		/*
180		 * XXX: If we are un-offloading the socket then we
181		 * should requeue these on the socket somehow.  If we
182		 * got a FIN from the remote end, then this completes
183		 * any remaining requests with an EOF read.
184		 */
185		if (!aio_clear_cancel_function(db->job))
186			ddp_complete_one(db->job, 0);
187	}
188
189	if (db->ps)
190		free_pageset(td, db->ps);
191}
192
193void
194ddp_init_toep(struct toepcb *toep)
195{
196
197	TAILQ_INIT(&toep->ddp.aiojobq);
198	TASK_INIT(&toep->ddp.requeue_task, 0, aio_ddp_requeue_task, toep);
199	toep->ddp.flags = DDP_OK;
200	toep->ddp.active_id = -1;
201	mtx_init(&toep->ddp.lock, "t4 ddp", NULL, MTX_DEF);
202}
203
204void
205ddp_uninit_toep(struct toepcb *toep)
206{
207
208	mtx_destroy(&toep->ddp.lock);
209}
210
211void
212release_ddp_resources(struct toepcb *toep)
213{
214	struct pageset *ps;
215	int i;
216
217	DDP_LOCK(toep);
218	toep->ddp.flags |= DDP_DEAD;
219	for (i = 0; i < nitems(toep->ddp.db); i++) {
220		free_ddp_buffer(toep->td, &toep->ddp.db[i]);
221	}
222	while ((ps = TAILQ_FIRST(&toep->ddp.cached_pagesets)) != NULL) {
223		TAILQ_REMOVE(&toep->ddp.cached_pagesets, ps, link);
224		free_pageset(toep->td, ps);
225	}
226	ddp_complete_all(toep, 0);
227	DDP_UNLOCK(toep);
228}
229
230#ifdef INVARIANTS
231void
232ddp_assert_empty(struct toepcb *toep)
233{
234	int i;
235
236	MPASS(!(toep->ddp.flags & DDP_TASK_ACTIVE));
237	for (i = 0; i < nitems(toep->ddp.db); i++) {
238		MPASS(toep->ddp.db[i].job == NULL);
239		MPASS(toep->ddp.db[i].ps == NULL);
240	}
241	MPASS(TAILQ_EMPTY(&toep->ddp.cached_pagesets));
242	MPASS(TAILQ_EMPTY(&toep->ddp.aiojobq));
243}
244#endif
245
246static void
247complete_ddp_buffer(struct toepcb *toep, struct ddp_buffer *db,
248    unsigned int db_idx)
249{
250	unsigned int db_flag;
251
252	toep->ddp.active_count--;
253	if (toep->ddp.active_id == db_idx) {
254		if (toep->ddp.active_count == 0) {
255			KASSERT(toep->ddp.db[db_idx ^ 1].job == NULL,
256			    ("%s: active_count mismatch", __func__));
257			toep->ddp.active_id = -1;
258		} else
259			toep->ddp.active_id ^= 1;
260#ifdef VERBOSE_TRACES
261		CTR3(KTR_CXGBE, "%s: tid %u, ddp_active_id = %d", __func__,
262		    toep->tid, toep->ddp.active_id);
263#endif
264	} else {
265		KASSERT(toep->ddp.active_count != 0 &&
266		    toep->ddp.active_id != -1,
267		    ("%s: active count mismatch", __func__));
268	}
269
270	db->cancel_pending = 0;
271	db->job = NULL;
272	recycle_pageset(toep, db->ps);
273	db->ps = NULL;
274
275	db_flag = db_idx == 1 ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE;
276	KASSERT(toep->ddp.flags & db_flag,
277	    ("%s: DDP buffer not active. toep %p, ddp_flags 0x%x",
278	    __func__, toep, toep->ddp.flags));
279	toep->ddp.flags &= ~db_flag;
280}
281
282/* XXX: handle_ddp_data code duplication */
283void
284insert_ddp_data(struct toepcb *toep, uint32_t n)
285{
286	struct inpcb *inp = toep->inp;
287	struct tcpcb *tp = intotcpcb(inp);
288	struct ddp_buffer *db;
289	struct kaiocb *job;
290	size_t placed;
291	long copied;
292	unsigned int db_flag, db_idx;
293
294	INP_WLOCK_ASSERT(inp);
295	DDP_ASSERT_LOCKED(toep);
296
297	tp->rcv_nxt += n;
298#ifndef USE_DDP_RX_FLOW_CONTROL
299	KASSERT(tp->rcv_wnd >= n, ("%s: negative window size", __func__));
300	tp->rcv_wnd -= n;
301#endif
302	CTR2(KTR_CXGBE, "%s: placed %u bytes before falling out of DDP",
303	    __func__, n);
304	while (toep->ddp.active_count > 0) {
305		MPASS(toep->ddp.active_id != -1);
306		db_idx = toep->ddp.active_id;
307		db_flag = db_idx == 1 ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE;
308		MPASS((toep->ddp.flags & db_flag) != 0);
309		db = &toep->ddp.db[db_idx];
310		job = db->job;
311		copied = job->aio_received;
312		placed = n;
313		if (placed > job->uaiocb.aio_nbytes - copied)
314			placed = job->uaiocb.aio_nbytes - copied;
315		if (placed > 0)
316			job->msgrcv = 1;
317		if (!aio_clear_cancel_function(job)) {
318			/*
319			 * Update the copied length for when
320			 * t4_aio_cancel_active() completes this
321			 * request.
322			 */
323			job->aio_received += placed;
324		} else if (copied + placed != 0) {
325			CTR4(KTR_CXGBE,
326			    "%s: completing %p (copied %ld, placed %lu)",
327			    __func__, job, copied, placed);
328			/* XXX: This always completes if there is some data. */
329			aio_complete(job, copied + placed, 0);
330		} else if (aio_set_cancel_function(job, t4_aio_cancel_queued)) {
331			TAILQ_INSERT_HEAD(&toep->ddp.aiojobq, job, list);
332			toep->ddp.waiting_count++;
333		} else
334			aio_cancel(job);
335		n -= placed;
336		complete_ddp_buffer(toep, db, db_idx);
337	}
338
339	MPASS(n == 0);
340}
341
342/* SET_TCB_FIELD sent as a ULP command looks like this */
343#define LEN__SET_TCB_FIELD_ULP (sizeof(struct ulp_txpkt) + \
344    sizeof(struct ulptx_idata) + sizeof(struct cpl_set_tcb_field_core))
345
346/* RX_DATA_ACK sent as a ULP command looks like this */
347#define LEN__RX_DATA_ACK_ULP (sizeof(struct ulp_txpkt) + \
348    sizeof(struct ulptx_idata) + sizeof(struct cpl_rx_data_ack_core))
349
350static inline void *
351mk_set_tcb_field_ulp(struct ulp_txpkt *ulpmc, struct toepcb *toep,
352    uint64_t word, uint64_t mask, uint64_t val)
353{
354	struct ulptx_idata *ulpsc;
355	struct cpl_set_tcb_field_core *req;
356
357	ulpmc->cmd_dest = htonl(V_ULPTX_CMD(ULP_TX_PKT) | V_ULP_TXPKT_DEST(0));
358	ulpmc->len = htobe32(howmany(LEN__SET_TCB_FIELD_ULP, 16));
359
360	ulpsc = (struct ulptx_idata *)(ulpmc + 1);
361	ulpsc->cmd_more = htobe32(V_ULPTX_CMD(ULP_TX_SC_IMM));
362	ulpsc->len = htobe32(sizeof(*req));
363
364	req = (struct cpl_set_tcb_field_core *)(ulpsc + 1);
365	OPCODE_TID(req) = htobe32(MK_OPCODE_TID(CPL_SET_TCB_FIELD, toep->tid));
366	req->reply_ctrl = htobe16(V_NO_REPLY(1) |
367	    V_QUEUENO(toep->ofld_rxq->iq.abs_id));
368	req->word_cookie = htobe16(V_WORD(word) | V_COOKIE(0));
369        req->mask = htobe64(mask);
370        req->val = htobe64(val);
371
372	ulpsc = (struct ulptx_idata *)(req + 1);
373	if (LEN__SET_TCB_FIELD_ULP % 16) {
374		ulpsc->cmd_more = htobe32(V_ULPTX_CMD(ULP_TX_SC_NOOP));
375		ulpsc->len = htobe32(0);
376		return (ulpsc + 1);
377	}
378	return (ulpsc);
379}
380
381static inline void *
382mk_rx_data_ack_ulp(struct ulp_txpkt *ulpmc, struct toepcb *toep)
383{
384	struct ulptx_idata *ulpsc;
385	struct cpl_rx_data_ack_core *req;
386
387	ulpmc->cmd_dest = htonl(V_ULPTX_CMD(ULP_TX_PKT) | V_ULP_TXPKT_DEST(0));
388	ulpmc->len = htobe32(howmany(LEN__RX_DATA_ACK_ULP, 16));
389
390	ulpsc = (struct ulptx_idata *)(ulpmc + 1);
391	ulpsc->cmd_more = htobe32(V_ULPTX_CMD(ULP_TX_SC_IMM));
392	ulpsc->len = htobe32(sizeof(*req));
393
394	req = (struct cpl_rx_data_ack_core *)(ulpsc + 1);
395	OPCODE_TID(req) = htobe32(MK_OPCODE_TID(CPL_RX_DATA_ACK, toep->tid));
396	req->credit_dack = htobe32(F_RX_MODULATE_RX);
397
398	ulpsc = (struct ulptx_idata *)(req + 1);
399	if (LEN__RX_DATA_ACK_ULP % 16) {
400		ulpsc->cmd_more = htobe32(V_ULPTX_CMD(ULP_TX_SC_NOOP));
401		ulpsc->len = htobe32(0);
402		return (ulpsc + 1);
403	}
404	return (ulpsc);
405}
406
407static struct wrqe *
408mk_update_tcb_for_ddp(struct adapter *sc, struct toepcb *toep, int db_idx,
409    struct pageset *ps, int offset, uint64_t ddp_flags, uint64_t ddp_flags_mask)
410{
411	struct wrqe *wr;
412	struct work_request_hdr *wrh;
413	struct ulp_txpkt *ulpmc;
414	int len;
415
416	KASSERT(db_idx == 0 || db_idx == 1,
417	    ("%s: bad DDP buffer index %d", __func__, db_idx));
418
419	/*
420	 * We'll send a compound work request that has 3 SET_TCB_FIELDs and an
421	 * RX_DATA_ACK (with RX_MODULATE to speed up delivery).
422	 *
423	 * The work request header is 16B and always ends at a 16B boundary.
424	 * The ULPTX master commands that follow must all end at 16B boundaries
425	 * too so we round up the size to 16.
426	 */
427	len = sizeof(*wrh) + 3 * roundup2(LEN__SET_TCB_FIELD_ULP, 16) +
428	    roundup2(LEN__RX_DATA_ACK_ULP, 16);
429
430	wr = alloc_wrqe(len, toep->ctrlq);
431	if (wr == NULL)
432		return (NULL);
433	wrh = wrtod(wr);
434	INIT_ULPTX_WRH(wrh, len, 1, 0);	/* atomic */
435	ulpmc = (struct ulp_txpkt *)(wrh + 1);
436
437	/* Write the buffer's tag */
438	ulpmc = mk_set_tcb_field_ulp(ulpmc, toep,
439	    W_TCB_RX_DDP_BUF0_TAG + db_idx,
440	    V_TCB_RX_DDP_BUF0_TAG(M_TCB_RX_DDP_BUF0_TAG),
441	    V_TCB_RX_DDP_BUF0_TAG(ps->prsv.prsv_tag));
442
443	/* Update the current offset in the DDP buffer and its total length */
444	if (db_idx == 0)
445		ulpmc = mk_set_tcb_field_ulp(ulpmc, toep,
446		    W_TCB_RX_DDP_BUF0_OFFSET,
447		    V_TCB_RX_DDP_BUF0_OFFSET(M_TCB_RX_DDP_BUF0_OFFSET) |
448		    V_TCB_RX_DDP_BUF0_LEN(M_TCB_RX_DDP_BUF0_LEN),
449		    V_TCB_RX_DDP_BUF0_OFFSET(offset) |
450		    V_TCB_RX_DDP_BUF0_LEN(ps->len));
451	else
452		ulpmc = mk_set_tcb_field_ulp(ulpmc, toep,
453		    W_TCB_RX_DDP_BUF1_OFFSET,
454		    V_TCB_RX_DDP_BUF1_OFFSET(M_TCB_RX_DDP_BUF1_OFFSET) |
455		    V_TCB_RX_DDP_BUF1_LEN((u64)M_TCB_RX_DDP_BUF1_LEN << 32),
456		    V_TCB_RX_DDP_BUF1_OFFSET(offset) |
457		    V_TCB_RX_DDP_BUF1_LEN((u64)ps->len << 32));
458
459	/* Update DDP flags */
460	ulpmc = mk_set_tcb_field_ulp(ulpmc, toep, W_TCB_RX_DDP_FLAGS,
461	    ddp_flags_mask, ddp_flags);
462
463	/* Gratuitous RX_DATA_ACK with RX_MODULATE set to speed up delivery. */
464	ulpmc = mk_rx_data_ack_ulp(ulpmc, toep);
465
466	return (wr);
467}
468
469static int
470handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len)
471{
472	uint32_t report = be32toh(ddp_report);
473	unsigned int db_idx;
474	struct inpcb *inp = toep->inp;
475	struct ddp_buffer *db;
476	struct tcpcb *tp;
477	struct socket *so;
478	struct sockbuf *sb;
479	struct kaiocb *job;
480	long copied;
481
482	db_idx = report & F_DDP_BUF_IDX ? 1 : 0;
483
484	if (__predict_false(!(report & F_DDP_INV)))
485		CXGBE_UNIMPLEMENTED("DDP buffer still valid");
486
487	INP_WLOCK(inp);
488	so = inp_inpcbtosocket(inp);
489	sb = &so->so_rcv;
490	DDP_LOCK(toep);
491
492	KASSERT(toep->ddp.active_id == db_idx,
493	    ("completed DDP buffer (%d) != active_id (%d) for tid %d", db_idx,
494	    toep->ddp.active_id, toep->tid));
495	db = &toep->ddp.db[db_idx];
496	job = db->job;
497
498	if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT))) {
499		/*
500		 * This can happen due to an administrative tcpdrop(8).
501		 * Just fail the request with ECONNRESET.
502		 */
503		CTR5(KTR_CXGBE, "%s: tid %u, seq 0x%x, len %d, inp_flags 0x%x",
504		    __func__, toep->tid, be32toh(rcv_nxt), len, inp->inp_flags);
505		if (aio_clear_cancel_function(job))
506			ddp_complete_one(job, ECONNRESET);
507		goto completed;
508	}
509
510	tp = intotcpcb(inp);
511
512	/*
513	 * For RX_DDP_COMPLETE, len will be zero and rcv_nxt is the
514	 * sequence number of the next byte to receive.  The length of
515	 * the data received for this message must be computed by
516	 * comparing the new and old values of rcv_nxt.
517	 *
518	 * For RX_DATA_DDP, len might be non-zero, but it is only the
519	 * length of the most recent DMA.  It does not include the
520	 * total length of the data received since the previous update
521	 * for this DDP buffer.  rcv_nxt is the sequence number of the
522	 * first received byte from the most recent DMA.
523	 */
524	len += be32toh(rcv_nxt) - tp->rcv_nxt;
525	tp->rcv_nxt += len;
526	tp->t_rcvtime = ticks;
527#ifndef USE_DDP_RX_FLOW_CONTROL
528	KASSERT(tp->rcv_wnd >= len, ("%s: negative window size", __func__));
529	tp->rcv_wnd -= len;
530#endif
531#ifdef VERBOSE_TRACES
532	CTR5(KTR_CXGBE, "%s: tid %u, DDP[%d] placed %d bytes (%#x)", __func__,
533	    toep->tid, db_idx, len, report);
534#endif
535
536	/* receive buffer autosize */
537	MPASS(toep->vnet == so->so_vnet);
538	CURVNET_SET(toep->vnet);
539	SOCKBUF_LOCK(sb);
540	if (sb->sb_flags & SB_AUTOSIZE &&
541	    V_tcp_do_autorcvbuf &&
542	    sb->sb_hiwat < V_tcp_autorcvbuf_max &&
543	    len > (sbspace(sb) / 8 * 7)) {
544		struct adapter *sc = td_adapter(toep->td);
545		unsigned int hiwat = sb->sb_hiwat;
546		unsigned int newsize = min(hiwat + sc->tt.autorcvbuf_inc,
547		    V_tcp_autorcvbuf_max);
548
549		if (!sbreserve_locked(sb, newsize, so, NULL))
550			sb->sb_flags &= ~SB_AUTOSIZE;
551	}
552	SOCKBUF_UNLOCK(sb);
553	CURVNET_RESTORE();
554
555	job->msgrcv = 1;
556	if (db->cancel_pending) {
557		/*
558		 * Update the job's length but defer completion to the
559		 * TCB_RPL callback.
560		 */
561		job->aio_received += len;
562		goto out;
563	} else if (!aio_clear_cancel_function(job)) {
564		/*
565		 * Update the copied length for when
566		 * t4_aio_cancel_active() completes this request.
567		 */
568		job->aio_received += len;
569	} else {
570		copied = job->aio_received;
571#ifdef VERBOSE_TRACES
572		CTR5(KTR_CXGBE,
573		    "%s: tid %u, completing %p (copied %ld, placed %d)",
574		    __func__, toep->tid, job, copied, len);
575#endif
576		aio_complete(job, copied + len, 0);
577		t4_rcvd(&toep->td->tod, tp);
578	}
579
580completed:
581	complete_ddp_buffer(toep, db, db_idx);
582	if (toep->ddp.waiting_count > 0)
583		ddp_queue_toep(toep);
584out:
585	DDP_UNLOCK(toep);
586	INP_WUNLOCK(inp);
587
588	return (0);
589}
590
591void
592handle_ddp_indicate(struct toepcb *toep)
593{
594
595	DDP_ASSERT_LOCKED(toep);
596	MPASS(toep->ddp.active_count == 0);
597	MPASS((toep->ddp.flags & (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE)) == 0);
598	if (toep->ddp.waiting_count == 0) {
599		/*
600		 * The pending requests that triggered the request for an
601		 * an indicate were cancelled.  Those cancels should have
602		 * already disabled DDP.  Just ignore this as the data is
603		 * going into the socket buffer anyway.
604		 */
605		return;
606	}
607	CTR3(KTR_CXGBE, "%s: tid %d indicated (%d waiting)", __func__,
608	    toep->tid, toep->ddp.waiting_count);
609	ddp_queue_toep(toep);
610}
611
612CTASSERT(CPL_COOKIE_DDP0 + 1 == CPL_COOKIE_DDP1);
613
614static int
615do_ddp_tcb_rpl(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
616{
617	struct adapter *sc = iq->adapter;
618	const struct cpl_set_tcb_rpl *cpl = (const void *)(rss + 1);
619	unsigned int tid = GET_TID(cpl);
620	unsigned int db_idx;
621	struct toepcb *toep;
622	struct inpcb *inp;
623	struct ddp_buffer *db;
624	struct kaiocb *job;
625	long copied;
626
627	if (cpl->status != CPL_ERR_NONE)
628		panic("XXX: tcp_rpl failed: %d", cpl->status);
629
630	toep = lookup_tid(sc, tid);
631	inp = toep->inp;
632	switch (cpl->cookie) {
633	case V_WORD(W_TCB_RX_DDP_FLAGS) | V_COOKIE(CPL_COOKIE_DDP0):
634	case V_WORD(W_TCB_RX_DDP_FLAGS) | V_COOKIE(CPL_COOKIE_DDP1):
635		/*
636		 * XXX: This duplicates a lot of code with handle_ddp_data().
637		 */
638		db_idx = G_COOKIE(cpl->cookie) - CPL_COOKIE_DDP0;
639		MPASS(db_idx < nitems(toep->ddp.db));
640		INP_WLOCK(inp);
641		DDP_LOCK(toep);
642		db = &toep->ddp.db[db_idx];
643
644		/*
645		 * handle_ddp_data() should leave the job around until
646		 * this callback runs once a cancel is pending.
647		 */
648		MPASS(db != NULL);
649		MPASS(db->job != NULL);
650		MPASS(db->cancel_pending);
651
652		/*
653		 * XXX: It's not clear what happens if there is data
654		 * placed when the buffer is invalidated.  I suspect we
655		 * need to read the TCB to see how much data was placed.
656		 *
657		 * For now this just pretends like nothing was placed.
658		 *
659		 * XXX: Note that if we did check the PCB we would need to
660		 * also take care of updating the tp, etc.
661		 */
662		job = db->job;
663		copied = job->aio_received;
664		if (copied == 0) {
665			CTR2(KTR_CXGBE, "%s: cancelling %p", __func__, job);
666			aio_cancel(job);
667		} else {
668			CTR3(KTR_CXGBE, "%s: completing %p (copied %ld)",
669			    __func__, job, copied);
670			aio_complete(job, copied, 0);
671			t4_rcvd(&toep->td->tod, intotcpcb(inp));
672		}
673
674		complete_ddp_buffer(toep, db, db_idx);
675		if (toep->ddp.waiting_count > 0)
676			ddp_queue_toep(toep);
677		DDP_UNLOCK(toep);
678		INP_WUNLOCK(inp);
679		break;
680	default:
681		panic("XXX: unknown tcb_rpl offset %#x, cookie %#x",
682		    G_WORD(cpl->cookie), G_COOKIE(cpl->cookie));
683	}
684
685	return (0);
686}
687
688void
689handle_ddp_close(struct toepcb *toep, struct tcpcb *tp, __be32 rcv_nxt)
690{
691	struct ddp_buffer *db;
692	struct kaiocb *job;
693	long copied;
694	unsigned int db_flag, db_idx;
695	int len, placed;
696
697	INP_WLOCK_ASSERT(toep->inp);
698	DDP_ASSERT_LOCKED(toep);
699
700	len = be32toh(rcv_nxt) - tp->rcv_nxt;
701	tp->rcv_nxt += len;
702
703	while (toep->ddp.active_count > 0) {
704		MPASS(toep->ddp.active_id != -1);
705		db_idx = toep->ddp.active_id;
706		db_flag = db_idx == 1 ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE;
707		MPASS((toep->ddp.flags & db_flag) != 0);
708		db = &toep->ddp.db[db_idx];
709		job = db->job;
710		copied = job->aio_received;
711		placed = len;
712		if (placed > job->uaiocb.aio_nbytes - copied)
713			placed = job->uaiocb.aio_nbytes - copied;
714		if (placed > 0)
715			job->msgrcv = 1;
716		if (!aio_clear_cancel_function(job)) {
717			/*
718			 * Update the copied length for when
719			 * t4_aio_cancel_active() completes this
720			 * request.
721			 */
722			job->aio_received += placed;
723		} else {
724			CTR4(KTR_CXGBE, "%s: tid %d completed buf %d len %d",
725			    __func__, toep->tid, db_idx, placed);
726			aio_complete(job, copied + placed, 0);
727		}
728		len -= placed;
729		complete_ddp_buffer(toep, db, db_idx);
730	}
731
732	MPASS(len == 0);
733	ddp_complete_all(toep, 0);
734}
735
736#define DDP_ERR (F_DDP_PPOD_MISMATCH | F_DDP_LLIMIT_ERR | F_DDP_ULIMIT_ERR |\
737	 F_DDP_PPOD_PARITY_ERR | F_DDP_PADDING_ERR | F_DDP_OFFSET_ERR |\
738	 F_DDP_INVALID_TAG | F_DDP_COLOR_ERR | F_DDP_TID_MISMATCH |\
739	 F_DDP_INVALID_PPOD | F_DDP_HDRCRC_ERR | F_DDP_DATACRC_ERR)
740
741extern cpl_handler_t t4_cpl_handler[];
742
743static int
744do_rx_data_ddp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
745{
746	struct adapter *sc = iq->adapter;
747	const struct cpl_rx_data_ddp *cpl = (const void *)(rss + 1);
748	unsigned int tid = GET_TID(cpl);
749	uint32_t vld;
750	struct toepcb *toep = lookup_tid(sc, tid);
751
752	KASSERT(m == NULL, ("%s: wasn't expecting payload", __func__));
753	KASSERT(toep->tid == tid, ("%s: toep tid/atid mismatch", __func__));
754	KASSERT(!(toep->flags & TPF_SYNQE),
755	    ("%s: toep %p claims to be a synq entry", __func__, toep));
756
757	vld = be32toh(cpl->ddpvld);
758	if (__predict_false(vld & DDP_ERR)) {
759		panic("%s: DDP error 0x%x (tid %d, toep %p)",
760		    __func__, vld, tid, toep);
761	}
762
763	if (ulp_mode(toep) == ULP_MODE_ISCSI) {
764		t4_cpl_handler[CPL_RX_ISCSI_DDP](iq, rss, m);
765		return (0);
766	}
767
768	handle_ddp_data(toep, cpl->u.ddp_report, cpl->seq, be16toh(cpl->len));
769
770	return (0);
771}
772
773static int
774do_rx_ddp_complete(struct sge_iq *iq, const struct rss_header *rss,
775    struct mbuf *m)
776{
777	struct adapter *sc = iq->adapter;
778	const struct cpl_rx_ddp_complete *cpl = (const void *)(rss + 1);
779	unsigned int tid = GET_TID(cpl);
780	struct toepcb *toep = lookup_tid(sc, tid);
781
782	KASSERT(m == NULL, ("%s: wasn't expecting payload", __func__));
783	KASSERT(toep->tid == tid, ("%s: toep tid/atid mismatch", __func__));
784	KASSERT(!(toep->flags & TPF_SYNQE),
785	    ("%s: toep %p claims to be a synq entry", __func__, toep));
786
787	handle_ddp_data(toep, cpl->ddp_report, cpl->rcv_nxt, 0);
788
789	return (0);
790}
791
792static void
793enable_ddp(struct adapter *sc, struct toepcb *toep)
794{
795
796	KASSERT((toep->ddp.flags & (DDP_ON | DDP_OK | DDP_SC_REQ)) == DDP_OK,
797	    ("%s: toep %p has bad ddp_flags 0x%x",
798	    __func__, toep, toep->ddp.flags));
799
800	CTR3(KTR_CXGBE, "%s: tid %u (time %u)",
801	    __func__, toep->tid, time_uptime);
802
803	DDP_ASSERT_LOCKED(toep);
804	toep->ddp.flags |= DDP_SC_REQ;
805	t4_set_tcb_field(sc, toep->ctrlq, toep, W_TCB_RX_DDP_FLAGS,
806	    V_TF_DDP_OFF(1) | V_TF_DDP_INDICATE_OUT(1) |
807	    V_TF_DDP_BUF0_INDICATE(1) | V_TF_DDP_BUF1_INDICATE(1) |
808	    V_TF_DDP_BUF0_VALID(1) | V_TF_DDP_BUF1_VALID(1),
809	    V_TF_DDP_BUF0_INDICATE(1) | V_TF_DDP_BUF1_INDICATE(1), 0, 0);
810	t4_set_tcb_field(sc, toep->ctrlq, toep, W_TCB_T_FLAGS,
811	    V_TF_RCV_COALESCE_ENABLE(1), 0, 0, 0);
812}
813
814static int
815calculate_hcf(int n1, int n2)
816{
817	int a, b, t;
818
819	if (n1 <= n2) {
820		a = n1;
821		b = n2;
822	} else {
823		a = n2;
824		b = n1;
825	}
826
827	while (a != 0) {
828		t = a;
829		a = b % a;
830		b = t;
831	}
832
833	return (b);
834}
835
836static inline int
837pages_to_nppods(int npages, int ddp_page_shift)
838{
839
840	MPASS(ddp_page_shift >= PAGE_SHIFT);
841
842	return (howmany(npages >> (ddp_page_shift - PAGE_SHIFT), PPOD_PAGES));
843}
844
845static int
846alloc_page_pods(struct ppod_region *pr, u_int nppods, u_int pgsz_idx,
847    struct ppod_reservation *prsv)
848{
849	vmem_addr_t addr;       /* relative to start of region */
850
851	if (vmem_alloc(pr->pr_arena, PPOD_SZ(nppods), M_NOWAIT | M_FIRSTFIT,
852	    &addr) != 0)
853		return (ENOMEM);
854
855	CTR5(KTR_CXGBE, "%-17s arena %p, addr 0x%08x, nppods %d, pgsz %d",
856	    __func__, pr->pr_arena, (uint32_t)addr & pr->pr_tag_mask,
857	    nppods, 1 << pr->pr_page_shift[pgsz_idx]);
858
859	/*
860	 * The hardware tagmask includes an extra invalid bit but the arena was
861	 * seeded with valid values only.  An allocation out of this arena will
862	 * fit inside the tagmask but won't have the invalid bit set.
863	 */
864	MPASS((addr & pr->pr_tag_mask) == addr);
865	MPASS((addr & pr->pr_invalid_bit) == 0);
866
867	prsv->prsv_pr = pr;
868	prsv->prsv_tag = V_PPOD_PGSZ(pgsz_idx) | addr;
869	prsv->prsv_nppods = nppods;
870
871	return (0);
872}
873
874int
875t4_alloc_page_pods_for_ps(struct ppod_region *pr, struct pageset *ps)
876{
877	int i, hcf, seglen, idx, nppods;
878	struct ppod_reservation *prsv = &ps->prsv;
879
880	KASSERT(prsv->prsv_nppods == 0,
881	    ("%s: page pods already allocated", __func__));
882
883	/*
884	 * The DDP page size is unrelated to the VM page size.  We combine
885	 * contiguous physical pages into larger segments to get the best DDP
886	 * page size possible.  This is the largest of the four sizes in
887	 * A_ULP_RX_TDDP_PSZ that evenly divides the HCF of the segment sizes in
888	 * the page list.
889	 */
890	hcf = 0;
891	for (i = 0; i < ps->npages; i++) {
892		seglen = PAGE_SIZE;
893		while (i < ps->npages - 1 &&
894		    ps->pages[i]->phys_addr + PAGE_SIZE ==
895		    ps->pages[i + 1]->phys_addr) {
896			seglen += PAGE_SIZE;
897			i++;
898		}
899
900		hcf = calculate_hcf(hcf, seglen);
901		if (hcf < (1 << pr->pr_page_shift[1])) {
902			idx = 0;
903			goto have_pgsz;	/* give up, short circuit */
904		}
905	}
906
907#define PR_PAGE_MASK(x) ((1 << pr->pr_page_shift[(x)]) - 1)
908	MPASS((hcf & PR_PAGE_MASK(0)) == 0); /* PAGE_SIZE is >= 4K everywhere */
909	for (idx = nitems(pr->pr_page_shift) - 1; idx > 0; idx--) {
910		if ((hcf & PR_PAGE_MASK(idx)) == 0)
911			break;
912	}
913#undef PR_PAGE_MASK
914
915have_pgsz:
916	MPASS(idx <= M_PPOD_PGSZ);
917
918	nppods = pages_to_nppods(ps->npages, pr->pr_page_shift[idx]);
919	if (alloc_page_pods(pr, nppods, idx, prsv) != 0)
920		return (0);
921	MPASS(prsv->prsv_nppods > 0);
922
923	return (1);
924}
925
926int
927t4_alloc_page_pods_for_buf(struct ppod_region *pr, vm_offset_t buf, int len,
928    struct ppod_reservation *prsv)
929{
930	int hcf, seglen, idx, npages, nppods;
931	uintptr_t start_pva, end_pva, pva, p1;
932
933	MPASS(buf > 0);
934	MPASS(len > 0);
935
936	/*
937	 * The DDP page size is unrelated to the VM page size.  We combine
938	 * contiguous physical pages into larger segments to get the best DDP
939	 * page size possible.  This is the largest of the four sizes in
940	 * A_ULP_RX_ISCSI_PSZ that evenly divides the HCF of the segment sizes
941	 * in the page list.
942	 */
943	hcf = 0;
944	start_pva = trunc_page(buf);
945	end_pva = trunc_page(buf + len - 1);
946	pva = start_pva;
947	while (pva <= end_pva) {
948		seglen = PAGE_SIZE;
949		p1 = pmap_kextract(pva);
950		pva += PAGE_SIZE;
951		while (pva <= end_pva && p1 + seglen == pmap_kextract(pva)) {
952			seglen += PAGE_SIZE;
953			pva += PAGE_SIZE;
954		}
955
956		hcf = calculate_hcf(hcf, seglen);
957		if (hcf < (1 << pr->pr_page_shift[1])) {
958			idx = 0;
959			goto have_pgsz;	/* give up, short circuit */
960		}
961	}
962
963#define PR_PAGE_MASK(x) ((1 << pr->pr_page_shift[(x)]) - 1)
964	MPASS((hcf & PR_PAGE_MASK(0)) == 0); /* PAGE_SIZE is >= 4K everywhere */
965	for (idx = nitems(pr->pr_page_shift) - 1; idx > 0; idx--) {
966		if ((hcf & PR_PAGE_MASK(idx)) == 0)
967			break;
968	}
969#undef PR_PAGE_MASK
970
971have_pgsz:
972	MPASS(idx <= M_PPOD_PGSZ);
973
974	npages = 1;
975	npages += (end_pva - start_pva) >> pr->pr_page_shift[idx];
976	nppods = howmany(npages, PPOD_PAGES);
977	if (alloc_page_pods(pr, nppods, idx, prsv) != 0)
978		return (ENOMEM);
979	MPASS(prsv->prsv_nppods > 0);
980
981	return (0);
982}
983
984void
985t4_free_page_pods(struct ppod_reservation *prsv)
986{
987	struct ppod_region *pr = prsv->prsv_pr;
988	vmem_addr_t addr;
989
990	MPASS(prsv != NULL);
991	MPASS(prsv->prsv_nppods != 0);
992
993	addr = prsv->prsv_tag & pr->pr_tag_mask;
994	MPASS((addr & pr->pr_invalid_bit) == 0);
995
996	CTR4(KTR_CXGBE, "%-17s arena %p, addr 0x%08x, nppods %d", __func__,
997	    pr->pr_arena, addr, prsv->prsv_nppods);
998
999	vmem_free(pr->pr_arena, addr, PPOD_SZ(prsv->prsv_nppods));
1000	prsv->prsv_nppods = 0;
1001}
1002
1003#define NUM_ULP_TX_SC_IMM_PPODS (256 / PPOD_SIZE)
1004
1005int
1006t4_write_page_pods_for_ps(struct adapter *sc, struct sge_wrq *wrq, int tid,
1007    struct pageset *ps)
1008{
1009	struct wrqe *wr;
1010	struct ulp_mem_io *ulpmc;
1011	struct ulptx_idata *ulpsc;
1012	struct pagepod *ppod;
1013	int i, j, k, n, chunk, len, ddp_pgsz, idx;
1014	u_int ppod_addr;
1015	uint32_t cmd;
1016	struct ppod_reservation *prsv = &ps->prsv;
1017	struct ppod_region *pr = prsv->prsv_pr;
1018
1019	KASSERT(!(ps->flags & PS_PPODS_WRITTEN),
1020	    ("%s: page pods already written", __func__));
1021	MPASS(prsv->prsv_nppods > 0);
1022
1023	cmd = htobe32(V_ULPTX_CMD(ULP_TX_MEM_WRITE));
1024	if (is_t4(sc))
1025		cmd |= htobe32(F_ULP_MEMIO_ORDER);
1026	else
1027		cmd |= htobe32(F_T5_ULP_MEMIO_IMM);
1028	ddp_pgsz = 1 << pr->pr_page_shift[G_PPOD_PGSZ(prsv->prsv_tag)];
1029	ppod_addr = pr->pr_start + (prsv->prsv_tag & pr->pr_tag_mask);
1030	for (i = 0; i < prsv->prsv_nppods; ppod_addr += chunk) {
1031
1032		/* How many page pods are we writing in this cycle */
1033		n = min(prsv->prsv_nppods - i, NUM_ULP_TX_SC_IMM_PPODS);
1034		chunk = PPOD_SZ(n);
1035		len = roundup2(sizeof(*ulpmc) + sizeof(*ulpsc) + chunk, 16);
1036
1037		wr = alloc_wrqe(len, wrq);
1038		if (wr == NULL)
1039			return (ENOMEM);	/* ok to just bail out */
1040		ulpmc = wrtod(wr);
1041
1042		INIT_ULPTX_WR(ulpmc, len, 0, 0);
1043		ulpmc->cmd = cmd;
1044		ulpmc->dlen = htobe32(V_ULP_MEMIO_DATA_LEN(chunk / 32));
1045		ulpmc->len16 = htobe32(howmany(len - sizeof(ulpmc->wr), 16));
1046		ulpmc->lock_addr = htobe32(V_ULP_MEMIO_ADDR(ppod_addr >> 5));
1047
1048		ulpsc = (struct ulptx_idata *)(ulpmc + 1);
1049		ulpsc->cmd_more = htobe32(V_ULPTX_CMD(ULP_TX_SC_IMM));
1050		ulpsc->len = htobe32(chunk);
1051
1052		ppod = (struct pagepod *)(ulpsc + 1);
1053		for (j = 0; j < n; i++, j++, ppod++) {
1054			ppod->vld_tid_pgsz_tag_color = htobe64(F_PPOD_VALID |
1055			    V_PPOD_TID(tid) | prsv->prsv_tag);
1056			ppod->len_offset = htobe64(V_PPOD_LEN(ps->len) |
1057			    V_PPOD_OFST(ps->offset));
1058			ppod->rsvd = 0;
1059			idx = i * PPOD_PAGES * (ddp_pgsz / PAGE_SIZE);
1060			for (k = 0; k < nitems(ppod->addr); k++) {
1061				if (idx < ps->npages) {
1062					ppod->addr[k] =
1063					    htobe64(ps->pages[idx]->phys_addr);
1064					idx += ddp_pgsz / PAGE_SIZE;
1065				} else
1066					ppod->addr[k] = 0;
1067#if 0
1068				CTR5(KTR_CXGBE,
1069				    "%s: tid %d ppod[%d]->addr[%d] = %p",
1070				    __func__, toep->tid, i, k,
1071				    htobe64(ppod->addr[k]));
1072#endif
1073			}
1074
1075		}
1076
1077		t4_wrq_tx(sc, wr);
1078	}
1079	ps->flags |= PS_PPODS_WRITTEN;
1080
1081	return (0);
1082}
1083
1084int
1085t4_write_page_pods_for_buf(struct adapter *sc, struct sge_wrq *wrq, int tid,
1086    struct ppod_reservation *prsv, vm_offset_t buf, int buflen)
1087{
1088	struct wrqe *wr;
1089	struct ulp_mem_io *ulpmc;
1090	struct ulptx_idata *ulpsc;
1091	struct pagepod *ppod;
1092	int i, j, k, n, chunk, len, ddp_pgsz;
1093	u_int ppod_addr, offset;
1094	uint32_t cmd;
1095	struct ppod_region *pr = prsv->prsv_pr;
1096	uintptr_t end_pva, pva, pa;
1097
1098	cmd = htobe32(V_ULPTX_CMD(ULP_TX_MEM_WRITE));
1099	if (is_t4(sc))
1100		cmd |= htobe32(F_ULP_MEMIO_ORDER);
1101	else
1102		cmd |= htobe32(F_T5_ULP_MEMIO_IMM);
1103	ddp_pgsz = 1 << pr->pr_page_shift[G_PPOD_PGSZ(prsv->prsv_tag)];
1104	offset = buf & PAGE_MASK;
1105	ppod_addr = pr->pr_start + (prsv->prsv_tag & pr->pr_tag_mask);
1106	pva = trunc_page(buf);
1107	end_pva = trunc_page(buf + buflen - 1);
1108	for (i = 0; i < prsv->prsv_nppods; ppod_addr += chunk) {
1109
1110		/* How many page pods are we writing in this cycle */
1111		n = min(prsv->prsv_nppods - i, NUM_ULP_TX_SC_IMM_PPODS);
1112		MPASS(n > 0);
1113		chunk = PPOD_SZ(n);
1114		len = roundup2(sizeof(*ulpmc) + sizeof(*ulpsc) + chunk, 16);
1115
1116		wr = alloc_wrqe(len, wrq);
1117		if (wr == NULL)
1118			return (ENOMEM);	/* ok to just bail out */
1119		ulpmc = wrtod(wr);
1120
1121		INIT_ULPTX_WR(ulpmc, len, 0, 0);
1122		ulpmc->cmd = cmd;
1123		ulpmc->dlen = htobe32(V_ULP_MEMIO_DATA_LEN(chunk / 32));
1124		ulpmc->len16 = htobe32(howmany(len - sizeof(ulpmc->wr), 16));
1125		ulpmc->lock_addr = htobe32(V_ULP_MEMIO_ADDR(ppod_addr >> 5));
1126
1127		ulpsc = (struct ulptx_idata *)(ulpmc + 1);
1128		ulpsc->cmd_more = htobe32(V_ULPTX_CMD(ULP_TX_SC_IMM));
1129		ulpsc->len = htobe32(chunk);
1130
1131		ppod = (struct pagepod *)(ulpsc + 1);
1132		for (j = 0; j < n; i++, j++, ppod++) {
1133			ppod->vld_tid_pgsz_tag_color = htobe64(F_PPOD_VALID |
1134			    V_PPOD_TID(tid) |
1135			    (prsv->prsv_tag & ~V_PPOD_PGSZ(M_PPOD_PGSZ)));
1136			ppod->len_offset = htobe64(V_PPOD_LEN(buflen) |
1137			    V_PPOD_OFST(offset));
1138			ppod->rsvd = 0;
1139
1140			for (k = 0; k < nitems(ppod->addr); k++) {
1141				if (pva > end_pva)
1142					ppod->addr[k] = 0;
1143				else {
1144					pa = pmap_kextract(pva);
1145					ppod->addr[k] = htobe64(pa);
1146					pva += ddp_pgsz;
1147				}
1148#if 0
1149				CTR5(KTR_CXGBE,
1150				    "%s: tid %d ppod[%d]->addr[%d] = %p",
1151				    __func__, tid, i, k,
1152				    htobe64(ppod->addr[k]));
1153#endif
1154			}
1155
1156			/*
1157			 * Walk back 1 segment so that the first address in the
1158			 * next pod is the same as the last one in the current
1159			 * pod.
1160			 */
1161			pva -= ddp_pgsz;
1162		}
1163
1164		t4_wrq_tx(sc, wr);
1165	}
1166
1167	MPASS(pva <= end_pva);
1168
1169	return (0);
1170}
1171
1172/*
1173 * Prepare a pageset for DDP.  This sets up page pods.
1174 */
1175static int
1176prep_pageset(struct adapter *sc, struct toepcb *toep, struct pageset *ps)
1177{
1178	struct tom_data *td = sc->tom_softc;
1179
1180	if (ps->prsv.prsv_nppods == 0 &&
1181	    !t4_alloc_page_pods_for_ps(&td->pr, ps)) {
1182		return (0);
1183	}
1184	if (!(ps->flags & PS_PPODS_WRITTEN) &&
1185	    t4_write_page_pods_for_ps(sc, toep->ctrlq, toep->tid, ps) != 0) {
1186		return (0);
1187	}
1188
1189	return (1);
1190}
1191
1192int
1193t4_init_ppod_region(struct ppod_region *pr, struct t4_range *r, u_int psz,
1194    const char *name)
1195{
1196	int i;
1197
1198	MPASS(pr != NULL);
1199	MPASS(r->size > 0);
1200
1201	pr->pr_start = r->start;
1202	pr->pr_len = r->size;
1203	pr->pr_page_shift[0] = 12 + G_HPZ0(psz);
1204	pr->pr_page_shift[1] = 12 + G_HPZ1(psz);
1205	pr->pr_page_shift[2] = 12 + G_HPZ2(psz);
1206	pr->pr_page_shift[3] = 12 + G_HPZ3(psz);
1207
1208	/* The SGL -> page pod algorithm requires the sizes to be in order. */
1209	for (i = 1; i < nitems(pr->pr_page_shift); i++) {
1210		if (pr->pr_page_shift[i] <= pr->pr_page_shift[i - 1])
1211			return (ENXIO);
1212	}
1213
1214	pr->pr_tag_mask = ((1 << fls(r->size)) - 1) & V_PPOD_TAG(M_PPOD_TAG);
1215	pr->pr_alias_mask = V_PPOD_TAG(M_PPOD_TAG) & ~pr->pr_tag_mask;
1216	if (pr->pr_tag_mask == 0 || pr->pr_alias_mask == 0)
1217		return (ENXIO);
1218	pr->pr_alias_shift = fls(pr->pr_tag_mask);
1219	pr->pr_invalid_bit = 1 << (pr->pr_alias_shift - 1);
1220
1221	pr->pr_arena = vmem_create(name, 0, pr->pr_len, PPOD_SIZE, 0,
1222	    M_FIRSTFIT | M_NOWAIT);
1223	if (pr->pr_arena == NULL)
1224		return (ENOMEM);
1225
1226	return (0);
1227}
1228
1229void
1230t4_free_ppod_region(struct ppod_region *pr)
1231{
1232
1233	MPASS(pr != NULL);
1234
1235	if (pr->pr_arena)
1236		vmem_destroy(pr->pr_arena);
1237	bzero(pr, sizeof(*pr));
1238}
1239
1240static int
1241pscmp(struct pageset *ps, struct vmspace *vm, vm_offset_t start, int npages,
1242    int pgoff, int len)
1243{
1244
1245	if (ps->start != start || ps->npages != npages ||
1246	    ps->offset != pgoff || ps->len != len)
1247		return (1);
1248
1249	return (ps->vm != vm || ps->vm_timestamp != vm->vm_map.timestamp);
1250}
1251
1252static int
1253hold_aio(struct toepcb *toep, struct kaiocb *job, struct pageset **pps)
1254{
1255	struct vmspace *vm;
1256	vm_map_t map;
1257	vm_offset_t start, end, pgoff;
1258	struct pageset *ps;
1259	int n;
1260
1261	DDP_ASSERT_LOCKED(toep);
1262
1263	/*
1264	 * The AIO subsystem will cancel and drain all requests before
1265	 * permitting a process to exit or exec, so p_vmspace should
1266	 * be stable here.
1267	 */
1268	vm = job->userproc->p_vmspace;
1269	map = &vm->vm_map;
1270	start = (uintptr_t)job->uaiocb.aio_buf;
1271	pgoff = start & PAGE_MASK;
1272	end = round_page(start + job->uaiocb.aio_nbytes);
1273	start = trunc_page(start);
1274
1275	if (end - start > MAX_DDP_BUFFER_SIZE) {
1276		/*
1277		 * Truncate the request to a short read.
1278		 * Alternatively, we could DDP in chunks to the larger
1279		 * buffer, but that would be quite a bit more work.
1280		 *
1281		 * When truncating, round the request down to avoid
1282		 * crossing a cache line on the final transaction.
1283		 */
1284		end = rounddown2(start + MAX_DDP_BUFFER_SIZE, CACHE_LINE_SIZE);
1285#ifdef VERBOSE_TRACES
1286		CTR4(KTR_CXGBE, "%s: tid %d, truncating size from %lu to %lu",
1287		    __func__, toep->tid, (unsigned long)job->uaiocb.aio_nbytes,
1288		    (unsigned long)(end - (start + pgoff)));
1289		job->uaiocb.aio_nbytes = end - (start + pgoff);
1290#endif
1291		end = round_page(end);
1292	}
1293
1294	n = atop(end - start);
1295
1296	/*
1297	 * Try to reuse a cached pageset.
1298	 */
1299	TAILQ_FOREACH(ps, &toep->ddp.cached_pagesets, link) {
1300		if (pscmp(ps, vm, start, n, pgoff,
1301		    job->uaiocb.aio_nbytes) == 0) {
1302			TAILQ_REMOVE(&toep->ddp.cached_pagesets, ps, link);
1303			toep->ddp.cached_count--;
1304			*pps = ps;
1305			return (0);
1306		}
1307	}
1308
1309	/*
1310	 * If there are too many cached pagesets to create a new one,
1311	 * free a pageset before creating a new one.
1312	 */
1313	KASSERT(toep->ddp.active_count + toep->ddp.cached_count <=
1314	    nitems(toep->ddp.db), ("%s: too many wired pagesets", __func__));
1315	if (toep->ddp.active_count + toep->ddp.cached_count ==
1316	    nitems(toep->ddp.db)) {
1317		KASSERT(toep->ddp.cached_count > 0,
1318		    ("no cached pageset to free"));
1319		ps = TAILQ_LAST(&toep->ddp.cached_pagesets, pagesetq);
1320		TAILQ_REMOVE(&toep->ddp.cached_pagesets, ps, link);
1321		toep->ddp.cached_count--;
1322		free_pageset(toep->td, ps);
1323	}
1324	DDP_UNLOCK(toep);
1325
1326	/* Create a new pageset. */
1327	ps = malloc(sizeof(*ps) + n * sizeof(vm_page_t), M_CXGBE, M_WAITOK |
1328	    M_ZERO);
1329	ps->pages = (vm_page_t *)(ps + 1);
1330	ps->vm_timestamp = map->timestamp;
1331	ps->npages = vm_fault_quick_hold_pages(map, start, end - start,
1332	    VM_PROT_WRITE, ps->pages, n);
1333
1334	DDP_LOCK(toep);
1335	if (ps->npages < 0) {
1336		free(ps, M_CXGBE);
1337		return (EFAULT);
1338	}
1339
1340	KASSERT(ps->npages == n, ("hold_aio: page count mismatch: %d vs %d",
1341	    ps->npages, n));
1342
1343	ps->offset = pgoff;
1344	ps->len = job->uaiocb.aio_nbytes;
1345	refcount_acquire(&vm->vm_refcnt);
1346	ps->vm = vm;
1347	ps->start = start;
1348
1349	CTR5(KTR_CXGBE, "%s: tid %d, new pageset %p for job %p, npages %d",
1350	    __func__, toep->tid, ps, job, ps->npages);
1351	*pps = ps;
1352	return (0);
1353}
1354
1355static void
1356ddp_complete_all(struct toepcb *toep, int error)
1357{
1358	struct kaiocb *job;
1359
1360	DDP_ASSERT_LOCKED(toep);
1361	while (!TAILQ_EMPTY(&toep->ddp.aiojobq)) {
1362		job = TAILQ_FIRST(&toep->ddp.aiojobq);
1363		TAILQ_REMOVE(&toep->ddp.aiojobq, job, list);
1364		toep->ddp.waiting_count--;
1365		if (aio_clear_cancel_function(job))
1366			ddp_complete_one(job, error);
1367	}
1368}
1369
1370static void
1371aio_ddp_cancel_one(struct kaiocb *job)
1372{
1373	long copied;
1374
1375	/*
1376	 * If this job had copied data out of the socket buffer before
1377	 * it was cancelled, report it as a short read rather than an
1378	 * error.
1379	 */
1380	copied = job->aio_received;
1381	if (copied != 0)
1382		aio_complete(job, copied, 0);
1383	else
1384		aio_cancel(job);
1385}
1386
1387/*
1388 * Called when the main loop wants to requeue a job to retry it later.
1389 * Deals with the race of the job being cancelled while it was being
1390 * examined.
1391 */
1392static void
1393aio_ddp_requeue_one(struct toepcb *toep, struct kaiocb *job)
1394{
1395
1396	DDP_ASSERT_LOCKED(toep);
1397	if (!(toep->ddp.flags & DDP_DEAD) &&
1398	    aio_set_cancel_function(job, t4_aio_cancel_queued)) {
1399		TAILQ_INSERT_HEAD(&toep->ddp.aiojobq, job, list);
1400		toep->ddp.waiting_count++;
1401	} else
1402		aio_ddp_cancel_one(job);
1403}
1404
1405static void
1406aio_ddp_requeue(struct toepcb *toep)
1407{
1408	struct adapter *sc = td_adapter(toep->td);
1409	struct socket *so;
1410	struct sockbuf *sb;
1411	struct inpcb *inp;
1412	struct kaiocb *job;
1413	struct ddp_buffer *db;
1414	size_t copied, offset, resid;
1415	struct pageset *ps;
1416	struct mbuf *m;
1417	uint64_t ddp_flags, ddp_flags_mask;
1418	struct wrqe *wr;
1419	int buf_flag, db_idx, error;
1420
1421	DDP_ASSERT_LOCKED(toep);
1422
1423restart:
1424	if (toep->ddp.flags & DDP_DEAD) {
1425		MPASS(toep->ddp.waiting_count == 0);
1426		MPASS(toep->ddp.active_count == 0);
1427		return;
1428	}
1429
1430	if (toep->ddp.waiting_count == 0 ||
1431	    toep->ddp.active_count == nitems(toep->ddp.db)) {
1432		return;
1433	}
1434
1435	job = TAILQ_FIRST(&toep->ddp.aiojobq);
1436	so = job->fd_file->f_data;
1437	sb = &so->so_rcv;
1438	SOCKBUF_LOCK(sb);
1439
1440	/* We will never get anything unless we are or were connected. */
1441	if (!(so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED))) {
1442		SOCKBUF_UNLOCK(sb);
1443		ddp_complete_all(toep, ENOTCONN);
1444		return;
1445	}
1446
1447	KASSERT(toep->ddp.active_count == 0 || sbavail(sb) == 0,
1448	    ("%s: pending sockbuf data and DDP is active", __func__));
1449
1450	/* Abort if socket has reported problems. */
1451	/* XXX: Wait for any queued DDP's to finish and/or flush them? */
1452	if (so->so_error && sbavail(sb) == 0) {
1453		toep->ddp.waiting_count--;
1454		TAILQ_REMOVE(&toep->ddp.aiojobq, job, list);
1455		if (!aio_clear_cancel_function(job)) {
1456			SOCKBUF_UNLOCK(sb);
1457			goto restart;
1458		}
1459
1460		/*
1461		 * If this job has previously copied some data, report
1462		 * a short read and leave the error to be reported by
1463		 * a future request.
1464		 */
1465		copied = job->aio_received;
1466		if (copied != 0) {
1467			SOCKBUF_UNLOCK(sb);
1468			aio_complete(job, copied, 0);
1469			goto restart;
1470		}
1471		error = so->so_error;
1472		so->so_error = 0;
1473		SOCKBUF_UNLOCK(sb);
1474		aio_complete(job, -1, error);
1475		goto restart;
1476	}
1477
1478	/*
1479	 * Door is closed.  If there is pending data in the socket buffer,
1480	 * deliver it.  If there are pending DDP requests, wait for those
1481	 * to complete.  Once they have completed, return EOF reads.
1482	 */
1483	if (sb->sb_state & SBS_CANTRCVMORE && sbavail(sb) == 0) {
1484		SOCKBUF_UNLOCK(sb);
1485		if (toep->ddp.active_count != 0)
1486			return;
1487		ddp_complete_all(toep, 0);
1488		return;
1489	}
1490
1491	/*
1492	 * If DDP is not enabled and there is no pending socket buffer
1493	 * data, try to enable DDP.
1494	 */
1495	if (sbavail(sb) == 0 && (toep->ddp.flags & DDP_ON) == 0) {
1496		SOCKBUF_UNLOCK(sb);
1497
1498		/*
1499		 * Wait for the card to ACK that DDP is enabled before
1500		 * queueing any buffers.  Currently this waits for an
1501		 * indicate to arrive.  This could use a TCB_SET_FIELD_RPL
1502		 * message to know that DDP was enabled instead of waiting
1503		 * for the indicate which would avoid copying the indicate
1504		 * if no data is pending.
1505		 *
1506		 * XXX: Might want to limit the indicate size to the size
1507		 * of the first queued request.
1508		 */
1509		if ((toep->ddp.flags & DDP_SC_REQ) == 0)
1510			enable_ddp(sc, toep);
1511		return;
1512	}
1513	SOCKBUF_UNLOCK(sb);
1514
1515	/*
1516	 * If another thread is queueing a buffer for DDP, let it
1517	 * drain any work and return.
1518	 */
1519	if (toep->ddp.queueing != NULL)
1520		return;
1521
1522	/* Take the next job to prep it for DDP. */
1523	toep->ddp.waiting_count--;
1524	TAILQ_REMOVE(&toep->ddp.aiojobq, job, list);
1525	if (!aio_clear_cancel_function(job))
1526		goto restart;
1527	toep->ddp.queueing = job;
1528
1529	/* NB: This drops DDP_LOCK while it holds the backing VM pages. */
1530	error = hold_aio(toep, job, &ps);
1531	if (error != 0) {
1532		ddp_complete_one(job, error);
1533		toep->ddp.queueing = NULL;
1534		goto restart;
1535	}
1536
1537	SOCKBUF_LOCK(sb);
1538	if (so->so_error && sbavail(sb) == 0) {
1539		copied = job->aio_received;
1540		if (copied != 0) {
1541			SOCKBUF_UNLOCK(sb);
1542			recycle_pageset(toep, ps);
1543			aio_complete(job, copied, 0);
1544			toep->ddp.queueing = NULL;
1545			goto restart;
1546		}
1547
1548		error = so->so_error;
1549		so->so_error = 0;
1550		SOCKBUF_UNLOCK(sb);
1551		recycle_pageset(toep, ps);
1552		aio_complete(job, -1, error);
1553		toep->ddp.queueing = NULL;
1554		goto restart;
1555	}
1556
1557	if (sb->sb_state & SBS_CANTRCVMORE && sbavail(sb) == 0) {
1558		SOCKBUF_UNLOCK(sb);
1559		recycle_pageset(toep, ps);
1560		if (toep->ddp.active_count != 0) {
1561			/*
1562			 * The door is closed, but there are still pending
1563			 * DDP buffers.  Requeue.  These jobs will all be
1564			 * completed once those buffers drain.
1565			 */
1566			aio_ddp_requeue_one(toep, job);
1567			toep->ddp.queueing = NULL;
1568			return;
1569		}
1570		ddp_complete_one(job, 0);
1571		ddp_complete_all(toep, 0);
1572		toep->ddp.queueing = NULL;
1573		return;
1574	}
1575
1576sbcopy:
1577	/*
1578	 * If the toep is dead, there shouldn't be any data in the socket
1579	 * buffer, so the above case should have handled this.
1580	 */
1581	MPASS(!(toep->ddp.flags & DDP_DEAD));
1582
1583	/*
1584	 * If there is pending data in the socket buffer (either
1585	 * from before the requests were queued or a DDP indicate),
1586	 * copy those mbufs out directly.
1587	 */
1588	copied = 0;
1589	offset = ps->offset + job->aio_received;
1590	MPASS(job->aio_received <= job->uaiocb.aio_nbytes);
1591	resid = job->uaiocb.aio_nbytes - job->aio_received;
1592	m = sb->sb_mb;
1593	KASSERT(m == NULL || toep->ddp.active_count == 0,
1594	    ("%s: sockbuf data with active DDP", __func__));
1595	while (m != NULL && resid > 0) {
1596		struct iovec iov[1];
1597		struct uio uio;
1598		int error;
1599
1600		iov[0].iov_base = mtod(m, void *);
1601		iov[0].iov_len = m->m_len;
1602		if (iov[0].iov_len > resid)
1603			iov[0].iov_len = resid;
1604		uio.uio_iov = iov;
1605		uio.uio_iovcnt = 1;
1606		uio.uio_offset = 0;
1607		uio.uio_resid = iov[0].iov_len;
1608		uio.uio_segflg = UIO_SYSSPACE;
1609		uio.uio_rw = UIO_WRITE;
1610		error = uiomove_fromphys(ps->pages, offset + copied,
1611		    uio.uio_resid, &uio);
1612		MPASS(error == 0 && uio.uio_resid == 0);
1613		copied += uio.uio_offset;
1614		resid -= uio.uio_offset;
1615		m = m->m_next;
1616	}
1617	if (copied != 0) {
1618		sbdrop_locked(sb, copied);
1619		job->aio_received += copied;
1620		job->msgrcv = 1;
1621		copied = job->aio_received;
1622		inp = sotoinpcb(so);
1623		if (!INP_TRY_WLOCK(inp)) {
1624			/*
1625			 * The reference on the socket file descriptor in
1626			 * the AIO job should keep 'sb' and 'inp' stable.
1627			 * Our caller has a reference on the 'toep' that
1628			 * keeps it stable.
1629			 */
1630			SOCKBUF_UNLOCK(sb);
1631			DDP_UNLOCK(toep);
1632			INP_WLOCK(inp);
1633			DDP_LOCK(toep);
1634			SOCKBUF_LOCK(sb);
1635
1636			/*
1637			 * If the socket has been closed, we should detect
1638			 * that and complete this request if needed on
1639			 * the next trip around the loop.
1640			 */
1641		}
1642		t4_rcvd_locked(&toep->td->tod, intotcpcb(inp));
1643		INP_WUNLOCK(inp);
1644		if (resid == 0 || toep->ddp.flags & DDP_DEAD) {
1645			/*
1646			 * We filled the entire buffer with socket
1647			 * data, DDP is not being used, or the socket
1648			 * is being shut down, so complete the
1649			 * request.
1650			 */
1651			SOCKBUF_UNLOCK(sb);
1652			recycle_pageset(toep, ps);
1653			aio_complete(job, copied, 0);
1654			toep->ddp.queueing = NULL;
1655			goto restart;
1656		}
1657
1658		/*
1659		 * If DDP is not enabled, requeue this request and restart.
1660		 * This will either enable DDP or wait for more data to
1661		 * arrive on the socket buffer.
1662		 */
1663		if ((toep->ddp.flags & (DDP_ON | DDP_SC_REQ)) != DDP_ON) {
1664			SOCKBUF_UNLOCK(sb);
1665			recycle_pageset(toep, ps);
1666			aio_ddp_requeue_one(toep, job);
1667			toep->ddp.queueing = NULL;
1668			goto restart;
1669		}
1670
1671		/*
1672		 * An indicate might have arrived and been added to
1673		 * the socket buffer while it was unlocked after the
1674		 * copy to lock the INP.  If so, restart the copy.
1675		 */
1676		if (sbavail(sb) != 0)
1677			goto sbcopy;
1678	}
1679	SOCKBUF_UNLOCK(sb);
1680
1681	if (prep_pageset(sc, toep, ps) == 0) {
1682		recycle_pageset(toep, ps);
1683		aio_ddp_requeue_one(toep, job);
1684		toep->ddp.queueing = NULL;
1685
1686		/*
1687		 * XXX: Need to retry this later.  Mostly need a trigger
1688		 * when page pods are freed up.
1689		 */
1690		printf("%s: prep_pageset failed\n", __func__);
1691		return;
1692	}
1693
1694	/* Determine which DDP buffer to use. */
1695	if (toep->ddp.db[0].job == NULL) {
1696		db_idx = 0;
1697	} else {
1698		MPASS(toep->ddp.db[1].job == NULL);
1699		db_idx = 1;
1700	}
1701
1702	ddp_flags = 0;
1703	ddp_flags_mask = 0;
1704	if (db_idx == 0) {
1705		ddp_flags |= V_TF_DDP_BUF0_VALID(1);
1706		if (so->so_state & SS_NBIO)
1707			ddp_flags |= V_TF_DDP_BUF0_FLUSH(1);
1708		ddp_flags_mask |= V_TF_DDP_PSH_NO_INVALIDATE0(1) |
1709		    V_TF_DDP_PUSH_DISABLE_0(1) | V_TF_DDP_PSHF_ENABLE_0(1) |
1710		    V_TF_DDP_BUF0_FLUSH(1) | V_TF_DDP_BUF0_VALID(1);
1711		buf_flag = DDP_BUF0_ACTIVE;
1712	} else {
1713		ddp_flags |= V_TF_DDP_BUF1_VALID(1);
1714		if (so->so_state & SS_NBIO)
1715			ddp_flags |= V_TF_DDP_BUF1_FLUSH(1);
1716		ddp_flags_mask |= V_TF_DDP_PSH_NO_INVALIDATE1(1) |
1717		    V_TF_DDP_PUSH_DISABLE_1(1) | V_TF_DDP_PSHF_ENABLE_1(1) |
1718		    V_TF_DDP_BUF1_FLUSH(1) | V_TF_DDP_BUF1_VALID(1);
1719		buf_flag = DDP_BUF1_ACTIVE;
1720	}
1721	MPASS((toep->ddp.flags & buf_flag) == 0);
1722	if ((toep->ddp.flags & (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE)) == 0) {
1723		MPASS(db_idx == 0);
1724		MPASS(toep->ddp.active_id == -1);
1725		MPASS(toep->ddp.active_count == 0);
1726		ddp_flags_mask |= V_TF_DDP_ACTIVE_BUF(1);
1727	}
1728
1729	/*
1730	 * The TID for this connection should still be valid.  If DDP_DEAD
1731	 * is set, SBS_CANTRCVMORE should be set, so we shouldn't be
1732	 * this far anyway.  Even if the socket is closing on the other
1733	 * end, the AIO job holds a reference on this end of the socket
1734	 * which will keep it open and keep the TCP PCB attached until
1735	 * after the job is completed.
1736	 */
1737	wr = mk_update_tcb_for_ddp(sc, toep, db_idx, ps, job->aio_received,
1738	    ddp_flags, ddp_flags_mask);
1739	if (wr == NULL) {
1740		recycle_pageset(toep, ps);
1741		aio_ddp_requeue_one(toep, job);
1742		toep->ddp.queueing = NULL;
1743
1744		/*
1745		 * XXX: Need a way to kick a retry here.
1746		 *
1747		 * XXX: We know the fixed size needed and could
1748		 * preallocate this using a blocking request at the
1749		 * start of the task to avoid having to handle this
1750		 * edge case.
1751		 */
1752		printf("%s: mk_update_tcb_for_ddp failed\n", __func__);
1753		return;
1754	}
1755
1756	if (!aio_set_cancel_function(job, t4_aio_cancel_active)) {
1757		free_wrqe(wr);
1758		recycle_pageset(toep, ps);
1759		aio_ddp_cancel_one(job);
1760		toep->ddp.queueing = NULL;
1761		goto restart;
1762	}
1763
1764#ifdef VERBOSE_TRACES
1765	CTR6(KTR_CXGBE,
1766	    "%s: tid %u, scheduling %p for DDP[%d] (flags %#lx/%#lx)", __func__,
1767	    toep->tid, job, db_idx, ddp_flags, ddp_flags_mask);
1768#endif
1769	/* Give the chip the go-ahead. */
1770	t4_wrq_tx(sc, wr);
1771	db = &toep->ddp.db[db_idx];
1772	db->cancel_pending = 0;
1773	db->job = job;
1774	db->ps = ps;
1775	toep->ddp.queueing = NULL;
1776	toep->ddp.flags |= buf_flag;
1777	toep->ddp.active_count++;
1778	if (toep->ddp.active_count == 1) {
1779		MPASS(toep->ddp.active_id == -1);
1780		toep->ddp.active_id = db_idx;
1781		CTR2(KTR_CXGBE, "%s: ddp_active_id = %d", __func__,
1782		    toep->ddp.active_id);
1783	}
1784	goto restart;
1785}
1786
1787void
1788ddp_queue_toep(struct toepcb *toep)
1789{
1790
1791	DDP_ASSERT_LOCKED(toep);
1792	if (toep->ddp.flags & DDP_TASK_ACTIVE)
1793		return;
1794	toep->ddp.flags |= DDP_TASK_ACTIVE;
1795	hold_toepcb(toep);
1796	soaio_enqueue(&toep->ddp.requeue_task);
1797}
1798
1799static void
1800aio_ddp_requeue_task(void *context, int pending)
1801{
1802	struct toepcb *toep = context;
1803
1804	DDP_LOCK(toep);
1805	aio_ddp_requeue(toep);
1806	toep->ddp.flags &= ~DDP_TASK_ACTIVE;
1807	DDP_UNLOCK(toep);
1808
1809	free_toepcb(toep);
1810}
1811
1812static void
1813t4_aio_cancel_active(struct kaiocb *job)
1814{
1815	struct socket *so = job->fd_file->f_data;
1816	struct tcpcb *tp = so_sototcpcb(so);
1817	struct toepcb *toep = tp->t_toe;
1818	struct adapter *sc = td_adapter(toep->td);
1819	uint64_t valid_flag;
1820	int i;
1821
1822	DDP_LOCK(toep);
1823	if (aio_cancel_cleared(job)) {
1824		DDP_UNLOCK(toep);
1825		aio_ddp_cancel_one(job);
1826		return;
1827	}
1828
1829	for (i = 0; i < nitems(toep->ddp.db); i++) {
1830		if (toep->ddp.db[i].job == job) {
1831			/* Should only ever get one cancel request for a job. */
1832			MPASS(toep->ddp.db[i].cancel_pending == 0);
1833
1834			/*
1835			 * Invalidate this buffer.  It will be
1836			 * cancelled or partially completed once the
1837			 * card ACKs the invalidate.
1838			 */
1839			valid_flag = i == 0 ? V_TF_DDP_BUF0_VALID(1) :
1840			    V_TF_DDP_BUF1_VALID(1);
1841			t4_set_tcb_field(sc, toep->ctrlq, toep,
1842			    W_TCB_RX_DDP_FLAGS, valid_flag, 0, 1,
1843			    CPL_COOKIE_DDP0 + i);
1844			toep->ddp.db[i].cancel_pending = 1;
1845			CTR2(KTR_CXGBE, "%s: request %p marked pending",
1846			    __func__, job);
1847			break;
1848		}
1849	}
1850	DDP_UNLOCK(toep);
1851}
1852
1853static void
1854t4_aio_cancel_queued(struct kaiocb *job)
1855{
1856	struct socket *so = job->fd_file->f_data;
1857	struct tcpcb *tp = so_sototcpcb(so);
1858	struct toepcb *toep = tp->t_toe;
1859
1860	DDP_LOCK(toep);
1861	if (!aio_cancel_cleared(job)) {
1862		TAILQ_REMOVE(&toep->ddp.aiojobq, job, list);
1863		toep->ddp.waiting_count--;
1864		if (toep->ddp.waiting_count == 0)
1865			ddp_queue_toep(toep);
1866	}
1867	CTR2(KTR_CXGBE, "%s: request %p cancelled", __func__, job);
1868	DDP_UNLOCK(toep);
1869
1870	aio_ddp_cancel_one(job);
1871}
1872
1873int
1874t4_aio_queue_ddp(struct socket *so, struct kaiocb *job)
1875{
1876	struct tcpcb *tp = so_sototcpcb(so);
1877	struct toepcb *toep = tp->t_toe;
1878
1879
1880	/* Ignore writes. */
1881	if (job->uaiocb.aio_lio_opcode != LIO_READ)
1882		return (EOPNOTSUPP);
1883
1884	DDP_LOCK(toep);
1885
1886	/*
1887	 * XXX: Think about possibly returning errors for ENOTCONN,
1888	 * etc.  Perhaps the caller would only queue the request
1889	 * if it failed with EOPNOTSUPP?
1890	 */
1891
1892#ifdef VERBOSE_TRACES
1893	CTR3(KTR_CXGBE, "%s: queueing %p for tid %u", __func__, job, toep->tid);
1894#endif
1895	if (!aio_set_cancel_function(job, t4_aio_cancel_queued))
1896		panic("new job was cancelled");
1897	TAILQ_INSERT_TAIL(&toep->ddp.aiojobq, job, list);
1898	toep->ddp.waiting_count++;
1899	toep->ddp.flags |= DDP_OK;
1900
1901	/*
1902	 * Try to handle this request synchronously.  If this has
1903	 * to block because the task is running, it will just bail
1904	 * and let the task handle it instead.
1905	 */
1906	aio_ddp_requeue(toep);
1907	DDP_UNLOCK(toep);
1908	return (0);
1909}
1910
1911void
1912t4_ddp_mod_load(void)
1913{
1914
1915	t4_register_shared_cpl_handler(CPL_SET_TCB_RPL, do_ddp_tcb_rpl,
1916	    CPL_COOKIE_DDP0);
1917	t4_register_shared_cpl_handler(CPL_SET_TCB_RPL, do_ddp_tcb_rpl,
1918	    CPL_COOKIE_DDP1);
1919	t4_register_cpl_handler(CPL_RX_DATA_DDP, do_rx_data_ddp);
1920	t4_register_cpl_handler(CPL_RX_DDP_COMPLETE, do_rx_ddp_complete);
1921	TAILQ_INIT(&ddp_orphan_pagesets);
1922	mtx_init(&ddp_orphan_pagesets_lock, "ddp orphans", NULL, MTX_DEF);
1923	TASK_INIT(&ddp_orphan_task, 0, ddp_free_orphan_pagesets, NULL);
1924}
1925
1926void
1927t4_ddp_mod_unload(void)
1928{
1929
1930	taskqueue_drain(taskqueue_thread, &ddp_orphan_task);
1931	MPASS(TAILQ_EMPTY(&ddp_orphan_pagesets));
1932	mtx_destroy(&ddp_orphan_pagesets_lock);
1933	t4_register_shared_cpl_handler(CPL_SET_TCB_RPL, NULL, CPL_COOKIE_DDP0);
1934	t4_register_shared_cpl_handler(CPL_SET_TCB_RPL, NULL, CPL_COOKIE_DDP1);
1935	t4_register_cpl_handler(CPL_RX_DATA_DDP, NULL);
1936	t4_register_cpl_handler(CPL_RX_DDP_COMPLETE, NULL);
1937}
1938#endif
1939