• Home
  • History
  • Annotate
  • Line#
  • Navigate
  • Raw
  • Download
  • only in /asuswrt-rt-n18u-9.0.0.4.380.2695/release/src-rt-6.x.4708/router/samba-3.5.8/source4/lib/messaging/
1/*
2   Unix SMB/CIFS implementation.
3
4   Samba internal messaging functions
5
6   Copyright (C) Andrew Tridgell 2004
7
8   This program is free software; you can redistribute it and/or modify
9   it under the terms of the GNU General Public License as published by
10   the Free Software Foundation; either version 3 of the License, or
11   (at your option) any later version.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with this program.  If not, see <http://www.gnu.org/licenses/>.
20*/
21
22#include "includes.h"
23#include "lib/events/events.h"
24#include "system/filesys.h"
25#include "messaging/messaging.h"
26#include "../lib/util/dlinklist.h"
27#include "lib/socket/socket.h"
28#include "librpc/gen_ndr/ndr_irpc.h"
29#include "lib/messaging/irpc.h"
30#include "tdb_wrap.h"
31#include "../lib/util/unix_privs.h"
32#include "librpc/rpc/dcerpc.h"
33#include "../tdb/include/tdb.h"
34#include "../lib/util/util_tdb.h"
35#include "cluster/cluster.h"
36
37/* change the message version with any incompatible changes in the protocol */
38#define MESSAGING_VERSION 1
39
40struct messaging_context {
41	struct server_id server_id;
42	struct socket_context *sock;
43	const char *base_path;
44	const char *path;
45	struct dispatch_fn **dispatch;
46	uint32_t num_types;
47	struct idr_context *dispatch_tree;
48	struct messaging_rec *pending;
49	struct messaging_rec *retry_queue;
50	struct smb_iconv_convenience *iconv_convenience;
51	struct irpc_list *irpc;
52	struct idr_context *idr;
53	const char **names;
54	struct timeval start_time;
55	struct tevent_timer *retry_te;
56	struct {
57		struct tevent_context *ev;
58		struct tevent_fd *fde;
59	} event;
60};
61
62/* we have a linked list of dispatch handlers for each msg_type that
63   this messaging server can deal with */
64struct dispatch_fn {
65	struct dispatch_fn *next, *prev;
66	uint32_t msg_type;
67	void *private_data;
68	msg_callback_t fn;
69};
70
71/* an individual message */
72struct messaging_rec {
73	struct messaging_rec *next, *prev;
74	struct messaging_context *msg;
75	const char *path;
76
77	struct messaging_header {
78		uint32_t version;
79		uint32_t msg_type;
80		struct server_id from;
81		struct server_id to;
82		uint32_t length;
83	} *header;
84
85	DATA_BLOB packet;
86	uint32_t retries;
87};
88
89
90static void irpc_handler(struct messaging_context *, void *,
91			 uint32_t, struct server_id, DATA_BLOB *);
92
93
94/*
95 A useful function for testing the message system.
96*/
97static void ping_message(struct messaging_context *msg, void *private_data,
98			 uint32_t msg_type, struct server_id src, DATA_BLOB *data)
99{
100	DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n",
101		 (uint_t)src.node, (uint_t)src.id, (int)data->length,
102		 data->data?(const char *)data->data:""));
103	messaging_send(msg, src, MSG_PONG, data);
104}
105
106/*
107  return uptime of messaging server via irpc
108*/
109static NTSTATUS irpc_uptime(struct irpc_message *msg,
110			    struct irpc_uptime *r)
111{
112	struct messaging_context *ctx = talloc_get_type(msg->private_data, struct messaging_context);
113	*r->out.start_time = timeval_to_nttime(&ctx->start_time);
114	return NT_STATUS_OK;
115}
116
117/*
118   return the path to a messaging socket
119*/
120static char *messaging_path(struct messaging_context *msg, struct server_id server_id)
121{
122	return talloc_asprintf(msg, "%s/msg.%s", msg->base_path,
123			       cluster_id_string(msg, server_id));
124}
125
126/*
127  dispatch a fully received message
128
129  note that this deliberately can match more than one message handler
130  per message. That allows a single messasging context to register
131  (for example) a debug handler for more than one piece of code
132*/
133static void messaging_dispatch(struct messaging_context *msg, struct messaging_rec *rec)
134{
135	struct dispatch_fn *d, *next;
136
137	/* temporary IDs use an idtree, the rest use a array of pointers */
138	if (rec->header->msg_type >= MSG_TMP_BASE) {
139		d = (struct dispatch_fn *)idr_find(msg->dispatch_tree,
140						   rec->header->msg_type);
141	} else if (rec->header->msg_type < msg->num_types) {
142		d = msg->dispatch[rec->header->msg_type];
143	} else {
144		d = NULL;
145	}
146
147	for (; d; d = next) {
148		DATA_BLOB data;
149		next = d->next;
150		data.data = rec->packet.data + sizeof(*rec->header);
151		data.length = rec->header->length;
152		d->fn(msg, d->private_data, d->msg_type, rec->header->from, &data);
153	}
154	rec->header->length = 0;
155}
156
157/*
158  handler for messages that arrive from other nodes in the cluster
159*/
160static void cluster_message_handler(struct messaging_context *msg, DATA_BLOB packet)
161{
162	struct messaging_rec *rec;
163
164	rec = talloc(msg, struct messaging_rec);
165	if (rec == NULL) {
166		smb_panic("Unable to allocate messaging_rec");
167	}
168
169	rec->msg           = msg;
170	rec->path          = msg->path;
171	rec->header        = (struct messaging_header *)packet.data;
172	rec->packet        = packet;
173	rec->retries       = 0;
174
175	if (packet.length != sizeof(*rec->header) + rec->header->length) {
176		DEBUG(0,("messaging: bad message header size %d should be %d\n",
177			 rec->header->length, (int)(packet.length - sizeof(*rec->header))));
178		talloc_free(rec);
179		return;
180	}
181
182	messaging_dispatch(msg, rec);
183	talloc_free(rec);
184}
185
186
187
188/*
189  try to send the message
190*/
191static NTSTATUS try_send(struct messaging_rec *rec)
192{
193	struct messaging_context *msg = rec->msg;
194	size_t nsent;
195	void *priv;
196	NTSTATUS status;
197	struct socket_address *path;
198
199	/* rec->path is the path of the *other* socket, where we want
200	 * this to end up */
201	path = socket_address_from_strings(msg, msg->sock->backend_name,
202					   rec->path, 0);
203	if (!path) {
204		return NT_STATUS_NO_MEMORY;
205	}
206
207	/* we send with privileges so messages work from any context */
208	priv = root_privileges();
209	status = socket_sendto(msg->sock, &rec->packet, &nsent, path);
210	talloc_free(path);
211	talloc_free(priv);
212
213	return status;
214}
215
216/*
217  retry backed off messages
218*/
219static void msg_retry_timer(struct tevent_context *ev, struct tevent_timer *te,
220			    struct timeval t, void *private_data)
221{
222	struct messaging_context *msg = talloc_get_type(private_data,
223							struct messaging_context);
224	msg->retry_te = NULL;
225
226	/* put the messages back on the main queue */
227	while (msg->retry_queue) {
228		struct messaging_rec *rec = msg->retry_queue;
229		DLIST_REMOVE(msg->retry_queue, rec);
230		DLIST_ADD_END(msg->pending, rec, struct messaging_rec *);
231	}
232
233	EVENT_FD_WRITEABLE(msg->event.fde);
234}
235
236/*
237  handle a socket write event
238*/
239static void messaging_send_handler(struct messaging_context *msg)
240{
241	while (msg->pending) {
242		struct messaging_rec *rec = msg->pending;
243		NTSTATUS status;
244		status = try_send(rec);
245		if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
246			rec->retries++;
247			if (rec->retries > 3) {
248				/* we're getting continuous write errors -
249				   backoff this record */
250				DLIST_REMOVE(msg->pending, rec);
251				DLIST_ADD_END(msg->retry_queue, rec,
252					      struct messaging_rec *);
253				if (msg->retry_te == NULL) {
254					msg->retry_te =
255						event_add_timed(msg->event.ev, msg,
256								timeval_current_ofs(1, 0),
257								msg_retry_timer, msg);
258				}
259			}
260			break;
261		}
262		rec->retries = 0;
263		if (!NT_STATUS_IS_OK(status)) {
264			DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n",
265				 cluster_id_string(debug_ctx(), rec->header->from),
266				 cluster_id_string(debug_ctx(), rec->header->to),
267				 rec->header->msg_type,
268				 nt_errstr(status)));
269		}
270		DLIST_REMOVE(msg->pending, rec);
271		talloc_free(rec);
272	}
273	if (msg->pending == NULL) {
274		EVENT_FD_NOT_WRITEABLE(msg->event.fde);
275	}
276}
277
278/*
279  handle a new incoming packet
280*/
281static void messaging_recv_handler(struct messaging_context *msg)
282{
283	struct messaging_rec *rec;
284	NTSTATUS status;
285	DATA_BLOB packet;
286	size_t msize;
287
288	/* see how many bytes are in the next packet */
289	status = socket_pending(msg->sock, &msize);
290	if (!NT_STATUS_IS_OK(status)) {
291		DEBUG(0,("socket_pending failed in messaging - %s\n",
292			 nt_errstr(status)));
293		return;
294	}
295
296	packet = data_blob_talloc(msg, NULL, msize);
297	if (packet.data == NULL) {
298		/* assume this is temporary and retry */
299		return;
300	}
301
302	status = socket_recv(msg->sock, packet.data, msize, &msize);
303	if (!NT_STATUS_IS_OK(status)) {
304		data_blob_free(&packet);
305		return;
306	}
307
308	if (msize < sizeof(*rec->header)) {
309		DEBUG(0,("messaging: bad message of size %d\n", (int)msize));
310		data_blob_free(&packet);
311		return;
312	}
313
314	rec = talloc(msg, struct messaging_rec);
315	if (rec == NULL) {
316		smb_panic("Unable to allocate messaging_rec");
317	}
318
319	talloc_steal(rec, packet.data);
320	rec->msg           = msg;
321	rec->path          = msg->path;
322	rec->header        = (struct messaging_header *)packet.data;
323	rec->packet        = packet;
324	rec->retries       = 0;
325
326	if (msize != sizeof(*rec->header) + rec->header->length) {
327		DEBUG(0,("messaging: bad message header size %d should be %d\n",
328			 rec->header->length, (int)(msize - sizeof(*rec->header))));
329		talloc_free(rec);
330		return;
331	}
332
333	messaging_dispatch(msg, rec);
334	talloc_free(rec);
335}
336
337
338/*
339  handle a socket event
340*/
341static void messaging_handler(struct tevent_context *ev, struct tevent_fd *fde,
342			      uint16_t flags, void *private_data)
343{
344	struct messaging_context *msg = talloc_get_type(private_data,
345							struct messaging_context);
346	if (flags & EVENT_FD_WRITE) {
347		messaging_send_handler(msg);
348	}
349	if (flags & EVENT_FD_READ) {
350		messaging_recv_handler(msg);
351	}
352}
353
354
355/*
356  Register a dispatch function for a particular message type.
357*/
358NTSTATUS messaging_register(struct messaging_context *msg, void *private_data,
359			    uint32_t msg_type, msg_callback_t fn)
360{
361	struct dispatch_fn *d;
362
363	/* possibly expand dispatch array */
364	if (msg_type >= msg->num_types) {
365		struct dispatch_fn **dp;
366		int i;
367		dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1);
368		NT_STATUS_HAVE_NO_MEMORY(dp);
369		msg->dispatch = dp;
370		for (i=msg->num_types;i<=msg_type;i++) {
371			msg->dispatch[i] = NULL;
372		}
373		msg->num_types = msg_type+1;
374	}
375
376	d = talloc_zero(msg->dispatch, struct dispatch_fn);
377	NT_STATUS_HAVE_NO_MEMORY(d);
378	d->msg_type = msg_type;
379	d->private_data = private_data;
380	d->fn = fn;
381
382	DLIST_ADD(msg->dispatch[msg_type], d);
383
384	return NT_STATUS_OK;
385}
386
387/*
388  register a temporary message handler. The msg_type is allocated
389  above MSG_TMP_BASE
390*/
391NTSTATUS messaging_register_tmp(struct messaging_context *msg, void *private_data,
392				msg_callback_t fn, uint32_t *msg_type)
393{
394	struct dispatch_fn *d;
395	int id;
396
397	d = talloc_zero(msg->dispatch, struct dispatch_fn);
398	NT_STATUS_HAVE_NO_MEMORY(d);
399	d->private_data = private_data;
400	d->fn = fn;
401
402	id = idr_get_new_above(msg->dispatch_tree, d, MSG_TMP_BASE, UINT16_MAX);
403	if (id == -1) {
404		talloc_free(d);
405		return NT_STATUS_TOO_MANY_CONTEXT_IDS;
406	}
407
408	d->msg_type = (uint32_t)id;
409	(*msg_type) = d->msg_type;
410
411	return NT_STATUS_OK;
412}
413
414/*
415  De-register the function for a particular message type.
416*/
417void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void *private_data)
418{
419	struct dispatch_fn *d, *next;
420
421	if (msg_type >= msg->num_types) {
422		d = (struct dispatch_fn *)idr_find(msg->dispatch_tree,
423						   msg_type);
424		if (!d) return;
425		idr_remove(msg->dispatch_tree, msg_type);
426		talloc_free(d);
427		return;
428	}
429
430	for (d = msg->dispatch[msg_type]; d; d = next) {
431		next = d->next;
432		if (d->private_data == private_data) {
433			DLIST_REMOVE(msg->dispatch[msg_type], d);
434			talloc_free(d);
435		}
436	}
437}
438
439/*
440  Send a message to a particular server
441*/
442NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server,
443			uint32_t msg_type, DATA_BLOB *data)
444{
445	struct messaging_rec *rec;
446	NTSTATUS status;
447	size_t dlength = data?data->length:0;
448
449	rec = talloc(msg, struct messaging_rec);
450	if (rec == NULL) {
451		return NT_STATUS_NO_MEMORY;
452	}
453
454	rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength);
455	if (rec->packet.data == NULL) {
456		talloc_free(rec);
457		return NT_STATUS_NO_MEMORY;
458	}
459
460	rec->retries       = 0;
461	rec->msg              = msg;
462	rec->header           = (struct messaging_header *)rec->packet.data;
463	/* zero padding */
464	ZERO_STRUCTP(rec->header);
465	rec->header->version  = MESSAGING_VERSION;
466	rec->header->msg_type = msg_type;
467	rec->header->from     = msg->server_id;
468	rec->header->to       = server;
469	rec->header->length   = dlength;
470	if (dlength != 0) {
471		memcpy(rec->packet.data + sizeof(*rec->header),
472		       data->data, dlength);
473	}
474
475	if (!cluster_node_equal(&msg->server_id, &server)) {
476		/* the destination is on another node - dispatch via
477		   the cluster layer */
478		status = cluster_message_send(server, &rec->packet);
479		talloc_free(rec);
480		return status;
481	}
482
483	rec->path = messaging_path(msg, server);
484	talloc_steal(rec, rec->path);
485
486	if (msg->pending != NULL) {
487		status = STATUS_MORE_ENTRIES;
488	} else {
489		status = try_send(rec);
490	}
491
492	if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
493		if (msg->pending == NULL) {
494			EVENT_FD_WRITEABLE(msg->event.fde);
495		}
496		DLIST_ADD_END(msg->pending, rec, struct messaging_rec *);
497		return NT_STATUS_OK;
498	}
499
500	talloc_free(rec);
501
502	return status;
503}
504
505/*
506  Send a message to a particular server, with the message containing a single pointer
507*/
508NTSTATUS messaging_send_ptr(struct messaging_context *msg, struct server_id server,
509			    uint32_t msg_type, void *ptr)
510{
511	DATA_BLOB blob;
512
513	blob.data = (uint8_t *)&ptr;
514	blob.length = sizeof(void *);
515
516	return messaging_send(msg, server, msg_type, &blob);
517}
518
519
520/*
521  destroy the messaging context
522*/
523static int messaging_destructor(struct messaging_context *msg)
524{
525	unlink(msg->path);
526	while (msg->names && msg->names[0]) {
527		irpc_remove_name(msg, msg->names[0]);
528	}
529	return 0;
530}
531
532/*
533  create the listening socket and setup the dispatcher
534*/
535struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
536					 const char *dir,
537					 struct server_id server_id,
538					 struct smb_iconv_convenience *iconv_convenience,
539					 struct tevent_context *ev)
540{
541	struct messaging_context *msg;
542	NTSTATUS status;
543	struct socket_address *path;
544
545	if (ev == NULL) {
546		return NULL;
547	}
548
549	msg = talloc_zero(mem_ctx, struct messaging_context);
550	if (msg == NULL) {
551		return NULL;
552	}
553
554	/* setup a handler for messages from other cluster nodes, if appropriate */
555	status = cluster_message_init(msg, server_id, cluster_message_handler);
556	if (!NT_STATUS_IS_OK(status)) {
557		talloc_free(msg);
558		return NULL;
559	}
560
561	/* create the messaging directory if needed */
562	mkdir(dir, 0700);
563
564	msg->base_path     = talloc_reference(msg, dir);
565	msg->path          = messaging_path(msg, server_id);
566	msg->server_id     = server_id;
567	msg->iconv_convenience = iconv_convenience;
568	msg->idr           = idr_init(msg);
569	msg->dispatch_tree = idr_init(msg);
570	msg->start_time    = timeval_current();
571
572	status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
573	if (!NT_STATUS_IS_OK(status)) {
574		talloc_free(msg);
575		return NULL;
576	}
577
578	/* by stealing here we ensure that the socket is cleaned up (and even
579	   deleted) on exit */
580	talloc_steal(msg, msg->sock);
581
582	path = socket_address_from_strings(msg, msg->sock->backend_name,
583					   msg->path, 0);
584	if (!path) {
585		talloc_free(msg);
586		return NULL;
587	}
588
589	status = socket_listen(msg->sock, path, 50, 0);
590	if (!NT_STATUS_IS_OK(status)) {
591		DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status)));
592		talloc_free(msg);
593		return NULL;
594	}
595
596	/* it needs to be non blocking for sends */
597	set_blocking(socket_get_fd(msg->sock), false);
598
599	msg->event.ev   = ev;
600	msg->event.fde	= event_add_fd(ev, msg, socket_get_fd(msg->sock),
601				       EVENT_FD_READ, messaging_handler, msg);
602
603	talloc_set_destructor(msg, messaging_destructor);
604
605	messaging_register(msg, NULL, MSG_PING, ping_message);
606	messaging_register(msg, NULL, MSG_IRPC, irpc_handler);
607	IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg);
608
609	return msg;
610}
611
612/*
613   A hack, for the short term until we get 'client only' messaging in place
614*/
615struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx,
616						const char *dir,
617						struct smb_iconv_convenience *iconv_convenience,
618						struct tevent_context *ev)
619{
620	struct server_id id;
621	ZERO_STRUCT(id);
622	id.id = random() % 0x10000000;
623	return messaging_init(mem_ctx, dir, id, iconv_convenience, ev);
624}
625/*
626  a list of registered irpc server functions
627*/
628struct irpc_list {
629	struct irpc_list *next, *prev;
630	struct GUID uuid;
631	const struct ndr_interface_table *table;
632	int callnum;
633	irpc_function_t fn;
634	void *private_data;
635};
636
637
638/*
639  register a irpc server function
640*/
641NTSTATUS irpc_register(struct messaging_context *msg_ctx,
642		       const struct ndr_interface_table *table,
643		       int callnum, irpc_function_t fn, void *private_data)
644{
645	struct irpc_list *irpc;
646
647	/* override an existing handler, if any */
648	for (irpc=msg_ctx->irpc; irpc; irpc=irpc->next) {
649		if (irpc->table == table && irpc->callnum == callnum) {
650			break;
651		}
652	}
653	if (irpc == NULL) {
654		irpc = talloc(msg_ctx, struct irpc_list);
655		NT_STATUS_HAVE_NO_MEMORY(irpc);
656		DLIST_ADD(msg_ctx->irpc, irpc);
657	}
658
659	irpc->table   = table;
660	irpc->callnum = callnum;
661	irpc->fn      = fn;
662	irpc->private_data = private_data;
663	irpc->uuid = irpc->table->syntax_id.uuid;
664
665	return NT_STATUS_OK;
666}
667
668
669/*
670  handle an incoming irpc reply message
671*/
672static void irpc_handler_reply(struct messaging_context *msg_ctx, struct irpc_message *m)
673{
674	struct irpc_request *irpc;
675	enum ndr_err_code ndr_err;
676
677	irpc = (struct irpc_request *)idr_find(msg_ctx->idr, m->header.callid);
678	if (irpc == NULL) return;
679
680	/* parse the reply data */
681	ndr_err = irpc->table->calls[irpc->callnum].ndr_pull(m->ndr, NDR_OUT, irpc->r);
682	if (NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
683		irpc->status = m->header.status;
684		talloc_steal(irpc->mem_ctx, m);
685	} else {
686		irpc->status = ndr_map_error2ntstatus(ndr_err);
687		talloc_steal(irpc, m);
688	}
689	irpc->done = true;
690	if (irpc->async.fn) {
691		irpc->async.fn(irpc);
692	}
693}
694
695/*
696  send a irpc reply
697*/
698NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status)
699{
700	struct ndr_push *push;
701	DATA_BLOB packet;
702	enum ndr_err_code ndr_err;
703
704	m->header.status = status;
705
706	/* setup the reply */
707	push = ndr_push_init_ctx(m->ndr, m->msg_ctx->iconv_convenience);
708	if (push == NULL) {
709		status = NT_STATUS_NO_MEMORY;
710		goto failed;
711	}
712
713	m->header.flags |= IRPC_FLAG_REPLY;
714
715	/* construct the packet */
716	ndr_err = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, &m->header);
717	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
718		status = ndr_map_error2ntstatus(ndr_err);
719		goto failed;
720	}
721
722	ndr_err = m->irpc->table->calls[m->irpc->callnum].ndr_push(push, NDR_OUT, m->data);
723	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
724		status = ndr_map_error2ntstatus(ndr_err);
725		goto failed;
726	}
727
728	/* send the reply message */
729	packet = ndr_push_blob(push);
730	status = messaging_send(m->msg_ctx, m->from, MSG_IRPC, &packet);
731	if (!NT_STATUS_IS_OK(status)) goto failed;
732
733failed:
734	talloc_free(m);
735	return status;
736}
737
738/*
739  handle an incoming irpc request message
740*/
741static void irpc_handler_request(struct messaging_context *msg_ctx,
742				 struct irpc_message *m)
743{
744	struct irpc_list *i;
745	void *r;
746	enum ndr_err_code ndr_err;
747
748	for (i=msg_ctx->irpc; i; i=i->next) {
749		if (GUID_equal(&i->uuid, &m->header.uuid) &&
750		    i->table->syntax_id.if_version == m->header.if_version &&
751		    i->callnum == m->header.callnum) {
752			break;
753		}
754	}
755
756	if (i == NULL) {
757		/* no registered handler for this message */
758		talloc_free(m);
759		return;
760	}
761
762	/* allocate space for the structure */
763	r = talloc_zero_size(m->ndr, i->table->calls[m->header.callnum].struct_size);
764	if (r == NULL) goto failed;
765
766	/* parse the request data */
767	ndr_err = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r);
768	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
769
770	/* make the call */
771	m->private_data= i->private_data;
772	m->defer_reply = false;
773	m->msg_ctx     = msg_ctx;
774	m->irpc        = i;
775	m->data        = r;
776	m->ev          = msg_ctx->event.ev;
777
778	m->header.status = i->fn(m, r);
779
780	if (m->defer_reply) {
781		/* the server function has asked to defer the reply to later */
782		talloc_steal(msg_ctx, m);
783		return;
784	}
785
786	irpc_send_reply(m, m->header.status);
787	return;
788
789failed:
790	talloc_free(m);
791}
792
793/*
794  handle an incoming irpc message
795*/
796static void irpc_handler(struct messaging_context *msg_ctx, void *private_data,
797			 uint32_t msg_type, struct server_id src, DATA_BLOB *packet)
798{
799	struct irpc_message *m;
800	enum ndr_err_code ndr_err;
801
802	m = talloc(msg_ctx, struct irpc_message);
803	if (m == NULL) goto failed;
804
805	m->from = src;
806
807	m->ndr = ndr_pull_init_blob(packet, m, msg_ctx->iconv_convenience);
808	if (m->ndr == NULL) goto failed;
809
810	m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC;
811
812	ndr_err = ndr_pull_irpc_header(m->ndr, NDR_BUFFERS|NDR_SCALARS, &m->header);
813	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
814
815	if (m->header.flags & IRPC_FLAG_REPLY) {
816		irpc_handler_reply(msg_ctx, m);
817	} else {
818		irpc_handler_request(msg_ctx, m);
819	}
820	return;
821
822failed:
823	talloc_free(m);
824}
825
826
827/*
828  destroy a irpc request
829*/
830static int irpc_destructor(struct irpc_request *irpc)
831{
832	if (irpc->callid != -1) {
833		idr_remove(irpc->msg_ctx->idr, irpc->callid);
834		irpc->callid = -1;
835	}
836
837	if (irpc->reject_free) {
838		return -1;
839	}
840	return 0;
841}
842
843/*
844  timeout a irpc request
845*/
846static void irpc_timeout(struct tevent_context *ev, struct tevent_timer *te,
847			 struct timeval t, void *private_data)
848{
849	struct irpc_request *irpc = talloc_get_type(private_data, struct irpc_request);
850	irpc->status = NT_STATUS_IO_TIMEOUT;
851	irpc->done = true;
852	if (irpc->async.fn) {
853		irpc->async.fn(irpc);
854	}
855}
856
857
858/*
859  make a irpc call - async send
860*/
861struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,
862				    struct server_id server_id,
863				    const struct ndr_interface_table *table,
864				    int callnum, void *r, TALLOC_CTX *ctx)
865{
866	struct irpc_header header;
867	struct ndr_push *ndr;
868	NTSTATUS status;
869	DATA_BLOB packet;
870	struct irpc_request *irpc;
871	enum ndr_err_code ndr_err;
872
873	irpc = talloc(msg_ctx, struct irpc_request);
874	if (irpc == NULL) goto failed;
875
876	irpc->msg_ctx  = msg_ctx;
877	irpc->table    = table;
878	irpc->callnum  = callnum;
879	irpc->callid   = idr_get_new(msg_ctx->idr, irpc, UINT16_MAX);
880	if (irpc->callid == -1) goto failed;
881	irpc->r        = r;
882	irpc->done     = false;
883	irpc->async.fn = NULL;
884	irpc->mem_ctx  = ctx;
885	irpc->reject_free = false;
886
887	talloc_set_destructor(irpc, irpc_destructor);
888
889	/* setup the header */
890	header.uuid = table->syntax_id.uuid;
891
892	header.if_version = table->syntax_id.if_version;
893	header.callid     = irpc->callid;
894	header.callnum    = callnum;
895	header.flags      = 0;
896	header.status     = NT_STATUS_OK;
897
898	/* construct the irpc packet */
899	ndr = ndr_push_init_ctx(irpc, msg_ctx->iconv_convenience);
900	if (ndr == NULL) goto failed;
901
902	ndr_err = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
903	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
904
905	ndr_err = table->calls[callnum].ndr_push(ndr, NDR_IN, r);
906	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
907
908	/* and send it */
909	packet = ndr_push_blob(ndr);
910	status = messaging_send(msg_ctx, server_id, MSG_IRPC, &packet);
911	if (!NT_STATUS_IS_OK(status)) goto failed;
912
913	event_add_timed(msg_ctx->event.ev, irpc,
914			timeval_current_ofs(IRPC_CALL_TIMEOUT, 0),
915			irpc_timeout, irpc);
916
917	talloc_free(ndr);
918	return irpc;
919
920failed:
921	talloc_free(irpc);
922	return NULL;
923}
924
925/*
926  wait for a irpc reply
927*/
928NTSTATUS irpc_call_recv(struct irpc_request *irpc)
929{
930	NTSTATUS status;
931
932	NT_STATUS_HAVE_NO_MEMORY(irpc);
933
934	irpc->reject_free = true;
935
936	while (!irpc->done) {
937		if (event_loop_once(irpc->msg_ctx->event.ev) != 0) {
938			return NT_STATUS_CONNECTION_DISCONNECTED;
939		}
940	}
941
942	irpc->reject_free = false;
943
944	status = irpc->status;
945	talloc_free(irpc);
946	return status;
947}
948
949/*
950  perform a synchronous irpc request
951*/
952NTSTATUS irpc_call(struct messaging_context *msg_ctx,
953		   struct server_id server_id,
954		   const struct ndr_interface_table *table,
955		   int callnum, void *r,
956		   TALLOC_CTX *mem_ctx)
957{
958	struct irpc_request *irpc = irpc_call_send(msg_ctx, server_id,
959						   table, callnum, r, mem_ctx);
960	return irpc_call_recv(irpc);
961}
962
963/*
964  open the naming database
965*/
966static struct tdb_wrap *irpc_namedb_open(struct messaging_context *msg_ctx)
967{
968	struct tdb_wrap *t;
969	char *path = talloc_asprintf(msg_ctx, "%s/names.tdb", msg_ctx->base_path);
970	if (path == NULL) {
971		return NULL;
972	}
973	t = tdb_wrap_open(msg_ctx, path, 0, 0, O_RDWR|O_CREAT, 0660);
974	talloc_free(path);
975	return t;
976}
977
978
979/*
980  add a string name that this irpc server can be called on
981*/
982NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name)
983{
984	struct tdb_wrap *t;
985	TDB_DATA rec;
986	int count;
987	NTSTATUS status = NT_STATUS_OK;
988
989	t = irpc_namedb_open(msg_ctx);
990	NT_STATUS_HAVE_NO_MEMORY(t);
991
992	if (tdb_lock_bystring(t->tdb, name) != 0) {
993		talloc_free(t);
994		return NT_STATUS_LOCK_NOT_GRANTED;
995	}
996	rec = tdb_fetch_bystring(t->tdb, name);
997	count = rec.dsize / sizeof(struct server_id);
998	rec.dptr = (unsigned char *)realloc_p(rec.dptr, struct server_id, count+1);
999	rec.dsize += sizeof(struct server_id);
1000	if (rec.dptr == NULL) {
1001		tdb_unlock_bystring(t->tdb, name);
1002		talloc_free(t);
1003		return NT_STATUS_NO_MEMORY;
1004	}
1005	((struct server_id *)rec.dptr)[count] = msg_ctx->server_id;
1006	if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) {
1007		status = NT_STATUS_INTERNAL_ERROR;
1008	}
1009	free(rec.dptr);
1010	tdb_unlock_bystring(t->tdb, name);
1011	talloc_free(t);
1012
1013	msg_ctx->names = str_list_add(msg_ctx->names, name);
1014	talloc_steal(msg_ctx, msg_ctx->names);
1015
1016	return status;
1017}
1018
1019/*
1020  return a list of server ids for a server name
1021*/
1022struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx,
1023				      TALLOC_CTX *mem_ctx,
1024				      const char *name)
1025{
1026	struct tdb_wrap *t;
1027	TDB_DATA rec;
1028	int count, i;
1029	struct server_id *ret;
1030
1031	t = irpc_namedb_open(msg_ctx);
1032	if (t == NULL) {
1033		return NULL;
1034	}
1035
1036	if (tdb_lock_bystring(t->tdb, name) != 0) {
1037		talloc_free(t);
1038		return NULL;
1039	}
1040	rec = tdb_fetch_bystring(t->tdb, name);
1041	if (rec.dptr == NULL) {
1042		tdb_unlock_bystring(t->tdb, name);
1043		talloc_free(t);
1044		return NULL;
1045	}
1046	count = rec.dsize / sizeof(struct server_id);
1047	ret = talloc_array(mem_ctx, struct server_id, count+1);
1048	if (ret == NULL) {
1049		tdb_unlock_bystring(t->tdb, name);
1050		talloc_free(t);
1051		return NULL;
1052	}
1053	for (i=0;i<count;i++) {
1054		ret[i] = ((struct server_id *)rec.dptr)[i];
1055	}
1056	ret[i] = cluster_id(0, 0);
1057	free(rec.dptr);
1058	tdb_unlock_bystring(t->tdb, name);
1059	talloc_free(t);
1060
1061	return ret;
1062}
1063
1064/*
1065  remove a name from a messaging context
1066*/
1067void irpc_remove_name(struct messaging_context *msg_ctx, const char *name)
1068{
1069	struct tdb_wrap *t;
1070	TDB_DATA rec;
1071	int count, i;
1072	struct server_id *ids;
1073
1074	str_list_remove(msg_ctx->names, name);
1075
1076	t = irpc_namedb_open(msg_ctx);
1077	if (t == NULL) {
1078		return;
1079	}
1080
1081	if (tdb_lock_bystring(t->tdb, name) != 0) {
1082		talloc_free(t);
1083		return;
1084	}
1085	rec = tdb_fetch_bystring(t->tdb, name);
1086	if (rec.dptr == NULL) {
1087		tdb_unlock_bystring(t->tdb, name);
1088		talloc_free(t);
1089		return;
1090	}
1091	count = rec.dsize / sizeof(struct server_id);
1092	if (count == 0) {
1093		free(rec.dptr);
1094		tdb_unlock_bystring(t->tdb, name);
1095		talloc_free(t);
1096		return;
1097	}
1098	ids = (struct server_id *)rec.dptr;
1099	for (i=0;i<count;i++) {
1100		if (cluster_id_equal(&ids[i], &msg_ctx->server_id)) {
1101			if (i < count-1) {
1102				memmove(ids+i, ids+i+1,
1103					sizeof(struct server_id) * (count-(i+1)));
1104			}
1105			rec.dsize -= sizeof(struct server_id);
1106			break;
1107		}
1108	}
1109	tdb_store_bystring(t->tdb, name, rec, 0);
1110	free(rec.dptr);
1111	tdb_unlock_bystring(t->tdb, name);
1112	talloc_free(t);
1113}
1114
1115struct server_id messaging_get_server_id(struct messaging_context *msg_ctx)
1116{
1117	return msg_ctx->server_id;
1118}
1119