ds.c revision 1.8
1/*	$OpenBSD: ds.c,v 1.8 2018/07/13 08:46:07 kettenis Exp $	*/
2
3/*
4 * Copyright (c) 2012 Mark Kettenis
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19#include <sys/types.h>
20#include <sys/poll.h>
21#include <sys/queue.h>
22#include <err.h>
23#include <fcntl.h>
24#include <stdio.h>
25#include <stdlib.h>
26#include <string.h>
27#include <unistd.h>
28
29#include "ds.h"
30#include "util.h"
31
32void	ldc_rx_ctrl_vers(struct ldc_conn *, struct ldc_pkt *);
33void	ldc_rx_ctrl_rtr(struct ldc_conn *, struct ldc_pkt *);
34void	ldc_rx_ctrl_rts(struct ldc_conn *, struct ldc_pkt *);
35void	ldc_rx_ctrl_rdx(struct ldc_conn *, struct ldc_pkt *);
36
37void	ldc_send_ack(struct ldc_conn *);
38void	ldc_send_nack(struct ldc_conn *);
39void	ldc_send_rtr(struct ldc_conn *);
40void	ldc_send_rts(struct ldc_conn *);
41void	ldc_send_rdx(struct ldc_conn *);
42
43void
44ldc_rx_ctrl(struct ldc_conn *lc, struct ldc_pkt *lp)
45{
46	switch (lp->ctrl) {
47	case LDC_VERS:
48		ldc_rx_ctrl_vers(lc, lp);
49		break;
50
51	case LDC_RTS:
52		ldc_rx_ctrl_rts(lc, lp);
53		break;
54
55	case LDC_RTR:
56		ldc_rx_ctrl_rtr(lc, lp);
57		break;
58
59	case LDC_RDX:
60		ldc_rx_ctrl_rdx(lc, lp);
61		break;
62
63	default:
64		DPRINTF(("CTRL/0x%02x/0x%02x\n", lp->stype, lp->ctrl));
65		ldc_reset(lc);
66		break;
67	}
68}
69
70void
71ldc_rx_ctrl_vers(struct ldc_conn *lc, struct ldc_pkt *lp)
72{
73	struct ldc_pkt *lvp = (struct ldc_pkt *)lp;
74
75	switch (lp->stype) {
76	case LDC_INFO:
77		if (lc->lc_state == LDC_RCV_VERS) {
78			DPRINTF(("Spurious CTRL/INFO/VERS: state %d\n",
79			    lc->lc_state));
80			return;
81		}
82		DPRINTF(("CTRL/INFO/VERS\n"));
83		if (lvp->major == LDC_VERSION_MAJOR &&
84		    lvp->minor == LDC_VERSION_MINOR)
85			ldc_send_ack(lc);
86		else
87			ldc_send_nack(lc);
88		break;
89
90	case LDC_ACK:
91		if (lc->lc_state != LDC_SND_VERS) {
92			DPRINTF(("Spurious CTRL/ACK/VERS: state %d\n",
93			    lc->lc_state));
94			ldc_reset(lc);
95			return;
96		}
97		DPRINTF(("CTRL/ACK/VERS\n"));
98		ldc_send_rts(lc);
99		break;
100
101	case LDC_NACK:
102		DPRINTF(("CTRL/NACK/VERS\n"));
103		ldc_reset(lc);
104		break;
105
106	default:
107		DPRINTF(("CTRL/0x%02x/VERS\n", lp->stype));
108		ldc_reset(lc);
109		break;
110	}
111}
112
113void
114ldc_rx_ctrl_rts(struct ldc_conn *lc, struct ldc_pkt *lp)
115{
116	switch (lp->stype) {
117	case LDC_INFO:
118		if (lc->lc_state != LDC_RCV_VERS) {
119			DPRINTF(("Spurious CTRL/INFO/RTS: state %d\n",
120			    lc->lc_state));
121			ldc_reset(lc);
122			return;
123		}
124		DPRINTF(("CTRL/INFO/RTS\n"));
125		if (lp->env != LDC_MODE_RELIABLE) {
126			ldc_reset(lc);
127			return;
128		}
129		ldc_send_rtr(lc);
130		break;
131
132	case LDC_ACK:
133		DPRINTF(("CTRL/ACK/RTS\n"));
134		ldc_reset(lc);
135		break;
136
137	case LDC_NACK:
138		DPRINTF(("CTRL/NACK/RTS\n"));
139		ldc_reset(lc);
140		break;
141
142	default:
143		DPRINTF(("CTRL/0x%02x/RTS\n", lp->stype));
144		ldc_reset(lc);
145		break;
146	}
147}
148
149void
150ldc_rx_ctrl_rtr(struct ldc_conn *lc, struct ldc_pkt *lp)
151{
152	switch (lp->stype) {
153	case LDC_INFO:
154		if (lc->lc_state != LDC_SND_RTS) {
155			DPRINTF(("Spurious CTRL/INFO/RTR: state %d\n",
156			    lc->lc_state));
157			ldc_reset(lc);
158			return;
159		}
160		DPRINTF(("CTRL/INFO/RTR\n"));
161		if (lp->env != LDC_MODE_RELIABLE) {
162			ldc_reset(lc);
163			return;
164		}
165		ldc_send_rdx(lc);
166#if 0
167		lc->lc_start(lc);
168#endif
169		break;
170
171	case LDC_ACK:
172		DPRINTF(("CTRL/ACK/RTR\n"));
173		ldc_reset(lc);
174		break;
175
176	case LDC_NACK:
177		DPRINTF(("CTRL/NACK/RTR\n"));
178		ldc_reset(lc);
179		break;
180
181	default:
182		DPRINTF(("CTRL/0x%02x/RTR\n", lp->stype));
183		ldc_reset(lc);
184		break;
185	}
186}
187
188void
189ldc_rx_ctrl_rdx(struct ldc_conn *lc, struct ldc_pkt *lp)
190{
191	switch (lp->stype) {
192	case LDC_INFO:
193		if (lc->lc_state != LDC_SND_RTR) {
194			DPRINTF(("Spurious CTRL/INFO/RTR: state %d\n",
195			    lc->lc_state));
196			ldc_reset(lc);
197			return;
198		}
199		DPRINTF(("CTRL/INFO/RDX\n"));
200#if 0
201		lc->lc_start(lc);
202#endif
203		break;
204
205	case LDC_ACK:
206		DPRINTF(("CTRL/ACK/RDX\n"));
207		ldc_reset(lc);
208		break;
209
210	case LDC_NACK:
211		DPRINTF(("CTRL/NACK/RDX\n"));
212		ldc_reset(lc);
213		break;
214
215	default:
216		DPRINTF(("CTRL/0x%02x/RDX\n", lp->stype));
217		ldc_reset(lc);
218		break;
219	}
220}
221
222void
223ldc_rx_data(struct ldc_conn *lc, struct ldc_pkt *lp)
224{
225	size_t len;
226
227	if (lp->stype != LDC_INFO && lp->stype != LDC_ACK) {
228		DPRINTF(("DATA/0x%02x\n", lp->stype));
229		ldc_reset(lc);
230		return;
231	}
232
233	if (lc->lc_state != LDC_SND_RTR &&
234	    lc->lc_state != LDC_SND_RDX) {
235		DPRINTF(("Spurious DATA/INFO: state %d\n", lc->lc_state));
236		ldc_reset(lc);
237		return;
238	}
239
240#if 0
241	if (lp->ackid) {
242		int i;
243
244		for (i = 0; ds_service[i].ds_svc_id; i++) {
245			if (ds_service[i].ds_ackid &&
246			    lp->ackid >= ds_service[i].ds_ackid) {
247				ds_service[i].ds_ackid = 0;
248				ds_service[i].ds_start(lc, ds_service[i].ds_svc_handle);
249			}
250		}
251	}
252#endif
253	if (lp->stype == LDC_ACK)
254		return;
255
256	if (lp->env & LDC_FRAG_START) {
257		lc->lc_len = (lp->env & LDC_LEN_MASK);
258		memcpy((uint8_t *)lc->lc_msg, &lp->data, lc->lc_len);
259	} else {
260		len = (lp->env & LDC_LEN_MASK);
261		if (lc->lc_len + len > sizeof(lc->lc_msg)) {
262			DPRINTF(("Buffer overrun\n"));
263			ldc_reset(lc);
264			return;
265		}
266		memcpy((uint8_t *)lc->lc_msg + lc->lc_len, &lp->data, len);
267		lc->lc_len += len;
268	}
269
270	if (lp->env & LDC_FRAG_STOP) {
271		ldc_ack(lc, lp->seqid);
272		lc->lc_rx_data(lc, lc->lc_msg, lc->lc_len);
273	}
274}
275
276void
277ldc_send_vers(struct ldc_conn *lc)
278{
279	struct ldc_pkt lp;
280	ssize_t nbytes;
281
282	bzero(&lp, sizeof(lp));
283	lp.type = LDC_CTRL;
284	lp.stype = LDC_INFO;
285	lp.ctrl = LDC_VERS;
286	lp.major = 1;
287	lp.minor = 0;
288
289	nbytes = write(lc->lc_fd, &lp, sizeof(lp));
290	if (nbytes != sizeof(lp))
291		err(1, "write");
292
293	lc->lc_state = LDC_SND_VERS;
294}
295
296void
297ldc_send_ack(struct ldc_conn *lc)
298{
299	struct ldc_pkt lp;
300	ssize_t nbytes;
301
302	bzero(&lp, sizeof(lp));
303	lp.type = LDC_CTRL;
304	lp.stype = LDC_ACK;
305	lp.ctrl = LDC_VERS;
306	lp.major = 1;
307	lp.minor = 0;
308
309	nbytes = write(lc->lc_fd, &lp, sizeof(lp));
310	if (nbytes != sizeof(lp))
311		err(1, "write");
312
313	lc->lc_state = LDC_RCV_VERS;
314}
315
316void
317ldc_send_nack(struct ldc_conn *lc)
318{
319	struct ldc_pkt lp;
320	ssize_t nbytes;
321
322	bzero(&lp, sizeof(lp));
323	lp.type = LDC_CTRL;
324	lp.stype = LDC_NACK;
325	lp.ctrl = LDC_VERS;
326	lp.major = 1;
327	lp.minor = 0;
328
329	nbytes = write(lc->lc_fd, &lp, sizeof(lp));
330	if (nbytes != sizeof(lp))
331		err(1, "write");
332
333	lc->lc_state = 0;
334}
335
336void
337ldc_send_rts(struct ldc_conn *lc)
338{
339	struct ldc_pkt lp;
340	ssize_t nbytes;
341
342	bzero(&lp, sizeof(lp));
343	lp.type = LDC_CTRL;
344	lp.stype = LDC_INFO;
345	lp.ctrl = LDC_RTS;
346	lp.env = LDC_MODE_RELIABLE;
347	lp.seqid = lc->lc_tx_seqid++;
348
349	nbytes = write(lc->lc_fd, &lp, sizeof(lp));
350	if (nbytes != sizeof(lp))
351		err(1, "write");
352
353	lc->lc_state = LDC_SND_RTS;
354}
355
356void
357ldc_send_rtr(struct ldc_conn *lc)
358{
359	struct ldc_pkt lp;
360	ssize_t nbytes;
361
362	bzero(&lp, sizeof(lp));
363	lp.type = LDC_CTRL;
364	lp.stype = LDC_INFO;
365	lp.ctrl = LDC_RTR;
366	lp.env = LDC_MODE_RELIABLE;
367	lp.seqid = lc->lc_tx_seqid++;
368
369	nbytes = write(lc->lc_fd, &lp, sizeof(lp));
370	if (nbytes != sizeof(lp))
371		err(1, "write");
372
373	lc->lc_state = LDC_SND_RTR;
374}
375
376void
377ldc_send_rdx(struct ldc_conn *lc)
378{
379	struct ldc_pkt lp;
380	ssize_t nbytes;
381
382	bzero(&lp, sizeof(lp));
383	lp.type = LDC_CTRL;
384	lp.stype = LDC_INFO;
385	lp.ctrl = LDC_RDX;
386	lp.env = LDC_MODE_RELIABLE;
387	lp.seqid = lc->lc_tx_seqid++;
388
389	nbytes = write(lc->lc_fd, &lp, sizeof(lp));
390	if (nbytes != sizeof(lp))
391		err(1, "write");
392
393	lc->lc_state = LDC_SND_RDX;
394}
395
396void
397ldc_reset(struct ldc_conn *lc)
398{
399	lc->lc_tx_seqid = 0;
400	lc->lc_state = 0;
401#if 0
402	lc->lc_reset(lc);
403#endif
404}
405
406void
407ldc_ack(struct ldc_conn *lc, uint32_t ackid)
408{
409	struct ldc_pkt lp;
410	ssize_t nbytes;
411
412	bzero(&lp, sizeof(lp));
413	lp.type = LDC_DATA;
414	lp.stype = LDC_ACK;
415	lp.seqid = lc->lc_tx_seqid++;
416	lp.ackid = ackid;
417	nbytes = write(lc->lc_fd, &lp, sizeof(lp));
418	if (nbytes != sizeof(lp))
419		err(1, "write");
420}
421
422void
423ds_rx_msg(struct ldc_conn *lc, void *data, size_t len)
424{
425	struct ds_conn *dc = lc->lc_cookie;
426	struct ds_msg *dm = data;
427
428	switch(dm->msg_type) {
429	case DS_INIT_REQ:
430	{
431		struct ds_init_req *dr = data;
432
433		DPRINTF(("DS_INIT_REQ %d.%d\n", dr->major_vers,
434		    dr->minor_vers));
435		if (dr->major_vers != 1 || dr->minor_vers != 0){
436			ldc_reset(lc);
437			return;
438		}
439		ds_init_ack(lc);
440		break;
441	}
442
443	case DS_REG_REQ:
444	{
445		struct ds_reg_req *dr = data;
446		struct ds_conn_svc *dcs;
447		uint16_t major = 0;
448
449		DPRINTF(("DS_REG_REQ %s %d.%d 0x%016llx\n", dr->svc_id,
450		    dr->major_vers, dr->minor_vers, dr->svc_handle));
451		TAILQ_FOREACH(dcs, &dc->services, link) {
452			if (strcmp(dr->svc_id, dcs->service->ds_svc_id) == 0 &&
453			    dr->major_vers == dcs->service->ds_major_vers) {
454				dcs->svc_handle = dr->svc_handle;
455				dcs->ackid = lc->lc_tx_seqid;
456				ds_reg_ack(lc, dcs->svc_handle,
457				    dcs->service->ds_minor_vers);
458				dcs->service->ds_start(lc, dcs->svc_handle);
459				return;
460			}
461		}
462
463		TAILQ_FOREACH(dcs, &dc->services, link) {
464			if (strcmp(dr->svc_id, dcs->service->ds_svc_id) == 0 &&
465			    dcs->service->ds_major_vers > major)
466				major = dcs->service->ds_major_vers;
467		}
468
469		ds_reg_nack(lc, dr->svc_handle, major);
470		break;
471	}
472
473	case DS_UNREG:
474	{
475		struct ds_unreg *du = data;
476
477		DPRINTF(("DS_UNREG 0x%016llx\n", du->svc_handle));
478		ds_unreg_ack(lc, du->svc_handle);
479		break;
480	}
481
482	case DS_DATA:
483	{
484		struct ds_data *dd = data;
485		struct ds_conn_svc *dcs;
486
487		DPRINTF(("DS_DATA 0x%016llx\n", dd->svc_handle));
488		TAILQ_FOREACH(dcs, &dc->services, link) {
489			if (dcs->svc_handle == dd->svc_handle)
490				dcs->service->ds_rx_data(lc, dd->svc_handle,
491				    data, len);
492		}
493		break;
494	}
495
496	default:
497		DPRINTF(("Unknown DS message type 0x%x\n", dm->msg_type));
498		ldc_reset(lc);
499		break;
500	}
501}
502
503void
504ds_init_ack(struct ldc_conn *lc)
505{
506	struct ds_init_ack da;
507
508	DPRINTF((" DS_INIT_ACK\n"));
509	bzero(&da, sizeof(da));
510	da.msg_type = DS_INIT_ACK;
511	da.payload_len = sizeof(da) - 8;
512	da.minor_vers = 0;
513	ds_send_msg(lc, &da, sizeof(da));
514}
515
516void
517ds_reg_ack(struct ldc_conn *lc, uint64_t svc_handle, uint16_t minor)
518{
519	struct ds_reg_ack da;
520
521	DPRINTF((" DS_REG_ACK 0x%016llx\n", svc_handle));
522	bzero(&da, sizeof(da));
523	da.msg_type = DS_REG_ACK;
524	da.payload_len = sizeof(da) - 8;
525	da.svc_handle = svc_handle;
526	da.minor_vers = minor;
527	ds_send_msg(lc, &da, sizeof(da));
528}
529
530void
531ds_reg_nack(struct ldc_conn *lc, uint64_t svc_handle, uint16_t major)
532{
533	struct ds_reg_nack dn;
534
535	DPRINTF((" DS_REG_NACK 0x%016llx\n", svc_handle));
536	bzero(&dn, sizeof(dn));
537	dn.msg_type = DS_REG_NACK;
538	dn.payload_len = sizeof(dn) - 8;
539	dn.svc_handle = svc_handle;
540	dn.result = DS_REG_VER_NACK;
541	dn.major_vers = major;
542	ds_send_msg(lc, &dn, sizeof(dn));
543}
544
545void
546ds_unreg_ack(struct ldc_conn *lc, uint64_t svc_handle)
547{
548	struct ds_unreg du;
549
550	DPRINTF((" DS_UNREG_ACK 0x%016llx\n", svc_handle));
551	bzero(&du, sizeof(du));
552	du.msg_type = DS_UNREG_ACK;
553	du.payload_len = sizeof(du) - 8;
554	du.svc_handle = svc_handle;
555	ds_send_msg(lc, &du, sizeof(du));
556}
557
558void
559ds_unreg_nack(struct ldc_conn *lc, uint64_t svc_handle)
560{
561	struct ds_unreg du;
562
563	DPRINTF((" DS_UNREG_NACK 0x%016llx\n", svc_handle));
564	bzero(&du, sizeof(du));
565	du.msg_type = DS_UNREG_NACK;
566	du.payload_len = sizeof(du) - 8;
567	du.svc_handle = svc_handle;
568	ds_send_msg(lc, &du, sizeof(du));
569}
570
571void
572ds_receive_msg(struct ldc_conn *lc, void *buf, size_t len)
573{
574	int env = LDC_FRAG_START;
575	struct ldc_pkt lp;
576	uint8_t *p = buf;
577	ssize_t nbytes;
578
579	while (len > 0) {
580		nbytes = read(lc->lc_fd, &lp, sizeof(lp));
581		if (nbytes != sizeof(lp))
582			err(1, "read");
583
584		if (lp.type != LDC_DATA &&
585		    lp.stype != LDC_INFO) {
586			ldc_reset(lc);
587			return;
588		}
589
590		if ((lp.env & LDC_FRAG_START) != env) {
591			ldc_reset(lc);
592			return;
593		}
594
595		bcopy(&lp.data, p, (lp.env & LDC_LEN_MASK));
596		p += (lp.env & LDC_LEN_MASK);
597		len -= (lp.env & LDC_LEN_MASK);
598
599		if (lp.env & LDC_FRAG_STOP)
600			ldc_ack(lc, lp.seqid);
601
602		env = (lp.env & LDC_FRAG_STOP) ? LDC_FRAG_START : 0;
603	}
604}
605
606void
607ldc_send_msg(struct ldc_conn *lc, void *buf, size_t len)
608{
609	struct ldc_pkt lp;
610	uint8_t *p = buf;
611	ssize_t nbytes;
612
613	while (len > 0) {
614		bzero(&lp, sizeof(lp));
615		lp.type = LDC_DATA;
616		lp.stype = LDC_INFO;
617		lp.env = min(len, LDC_PKT_PAYLOAD);
618		if (p == buf)
619			lp.env |= LDC_FRAG_START;
620		if (len <= LDC_PKT_PAYLOAD)
621			lp.env |= LDC_FRAG_STOP;
622		lp.seqid = lc->lc_tx_seqid++;
623		bcopy(p, &lp.data, min(len, LDC_PKT_PAYLOAD));
624
625		nbytes = write(lc->lc_fd, &lp, sizeof(lp));
626		if (nbytes != sizeof(lp))
627			err(1, "write");
628		p += min(len, LDC_PKT_PAYLOAD);
629		len -= min(len, LDC_PKT_PAYLOAD);
630	}
631}
632
633void
634ds_send_msg(struct ldc_conn *lc, void *buf, size_t len)
635{
636	uint8_t *p = buf;
637	struct ldc_pkt lp;
638	ssize_t nbytes;
639
640	while (len > 0) {
641		ldc_send_msg(lc, p, min(len, LDC_MSG_MAX));
642		p += min(len, LDC_MSG_MAX);
643		len -= min(len, LDC_MSG_MAX);
644
645		if (len > 0) {
646			/* Consume ACK. */
647			nbytes = read(lc->lc_fd, &lp, sizeof(lp));
648			if (nbytes != sizeof(lp))
649				err(1, "read");
650#if 0
651			{
652				uint64_t *msg = (uint64_t *)&lp;
653				int i;
654
655				for (i = 0; i < 8; i++)
656					printf("%02x: %016llx\n", i, msg[i]);
657			}
658#endif
659		}
660	}
661}
662
663TAILQ_HEAD(ds_conn_head, ds_conn) ds_conns =
664    TAILQ_HEAD_INITIALIZER(ds_conns);
665int num_ds_conns;
666
667struct ds_conn *
668ds_conn_open(const char *path, void *cookie)
669{
670	struct ds_conn *dc;
671
672	dc = xmalloc(sizeof(*dc));
673	dc->path = xstrdup(path);
674	dc->cookie = cookie;
675
676	dc->fd = open(path, O_RDWR, 0);
677	if (dc->fd == -1)
678		err(1, "cannot open %s", path);
679
680	memset(&dc->lc, 0, sizeof(dc->lc));
681	dc->lc.lc_fd = dc->fd;
682	dc->lc.lc_cookie = dc;
683	dc->lc.lc_rx_data = ds_rx_msg;
684
685	TAILQ_INIT(&dc->services);
686	TAILQ_INSERT_TAIL(&ds_conns, dc, link);
687	dc->id = num_ds_conns++;
688	return dc;
689}
690
691void
692ds_conn_register_service(struct ds_conn *dc, struct ds_service *ds)
693{
694	struct ds_conn_svc *dcs;
695
696	dcs = xzalloc(sizeof(*dcs));
697	dcs->service = ds;
698
699	TAILQ_INSERT_TAIL(&dc->services, dcs, link);
700}
701
702void
703ds_conn_handle(struct ds_conn *dc)
704{
705	struct ldc_pkt lp;
706	ssize_t nbytes;
707
708	nbytes = read(dc->fd, &lp, sizeof(lp));
709	if (nbytes != sizeof(lp)) {
710		ldc_reset(&dc->lc);
711		return;
712	}
713
714	switch (lp.type) {
715	case LDC_CTRL:
716		ldc_rx_ctrl(&dc->lc, &lp);
717		break;
718	case LDC_DATA:
719		ldc_rx_data(&dc->lc, &lp);
720		break;
721	default:
722		DPRINTF(("0x%02x/0x%02x/0x%02x\n", lp.type, lp.stype,
723		    lp.ctrl));
724		ldc_reset(&dc->lc);
725		break;
726	}
727}
728
729void
730ds_conn_serve(void)
731{
732	struct ds_conn *dc;
733	struct pollfd *pfd;
734	int nfds;
735
736	pfd = xreallocarray(NULL, num_ds_conns, sizeof(*pfd));
737	TAILQ_FOREACH(dc, &ds_conns, link) {
738		pfd[dc->id].fd = dc->fd;
739		pfd[dc->id].events = POLLIN;
740	}
741
742	while (1) {
743		nfds = poll(pfd, num_ds_conns, -1);
744		if (nfds == -1 || nfds == 0)
745			errx(1, "poll");
746
747		TAILQ_FOREACH(dc, &ds_conns, link) {
748			if (pfd[dc->id].revents)
749				ds_conn_handle(dc);
750		}
751	}
752}
753