1// SPDX-License-Identifier: GPL-2.0
2/*
3 * Copyright (c) 2020 Oracle. All rights reserved.
4 */
5
6#include <linux/sunrpc/svc_rdma.h>
7#include <linux/sunrpc/rpc_rdma.h>
8
9#include "xprt_rdma.h"
10#include <trace/events/rpcrdma.h>
11
12/**
13 * pcl_free - Release all memory associated with a parsed chunk list
14 * @pcl: parsed chunk list
15 *
16 */
17void pcl_free(struct svc_rdma_pcl *pcl)
18{
19	while (!list_empty(&pcl->cl_chunks)) {
20		struct svc_rdma_chunk *chunk;
21
22		chunk = pcl_first_chunk(pcl);
23		list_del(&chunk->ch_list);
24		kfree(chunk);
25	}
26}
27
28static struct svc_rdma_chunk *pcl_alloc_chunk(u32 segcount, u32 position)
29{
30	struct svc_rdma_chunk *chunk;
31
32	chunk = kmalloc(struct_size(chunk, ch_segments, segcount), GFP_KERNEL);
33	if (!chunk)
34		return NULL;
35
36	chunk->ch_position = position;
37	chunk->ch_length = 0;
38	chunk->ch_payload_length = 0;
39	chunk->ch_segcount = 0;
40	return chunk;
41}
42
43static struct svc_rdma_chunk *
44pcl_lookup_position(struct svc_rdma_pcl *pcl, u32 position)
45{
46	struct svc_rdma_chunk *pos;
47
48	pcl_for_each_chunk(pos, pcl) {
49		if (pos->ch_position == position)
50			return pos;
51	}
52	return NULL;
53}
54
55static void pcl_insert_position(struct svc_rdma_pcl *pcl,
56				struct svc_rdma_chunk *chunk)
57{
58	struct svc_rdma_chunk *pos;
59
60	pcl_for_each_chunk(pos, pcl) {
61		if (pos->ch_position > chunk->ch_position)
62			break;
63	}
64	__list_add(&chunk->ch_list, pos->ch_list.prev, &pos->ch_list);
65	pcl->cl_count++;
66}
67
68static void pcl_set_read_segment(const struct svc_rdma_recv_ctxt *rctxt,
69				 struct svc_rdma_chunk *chunk,
70				 u32 handle, u32 length, u64 offset)
71{
72	struct svc_rdma_segment *segment;
73
74	segment = &chunk->ch_segments[chunk->ch_segcount];
75	segment->rs_handle = handle;
76	segment->rs_length = length;
77	segment->rs_offset = offset;
78
79	trace_svcrdma_decode_rseg(&rctxt->rc_cid, chunk, segment);
80
81	chunk->ch_length += length;
82	chunk->ch_segcount++;
83}
84
85/**
86 * pcl_alloc_call - Construct a parsed chunk list for the Call body
87 * @rctxt: Ingress receive context
88 * @p: Start of an un-decoded Read list
89 *
90 * Assumptions:
91 * - The incoming Read list has already been sanity checked.
92 * - cl_count is already set to the number of segments in
93 *   the un-decoded list.
94 * - The list might not be in order by position.
95 *
96 * Return values:
97 *       %true: Parsed chunk list was successfully constructed, and
98 *              cl_count is updated to be the number of chunks (ie.
99 *              unique positions) in the Read list.
100 *      %false: Memory allocation failed.
101 */
102bool pcl_alloc_call(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
103{
104	struct svc_rdma_pcl *pcl = &rctxt->rc_call_pcl;
105	unsigned int i, segcount = pcl->cl_count;
106
107	pcl->cl_count = 0;
108	for (i = 0; i < segcount; i++) {
109		struct svc_rdma_chunk *chunk;
110		u32 position, handle, length;
111		u64 offset;
112
113		p++;	/* skip the list discriminator */
114		p = xdr_decode_read_segment(p, &position, &handle,
115					    &length, &offset);
116		if (position != 0)
117			continue;
118
119		if (pcl_is_empty(pcl)) {
120			chunk = pcl_alloc_chunk(segcount, position);
121			if (!chunk)
122				return false;
123			pcl_insert_position(pcl, chunk);
124		} else {
125			chunk = list_first_entry(&pcl->cl_chunks,
126						 struct svc_rdma_chunk,
127						 ch_list);
128		}
129
130		pcl_set_read_segment(rctxt, chunk, handle, length, offset);
131	}
132
133	return true;
134}
135
136/**
137 * pcl_alloc_read - Construct a parsed chunk list for normal Read chunks
138 * @rctxt: Ingress receive context
139 * @p: Start of an un-decoded Read list
140 *
141 * Assumptions:
142 * - The incoming Read list has already been sanity checked.
143 * - cl_count is already set to the number of segments in
144 *   the un-decoded list.
145 * - The list might not be in order by position.
146 *
147 * Return values:
148 *       %true: Parsed chunk list was successfully constructed, and
149 *              cl_count is updated to be the number of chunks (ie.
150 *              unique position values) in the Read list.
151 *      %false: Memory allocation failed.
152 *
153 * TODO:
154 * - Check for chunk range overlaps
155 */
156bool pcl_alloc_read(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
157{
158	struct svc_rdma_pcl *pcl = &rctxt->rc_read_pcl;
159	unsigned int i, segcount = pcl->cl_count;
160
161	pcl->cl_count = 0;
162	for (i = 0; i < segcount; i++) {
163		struct svc_rdma_chunk *chunk;
164		u32 position, handle, length;
165		u64 offset;
166
167		p++;	/* skip the list discriminator */
168		p = xdr_decode_read_segment(p, &position, &handle,
169					    &length, &offset);
170		if (position == 0)
171			continue;
172
173		chunk = pcl_lookup_position(pcl, position);
174		if (!chunk) {
175			chunk = pcl_alloc_chunk(segcount, position);
176			if (!chunk)
177				return false;
178			pcl_insert_position(pcl, chunk);
179		}
180
181		pcl_set_read_segment(rctxt, chunk, handle, length, offset);
182	}
183
184	return true;
185}
186
187/**
188 * pcl_alloc_write - Construct a parsed chunk list from a Write list
189 * @rctxt: Ingress receive context
190 * @pcl: Parsed chunk list to populate
191 * @p: Start of an un-decoded Write list
192 *
193 * Assumptions:
194 * - The incoming Write list has already been sanity checked, and
195 * - cl_count is set to the number of chunks in the un-decoded list.
196 *
197 * Return values:
198 *       %true: Parsed chunk list was successfully constructed.
199 *      %false: Memory allocation failed.
200 */
201bool pcl_alloc_write(struct svc_rdma_recv_ctxt *rctxt,
202		     struct svc_rdma_pcl *pcl, __be32 *p)
203{
204	struct svc_rdma_segment *segment;
205	struct svc_rdma_chunk *chunk;
206	unsigned int i, j;
207	u32 segcount;
208
209	for (i = 0; i < pcl->cl_count; i++) {
210		p++;	/* skip the list discriminator */
211		segcount = be32_to_cpup(p++);
212
213		chunk = pcl_alloc_chunk(segcount, 0);
214		if (!chunk)
215			return false;
216		list_add_tail(&chunk->ch_list, &pcl->cl_chunks);
217
218		for (j = 0; j < segcount; j++) {
219			segment = &chunk->ch_segments[j];
220			p = xdr_decode_rdma_segment(p, &segment->rs_handle,
221						    &segment->rs_length,
222						    &segment->rs_offset);
223			trace_svcrdma_decode_wseg(&rctxt->rc_cid, chunk, j);
224
225			chunk->ch_length += segment->rs_length;
226			chunk->ch_segcount++;
227		}
228	}
229	return true;
230}
231
232static int pcl_process_region(const struct xdr_buf *xdr,
233			      unsigned int offset, unsigned int length,
234			      int (*actor)(const struct xdr_buf *, void *),
235			      void *data)
236{
237	struct xdr_buf subbuf;
238
239	if (!length)
240		return 0;
241	if (xdr_buf_subsegment(xdr, &subbuf, offset, length))
242		return -EMSGSIZE;
243	return actor(&subbuf, data);
244}
245
246/**
247 * pcl_process_nonpayloads - Process non-payload regions inside @xdr
248 * @pcl: Chunk list to process
249 * @xdr: xdr_buf to process
250 * @actor: Function to invoke on each non-payload region
251 * @data: Arguments for @actor
252 *
253 * This mechanism must ignore not only result payloads that were already
254 * sent via RDMA Write, but also XDR padding for those payloads that
255 * the upper layer has added.
256 *
257 * Assumptions:
258 *  The xdr->len and ch_position fields are aligned to 4-byte multiples.
259 *
260 * Returns:
261 *   On success, zero,
262 *   %-EMSGSIZE on XDR buffer overflow, or
263 *   The return value of @actor
264 */
265int pcl_process_nonpayloads(const struct svc_rdma_pcl *pcl,
266			    const struct xdr_buf *xdr,
267			    int (*actor)(const struct xdr_buf *, void *),
268			    void *data)
269{
270	struct svc_rdma_chunk *chunk, *next;
271	unsigned int start;
272	int ret;
273
274	chunk = pcl_first_chunk(pcl);
275
276	/* No result payloads were generated */
277	if (!chunk || !chunk->ch_payload_length)
278		return actor(xdr, data);
279
280	/* Process the region before the first result payload */
281	ret = pcl_process_region(xdr, 0, chunk->ch_position, actor, data);
282	if (ret < 0)
283		return ret;
284
285	/* Process the regions between each middle result payload */
286	while ((next = pcl_next_chunk(pcl, chunk))) {
287		if (!next->ch_payload_length)
288			break;
289
290		start = pcl_chunk_end_offset(chunk);
291		ret = pcl_process_region(xdr, start, next->ch_position - start,
292					 actor, data);
293		if (ret < 0)
294			return ret;
295
296		chunk = next;
297	}
298
299	/* Process the region after the last result payload */
300	start = pcl_chunk_end_offset(chunk);
301	ret = pcl_process_region(xdr, start, xdr->len - start, actor, data);
302	if (ret < 0)
303		return ret;
304
305	return 0;
306}
307