1// SPDX-License-Identifier: GPL-2.0-or-later
2/* RxRPC Tx data buffering.
3 *
4 * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
5 * Written by David Howells (dhowells@redhat.com)
6 */
7
8#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10#include <linux/slab.h>
11#include "ar-internal.h"
12
13static atomic_t rxrpc_txbuf_debug_ids;
14atomic_t rxrpc_nr_txbuf;
15
16/*
17 * Allocate and partially initialise a data transmission buffer.
18 */
19struct rxrpc_txbuf *rxrpc_alloc_data_txbuf(struct rxrpc_call *call, size_t data_size,
20					   size_t data_align, gfp_t gfp)
21{
22	struct rxrpc_wire_header *whdr;
23	struct rxrpc_txbuf *txb;
24	size_t total, hoff = 0;
25	void *buf;
26
27	txb = kmalloc(sizeof(*txb), gfp);
28	if (!txb)
29		return NULL;
30
31	if (data_align)
32		hoff = round_up(sizeof(*whdr), data_align) - sizeof(*whdr);
33	total = hoff + sizeof(*whdr) + data_size;
34
35	mutex_lock(&call->conn->tx_data_alloc_lock);
36	buf = __page_frag_alloc_align(&call->conn->tx_data_alloc, total, gfp,
37				      ~(data_align - 1) & ~(L1_CACHE_BYTES - 1));
38	mutex_unlock(&call->conn->tx_data_alloc_lock);
39	if (!buf) {
40		kfree(txb);
41		return NULL;
42	}
43
44	whdr = buf + hoff;
45
46	INIT_LIST_HEAD(&txb->call_link);
47	INIT_LIST_HEAD(&txb->tx_link);
48	refcount_set(&txb->ref, 1);
49	txb->last_sent		= KTIME_MIN;
50	txb->call_debug_id	= call->debug_id;
51	txb->debug_id		= atomic_inc_return(&rxrpc_txbuf_debug_ids);
52	txb->space		= data_size;
53	txb->len		= 0;
54	txb->offset		= sizeof(*whdr);
55	txb->flags		= call->conn->out_clientflag;
56	txb->ack_why		= 0;
57	txb->seq		= call->tx_prepared + 1;
58	txb->serial		= 0;
59	txb->cksum		= 0;
60	txb->nr_kvec		= 1;
61	txb->kvec[0].iov_base	= whdr;
62	txb->kvec[0].iov_len	= sizeof(*whdr);
63
64	whdr->epoch		= htonl(call->conn->proto.epoch);
65	whdr->cid		= htonl(call->cid);
66	whdr->callNumber	= htonl(call->call_id);
67	whdr->seq		= htonl(txb->seq);
68	whdr->type		= RXRPC_PACKET_TYPE_DATA;
69	whdr->flags		= 0;
70	whdr->userStatus	= 0;
71	whdr->securityIndex	= call->security_ix;
72	whdr->_rsvd		= 0;
73	whdr->serviceId		= htons(call->dest_srx.srx_service);
74
75	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 1,
76			  rxrpc_txbuf_alloc_data);
77
78	atomic_inc(&rxrpc_nr_txbuf);
79	return txb;
80}
81
82/*
83 * Allocate and partially initialise an ACK packet.
84 */
85struct rxrpc_txbuf *rxrpc_alloc_ack_txbuf(struct rxrpc_call *call, size_t sack_size)
86{
87	struct rxrpc_wire_header *whdr;
88	struct rxrpc_acktrailer *trailer;
89	struct rxrpc_ackpacket *ack;
90	struct rxrpc_txbuf *txb;
91	gfp_t gfp = rcu_read_lock_held() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS;
92	void *buf, *buf2 = NULL;
93	u8 *filler;
94
95	txb = kmalloc(sizeof(*txb), gfp);
96	if (!txb)
97		return NULL;
98
99	buf = page_frag_alloc(&call->local->tx_alloc,
100			      sizeof(*whdr) + sizeof(*ack) + 1 + 3 + sizeof(*trailer), gfp);
101	if (!buf) {
102		kfree(txb);
103		return NULL;
104	}
105
106	if (sack_size) {
107		buf2 = page_frag_alloc(&call->local->tx_alloc, sack_size, gfp);
108		if (!buf2) {
109			page_frag_free(buf);
110			kfree(txb);
111			return NULL;
112		}
113	}
114
115	whdr	= buf;
116	ack	= buf + sizeof(*whdr);
117	filler	= buf + sizeof(*whdr) + sizeof(*ack) + 1;
118	trailer	= buf + sizeof(*whdr) + sizeof(*ack) + 1 + 3;
119
120	INIT_LIST_HEAD(&txb->call_link);
121	INIT_LIST_HEAD(&txb->tx_link);
122	refcount_set(&txb->ref, 1);
123	txb->call_debug_id	= call->debug_id;
124	txb->debug_id		= atomic_inc_return(&rxrpc_txbuf_debug_ids);
125	txb->space		= 0;
126	txb->len		= sizeof(*whdr) + sizeof(*ack) + 3 + sizeof(*trailer);
127	txb->offset		= 0;
128	txb->flags		= call->conn->out_clientflag;
129	txb->ack_rwind		= 0;
130	txb->seq		= 0;
131	txb->serial		= 0;
132	txb->cksum		= 0;
133	txb->nr_kvec		= 3;
134	txb->kvec[0].iov_base	= whdr;
135	txb->kvec[0].iov_len	= sizeof(*whdr) + sizeof(*ack);
136	txb->kvec[1].iov_base	= buf2;
137	txb->kvec[1].iov_len	= sack_size;
138	txb->kvec[2].iov_base	= filler;
139	txb->kvec[2].iov_len	= 3 + sizeof(*trailer);
140
141	whdr->epoch		= htonl(call->conn->proto.epoch);
142	whdr->cid		= htonl(call->cid);
143	whdr->callNumber	= htonl(call->call_id);
144	whdr->seq		= 0;
145	whdr->type		= RXRPC_PACKET_TYPE_ACK;
146	whdr->flags		= 0;
147	whdr->userStatus	= 0;
148	whdr->securityIndex	= call->security_ix;
149	whdr->_rsvd		= 0;
150	whdr->serviceId		= htons(call->dest_srx.srx_service);
151
152	get_page(virt_to_head_page(trailer));
153
154	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 1,
155			  rxrpc_txbuf_alloc_ack);
156	atomic_inc(&rxrpc_nr_txbuf);
157	return txb;
158}
159
160void rxrpc_get_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
161{
162	int r;
163
164	__refcount_inc(&txb->ref, &r);
165	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r + 1, what);
166}
167
168void rxrpc_see_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
169{
170	int r = refcount_read(&txb->ref);
171
172	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r, what);
173}
174
175static void rxrpc_free_txbuf(struct rxrpc_txbuf *txb)
176{
177	int i;
178
179	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 0,
180			  rxrpc_txbuf_free);
181	for (i = 0; i < txb->nr_kvec; i++)
182		if (txb->kvec[i].iov_base)
183			page_frag_free(txb->kvec[i].iov_base);
184	kfree(txb);
185	atomic_dec(&rxrpc_nr_txbuf);
186}
187
188void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
189{
190	unsigned int debug_id, call_debug_id;
191	rxrpc_seq_t seq;
192	bool dead;
193	int r;
194
195	if (txb) {
196		debug_id = txb->debug_id;
197		call_debug_id = txb->call_debug_id;
198		seq = txb->seq;
199		dead = __refcount_dec_and_test(&txb->ref, &r);
200		trace_rxrpc_txbuf(debug_id, call_debug_id, seq, r - 1, what);
201		if (dead)
202			rxrpc_free_txbuf(txb);
203	}
204}
205
206/*
207 * Shrink the transmit buffer.
208 */
209void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
210{
211	struct rxrpc_txbuf *txb;
212	rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack);
213	bool wake = false;
214
215	_enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top);
216
217	while ((txb = list_first_entry_or_null(&call->tx_buffer,
218					       struct rxrpc_txbuf, call_link))) {
219		hard_ack = smp_load_acquire(&call->acks_hard_ack);
220		if (before(hard_ack, txb->seq))
221			break;
222
223		if (txb->seq != call->tx_bottom + 1)
224			rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step);
225		ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
226		smp_store_release(&call->tx_bottom, call->tx_bottom + 1);
227		list_del_rcu(&txb->call_link);
228
229		trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue);
230
231		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated);
232		if (after(call->acks_hard_ack, call->tx_bottom + 128))
233			wake = true;
234	}
235
236	if (wake)
237		wake_up(&call->waitq);
238}
239