1/* 2 * Copyright (c) 2009 Mark Heily <mark@heily.com> 3 * 4 * Permission to use, copy, modify, and distribute this software for any 5 * purpose with or without fee is hereby granted, provided that the above 6 * copyright notice and this permission notice appear in all copies. 7 * 8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 15 */ 16 17#include <errno.h> 18#include <fcntl.h> 19#include <poll.h> 20#include <pthread.h> 21#include <signal.h> 22#include <stdlib.h> 23#include <stdio.h> 24#include <sys/queue.h> 25#include <sys/socket.h> 26#include <sys/types.h> 27#include <string.h> 28#include <unistd.h> 29 30#include "sys/event.h" 31#include "private.h" 32 33#ifndef NDEBUG 34int KQUEUE_DEBUG = 0; 35#endif 36 37static RB_HEAD(kqt, kqueue) kqtree = RB_INITIALIZER(&kqtree); 38static pthread_rwlock_t kqtree_mtx = PTHREAD_RWLOCK_INITIALIZER; 39 40static int 41kqueue_cmp(struct kqueue *a, struct kqueue *b) 42{ 43 return memcmp(&a->kq_sockfd[1], &b->kq_sockfd[1], sizeof(int)); 44} 45 46RB_GENERATE(kqt, kqueue, entries, kqueue_cmp) 47 48/* Must hold the kqtree_mtx when calling this */ 49static void 50kqueue_free(struct kqueue *kq) 51{ 52 RB_REMOVE(kqt, &kqtree, kq); 53 filter_unregister_all(kq); 54#if defined(__sun__) 55 port_event_t *pe = (port_event_t *) pthread_getspecific(kq->kq_port_event); 56 57 if (kq->kq_port > 0) 58 close(kq->kq_port); 59 free(pe); 60#endif 61 free(kq); 62} 63 64static int 65kqueue_gc(void) 66{ 67 int rv; 68 struct kqueue *n1, *n2; 69 70 /* Free any kqueue descriptor that is no longer needed */ 71 /* Sadly O(N), however needed in the case that a descriptor is 72 closed and kevent(2) will never again be called on it. */ 73 for (n1 = RB_MIN(kqt, &kqtree); n1 != NULL; n1 = n2) { 74 n2 = RB_NEXT(kqt, &kqtree, n1); 75 76 if (n1->kq_ref == 0) { 77 kqueue_free(n1); 78 } else { 79 rv = kqueue_validate(n1); 80 if (rv == 0) 81 kqueue_free(n1); 82 else if (rv < 0) 83 return (-1); 84 } 85 } 86 87 return (0); 88} 89 90 91int 92kqueue_validate(struct kqueue *kq) 93{ 94 int rv; 95 char buf[1]; 96 struct pollfd pfd; 97 98 pfd.fd = kq->kq_sockfd[0]; 99 pfd.events = POLLIN | POLLHUP; 100 pfd.revents = 0; 101 102 rv = poll(&pfd, 1, 0); 103 if (rv == 0) 104 return (1); 105 if (rv < 0) { 106 dbg_perror("poll(2)"); 107 return (-1); 108 } 109 if (rv > 0) { 110 /* NOTE: If the caller accidentally writes to the kqfd, it will 111 be considered invalid. */ 112 rv = recv(kq->kq_sockfd[0], buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT); 113 if (rv == 0) 114 return (0); 115 else 116 return (-1); 117 } 118 119 return (0); 120} 121 122void 123kqueue_put(struct kqueue *kq) 124{ 125 atomic_dec(&kq->kq_ref); 126} 127 128struct kqueue * 129kqueue_get(int kq) 130{ 131 struct kqueue query; 132 struct kqueue *ent = NULL; 133 134 query.kq_sockfd[1] = kq; 135 pthread_rwlock_rdlock(&kqtree_mtx); 136 ent = RB_FIND(kqt, &kqtree, &query); 137 pthread_rwlock_unlock(&kqtree_mtx); 138 139 /* Check for invalid kqueue objects still in the tree */ 140 if (ent != NULL) { 141 if (ent->kq_sockfd[0] < 0 || ent->kq_ref == 0) { 142 ent = NULL; 143 } else { 144 atomic_inc(&ent->kq_ref); 145 } 146 } 147 148 return (ent); 149} 150 151/* Non-portable kqueue initalization code. */ 152static int 153kqueue_sys_init(struct kqueue *kq) 154{ 155#if defined(__sun__) 156 port_event_t *pe; 157 158 if ((kq->kq_port = port_create()) < 0) { 159 dbg_perror("port_create(2)"); 160 return (-1); 161 } 162 if (pthread_key_create(&kq->kq_port_event, NULL) != 0) 163 abort(); 164 if ((pe = calloc(1, sizeof(*pe))) == NULL) 165 abort(); 166 if (pthread_setspecific(kq->kq_port_event, pe) != 0) 167 abort(); 168#endif 169 return (0); 170} 171 172int __attribute__((visibility("default"))) 173kqueue(void) 174{ 175 struct kqueue *kq; 176 int tmp; 177 178 kq = calloc(1, sizeof(*kq)); 179 if (kq == NULL) 180 return (-1); 181 kq->kq_ref = 1; 182 pthread_mutex_init(&kq->kq_mtx, NULL); 183 184#ifdef NDEBUG 185 KQUEUE_DEBUG = 0; 186#else 187 KQUEUE_DEBUG = (getenv("KQUEUE_DEBUG") == NULL) ? 0 : 1; 188#endif 189 190 if (socketpair(AF_UNIX, SOCK_STREAM, 0, kq->kq_sockfd) < 0) 191 goto errout_unlocked; 192 193 if (kqueue_sys_init(kq) < 0) 194 goto errout_unlocked; 195 196 pthread_rwlock_wrlock(&kqtree_mtx); 197 if (kqueue_gc() < 0) 198 goto errout; 199 /* TODO: move outside of the lock if it is safe */ 200 if (filter_register_all(kq) < 0) 201 goto errout; 202 RB_INSERT(kqt, &kqtree, kq); 203 pthread_rwlock_unlock(&kqtree_mtx); 204 205 dbg_printf("created kqueue, fd=%d", kq->kq_sockfd[1]); 206 return (kq->kq_sockfd[1]); 207 208errout: 209 pthread_rwlock_unlock(&kqtree_mtx); 210 211errout_unlocked: 212 if (kq->kq_sockfd[0] != kq->kq_sockfd[1]) { 213 tmp = errno; 214 (void)close(kq->kq_sockfd[0]); 215 (void)close(kq->kq_sockfd[1]); 216 errno = tmp; 217 } 218#if defined(__sun__) 219 if (kq->kq_port > 0) 220 close(kq->kq_port); 221#endif 222 free(kq); 223 return (-1); 224} 225