1/*
2 * Copyright (c) 2009 Mark Heily <mark@heily.com>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 */
16
17#include <errno.h>
18#include <fcntl.h>
19#include <poll.h>
20#include <pthread.h>
21#include <signal.h>
22#include <stdlib.h>
23#include <stdio.h>
24#include <sys/queue.h>
25#include <sys/socket.h>
26#include <sys/types.h>
27#include <string.h>
28#include <unistd.h>
29
30#include "sys/event.h"
31#include "private.h"
32
33#ifndef NDEBUG
34int KQUEUE_DEBUG = 0;
35#endif
36
37static RB_HEAD(kqt, kqueue) kqtree       = RB_INITIALIZER(&kqtree);
38static pthread_rwlock_t     kqtree_mtx   = PTHREAD_RWLOCK_INITIALIZER;
39
40static int
41kqueue_cmp(struct kqueue *a, struct kqueue *b)
42{
43    return memcmp(&a->kq_sockfd[1], &b->kq_sockfd[1], sizeof(int));
44}
45
46RB_GENERATE(kqt, kqueue, entries, kqueue_cmp)
47
48/* Must hold the kqtree_mtx when calling this */
49static void
50kqueue_free(struct kqueue *kq)
51{
52    RB_REMOVE(kqt, &kqtree, kq);
53    filter_unregister_all(kq);
54#if defined(__sun__)
55    port_event_t *pe = (port_event_t *) pthread_getspecific(kq->kq_port_event);
56
57    if (kq->kq_port > 0)
58        close(kq->kq_port);
59    free(pe);
60#endif
61    free(kq);
62}
63
64static int
65kqueue_gc(void)
66{
67    int rv;
68    struct kqueue *n1, *n2;
69
70    /* Free any kqueue descriptor that is no longer needed */
71    /* Sadly O(N), however needed in the case that a descriptor is
72       closed and kevent(2) will never again be called on it. */
73    for (n1 = RB_MIN(kqt, &kqtree); n1 != NULL; n1 = n2) {
74        n2 = RB_NEXT(kqt, &kqtree, n1);
75
76        if (n1->kq_ref == 0) {
77            kqueue_free(n1);
78        } else {
79            rv = kqueue_validate(n1);
80            if (rv == 0)
81                kqueue_free(n1);
82            else if (rv < 0)
83                return (-1);
84        }
85    }
86
87    return (0);
88}
89
90
91int
92kqueue_validate(struct kqueue *kq)
93{
94    int rv;
95    char buf[1];
96    struct pollfd pfd;
97
98    pfd.fd = kq->kq_sockfd[0];
99    pfd.events = POLLIN | POLLHUP;
100    pfd.revents = 0;
101
102    rv = poll(&pfd, 1, 0);
103    if (rv == 0)
104        return (1);
105    if (rv < 0) {
106        dbg_perror("poll(2)");
107        return (-1);
108    }
109    if (rv > 0) {
110        /* NOTE: If the caller accidentally writes to the kqfd, it will
111                 be considered invalid. */
112        rv = recv(kq->kq_sockfd[0], buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT);
113        if (rv == 0)
114            return (0);
115        else
116            return (-1);
117    }
118
119    return (0);
120}
121
122void
123kqueue_put(struct kqueue *kq)
124{
125    atomic_dec(&kq->kq_ref);
126}
127
128struct kqueue *
129kqueue_get(int kq)
130{
131    struct kqueue query;
132    struct kqueue *ent = NULL;
133
134    query.kq_sockfd[1] = kq;
135    pthread_rwlock_rdlock(&kqtree_mtx);
136    ent = RB_FIND(kqt, &kqtree, &query);
137    pthread_rwlock_unlock(&kqtree_mtx);
138
139    /* Check for invalid kqueue objects still in the tree */
140    if (ent != NULL) {
141        if (ent->kq_sockfd[0] < 0 || ent->kq_ref == 0) {
142            ent = NULL;
143        } else {
144            atomic_inc(&ent->kq_ref);
145        }
146    }
147
148    return (ent);
149}
150
151/* Non-portable kqueue initalization code. */
152static int
153kqueue_sys_init(struct kqueue *kq)
154{
155#if defined(__sun__)
156    port_event_t *pe;
157
158    if ((kq->kq_port = port_create()) < 0) {
159        dbg_perror("port_create(2)");
160        return (-1);
161    }
162    if (pthread_key_create(&kq->kq_port_event, NULL) != 0)
163       abort();
164    if ((pe = calloc(1, sizeof(*pe))) == NULL)
165       abort();
166    if (pthread_setspecific(kq->kq_port_event, pe) != 0)
167       abort();
168#endif
169    return (0);
170}
171
172int __attribute__((visibility("default")))
173kqueue(void)
174{
175    struct kqueue *kq;
176    int tmp;
177
178    kq = calloc(1, sizeof(*kq));
179    if (kq == NULL)
180        return (-1);
181    kq->kq_ref = 1;
182    pthread_mutex_init(&kq->kq_mtx, NULL);
183
184#ifdef NDEBUG
185    KQUEUE_DEBUG = 0;
186#else
187    KQUEUE_DEBUG = (getenv("KQUEUE_DEBUG") == NULL) ? 0 : 1;
188#endif
189
190    if (socketpair(AF_UNIX, SOCK_STREAM, 0, kq->kq_sockfd) < 0)
191        goto errout_unlocked;
192
193    if (kqueue_sys_init(kq) < 0)
194        goto errout_unlocked;
195
196    pthread_rwlock_wrlock(&kqtree_mtx);
197    if (kqueue_gc() < 0)
198        goto errout;
199    /* TODO: move outside of the lock if it is safe */
200    if (filter_register_all(kq) < 0)
201        goto errout;
202    RB_INSERT(kqt, &kqtree, kq);
203    pthread_rwlock_unlock(&kqtree_mtx);
204
205    dbg_printf("created kqueue, fd=%d", kq->kq_sockfd[1]);
206    return (kq->kq_sockfd[1]);
207
208errout:
209    pthread_rwlock_unlock(&kqtree_mtx);
210
211errout_unlocked:
212    if (kq->kq_sockfd[0] != kq->kq_sockfd[1]) {
213        tmp = errno;
214        (void)close(kq->kq_sockfd[0]);
215        (void)close(kq->kq_sockfd[1]);
216        errno = tmp;
217    }
218#if defined(__sun__)
219    if (kq->kq_port > 0)
220	close(kq->kq_port);
221#endif
222    free(kq);
223    return (-1);
224}
225