1/* 2 * Unix SMB/CIFS implementation. 3 * Async fn calls 4 * Copyright (C) Volker Lendecke 2009 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License as published by 8 * the Free Software Foundation; either version 3 of the License, or 9 * (at your option) any later version. 10 * 11 * This program is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public License 17 * along with this program. If not, see <http://www.gnu.org/licenses/>. 18 */ 19 20#include "includes.h" 21 22#if WITH_PTHREADPOOL 23 24#include "pthreadpool.h" 25 26struct fncall_state { 27 struct fncall_context *ctx; 28 int job_id; 29 bool done; 30 31 void *private_parent; 32 void *job_private; 33}; 34 35struct fncall_context { 36 struct pthreadpool *pool; 37 int next_job_id; 38 int sig_fd; 39 struct tevent_req **pending; 40 41 struct fncall_state **orphaned; 42 int num_orphaned; 43 44 struct fd_event *fde; 45}; 46 47static void fncall_handler(struct tevent_context *ev, struct tevent_fd *fde, 48 uint16_t flags, void *private_data); 49 50static int fncall_context_destructor(struct fncall_context *ctx) 51{ 52 while (talloc_array_length(ctx->pending) != 0) { 53 /* No TALLOC_FREE here */ 54 talloc_free(ctx->pending[0]); 55 } 56 57 while (ctx->num_orphaned != 0) { 58 /* 59 * We've got jobs in the queue for which the tevent_req has 60 * been finished already. Wait for all of them to finish. 61 */ 62 fncall_handler(NULL, NULL, TEVENT_FD_READ, ctx); 63 } 64 65 pthreadpool_destroy(ctx->pool); 66 ctx->pool = NULL; 67 68 return 0; 69} 70 71struct fncall_context *fncall_context_init(TALLOC_CTX *mem_ctx, 72 int max_threads) 73{ 74 struct fncall_context *ctx; 75 int ret; 76 77 ctx = talloc_zero(mem_ctx, struct fncall_context); 78 if (ctx == NULL) { 79 return NULL; 80 } 81 82 ret = pthreadpool_init(max_threads, &ctx->pool); 83 if (ret != 0) { 84 TALLOC_FREE(ctx); 85 return NULL; 86 } 87 talloc_set_destructor(ctx, fncall_context_destructor); 88 89 ctx->sig_fd = pthreadpool_sig_fd(ctx->pool); 90 if (ctx->sig_fd == -1) { 91 TALLOC_FREE(ctx); 92 return NULL; 93 } 94 95 return ctx; 96} 97 98static int fncall_next_job_id(struct fncall_context *ctx) 99{ 100 int num_pending = talloc_array_length(ctx->pending); 101 int result; 102 103 while (true) { 104 int i; 105 106 result = ctx->next_job_id++; 107 if (result == 0) { 108 continue; 109 } 110 111 for (i=0; i<num_pending; i++) { 112 struct fncall_state *state = tevent_req_data( 113 ctx->pending[i], struct fncall_state); 114 115 if (result == state->job_id) { 116 break; 117 } 118 } 119 if (i == num_pending) { 120 return result; 121 } 122 } 123} 124 125static void fncall_unset_pending(struct tevent_req *req); 126static int fncall_destructor(struct tevent_req *req); 127 128static bool fncall_set_pending(struct tevent_req *req, 129 struct fncall_context *ctx, 130 struct tevent_context *ev) 131{ 132 struct tevent_req **pending; 133 int num_pending, orphaned_array_length; 134 135 num_pending = talloc_array_length(ctx->pending); 136 137 pending = talloc_realloc(ctx, ctx->pending, struct tevent_req *, 138 num_pending+1); 139 if (pending == NULL) { 140 return false; 141 } 142 pending[num_pending] = req; 143 num_pending += 1; 144 ctx->pending = pending; 145 talloc_set_destructor(req, fncall_destructor); 146 147 /* 148 * Make sure that the orphaned array of fncall_state structs has 149 * enough space. A job can change from pending to orphaned in 150 * fncall_destructor, and to fail in a talloc destructor should be 151 * avoided if possible. 152 */ 153 154 orphaned_array_length = talloc_array_length(ctx->orphaned); 155 if (num_pending > orphaned_array_length) { 156 struct fncall_state **orphaned; 157 158 orphaned = talloc_realloc(ctx, ctx->orphaned, 159 struct fncall_state *, 160 orphaned_array_length + 1); 161 if (orphaned == NULL) { 162 fncall_unset_pending(req); 163 return false; 164 } 165 ctx->orphaned = orphaned; 166 } 167 168 if (ctx->fde != NULL) { 169 return true; 170 } 171 172 ctx->fde = tevent_add_fd(ev, ctx->pending, ctx->sig_fd, TEVENT_FD_READ, 173 fncall_handler, ctx); 174 if (ctx->fde == NULL) { 175 fncall_unset_pending(req); 176 return false; 177 } 178 return true; 179} 180 181static void fncall_unset_pending(struct tevent_req *req) 182{ 183 struct fncall_state *state = tevent_req_data(req, struct fncall_state); 184 struct fncall_context *ctx = state->ctx; 185 int num_pending = talloc_array_length(ctx->pending); 186 int i; 187 188 if (num_pending == 1) { 189 TALLOC_FREE(ctx->fde); 190 TALLOC_FREE(ctx->pending); 191 return; 192 } 193 194 for (i=0; i<num_pending; i++) { 195 if (req == ctx->pending[i]) { 196 break; 197 } 198 } 199 if (i == num_pending) { 200 return; 201 } 202 if (num_pending > 1) { 203 ctx->pending[i] = ctx->pending[num_pending-1]; 204 } 205 ctx->pending = talloc_realloc(NULL, ctx->pending, struct tevent_req *, 206 num_pending - 1); 207} 208 209static int fncall_destructor(struct tevent_req *req) 210{ 211 struct fncall_state *state = tevent_req_data( 212 req, struct fncall_state); 213 struct fncall_context *ctx = state->ctx; 214 215 fncall_unset_pending(req); 216 217 if (state->done) { 218 return 0; 219 } 220 221 /* 222 * Keep around the state of the deleted request until the request has 223 * finished in the helper thread. fncall_handler will destroy it. 224 */ 225 ctx->orphaned[ctx->num_orphaned] = talloc_move(ctx->orphaned, &state); 226 ctx->num_orphaned += 1; 227 228 return 0; 229} 230 231struct tevent_req *fncall_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, 232 struct fncall_context *ctx, 233 void (*fn)(void *private_data), 234 void *private_data) 235{ 236 struct tevent_req *req; 237 struct fncall_state *state; 238 int ret; 239 240 req = tevent_req_create(mem_ctx, &state, struct fncall_state); 241 if (req == NULL) { 242 return NULL; 243 } 244 state->ctx = ctx; 245 state->job_id = fncall_next_job_id(state->ctx); 246 state->done = false; 247 248 /* 249 * We need to keep the private data we handed out to the thread around 250 * as long as the job is not finished. This is a bit of an abstraction 251 * violation, because the "req->state1->subreq->state2" (we're 252 * "subreq", "req" is the request our caller creates) is broken to 253 * "ctx->state2->state1", but we are right now in the destructor for 254 * "subreq2", so what can we do. We need to keep state1 around, 255 * otherwise the helper thread will have no place to put its results. 256 */ 257 258 state->private_parent = talloc_parent(private_data); 259 state->job_private = talloc_move(state, &private_data); 260 261 ret = pthreadpool_add_job(state->ctx->pool, state->job_id, fn, 262 state->job_private); 263 if (ret == -1) { 264 tevent_req_error(req, errno); 265 return tevent_req_post(req, ev); 266 } 267 if (!fncall_set_pending(req, state->ctx, ev)) { 268 tevent_req_nomem(NULL, req); 269 return tevent_req_post(req, ev); 270 } 271 return req; 272} 273 274static void fncall_handler(struct tevent_context *ev, struct tevent_fd *fde, 275 uint16_t flags, void *private_data) 276{ 277 struct fncall_context *ctx = talloc_get_type_abort( 278 private_data, struct fncall_context); 279 int i, num_pending; 280 int job_id; 281 282 job_id = pthreadpool_finished_job(ctx->pool); 283 if (job_id <= 0) { 284 return; 285 } 286 287 num_pending = talloc_array_length(ctx->pending); 288 289 for (i=0; i<num_pending; i++) { 290 struct fncall_state *state = tevent_req_data( 291 ctx->pending[i], struct fncall_state); 292 293 if (job_id == state->job_id) { 294 state->done = true; 295 talloc_move(state->private_parent, 296 &state->job_private); 297 tevent_req_done(ctx->pending[i]); 298 return; 299 } 300 } 301 302 for (i=0; i<ctx->num_orphaned; i++) { 303 if (job_id == ctx->orphaned[i]->job_id) { 304 break; 305 } 306 } 307 if (i == ctx->num_orphaned) { 308 return; 309 } 310 311 TALLOC_FREE(ctx->orphaned[i]); 312 313 if (i < ctx->num_orphaned-1) { 314 ctx->orphaned[i] = ctx->orphaned[ctx->num_orphaned-1]; 315 } 316 ctx->num_orphaned -= 1; 317} 318 319int fncall_recv(struct tevent_req *req, int *perr) 320{ 321 if (tevent_req_is_unix_error(req, perr)) { 322 return -1; 323 } 324 return 0; 325} 326 327#else /* WITH_PTHREADPOOL */ 328 329struct fncall_context { 330 uint8_t dummy; 331}; 332 333struct fncall_context *fncall_context_init(TALLOC_CTX *mem_ctx, 334 int max_threads) 335{ 336 return talloc(mem_ctx, struct fncall_context); 337} 338 339struct fncall_state { 340 uint8_t dummy; 341}; 342 343struct tevent_req *fncall_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, 344 struct fncall_context *ctx, 345 void (*fn)(void *private_data), 346 void *private_data) 347{ 348 struct tevent_req *req; 349 struct fncall_state *state; 350 351 req = tevent_req_create(mem_ctx, &state, struct fncall_state); 352 if (req == NULL) { 353 return NULL; 354 } 355 fn(private_data); 356 tevent_req_post(req, ev); 357 return req; 358} 359 360int fncall_recv(struct tevent_req *req, int *perr) 361{ 362 return 0; 363} 364 365#endif 366