1/*
2 *   BSD LICENSE
3 *
4 *   Copyright(c) 2017 Cavium, Inc.. All rights reserved.
5 *   All rights reserved.
6 *
7 *   Redistribution and use in source and binary forms, with or without
8 *   modification, are permitted provided that the following conditions
9 *   are met:
10 *
11 *     * Redistributions of source code must retain the above copyright
12 *       notice, this list of conditions and the following disclaimer.
13 *     * Redistributions in binary form must reproduce the above copyright
14 *       notice, this list of conditions and the following disclaimer in
15 *       the documentation and/or other materials provided with the
16 *       distribution.
17 *     * Neither the name of Cavium, Inc. nor the names of its
18 *       contributors may be used to endorse or promote products derived
19 *       from this software without specific prior written permission.
20 *
21 *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 *   OWNER(S) OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33/*$FreeBSD: stable/11/sys/dev/liquidio/base/lio_droq.c 325618 2017-11-09 19:52:56Z sbruno $*/
34
35#include "lio_bsd.h"
36#include "lio_common.h"
37#include "lio_droq.h"
38#include "lio_iq.h"
39#include "lio_response_manager.h"
40#include "lio_device.h"
41#include "lio_main.h"
42#include "cn23xx_pf_device.h"
43#include "lio_network.h"
44
45struct __dispatch {
46	struct lio_stailq_node	node;
47	struct lio_recv_info	*rinfo;
48	lio_dispatch_fn_t	disp_fn;
49};
50
51void	*lio_get_dispatch_arg(struct octeon_device *oct,
52			      uint16_t opcode, uint16_t subcode);
53
54/*
55 *  Get the argument that the user set when registering dispatch
56 *  function for a given opcode/subcode.
57 *  @param  octeon_dev - the octeon device pointer.
58 *  @param  opcode     - the opcode for which the dispatch argument
59 *                       is to be checked.
60 *  @param  subcode    - the subcode for which the dispatch argument
61 *                       is to be checked.
62 *  @return  Success: void * (argument to the dispatch function)
63 *  @return  Failure: NULL
64 *
65 */
66void   *
67lio_get_dispatch_arg(struct octeon_device *octeon_dev,
68		     uint16_t opcode, uint16_t subcode)
69{
70	struct lio_stailq_node	*dispatch;
71	void			*fn_arg = NULL;
72	int			idx;
73	uint16_t		combined_opcode;
74
75	combined_opcode = LIO_OPCODE_SUBCODE(opcode, subcode);
76
77	idx = combined_opcode & LIO_OPCODE_MASK;
78
79	mtx_lock(&octeon_dev->dispatch.lock);
80
81	if (octeon_dev->dispatch.count == 0) {
82		mtx_unlock(&octeon_dev->dispatch.lock);
83		return (NULL);
84	}
85
86	if (octeon_dev->dispatch.dlist[idx].opcode == combined_opcode) {
87		fn_arg = octeon_dev->dispatch.dlist[idx].arg;
88	} else {
89		STAILQ_FOREACH(dispatch,
90			       &octeon_dev->dispatch.dlist[idx].head, entries) {
91			if (((struct lio_dispatch *)dispatch)->opcode ==
92			    combined_opcode) {
93				fn_arg = ((struct lio_dispatch *)dispatch)->arg;
94				break;
95			}
96		}
97	}
98
99	mtx_unlock(&octeon_dev->dispatch.lock);
100	return (fn_arg);
101}
102
103/*
104 *  Check for packets on Droq. This function should be called with lock held.
105 *  @param  droq - Droq on which count is checked.
106 *  @return Returns packet count.
107 */
108uint32_t
109lio_droq_check_hw_for_pkts(struct lio_droq *droq)
110{
111	struct octeon_device	*oct = droq->oct_dev;
112	uint32_t		last_count;
113	uint32_t		pkt_count = 0;
114
115	pkt_count = lio_read_csr32(oct, droq->pkts_sent_reg);
116
117	last_count = pkt_count - droq->pkt_count;
118	droq->pkt_count = pkt_count;
119
120	/* we shall write to cnts at the end of processing */
121	if (last_count)
122		atomic_add_int(&droq->pkts_pending, last_count);
123
124	return (last_count);
125}
126
127static void
128lio_droq_compute_max_packet_bufs(struct lio_droq *droq)
129{
130	uint32_t	count = 0;
131
132	/*
133	 * max_empty_descs is the max. no. of descs that can have no buffers.
134	 * If the empty desc count goes beyond this value, we cannot safely
135	 * read in a 64K packet sent by Octeon
136	 * (64K is max pkt size from Octeon)
137	 */
138	droq->max_empty_descs = 0;
139
140	do {
141		droq->max_empty_descs++;
142		count += droq->buffer_size;
143	} while (count < (64 * 1024));
144
145	droq->max_empty_descs = droq->max_count - droq->max_empty_descs;
146}
147
148static void
149lio_droq_reset_indices(struct lio_droq *droq)
150{
151
152	droq->read_idx = 0;
153	droq->refill_idx = 0;
154	droq->refill_count = 0;
155	atomic_store_rel_int(&droq->pkts_pending, 0);
156}
157
158static void
159lio_droq_destroy_ring_buffers(struct octeon_device *oct,
160			      struct lio_droq *droq)
161{
162	uint32_t	i;
163
164	for (i = 0; i < droq->max_count; i++) {
165		if (droq->recv_buf_list[i].buffer != NULL) {
166			lio_recv_buffer_free(droq->recv_buf_list[i].buffer);
167			droq->recv_buf_list[i].buffer = NULL;
168		}
169	}
170
171	lio_droq_reset_indices(droq);
172}
173
174static int
175lio_droq_setup_ring_buffers(struct octeon_device *oct,
176			    struct lio_droq *droq)
177{
178	struct lio_droq_desc	*desc_ring = droq->desc_ring;
179	void			*buf;
180	uint32_t		i;
181
182	for (i = 0; i < droq->max_count; i++) {
183		buf = lio_recv_buffer_alloc(droq->buffer_size);
184
185		if (buf == NULL) {
186			lio_dev_err(oct, "%s buffer alloc failed\n",
187				    __func__);
188			droq->stats.rx_alloc_failure++;
189			return (-ENOMEM);
190		}
191
192		droq->recv_buf_list[i].buffer = buf;
193		droq->recv_buf_list[i].data = ((struct mbuf *)buf)->m_data;
194		desc_ring[i].info_ptr = 0;
195		desc_ring[i].buffer_ptr =
196			lio_map_ring(oct->device, droq->recv_buf_list[i].buffer,
197				     droq->buffer_size);
198	}
199
200	lio_droq_reset_indices(droq);
201
202	lio_droq_compute_max_packet_bufs(droq);
203
204	return (0);
205}
206
207int
208lio_delete_droq(struct octeon_device *oct, uint32_t q_no)
209{
210	struct lio_droq	*droq = oct->droq[q_no];
211
212	lio_dev_dbg(oct, "%s[%d]\n", __func__, q_no);
213
214	while (taskqueue_cancel(droq->droq_taskqueue, &droq->droq_task, NULL))
215		taskqueue_drain(droq->droq_taskqueue, &droq->droq_task);
216
217	taskqueue_free(droq->droq_taskqueue);
218	droq->droq_taskqueue = NULL;
219
220	lio_droq_destroy_ring_buffers(oct, droq);
221	free(droq->recv_buf_list, M_DEVBUF);
222
223	if (droq->desc_ring != NULL)
224		lio_dma_free((droq->max_count * LIO_DROQ_DESC_SIZE),
225			     droq->desc_ring);
226
227	oct->io_qmask.oq &= ~(1ULL << q_no);
228	bzero(oct->droq[q_no], sizeof(struct lio_droq));
229	oct->num_oqs--;
230
231	return (0);
232}
233
234void
235lio_droq_bh(void *ptr, int pending __unused)
236{
237	struct lio_droq		*droq = ptr;
238	struct octeon_device	*oct = droq->oct_dev;
239	struct lio_instr_queue	*iq = oct->instr_queue[droq->q_no];
240	int	reschedule, tx_done = 1;
241
242	reschedule = lio_droq_process_packets(oct, droq, oct->rx_budget);
243
244	if (atomic_load_acq_int(&iq->instr_pending))
245		tx_done = lio_flush_iq(oct, iq, oct->tx_budget);
246
247	if (reschedule || !tx_done)
248		taskqueue_enqueue(droq->droq_taskqueue, &droq->droq_task);
249	else
250		lio_enable_irq(droq, iq);
251}
252
253int
254lio_init_droq(struct octeon_device *oct, uint32_t q_no,
255	      uint32_t num_descs, uint32_t desc_size, void *app_ctx)
256{
257	struct lio_droq	*droq;
258	unsigned long	size;
259	uint32_t	c_buf_size = 0, c_num_descs = 0, c_pkts_per_intr = 0;
260	uint32_t	c_refill_threshold = 0, desc_ring_size = 0;
261
262	lio_dev_dbg(oct, "%s[%d]\n", __func__, q_no);
263
264	droq = oct->droq[q_no];
265	bzero(droq, LIO_DROQ_SIZE);
266
267	droq->oct_dev = oct;
268	droq->q_no = q_no;
269	if (app_ctx != NULL)
270		droq->app_ctx = app_ctx;
271	else
272		droq->app_ctx = (void *)(size_t)q_no;
273
274	c_num_descs = num_descs;
275	c_buf_size = desc_size;
276	if (LIO_CN23XX_PF(oct)) {
277		struct lio_config *conf23 = LIO_CHIP_CONF(oct, cn23xx_pf);
278
279		c_pkts_per_intr =
280			(uint32_t)LIO_GET_OQ_PKTS_PER_INTR_CFG(conf23);
281		c_refill_threshold =
282			(uint32_t)LIO_GET_OQ_REFILL_THRESHOLD_CFG(conf23);
283	} else {
284		return (1);
285	}
286
287	droq->max_count = c_num_descs;
288	droq->buffer_size = c_buf_size;
289
290	desc_ring_size = droq->max_count * LIO_DROQ_DESC_SIZE;
291	droq->desc_ring = lio_dma_alloc(desc_ring_size, &droq->desc_ring_dma);
292	if (droq->desc_ring == NULL) {
293		lio_dev_err(oct, "Output queue %d ring alloc failed\n", q_no);
294		return (1);
295	}
296
297	lio_dev_dbg(oct, "droq[%d]: desc_ring: virt: 0x%p, dma: %llx\n", q_no,
298		    droq->desc_ring, LIO_CAST64(droq->desc_ring_dma));
299	lio_dev_dbg(oct, "droq[%d]: num_desc: %d\n", q_no, droq->max_count);
300
301	size = droq->max_count * LIO_DROQ_RECVBUF_SIZE;
302	droq->recv_buf_list =
303		(struct lio_recv_buffer *)malloc(size, M_DEVBUF,
304						 M_NOWAIT | M_ZERO);
305	if (droq->recv_buf_list == NULL) {
306		lio_dev_err(oct, "Output queue recv buf list alloc failed\n");
307		goto init_droq_fail;
308	}
309
310	if (lio_droq_setup_ring_buffers(oct, droq))
311		goto init_droq_fail;
312
313	droq->pkts_per_intr = c_pkts_per_intr;
314	droq->refill_threshold = c_refill_threshold;
315
316	lio_dev_dbg(oct, "DROQ INIT: max_empty_descs: %d\n",
317		    droq->max_empty_descs);
318
319	mtx_init(&droq->lock, "droq_lock", NULL, MTX_DEF);
320
321	STAILQ_INIT(&droq->dispatch_stq_head);
322
323	oct->fn_list.setup_oq_regs(oct, q_no);
324
325	oct->io_qmask.oq |= BIT_ULL(q_no);
326
327	/*
328	 * Initialize the taskqueue that handles
329	 * output queue packet processing.
330	 */
331	lio_dev_dbg(oct, "Initializing droq%d taskqueue\n", q_no);
332	TASK_INIT(&droq->droq_task, 0, lio_droq_bh, (void *)droq);
333
334	droq->droq_taskqueue = taskqueue_create_fast("lio_droq_task", M_NOWAIT,
335						     taskqueue_thread_enqueue,
336						     &droq->droq_taskqueue);
337	taskqueue_start_threads_cpuset(&droq->droq_taskqueue, 1, PI_NET,
338				       &oct->ioq_vector[q_no].affinity_mask,
339				       "lio%d_droq%d_task", oct->octeon_id,
340				       q_no);
341
342	return (0);
343
344init_droq_fail:
345	lio_delete_droq(oct, q_no);
346	return (1);
347}
348
349/*
350 * lio_create_recv_info
351 * Parameters:
352 *  octeon_dev - pointer to the octeon device structure
353 *  droq       - droq in which the packet arrived.
354 *  buf_cnt    - no. of buffers used by the packet.
355 *  idx        - index in the descriptor for the first buffer in the packet.
356 * Description:
357 *  Allocates a recv_info_t and copies the buffer addresses for packet data
358 *  into the recv_pkt space which starts at an 8B offset from recv_info_t.
359 *  Flags the descriptors for refill later. If available descriptors go
360 *  below the threshold to receive a 64K pkt, new buffers are first allocated
361 *  before the recv_pkt_t is created.
362 *  This routine will be called in interrupt context.
363 * Returns:
364 *  Success: Pointer to recv_info_t
365 *  Failure: NULL.
366 * Locks:
367 *  The droq->lock is held when this routine is called.
368 */
369static inline struct lio_recv_info *
370lio_create_recv_info(struct octeon_device *octeon_dev, struct lio_droq *droq,
371		     uint32_t buf_cnt, uint32_t idx)
372{
373	struct lio_droq_info	*info;
374	struct lio_recv_pkt	*recv_pkt;
375	struct lio_recv_info	*recv_info;
376	uint32_t		bytes_left, i;
377
378	info = (struct lio_droq_info *)droq->recv_buf_list[idx].data;
379
380	recv_info = lio_alloc_recv_info(sizeof(struct __dispatch));
381	if (recv_info == NULL)
382		return (NULL);
383
384	recv_pkt = recv_info->recv_pkt;
385	recv_pkt->rh = info->rh;
386	recv_pkt->length = (uint32_t)info->length;
387	recv_pkt->buffer_count = (uint16_t)buf_cnt;
388	recv_pkt->octeon_id = (uint16_t)octeon_dev->octeon_id;
389
390	i = 0;
391	bytes_left = (uint32_t)info->length;
392
393	while (buf_cnt) {
394		recv_pkt->buffer_size[i] = (bytes_left >= droq->buffer_size) ?
395			droq->buffer_size : bytes_left;
396
397		recv_pkt->buffer_ptr[i] = droq->recv_buf_list[idx].buffer;
398		droq->recv_buf_list[idx].buffer = NULL;
399
400		idx = lio_incr_index(idx, 1, droq->max_count);
401		bytes_left -= droq->buffer_size;
402		i++;
403		buf_cnt--;
404	}
405
406	return (recv_info);
407}
408
409/*
410 * If we were not able to refill all buffers, try to move around
411 * the buffers that were not dispatched.
412 */
413static inline uint32_t
414lio_droq_refill_pullup_descs(struct lio_droq *droq,
415			     struct lio_droq_desc *desc_ring)
416{
417	uint32_t	desc_refilled = 0;
418	uint32_t	refill_index = droq->refill_idx;
419
420	while (refill_index != droq->read_idx) {
421		if (droq->recv_buf_list[refill_index].buffer != NULL) {
422			droq->recv_buf_list[droq->refill_idx].buffer =
423				droq->recv_buf_list[refill_index].buffer;
424			droq->recv_buf_list[droq->refill_idx].data =
425				droq->recv_buf_list[refill_index].data;
426			desc_ring[droq->refill_idx].buffer_ptr =
427				desc_ring[refill_index].buffer_ptr;
428			droq->recv_buf_list[refill_index].buffer = NULL;
429			desc_ring[refill_index].buffer_ptr = 0;
430			do {
431				droq->refill_idx =
432					lio_incr_index(droq->refill_idx, 1,
433						       droq->max_count);
434				desc_refilled++;
435				droq->refill_count--;
436			} while (droq->recv_buf_list[droq->refill_idx].buffer !=
437				 NULL);
438		}
439		refill_index = lio_incr_index(refill_index, 1, droq->max_count);
440	}	/* while */
441	return (desc_refilled);
442}
443
444/*
445 * lio_droq_refill
446 * Parameters:
447 *  droq       - droq in which descriptors require new buffers.
448 * Description:
449 *  Called during normal DROQ processing in interrupt mode or by the poll
450 *  thread to refill the descriptors from which buffers were dispatched
451 *  to upper layers. Attempts to allocate new buffers. If that fails, moves
452 *  up buffers (that were not dispatched) to form a contiguous ring.
453 * Returns:
454 *  No of descriptors refilled.
455 * Locks:
456 *  This routine is called with droq->lock held.
457 */
458uint32_t
459lio_droq_refill(struct octeon_device *octeon_dev, struct lio_droq *droq)
460{
461	struct lio_droq_desc	*desc_ring;
462	void			*buf = NULL;
463	uint32_t		desc_refilled = 0;
464	uint8_t			*data;
465
466	desc_ring = droq->desc_ring;
467
468	while (droq->refill_count && (desc_refilled < droq->max_count)) {
469		/*
470		 * If a valid buffer exists (happens if there is no dispatch),
471		 * reuse
472		 * the buffer, else allocate.
473		 */
474		if (droq->recv_buf_list[droq->refill_idx].buffer == NULL) {
475			buf = lio_recv_buffer_alloc(droq->buffer_size);
476			/*
477			 * If a buffer could not be allocated, no point in
478			 * continuing
479			 */
480			if (buf == NULL) {
481				droq->stats.rx_alloc_failure++;
482				break;
483			}
484
485			droq->recv_buf_list[droq->refill_idx].buffer = buf;
486			data = ((struct mbuf *)buf)->m_data;
487		} else {
488			data = ((struct mbuf *)droq->recv_buf_list
489				[droq->refill_idx].buffer)->m_data;
490		}
491
492		droq->recv_buf_list[droq->refill_idx].data = data;
493
494		desc_ring[droq->refill_idx].buffer_ptr =
495		    lio_map_ring(octeon_dev->device,
496				 droq->recv_buf_list[droq->refill_idx].buffer,
497				 droq->buffer_size);
498
499		droq->refill_idx = lio_incr_index(droq->refill_idx, 1,
500						  droq->max_count);
501		desc_refilled++;
502		droq->refill_count--;
503	}
504
505	if (droq->refill_count)
506		desc_refilled += lio_droq_refill_pullup_descs(droq, desc_ring);
507
508	/*
509	 * if droq->refill_count
510	 * The refill count would not change in pass two. We only moved buffers
511	 * to close the gap in the ring, but we would still have the same no. of
512	 * buffers to refill.
513	 */
514	return (desc_refilled);
515}
516
517static inline uint32_t
518lio_droq_get_bufcount(uint32_t buf_size, uint32_t total_len)
519{
520
521	return ((total_len + buf_size - 1) / buf_size);
522}
523
524static int
525lio_droq_dispatch_pkt(struct octeon_device *oct, struct lio_droq *droq,
526		      union octeon_rh *rh, struct lio_droq_info *info)
527{
528	struct lio_recv_info	*rinfo;
529	lio_dispatch_fn_t	disp_fn;
530	uint32_t		cnt;
531
532	cnt = lio_droq_get_bufcount(droq->buffer_size, (uint32_t)info->length);
533
534	disp_fn = lio_get_dispatch(oct, (uint16_t)rh->r.opcode,
535				   (uint16_t)rh->r.subcode);
536	if (disp_fn) {
537		rinfo = lio_create_recv_info(oct, droq, cnt, droq->read_idx);
538		if (rinfo != NULL) {
539			struct __dispatch *rdisp = rinfo->rsvd;
540
541			rdisp->rinfo = rinfo;
542			rdisp->disp_fn = disp_fn;
543			rinfo->recv_pkt->rh = *rh;
544			STAILQ_INSERT_TAIL(&droq->dispatch_stq_head,
545					   &rdisp->node, entries);
546		} else {
547			droq->stats.dropped_nomem++;
548		}
549	} else {
550		lio_dev_err(oct, "DROQ: No dispatch function (opcode %u/%u)\n",
551			    (unsigned int)rh->r.opcode,
552			    (unsigned int)rh->r.subcode);
553		droq->stats.dropped_nodispatch++;
554	}
555
556	return (cnt);
557}
558
559static inline void
560lio_droq_drop_packets(struct octeon_device *oct, struct lio_droq *droq,
561		      uint32_t cnt)
562{
563	struct lio_droq_info	*info;
564	uint32_t		i = 0, buf_cnt;
565
566	for (i = 0; i < cnt; i++) {
567		info = (struct lio_droq_info *)
568			droq->recv_buf_list[droq->read_idx].data;
569
570		lio_swap_8B_data((uint64_t *)info, 2);
571
572		if (info->length) {
573			info->length += 8;
574			droq->stats.bytes_received += info->length;
575			buf_cnt = lio_droq_get_bufcount(droq->buffer_size,
576							(uint32_t)info->length);
577		} else {
578			lio_dev_err(oct, "DROQ: In drop: pkt with len 0\n");
579			buf_cnt = 1;
580		}
581
582		droq->read_idx = lio_incr_index(droq->read_idx, buf_cnt,
583						droq->max_count);
584		droq->refill_count += buf_cnt;
585	}
586}
587
588static uint32_t
589lio_droq_fast_process_packets(struct octeon_device *oct, struct lio_droq *droq,
590			      uint32_t pkts_to_process)
591{
592	struct lio_droq_info	*info;
593	union			octeon_rh *rh;
594	uint32_t		pkt, pkt_count, total_len = 0;
595
596	pkt_count = pkts_to_process;
597
598	for (pkt = 0; pkt < pkt_count; pkt++) {
599		struct mbuf	*nicbuf = NULL;
600		uint32_t	pkt_len = 0;
601
602		info = (struct lio_droq_info *)
603		    droq->recv_buf_list[droq->read_idx].data;
604
605		lio_swap_8B_data((uint64_t *)info, 2);
606
607		if (!info->length) {
608			lio_dev_err(oct,
609				    "DROQ[%d] idx: %d len:0, pkt_cnt: %d\n",
610				    droq->q_no, droq->read_idx, pkt_count);
611			hexdump((uint8_t *)info, LIO_DROQ_INFO_SIZE, NULL,
612				HD_OMIT_CHARS);
613			pkt++;
614			lio_incr_index(droq->read_idx, 1, droq->max_count);
615			droq->refill_count++;
616			break;
617		}
618
619		rh = &info->rh;
620
621		info->length += 8;
622		rh->r_dh.len += (LIO_DROQ_INFO_SIZE + 7) / 8;
623
624		total_len += (uint32_t)info->length;
625		if (lio_opcode_slow_path(rh)) {
626			uint32_t	buf_cnt;
627
628			buf_cnt = lio_droq_dispatch_pkt(oct, droq, rh, info);
629			droq->read_idx = lio_incr_index(droq->read_idx,	buf_cnt,
630							droq->max_count);
631			droq->refill_count += buf_cnt;
632		} else {
633			if (info->length <= droq->buffer_size) {
634				pkt_len = (uint32_t)info->length;
635				nicbuf = droq->recv_buf_list[
636						       droq->read_idx].buffer;
637				nicbuf->m_len = pkt_len;
638				droq->recv_buf_list[droq->read_idx].buffer =
639					NULL;
640
641				droq->read_idx =
642					lio_incr_index(droq->read_idx,
643						       1, droq->max_count);
644				droq->refill_count++;
645			} else {
646				bool	secondary_frag = false;
647
648				pkt_len = 0;
649
650				while (pkt_len < info->length) {
651					int	frag_len, idx = droq->read_idx;
652					struct mbuf	*buffer;
653
654					frag_len =
655						((pkt_len + droq->buffer_size) >
656						 info->length) ?
657						((uint32_t)info->length -
658						 pkt_len) : droq->buffer_size;
659
660					buffer = ((struct mbuf *)
661						  droq->recv_buf_list[idx].
662						  buffer);
663					buffer->m_len = frag_len;
664					if (__predict_true(secondary_frag)) {
665						m_cat(nicbuf, buffer);
666					} else {
667						nicbuf = buffer;
668						secondary_frag = true;
669					}
670
671					droq->recv_buf_list[droq->read_idx].
672						buffer = NULL;
673
674					pkt_len += frag_len;
675					droq->read_idx =
676						lio_incr_index(droq->read_idx,
677							       1,
678							       droq->max_count);
679					droq->refill_count++;
680				}
681			}
682
683			if (nicbuf != NULL) {
684				if (droq->ops.fptr != NULL) {
685					droq->ops.fptr(nicbuf, pkt_len, rh,
686						       droq, droq->ops.farg);
687				} else {
688					lio_recv_buffer_free(nicbuf);
689				}
690			}
691		}
692
693		if (droq->refill_count >= droq->refill_threshold) {
694			int desc_refilled = lio_droq_refill(oct, droq);
695
696			/*
697			 * Flush the droq descriptor data to memory to be sure
698			 * that when we update the credits the data in memory
699			 * is accurate.
700			 */
701			wmb();
702			lio_write_csr32(oct, droq->pkts_credit_reg,
703					desc_refilled);
704			/* make sure mmio write completes */
705			__compiler_membar();
706		}
707	}	/* for (each packet)... */
708
709	/* Increment refill_count by the number of buffers processed. */
710	droq->stats.pkts_received += pkt;
711	droq->stats.bytes_received += total_len;
712
713	tcp_lro_flush_all(&droq->lro);
714
715	if ((droq->ops.drop_on_max) && (pkts_to_process - pkt)) {
716		lio_droq_drop_packets(oct, droq, (pkts_to_process - pkt));
717
718		droq->stats.dropped_toomany += (pkts_to_process - pkt);
719		return (pkts_to_process);
720	}
721
722	return (pkt);
723}
724
725int
726lio_droq_process_packets(struct octeon_device *oct, struct lio_droq *droq,
727			 uint32_t budget)
728{
729	struct lio_stailq_node	*tmp, *tmp2;
730	uint32_t		pkt_count = 0, pkts_processed = 0;
731
732	/* Grab the droq lock */
733	mtx_lock(&droq->lock);
734
735	lio_droq_check_hw_for_pkts(droq);
736	pkt_count = atomic_load_acq_int(&droq->pkts_pending);
737
738	if (!pkt_count) {
739		mtx_unlock(&droq->lock);
740		return (0);
741	}
742	if (pkt_count > budget)
743		pkt_count = budget;
744
745	pkts_processed = lio_droq_fast_process_packets(oct, droq, pkt_count);
746
747	atomic_subtract_int(&droq->pkts_pending, pkts_processed);
748
749	/* Release the lock */
750	mtx_unlock(&droq->lock);
751
752	STAILQ_FOREACH_SAFE(tmp, &droq->dispatch_stq_head, entries, tmp2) {
753		struct __dispatch *rdisp = (struct __dispatch *)tmp;
754
755		STAILQ_REMOVE_HEAD(&droq->dispatch_stq_head, entries);
756		rdisp->disp_fn(rdisp->rinfo, lio_get_dispatch_arg(oct,
757			(uint16_t)rdisp->rinfo->recv_pkt->rh.r.opcode,
758			(uint16_t)rdisp->rinfo->recv_pkt->rh.r.subcode));
759	}
760
761	/* If there are packets pending. schedule tasklet again */
762	if (atomic_load_acq_int(&droq->pkts_pending))
763		return (1);
764
765	return (0);
766}
767
768int
769lio_register_droq_ops(struct octeon_device *oct, uint32_t q_no,
770		      struct lio_droq_ops *ops)
771{
772	struct lio_droq		*droq;
773	struct lio_config	*lio_cfg = NULL;
774
775	lio_cfg = lio_get_conf(oct);
776
777	if (lio_cfg == NULL)
778		return (-EINVAL);
779
780	if (ops == NULL) {
781		lio_dev_err(oct, "%s: droq_ops pointer is NULL\n", __func__);
782		return (-EINVAL);
783	}
784
785	if (q_no >= LIO_GET_OQ_MAX_Q_CFG(lio_cfg)) {
786		lio_dev_err(oct, "%s: droq id (%d) exceeds MAX (%d)\n",
787			    __func__, q_no, (oct->num_oqs - 1));
788		return (-EINVAL);
789	}
790	droq = oct->droq[q_no];
791
792	mtx_lock(&droq->lock);
793
794	memcpy(&droq->ops, ops, sizeof(struct lio_droq_ops));
795
796	mtx_unlock(&droq->lock);
797
798	return (0);
799}
800
801int
802lio_unregister_droq_ops(struct octeon_device *oct, uint32_t q_no)
803{
804	struct lio_droq		*droq;
805	struct lio_config	*lio_cfg = NULL;
806
807	lio_cfg = lio_get_conf(oct);
808
809	if (lio_cfg == NULL)
810		return (-EINVAL);
811
812	if (q_no >= LIO_GET_OQ_MAX_Q_CFG(lio_cfg)) {
813		lio_dev_err(oct, "%s: droq id (%d) exceeds MAX (%d)\n",
814			    __func__, q_no, oct->num_oqs - 1);
815		return (-EINVAL);
816	}
817
818	droq = oct->droq[q_no];
819
820	if (droq == NULL) {
821		lio_dev_info(oct, "Droq id (%d) not available.\n", q_no);
822		return (0);
823	}
824
825	mtx_lock(&droq->lock);
826
827	droq->ops.fptr = NULL;
828	droq->ops.farg = NULL;
829	droq->ops.drop_on_max = 0;
830
831	mtx_unlock(&droq->lock);
832
833	return (0);
834}
835
836int
837lio_create_droq(struct octeon_device *oct, uint32_t q_no, uint32_t num_descs,
838		uint32_t desc_size, void *app_ctx)
839{
840
841	if (oct->droq[q_no]->oct_dev != NULL) {
842		lio_dev_dbg(oct, "Droq already in use. Cannot create droq %d again\n",
843			    q_no);
844		return (1);
845	}
846
847	/* Initialize the Droq */
848	if (lio_init_droq(oct, q_no, num_descs, desc_size, app_ctx)) {
849		bzero(oct->droq[q_no], sizeof(struct lio_droq));
850		goto create_droq_fail;
851	}
852
853	oct->num_oqs++;
854
855	lio_dev_dbg(oct, "%s: Total number of OQ: %d\n", __func__,
856		    oct->num_oqs);
857
858	/* Global Droq register settings */
859
860	/*
861	 * As of now not required, as setting are done for all 32 Droqs at
862	 * the same time.
863	 */
864	return (0);
865
866create_droq_fail:
867	return (-ENOMEM);
868}
869