libworker.c revision 296415
1/*
2 * libunbound/worker.c - worker thread or process that resolves
3 *
4 * Copyright (c) 2007, NLnet Labs. All rights reserved.
5 *
6 * This software is open source.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
14 *
15 * Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 *
19 * Neither the name of the NLNET LABS nor the names of its contributors may
20 * be used to endorse or promote products derived from this software without
21 * specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36/**
37 * \file
38 *
39 * This file contains the worker process or thread that performs
40 * the DNS resolving and validation. The worker is called by a procedure
41 * and if in the background continues until exit, if in the foreground
42 * returns from the procedure when done.
43 */
44#include "config.h"
45#ifdef HAVE_SSL
46#include <openssl/ssl.h>
47#endif
48#include "libunbound/libworker.h"
49#include "libunbound/context.h"
50#include "libunbound/unbound.h"
51#include "libunbound/worker.h"
52#include "libunbound/unbound-event.h"
53#include "services/outside_network.h"
54#include "services/mesh.h"
55#include "services/localzone.h"
56#include "services/cache/rrset.h"
57#include "services/outbound_list.h"
58#include "util/fptr_wlist.h"
59#include "util/module.h"
60#include "util/regional.h"
61#include "util/random.h"
62#include "util/config_file.h"
63#include "util/netevent.h"
64#include "util/storage/lookup3.h"
65#include "util/storage/slabhash.h"
66#include "util/net_help.h"
67#include "util/data/dname.h"
68#include "util/data/msgreply.h"
69#include "util/data/msgencode.h"
70#include "util/tube.h"
71#include "iterator/iter_fwd.h"
72#include "iterator/iter_hints.h"
73#include "sldns/sbuffer.h"
74#include "sldns/str2wire.h"
75
76/** handle new query command for bg worker */
77static void handle_newq(struct libworker* w, uint8_t* buf, uint32_t len);
78
79/** delete libworker env */
80static void
81libworker_delete_env(struct libworker* w)
82{
83	if(w->env) {
84		outside_network_quit_prepare(w->back);
85		mesh_delete(w->env->mesh);
86		context_release_alloc(w->ctx, w->env->alloc,
87			!w->is_bg || w->is_bg_thread);
88		sldns_buffer_free(w->env->scratch_buffer);
89		regional_destroy(w->env->scratch);
90		forwards_delete(w->env->fwds);
91		hints_delete(w->env->hints);
92		ub_randfree(w->env->rnd);
93		free(w->env);
94	}
95#ifdef HAVE_SSL
96	SSL_CTX_free(w->sslctx);
97#endif
98	outside_network_delete(w->back);
99}
100
101/** delete libworker struct */
102static void
103libworker_delete(struct libworker* w)
104{
105	if(!w) return;
106	libworker_delete_env(w);
107	comm_base_delete(w->base);
108	free(w);
109}
110
111void
112libworker_delete_event(struct libworker* w)
113{
114	if(!w) return;
115	libworker_delete_env(w);
116	comm_base_delete_no_base(w->base);
117	free(w);
118}
119
120/** setup fresh libworker struct */
121static struct libworker*
122libworker_setup(struct ub_ctx* ctx, int is_bg, struct event_base* eb)
123{
124	unsigned int seed;
125	struct libworker* w = (struct libworker*)calloc(1, sizeof(*w));
126	struct config_file* cfg = ctx->env->cfg;
127	int* ports;
128	int numports;
129	if(!w) return NULL;
130	w->is_bg = is_bg;
131	w->ctx = ctx;
132	w->env = (struct module_env*)malloc(sizeof(*w->env));
133	if(!w->env) {
134		free(w);
135		return NULL;
136	}
137	*w->env = *ctx->env;
138	w->env->alloc = context_obtain_alloc(ctx, !w->is_bg || w->is_bg_thread);
139	if(!w->env->alloc) {
140		libworker_delete(w);
141		return NULL;
142	}
143	w->thread_num = w->env->alloc->thread_num;
144	alloc_set_id_cleanup(w->env->alloc, &libworker_alloc_cleanup, w);
145	if(!w->is_bg || w->is_bg_thread) {
146		lock_basic_lock(&ctx->cfglock);
147	}
148	w->env->scratch = regional_create_custom(cfg->msg_buffer_size);
149	w->env->scratch_buffer = sldns_buffer_new(cfg->msg_buffer_size);
150	w->env->fwds = forwards_create();
151	if(w->env->fwds && !forwards_apply_cfg(w->env->fwds, cfg)) {
152		forwards_delete(w->env->fwds);
153		w->env->fwds = NULL;
154	}
155	w->env->hints = hints_create();
156	if(w->env->hints && !hints_apply_cfg(w->env->hints, cfg)) {
157		hints_delete(w->env->hints);
158		w->env->hints = NULL;
159	}
160	if(cfg->ssl_upstream) {
161		w->sslctx = connect_sslctx_create(NULL, NULL, NULL);
162		if(!w->sslctx) {
163			/* to make the setup fail after unlock */
164			hints_delete(w->env->hints);
165			w->env->hints = NULL;
166		}
167	}
168	if(!w->is_bg || w->is_bg_thread) {
169		lock_basic_unlock(&ctx->cfglock);
170	}
171	if(!w->env->scratch || !w->env->scratch_buffer || !w->env->fwds ||
172		!w->env->hints) {
173		libworker_delete(w);
174		return NULL;
175	}
176	w->env->worker = (struct worker*)w;
177	w->env->probe_timer = NULL;
178	seed = (unsigned int)time(NULL) ^ (unsigned int)getpid() ^
179		(((unsigned int)w->thread_num)<<17);
180	seed ^= (unsigned int)w->env->alloc->next_id;
181	if(!w->is_bg || w->is_bg_thread) {
182		lock_basic_lock(&ctx->cfglock);
183	}
184	if(!(w->env->rnd = ub_initstate(seed, ctx->seed_rnd))) {
185		if(!w->is_bg || w->is_bg_thread) {
186			lock_basic_unlock(&ctx->cfglock);
187		}
188		seed = 0;
189		libworker_delete(w);
190		return NULL;
191	}
192	if(!w->is_bg || w->is_bg_thread) {
193		lock_basic_unlock(&ctx->cfglock);
194	}
195	if(1) {
196		/* primitive lockout for threading: if it overwrites another
197		 * thread it is like wiping the cache (which is likely empty
198		 * at the start) */
199		/* note we are holding the ctx lock in normal threaded
200		 * cases so that is solved properly, it is only for many ctx
201		 * in different threads that this may clash */
202		static int done_raninit = 0;
203		if(!done_raninit) {
204			done_raninit = 1;
205			hash_set_raninit((uint32_t)ub_random(w->env->rnd));
206		}
207	}
208	seed = 0;
209
210	if(eb)
211		w->base = comm_base_create_event(eb);
212	else	w->base = comm_base_create(0);
213	if(!w->base) {
214		libworker_delete(w);
215		return NULL;
216	}
217	if(!w->is_bg || w->is_bg_thread) {
218		lock_basic_lock(&ctx->cfglock);
219	}
220	numports = cfg_condense_ports(cfg, &ports);
221	if(numports == 0) {
222		int locked = !w->is_bg || w->is_bg_thread;
223		libworker_delete(w);
224		if(locked) {
225			lock_basic_unlock(&ctx->cfglock);
226		}
227		return NULL;
228	}
229	w->back = outside_network_create(w->base, cfg->msg_buffer_size,
230		(size_t)cfg->outgoing_num_ports, cfg->out_ifs,
231		cfg->num_out_ifs, cfg->do_ip4, cfg->do_ip6,
232		cfg->do_tcp?cfg->outgoing_num_tcp:0,
233		w->env->infra_cache, w->env->rnd, cfg->use_caps_bits_for_id,
234		ports, numports, cfg->unwanted_threshold,
235		cfg->outgoing_tcp_mss,
236		&libworker_alloc_cleanup, w, cfg->do_udp, w->sslctx,
237		cfg->delay_close, NULL);
238	if(!w->is_bg || w->is_bg_thread) {
239		lock_basic_unlock(&ctx->cfglock);
240	}
241	free(ports);
242	if(!w->back) {
243		libworker_delete(w);
244		return NULL;
245	}
246	w->env->mesh = mesh_create(&ctx->mods, w->env);
247	if(!w->env->mesh) {
248		libworker_delete(w);
249		return NULL;
250	}
251	w->env->send_query = &libworker_send_query;
252	w->env->detach_subs = &mesh_detach_subs;
253	w->env->attach_sub = &mesh_attach_sub;
254	w->env->kill_sub = &mesh_state_delete;
255	w->env->detect_cycle = &mesh_detect_cycle;
256	comm_base_timept(w->base, &w->env->now, &w->env->now_tv);
257	return w;
258}
259
260struct libworker* libworker_create_event(struct ub_ctx* ctx,
261	struct event_base* eb)
262{
263	return libworker_setup(ctx, 0, eb);
264}
265
266/** handle cancel command for bg worker */
267static void
268handle_cancel(struct libworker* w, uint8_t* buf, uint32_t len)
269{
270	struct ctx_query* q;
271	if(w->is_bg_thread) {
272		lock_basic_lock(&w->ctx->cfglock);
273		q = context_deserialize_cancel(w->ctx, buf, len);
274		lock_basic_unlock(&w->ctx->cfglock);
275	} else {
276		q = context_deserialize_cancel(w->ctx, buf, len);
277	}
278	if(!q) {
279		/* probably simply lookup failed, i.e. the message had been
280		 * processed and answered before the cancel arrived */
281		return;
282	}
283	q->cancelled = 1;
284	free(buf);
285}
286
287/** do control command coming into bg server */
288static void
289libworker_do_cmd(struct libworker* w, uint8_t* msg, uint32_t len)
290{
291	switch(context_serial_getcmd(msg, len)) {
292		default:
293		case UB_LIBCMD_ANSWER:
294			log_err("unknown command for bg worker %d",
295				(int)context_serial_getcmd(msg, len));
296			/* and fall through to quit */
297		case UB_LIBCMD_QUIT:
298			free(msg);
299			comm_base_exit(w->base);
300			break;
301		case UB_LIBCMD_NEWQUERY:
302			handle_newq(w, msg, len);
303			break;
304		case UB_LIBCMD_CANCEL:
305			handle_cancel(w, msg, len);
306			break;
307	}
308}
309
310/** handle control command coming into server */
311void
312libworker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
313	uint8_t* msg, size_t len, int err, void* arg)
314{
315	struct libworker* w = (struct libworker*)arg;
316
317	if(err != 0) {
318		free(msg);
319		/* it is of no use to go on, exit */
320		comm_base_exit(w->base);
321		return;
322	}
323	libworker_do_cmd(w, msg, len); /* also frees the buf */
324}
325
326/** the background thread func */
327static void*
328libworker_dobg(void* arg)
329{
330	/* setup */
331	uint32_t m;
332	struct libworker* w = (struct libworker*)arg;
333	struct ub_ctx* ctx;
334	if(!w) {
335		log_err("libunbound bg worker init failed, nomem");
336		return NULL;
337	}
338	ctx = w->ctx;
339	log_thread_set(&w->thread_num);
340#ifdef THREADS_DISABLED
341	/* we are forked */
342	w->is_bg_thread = 0;
343	/* close non-used parts of the pipes */
344	tube_close_write(ctx->qq_pipe);
345	tube_close_read(ctx->rr_pipe);
346#endif
347	if(!tube_setup_bg_listen(ctx->qq_pipe, w->base,
348		libworker_handle_control_cmd, w)) {
349		log_err("libunbound bg worker init failed, no bglisten");
350		return NULL;
351	}
352	if(!tube_setup_bg_write(ctx->rr_pipe, w->base)) {
353		log_err("libunbound bg worker init failed, no bgwrite");
354		return NULL;
355	}
356
357	/* do the work */
358	comm_base_dispatch(w->base);
359
360	/* cleanup */
361	m = UB_LIBCMD_QUIT;
362	tube_remove_bg_listen(w->ctx->qq_pipe);
363	tube_remove_bg_write(w->ctx->rr_pipe);
364	libworker_delete(w);
365	(void)tube_write_msg(ctx->rr_pipe, (uint8_t*)&m,
366		(uint32_t)sizeof(m), 0);
367#ifdef THREADS_DISABLED
368	/* close pipes from forked process before exit */
369	tube_close_read(ctx->qq_pipe);
370	tube_close_write(ctx->rr_pipe);
371#endif
372	return NULL;
373}
374
375int libworker_bg(struct ub_ctx* ctx)
376{
377	struct libworker* w;
378	/* fork or threadcreate */
379	lock_basic_lock(&ctx->cfglock);
380	if(ctx->dothread) {
381		lock_basic_unlock(&ctx->cfglock);
382		w = libworker_setup(ctx, 1, NULL);
383		if(!w) return UB_NOMEM;
384		w->is_bg_thread = 1;
385#ifdef ENABLE_LOCK_CHECKS
386		w->thread_num = 1; /* for nicer DEBUG checklocks */
387#endif
388		ub_thread_create(&ctx->bg_tid, libworker_dobg, w);
389	} else {
390		lock_basic_unlock(&ctx->cfglock);
391#ifndef HAVE_FORK
392		/* no fork on windows */
393		return UB_FORKFAIL;
394#else /* HAVE_FORK */
395		switch((ctx->bg_pid=fork())) {
396			case 0:
397				w = libworker_setup(ctx, 1, NULL);
398				if(!w) fatal_exit("out of memory");
399				/* close non-used parts of the pipes */
400				tube_close_write(ctx->qq_pipe);
401				tube_close_read(ctx->rr_pipe);
402				(void)libworker_dobg(w);
403				exit(0);
404				break;
405			case -1:
406				return UB_FORKFAIL;
407			default:
408				/* close non-used parts, so that the worker
409				 * bgprocess gets 'pipe closed' when the
410				 * main process exits */
411				tube_close_read(ctx->qq_pipe);
412				tube_close_write(ctx->rr_pipe);
413				break;
414		}
415#endif /* HAVE_FORK */
416	}
417	return UB_NOERROR;
418}
419
420/** get msg reply struct (in temp region) */
421static struct reply_info*
422parse_reply(sldns_buffer* pkt, struct regional* region, struct query_info* qi)
423{
424	struct reply_info* rep;
425	struct msg_parse* msg;
426	if(!(msg = regional_alloc(region, sizeof(*msg)))) {
427		return NULL;
428	}
429	memset(msg, 0, sizeof(*msg));
430	sldns_buffer_set_position(pkt, 0);
431	if(parse_packet(pkt, msg, region) != 0)
432		return 0;
433	if(!parse_create_msg(pkt, msg, NULL, qi, &rep, region)) {
434		return 0;
435	}
436	return rep;
437}
438
439/** insert canonname */
440static int
441fill_canon(struct ub_result* res, uint8_t* s)
442{
443	char buf[255+2];
444	dname_str(s, buf);
445	res->canonname = strdup(buf);
446	return res->canonname != 0;
447}
448
449/** fill data into result */
450static int
451fill_res(struct ub_result* res, struct ub_packed_rrset_key* answer,
452	uint8_t* finalcname, struct query_info* rq, struct reply_info* rep)
453{
454	size_t i;
455	struct packed_rrset_data* data;
456	res->ttl = 0;
457	if(!answer) {
458		if(finalcname) {
459			if(!fill_canon(res, finalcname))
460				return 0; /* out of memory */
461		}
462		if(rep->rrset_count != 0)
463			res->ttl = (int)rep->ttl;
464		res->data = (char**)calloc(1, sizeof(char*));
465		res->len = (int*)calloc(1, sizeof(int));
466		return (res->data && res->len);
467	}
468	data = (struct packed_rrset_data*)answer->entry.data;
469	if(query_dname_compare(rq->qname, answer->rk.dname) != 0) {
470		if(!fill_canon(res, answer->rk.dname))
471			return 0; /* out of memory */
472	} else	res->canonname = NULL;
473	res->data = (char**)calloc(data->count+1, sizeof(char*));
474	res->len = (int*)calloc(data->count+1, sizeof(int));
475	if(!res->data || !res->len)
476		return 0; /* out of memory */
477	for(i=0; i<data->count; i++) {
478		/* remove rdlength from rdata */
479		res->len[i] = (int)(data->rr_len[i] - 2);
480		res->data[i] = memdup(data->rr_data[i]+2, (size_t)res->len[i]);
481		if(!res->data[i])
482			return 0; /* out of memory */
483	}
484	/* ttl for positive answers, from CNAME and answer RRs */
485	if(data->count != 0) {
486		size_t j;
487		res->ttl = (int)data->ttl;
488		for(j=0; j<rep->an_numrrsets; j++) {
489			struct packed_rrset_data* d =
490				(struct packed_rrset_data*)rep->rrsets[j]->
491				entry.data;
492			if((int)d->ttl < res->ttl)
493				res->ttl = (int)d->ttl;
494		}
495	}
496	/* ttl for negative answers */
497	if(data->count == 0 && rep->rrset_count != 0)
498		res->ttl = (int)rep->ttl;
499	res->data[data->count] = NULL;
500	res->len[data->count] = 0;
501	return 1;
502}
503
504/** fill result from parsed message, on error fills servfail */
505void
506libworker_enter_result(struct ub_result* res, sldns_buffer* buf,
507	struct regional* temp, enum sec_status msg_security)
508{
509	struct query_info rq;
510	struct reply_info* rep;
511	res->rcode = LDNS_RCODE_SERVFAIL;
512	rep = parse_reply(buf, temp, &rq);
513	if(!rep) {
514		log_err("cannot parse buf");
515		return; /* error parsing buf, or out of memory */
516	}
517	if(!fill_res(res, reply_find_answer_rrset(&rq, rep),
518		reply_find_final_cname_target(&rq, rep), &rq, rep))
519		return; /* out of memory */
520	/* rcode, havedata, nxdomain, secure, bogus */
521	res->rcode = (int)FLAGS_GET_RCODE(rep->flags);
522	if(res->data && res->data[0])
523		res->havedata = 1;
524	if(res->rcode == LDNS_RCODE_NXDOMAIN)
525		res->nxdomain = 1;
526	if(msg_security == sec_status_secure)
527		res->secure = 1;
528	if(msg_security == sec_status_bogus)
529		res->bogus = 1;
530}
531
532/** fillup fg results */
533static void
534libworker_fillup_fg(struct ctx_query* q, int rcode, sldns_buffer* buf,
535	enum sec_status s, char* why_bogus)
536{
537	if(why_bogus)
538		q->res->why_bogus = strdup(why_bogus);
539	if(rcode != 0) {
540		q->res->rcode = rcode;
541		q->msg_security = s;
542		return;
543	}
544
545	q->res->rcode = LDNS_RCODE_SERVFAIL;
546	q->msg_security = 0;
547	q->msg = memdup(sldns_buffer_begin(buf), sldns_buffer_limit(buf));
548	q->msg_len = sldns_buffer_limit(buf);
549	if(!q->msg) {
550		return; /* the error is in the rcode */
551	}
552
553	/* canonname and results */
554	q->msg_security = s;
555	libworker_enter_result(q->res, buf, q->w->env->scratch, s);
556}
557
558void
559libworker_fg_done_cb(void* arg, int rcode, sldns_buffer* buf, enum sec_status s,
560	char* why_bogus)
561{
562	struct ctx_query* q = (struct ctx_query*)arg;
563	/* fg query is done; exit comm base */
564	comm_base_exit(q->w->base);
565
566	libworker_fillup_fg(q, rcode, buf, s, why_bogus);
567}
568
569/** setup qinfo and edns */
570static int
571setup_qinfo_edns(struct libworker* w, struct ctx_query* q,
572	struct query_info* qinfo, struct edns_data* edns)
573{
574	qinfo->qtype = (uint16_t)q->res->qtype;
575	qinfo->qclass = (uint16_t)q->res->qclass;
576	qinfo->qname = sldns_str2wire_dname(q->res->qname, &qinfo->qname_len);
577	if(!qinfo->qname) {
578		return 0;
579	}
580	edns->edns_present = 1;
581	edns->ext_rcode = 0;
582	edns->edns_version = 0;
583	edns->bits = EDNS_DO;
584	if(sldns_buffer_capacity(w->back->udp_buff) < 65535)
585		edns->udp_size = (uint16_t)sldns_buffer_capacity(
586			w->back->udp_buff);
587	else	edns->udp_size = 65535;
588	return 1;
589}
590
591int libworker_fg(struct ub_ctx* ctx, struct ctx_query* q)
592{
593	struct libworker* w = libworker_setup(ctx, 0, NULL);
594	uint16_t qflags, qid;
595	struct query_info qinfo;
596	struct edns_data edns;
597	if(!w)
598		return UB_INITFAIL;
599	if(!setup_qinfo_edns(w, q, &qinfo, &edns)) {
600		libworker_delete(w);
601		return UB_SYNTAX;
602	}
603	qid = 0;
604	qflags = BIT_RD;
605	q->w = w;
606	/* see if there is a fixed answer */
607	sldns_buffer_write_u16_at(w->back->udp_buff, 0, qid);
608	sldns_buffer_write_u16_at(w->back->udp_buff, 2, qflags);
609	if(local_zones_answer(ctx->local_zones, &qinfo, &edns,
610		w->back->udp_buff, w->env->scratch, NULL)) {
611		regional_free_all(w->env->scratch);
612		libworker_fillup_fg(q, LDNS_RCODE_NOERROR,
613			w->back->udp_buff, sec_status_insecure, NULL);
614		libworker_delete(w);
615		free(qinfo.qname);
616		return UB_NOERROR;
617	}
618	/* process new query */
619	if(!mesh_new_callback(w->env->mesh, &qinfo, qflags, &edns,
620		w->back->udp_buff, qid, libworker_fg_done_cb, q)) {
621		free(qinfo.qname);
622		return UB_NOMEM;
623	}
624	free(qinfo.qname);
625
626	/* wait for reply */
627	comm_base_dispatch(w->base);
628
629	libworker_delete(w);
630	return UB_NOERROR;
631}
632
633void
634libworker_event_done_cb(void* arg, int rcode, sldns_buffer* buf,
635	enum sec_status s, char* why_bogus)
636{
637	struct ctx_query* q = (struct ctx_query*)arg;
638	ub_event_callback_t cb = (ub_event_callback_t)q->cb;
639	void* cb_arg = q->cb_arg;
640	int cancelled = q->cancelled;
641
642	/* delete it now */
643	struct ub_ctx* ctx = q->w->ctx;
644	lock_basic_lock(&ctx->cfglock);
645	(void)rbtree_delete(&ctx->queries, q->node.key);
646	ctx->num_async--;
647	context_query_delete(q);
648	lock_basic_unlock(&ctx->cfglock);
649
650	if(!cancelled) {
651		/* call callback */
652		int sec = 0;
653		if(s == sec_status_bogus)
654			sec = 1;
655		else if(s == sec_status_secure)
656			sec = 2;
657		(*cb)(cb_arg, rcode, (void*)sldns_buffer_begin(buf),
658			(int)sldns_buffer_limit(buf), sec, why_bogus);
659	}
660}
661
662int libworker_attach_mesh(struct ub_ctx* ctx, struct ctx_query* q,
663	int* async_id)
664{
665	struct libworker* w = ctx->event_worker;
666	uint16_t qflags, qid;
667	struct query_info qinfo;
668	struct edns_data edns;
669	if(!w)
670		return UB_INITFAIL;
671	if(!setup_qinfo_edns(w, q, &qinfo, &edns))
672		return UB_SYNTAX;
673	qid = 0;
674	qflags = BIT_RD;
675	q->w = w;
676	/* see if there is a fixed answer */
677	sldns_buffer_write_u16_at(w->back->udp_buff, 0, qid);
678	sldns_buffer_write_u16_at(w->back->udp_buff, 2, qflags);
679	if(local_zones_answer(ctx->local_zones, &qinfo, &edns,
680		w->back->udp_buff, w->env->scratch, NULL)) {
681		regional_free_all(w->env->scratch);
682		free(qinfo.qname);
683		libworker_event_done_cb(q, LDNS_RCODE_NOERROR,
684			w->back->udp_buff, sec_status_insecure, NULL);
685		return UB_NOERROR;
686	}
687	/* process new query */
688	if(async_id)
689		*async_id = q->querynum;
690	if(!mesh_new_callback(w->env->mesh, &qinfo, qflags, &edns,
691		w->back->udp_buff, qid, libworker_event_done_cb, q)) {
692		free(qinfo.qname);
693		return UB_NOMEM;
694	}
695	free(qinfo.qname);
696	return UB_NOERROR;
697}
698
699/** add result to the bg worker result queue */
700static void
701add_bg_result(struct libworker* w, struct ctx_query* q, sldns_buffer* pkt,
702	int err, char* reason)
703{
704	uint8_t* msg = NULL;
705	uint32_t len = 0;
706
707	/* serialize and delete unneeded q */
708	if(w->is_bg_thread) {
709		lock_basic_lock(&w->ctx->cfglock);
710		if(reason)
711			q->res->why_bogus = strdup(reason);
712		if(pkt) {
713			q->msg_len = sldns_buffer_remaining(pkt);
714			q->msg = memdup(sldns_buffer_begin(pkt), q->msg_len);
715			if(!q->msg)
716				msg = context_serialize_answer(q, UB_NOMEM,
717				NULL, &len);
718			else	msg = context_serialize_answer(q, err,
719				NULL, &len);
720		} else msg = context_serialize_answer(q, err, NULL, &len);
721		lock_basic_unlock(&w->ctx->cfglock);
722	} else {
723		if(reason)
724			q->res->why_bogus = strdup(reason);
725		msg = context_serialize_answer(q, err, pkt, &len);
726		(void)rbtree_delete(&w->ctx->queries, q->node.key);
727		w->ctx->num_async--;
728		context_query_delete(q);
729	}
730
731	if(!msg) {
732		log_err("out of memory for async answer");
733		return;
734	}
735	if(!tube_queue_item(w->ctx->rr_pipe, msg, len)) {
736		log_err("out of memory for async answer");
737		return;
738	}
739}
740
741void
742libworker_bg_done_cb(void* arg, int rcode, sldns_buffer* buf, enum sec_status s,
743	char* why_bogus)
744{
745	struct ctx_query* q = (struct ctx_query*)arg;
746
747	if(q->cancelled) {
748		if(q->w->is_bg_thread) {
749			/* delete it now */
750			struct ub_ctx* ctx = q->w->ctx;
751			lock_basic_lock(&ctx->cfglock);
752			(void)rbtree_delete(&ctx->queries, q->node.key);
753			ctx->num_async--;
754			context_query_delete(q);
755			lock_basic_unlock(&ctx->cfglock);
756		}
757		/* cancelled, do not give answer */
758		return;
759	}
760	q->msg_security = s;
761	if(!buf)
762		buf = q->w->env->scratch_buffer;
763	if(rcode != 0) {
764		error_encode(buf, rcode, NULL, 0, BIT_RD, NULL);
765	}
766	add_bg_result(q->w, q, buf, UB_NOERROR, why_bogus);
767}
768
769
770/** handle new query command for bg worker */
771static void
772handle_newq(struct libworker* w, uint8_t* buf, uint32_t len)
773{
774	uint16_t qflags, qid;
775	struct query_info qinfo;
776	struct edns_data edns;
777	struct ctx_query* q;
778	if(w->is_bg_thread) {
779		lock_basic_lock(&w->ctx->cfglock);
780		q = context_lookup_new_query(w->ctx, buf, len);
781		lock_basic_unlock(&w->ctx->cfglock);
782	} else {
783		q = context_deserialize_new_query(w->ctx, buf, len);
784	}
785	free(buf);
786	if(!q) {
787		log_err("failed to deserialize newq");
788		return;
789	}
790	if(!setup_qinfo_edns(w, q, &qinfo, &edns)) {
791		add_bg_result(w, q, NULL, UB_SYNTAX, NULL);
792		return;
793	}
794	qid = 0;
795	qflags = BIT_RD;
796	/* see if there is a fixed answer */
797	sldns_buffer_write_u16_at(w->back->udp_buff, 0, qid);
798	sldns_buffer_write_u16_at(w->back->udp_buff, 2, qflags);
799	if(local_zones_answer(w->ctx->local_zones, &qinfo, &edns,
800		w->back->udp_buff, w->env->scratch, NULL)) {
801		regional_free_all(w->env->scratch);
802		q->msg_security = sec_status_insecure;
803		add_bg_result(w, q, w->back->udp_buff, UB_NOERROR, NULL);
804		free(qinfo.qname);
805		return;
806	}
807	q->w = w;
808	/* process new query */
809	if(!mesh_new_callback(w->env->mesh, &qinfo, qflags, &edns,
810		w->back->udp_buff, qid, libworker_bg_done_cb, q)) {
811		add_bg_result(w, q, NULL, UB_NOMEM, NULL);
812	}
813	free(qinfo.qname);
814}
815
816void libworker_alloc_cleanup(void* arg)
817{
818	struct libworker* w = (struct libworker*)arg;
819	slabhash_clear(&w->env->rrset_cache->table);
820        slabhash_clear(w->env->msg_cache);
821}
822
823struct outbound_entry* libworker_send_query(uint8_t* qname, size_t qnamelen,
824        uint16_t qtype, uint16_t qclass, uint16_t flags, int dnssec,
825	int want_dnssec, int nocaps, struct sockaddr_storage* addr,
826	socklen_t addrlen, uint8_t* zone, size_t zonelen,
827	struct module_qstate* q)
828{
829	struct libworker* w = (struct libworker*)q->env->worker;
830	struct outbound_entry* e = (struct outbound_entry*)regional_alloc(
831		q->region, sizeof(*e));
832	if(!e)
833		return NULL;
834	e->qstate = q;
835	e->qsent = outnet_serviced_query(w->back, qname,
836		qnamelen, qtype, qclass, flags, dnssec, want_dnssec, nocaps,
837		q->env->cfg->tcp_upstream, q->env->cfg->ssl_upstream, addr,
838		addrlen, zone, zonelen, libworker_handle_service_reply, e,
839		w->back->udp_buff);
840	if(!e->qsent) {
841		return NULL;
842	}
843	return e;
844}
845
846int
847libworker_handle_reply(struct comm_point* c, void* arg, int error,
848        struct comm_reply* reply_info)
849{
850	struct module_qstate* q = (struct module_qstate*)arg;
851	struct libworker* lw = (struct libworker*)q->env->worker;
852	struct outbound_entry e;
853	e.qstate = q;
854	e.qsent = NULL;
855
856	if(error != 0) {
857		mesh_report_reply(lw->env->mesh, &e, reply_info, error);
858		return 0;
859	}
860	/* sanity check. */
861	if(!LDNS_QR_WIRE(sldns_buffer_begin(c->buffer))
862		|| LDNS_OPCODE_WIRE(sldns_buffer_begin(c->buffer)) !=
863			LDNS_PACKET_QUERY
864		|| LDNS_QDCOUNT(sldns_buffer_begin(c->buffer)) > 1) {
865		/* error becomes timeout for the module as if this reply
866		 * never arrived. */
867		mesh_report_reply(lw->env->mesh, &e, reply_info,
868			NETEVENT_TIMEOUT);
869		return 0;
870	}
871	mesh_report_reply(lw->env->mesh, &e, reply_info, NETEVENT_NOERROR);
872	return 0;
873}
874
875int
876libworker_handle_service_reply(struct comm_point* c, void* arg, int error,
877        struct comm_reply* reply_info)
878{
879	struct outbound_entry* e = (struct outbound_entry*)arg;
880	struct libworker* lw = (struct libworker*)e->qstate->env->worker;
881
882	if(error != 0) {
883		mesh_report_reply(lw->env->mesh, e, reply_info, error);
884		return 0;
885	}
886	/* sanity check. */
887	if(!LDNS_QR_WIRE(sldns_buffer_begin(c->buffer))
888		|| LDNS_OPCODE_WIRE(sldns_buffer_begin(c->buffer)) !=
889			LDNS_PACKET_QUERY
890		|| LDNS_QDCOUNT(sldns_buffer_begin(c->buffer)) > 1) {
891		/* error becomes timeout for the module as if this reply
892		 * never arrived. */
893		mesh_report_reply(lw->env->mesh, e, reply_info,
894			NETEVENT_TIMEOUT);
895		return 0;
896	}
897	mesh_report_reply(lw->env->mesh,  e, reply_info, NETEVENT_NOERROR);
898	return 0;
899}
900
901/* --- fake callbacks for fptr_wlist to work --- */
902void worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
903	uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
904	int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
905{
906	log_assert(0);
907}
908
909int worker_handle_request(struct comm_point* ATTR_UNUSED(c),
910	void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),
911        struct comm_reply* ATTR_UNUSED(repinfo))
912{
913	log_assert(0);
914	return 0;
915}
916
917int worker_handle_reply(struct comm_point* ATTR_UNUSED(c),
918	void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),
919        struct comm_reply* ATTR_UNUSED(reply_info))
920{
921	log_assert(0);
922	return 0;
923}
924
925int worker_handle_service_reply(struct comm_point* ATTR_UNUSED(c),
926	void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),
927        struct comm_reply* ATTR_UNUSED(reply_info))
928{
929	log_assert(0);
930	return 0;
931}
932
933int remote_accept_callback(struct comm_point* ATTR_UNUSED(c),
934	void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),
935        struct comm_reply* ATTR_UNUSED(repinfo))
936{
937	log_assert(0);
938	return 0;
939}
940
941int remote_control_callback(struct comm_point* ATTR_UNUSED(c),
942	void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),
943        struct comm_reply* ATTR_UNUSED(repinfo))
944{
945	log_assert(0);
946	return 0;
947}
948
949void worker_sighandler(int ATTR_UNUSED(sig), void* ATTR_UNUSED(arg))
950{
951	log_assert(0);
952}
953
954struct outbound_entry* worker_send_query(uint8_t* ATTR_UNUSED(qname),
955	size_t ATTR_UNUSED(qnamelen), uint16_t ATTR_UNUSED(qtype),
956	uint16_t ATTR_UNUSED(qclass), uint16_t ATTR_UNUSED(flags),
957	int ATTR_UNUSED(dnssec), int ATTR_UNUSED(want_dnssec),
958	int ATTR_UNUSED(nocaps), struct sockaddr_storage* ATTR_UNUSED(addr),
959	socklen_t ATTR_UNUSED(addrlen), uint8_t* ATTR_UNUSED(zone),
960	size_t ATTR_UNUSED(zonelen), struct module_qstate* ATTR_UNUSED(q))
961{
962	log_assert(0);
963	return 0;
964}
965
966void
967worker_alloc_cleanup(void* ATTR_UNUSED(arg))
968{
969	log_assert(0);
970}
971
972void worker_stat_timer_cb(void* ATTR_UNUSED(arg))
973{
974	log_assert(0);
975}
976
977void worker_probe_timer_cb(void* ATTR_UNUSED(arg))
978{
979	log_assert(0);
980}
981
982void worker_start_accept(void* ATTR_UNUSED(arg))
983{
984	log_assert(0);
985}
986
987void worker_stop_accept(void* ATTR_UNUSED(arg))
988{
989	log_assert(0);
990}
991
992int order_lock_cmp(const void* ATTR_UNUSED(e1), const void* ATTR_UNUSED(e2))
993{
994	log_assert(0);
995	return 0;
996}
997
998int
999codeline_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b))
1000{
1001	log_assert(0);
1002	return 0;
1003}
1004
1005int replay_var_compare(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b))
1006{
1007        log_assert(0);
1008        return 0;
1009}
1010
1011void remote_get_opt_ssl(char* ATTR_UNUSED(str), void* ATTR_UNUSED(arg))
1012{
1013        log_assert(0);
1014}
1015
1016#ifdef UB_ON_WINDOWS
1017void
1018worker_win_stop_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(ev), void*
1019        ATTR_UNUSED(arg)) {
1020        log_assert(0);
1021}
1022
1023void
1024wsvc_cron_cb(void* ATTR_UNUSED(arg))
1025{
1026        log_assert(0);
1027}
1028#endif /* UB_ON_WINDOWS */
1029