1/*	$NetBSD: bufferevent_pair.c,v 1.6 2020/05/25 20:47:33 christos Exp $	*/
2
3/*
4 * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 *    derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28#include "event2/event-config.h"
29#include "evconfig-private.h"
30
31#include <sys/types.h>
32
33#ifdef _WIN32
34#include <winsock2.h>
35#endif
36
37#include "event2/util.h"
38#include "event2/buffer.h"
39#include "event2/bufferevent.h"
40#include "event2/bufferevent_struct.h"
41#include "event2/event.h"
42#include "defer-internal.h"
43#include "bufferevent-internal.h"
44#include "mm-internal.h"
45#include "util-internal.h"
46
47struct bufferevent_pair {
48	struct bufferevent_private bev;
49	struct bufferevent_pair *partner;
50	/* For ->destruct() lock checking */
51	struct bufferevent_pair *unlinked_partner;
52};
53
54
55/* Given a bufferevent that's really a bev part of a bufferevent_pair,
56 * return that bufferevent_filtered. Returns NULL otherwise.*/
57static inline struct bufferevent_pair *
58upcast(struct bufferevent *bev)
59{
60	struct bufferevent_pair *bev_p;
61	if (bev->be_ops != &bufferevent_ops_pair)
62		return NULL;
63	bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev);
64	EVUTIL_ASSERT(bev_p->bev.bev.be_ops == &bufferevent_ops_pair);
65	return bev_p;
66}
67
68#define downcast(bev_pair) (&(bev_pair)->bev.bev)
69
70static inline void
71incref_and_lock(struct bufferevent *b)
72{
73	struct bufferevent_pair *bevp;
74	bufferevent_incref_and_lock_(b);
75	bevp = upcast(b);
76	if (bevp->partner)
77		bufferevent_incref_and_lock_(downcast(bevp->partner));
78}
79
80static inline void
81decref_and_unlock(struct bufferevent *b)
82{
83	struct bufferevent_pair *bevp = upcast(b);
84	if (bevp->partner)
85		bufferevent_decref_and_unlock_(downcast(bevp->partner));
86	bufferevent_decref_and_unlock_(b);
87}
88
89/* XXX Handle close */
90
91static void be_pair_outbuf_cb(struct evbuffer *,
92    const struct evbuffer_cb_info *, void *);
93
94static struct bufferevent_pair *
95bufferevent_pair_elt_new(struct event_base *base,
96    int options)
97{
98	struct bufferevent_pair *bufev;
99	if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair))))
100		return NULL;
101	if (bufferevent_init_common_(&bufev->bev, base, &bufferevent_ops_pair,
102		options)) {
103		mm_free(bufev);
104		return NULL;
105	}
106	if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) {
107		bufferevent_free(downcast(bufev));
108		return NULL;
109	}
110
111	bufferevent_init_generic_timeout_cbs_(&bufev->bev.bev);
112
113	return bufev;
114}
115
116int
117bufferevent_pair_new(struct event_base *base, int options,
118    struct bufferevent *pair[2])
119{
120	struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
121	int tmp_options;
122
123	options |= BEV_OPT_DEFER_CALLBACKS;
124	tmp_options = options & ~BEV_OPT_THREADSAFE;
125
126	bufev1 = bufferevent_pair_elt_new(base, options);
127	if (!bufev1)
128		return -1;
129	bufev2 = bufferevent_pair_elt_new(base, tmp_options);
130	if (!bufev2) {
131		bufferevent_free(downcast(bufev1));
132		return -1;
133	}
134
135	if (options & BEV_OPT_THREADSAFE) {
136		/*XXXX check return */
137		bufferevent_enable_locking_(downcast(bufev2), bufev1->bev.lock);
138	}
139
140	bufev1->partner = bufev2;
141	bufev2->partner = bufev1;
142
143	evbuffer_freeze(downcast(bufev1)->input, 0);
144	evbuffer_freeze(downcast(bufev1)->output, 1);
145	evbuffer_freeze(downcast(bufev2)->input, 0);
146	evbuffer_freeze(downcast(bufev2)->output, 1);
147
148	pair[0] = downcast(bufev1);
149	pair[1] = downcast(bufev2);
150
151	return 0;
152}
153
154static void
155be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
156    int ignore_wm)
157{
158	size_t dst_size;
159	size_t n;
160
161	evbuffer_unfreeze(src->output, 1);
162	evbuffer_unfreeze(dst->input, 0);
163
164	if (dst->wm_read.high) {
165		dst_size = evbuffer_get_length(dst->input);
166		if (dst_size < dst->wm_read.high) {
167			n = dst->wm_read.high - dst_size;
168			evbuffer_remove_buffer(src->output, dst->input, n);
169		} else {
170			if (!ignore_wm)
171				goto done;
172			n = evbuffer_get_length(src->output);
173			evbuffer_add_buffer(dst->input, src->output);
174		}
175	} else {
176		n = evbuffer_get_length(src->output);
177		evbuffer_add_buffer(dst->input, src->output);
178	}
179
180	if (n) {
181		BEV_RESET_GENERIC_READ_TIMEOUT(dst);
182
183		if (evbuffer_get_length(dst->output))
184			BEV_RESET_GENERIC_WRITE_TIMEOUT(dst);
185		else
186			BEV_DEL_GENERIC_WRITE_TIMEOUT(dst);
187	}
188
189	bufferevent_trigger_nolock_(dst, EV_READ, 0);
190	bufferevent_trigger_nolock_(src, EV_WRITE, 0);
191done:
192	evbuffer_freeze(src->output, 1);
193	evbuffer_freeze(dst->input, 0);
194}
195
196static inline int
197be_pair_wants_to_talk(struct bufferevent_pair *src,
198    struct bufferevent_pair *dst)
199{
200	return (downcast(src)->enabled & EV_WRITE) &&
201	    (downcast(dst)->enabled & EV_READ) &&
202	    !dst->bev.read_suspended &&
203	    evbuffer_get_length(downcast(src)->output);
204}
205
206static void
207be_pair_outbuf_cb(struct evbuffer *outbuf,
208    const struct evbuffer_cb_info *info, void *arg)
209{
210	struct bufferevent_pair *bev_pair = arg;
211	struct bufferevent_pair *partner = bev_pair->partner;
212
213	incref_and_lock(downcast(bev_pair));
214
215	if (info->n_added > info->n_deleted && partner) {
216		/* We got more data.  If the other side's reading, then
217		   hand it over. */
218		if (be_pair_wants_to_talk(bev_pair, partner)) {
219			be_pair_transfer(downcast(bev_pair), downcast(partner), 0);
220		}
221	}
222
223	decref_and_unlock(downcast(bev_pair));
224}
225
226static int
227be_pair_enable(struct bufferevent *bufev, short events)
228{
229	struct bufferevent_pair *bev_p = upcast(bufev);
230	struct bufferevent_pair *partner = bev_p->partner;
231
232	incref_and_lock(bufev);
233
234	if (events & EV_READ) {
235		BEV_RESET_GENERIC_READ_TIMEOUT(bufev);
236	}
237	if ((events & EV_WRITE) && evbuffer_get_length(bufev->output))
238		BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
239
240	/* We're starting to read! Does the other side have anything to write?*/
241	if ((events & EV_READ) && partner &&
242	    be_pair_wants_to_talk(partner, bev_p)) {
243		be_pair_transfer(downcast(partner), bufev, 0);
244	}
245	/* We're starting to write! Does the other side want to read? */
246	if ((events & EV_WRITE) && partner &&
247	    be_pair_wants_to_talk(bev_p, partner)) {
248		be_pair_transfer(bufev, downcast(partner), 0);
249	}
250	decref_and_unlock(bufev);
251	return 0;
252}
253
254static int
255be_pair_disable(struct bufferevent *bev, short events)
256{
257	if (events & EV_READ) {
258		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
259	}
260	if (events & EV_WRITE) {
261		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
262	}
263	return 0;
264}
265
266static void
267be_pair_unlink(struct bufferevent *bev)
268{
269	struct bufferevent_pair *bev_p = upcast(bev);
270
271	if (bev_p->partner) {
272		bev_p->unlinked_partner = bev_p->partner;
273		bev_p->partner->partner = NULL;
274		bev_p->partner = NULL;
275	}
276}
277
278/* Free *shared* lock in the latest be (since we share it between two of them). */
279static void
280be_pair_destruct(struct bufferevent *bev)
281{
282	struct bufferevent_pair *bev_p = upcast(bev);
283
284	/* Transfer ownership of the lock into partner, otherwise we will use
285	 * already free'd lock during freeing second bev, see next example:
286	 *
287	 * bev1->own_lock = 1
288	 * bev2->own_lock = 0
289	 * bev2->lock = bev1->lock
290	 *
291	 * bufferevent_free(bev1) # refcnt == 0 -> unlink
292	 * bufferevent_free(bev2) # refcnt == 0 -> unlink
293	 *
294	 * event_base_free() -> finilizers -> EVTHREAD_FREE_LOCK(bev1->lock)
295	 *                                 -> BEV_LOCK(bev2->lock) <-- already freed
296	 *
297	 * Where bev1 == pair[0], bev2 == pair[1].
298	 */
299	if (bev_p->unlinked_partner && bev_p->bev.own_lock) {
300		bev_p->unlinked_partner->bev.own_lock = 1;
301		bev_p->bev.own_lock = 0;
302	}
303	bev_p->unlinked_partner = NULL;
304}
305
306static int
307be_pair_flush(struct bufferevent *bev, short iotype,
308    enum bufferevent_flush_mode mode)
309{
310	struct bufferevent_pair *bev_p = upcast(bev);
311	struct bufferevent *partner;
312	incref_and_lock(bev);
313	if (!bev_p->partner)
314		return -1;
315
316	partner = downcast(bev_p->partner);
317
318	if (mode == BEV_NORMAL)
319		return 0;
320
321	if ((iotype & EV_READ) != 0)
322		be_pair_transfer(partner, bev, 1);
323
324	if ((iotype & EV_WRITE) != 0)
325		be_pair_transfer(bev, partner, 1);
326
327	if (mode == BEV_FINISHED) {
328		bufferevent_run_eventcb_(partner, iotype|BEV_EVENT_EOF, 0);
329	}
330	decref_and_unlock(bev);
331	return 0;
332}
333
334struct bufferevent *
335bufferevent_pair_get_partner(struct bufferevent *bev)
336{
337	struct bufferevent_pair *bev_p;
338	struct bufferevent *partner = NULL;
339	bev_p = upcast(bev);
340	if (! bev_p)
341		return NULL;
342
343	incref_and_lock(bev);
344	if (bev_p->partner)
345		partner = downcast(bev_p->partner);
346	decref_and_unlock(bev);
347	return partner;
348}
349
350const struct bufferevent_ops bufferevent_ops_pair = {
351	"pair_elt",
352	evutil_offsetof(struct bufferevent_pair, bev.bev),
353	be_pair_enable,
354	be_pair_disable,
355	be_pair_unlink,
356	be_pair_destruct,
357	bufferevent_generic_adj_timeouts_,
358	be_pair_flush,
359	NULL, /* ctrl */
360};
361