1/* 2 Unix SMB/CIFS implementation. 3 4 Samba internal messaging functions 5 6 Copyright (C) Andrew Tridgell 2004 7 8 This program is free software; you can redistribute it and/or modify 9 it under the terms of the GNU General Public License as published by 10 the Free Software Foundation; either version 3 of the License, or 11 (at your option) any later version. 12 13 This program is distributed in the hope that it will be useful, 14 but WITHOUT ANY WARRANTY; without even the implied warranty of 15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 GNU General Public License for more details. 17 18 You should have received a copy of the GNU General Public License 19 along with this program. If not, see <http://www.gnu.org/licenses/>. 20*/ 21 22#include "includes.h" 23#include "lib/events/events.h" 24#include "system/filesys.h" 25#include "messaging/messaging.h" 26#include "../lib/util/dlinklist.h" 27#include "lib/socket/socket.h" 28#include "librpc/gen_ndr/ndr_irpc.h" 29#include "lib/messaging/irpc.h" 30#include "tdb_wrap.h" 31#include "../lib/util/unix_privs.h" 32#include "librpc/rpc/dcerpc.h" 33#include "../tdb/include/tdb.h" 34#include "../lib/util/util_tdb.h" 35#include "cluster/cluster.h" 36 37/* change the message version with any incompatible changes in the protocol */ 38#define MESSAGING_VERSION 1 39 40struct messaging_context { 41 struct server_id server_id; 42 struct socket_context *sock; 43 const char *base_path; 44 const char *path; 45 struct dispatch_fn **dispatch; 46 uint32_t num_types; 47 struct idr_context *dispatch_tree; 48 struct messaging_rec *pending; 49 struct messaging_rec *retry_queue; 50 struct smb_iconv_convenience *iconv_convenience; 51 struct irpc_list *irpc; 52 struct idr_context *idr; 53 const char **names; 54 struct timeval start_time; 55 struct tevent_timer *retry_te; 56 struct { 57 struct tevent_context *ev; 58 struct tevent_fd *fde; 59 } event; 60}; 61 62/* we have a linked list of dispatch handlers for each msg_type that 63 this messaging server can deal with */ 64struct dispatch_fn { 65 struct dispatch_fn *next, *prev; 66 uint32_t msg_type; 67 void *private_data; 68 msg_callback_t fn; 69}; 70 71/* an individual message */ 72struct messaging_rec { 73 struct messaging_rec *next, *prev; 74 struct messaging_context *msg; 75 const char *path; 76 77 struct messaging_header { 78 uint32_t version; 79 uint32_t msg_type; 80 struct server_id from; 81 struct server_id to; 82 uint32_t length; 83 } *header; 84 85 DATA_BLOB packet; 86 uint32_t retries; 87}; 88 89 90static void irpc_handler(struct messaging_context *, void *, 91 uint32_t, struct server_id, DATA_BLOB *); 92 93 94/* 95 A useful function for testing the message system. 96*/ 97static void ping_message(struct messaging_context *msg, void *private_data, 98 uint32_t msg_type, struct server_id src, DATA_BLOB *data) 99{ 100 DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n", 101 (uint_t)src.node, (uint_t)src.id, (int)data->length, 102 data->data?(const char *)data->data:"")); 103 messaging_send(msg, src, MSG_PONG, data); 104} 105 106/* 107 return uptime of messaging server via irpc 108*/ 109static NTSTATUS irpc_uptime(struct irpc_message *msg, 110 struct irpc_uptime *r) 111{ 112 struct messaging_context *ctx = talloc_get_type(msg->private_data, struct messaging_context); 113 *r->out.start_time = timeval_to_nttime(&ctx->start_time); 114 return NT_STATUS_OK; 115} 116 117/* 118 return the path to a messaging socket 119*/ 120static char *messaging_path(struct messaging_context *msg, struct server_id server_id) 121{ 122 return talloc_asprintf(msg, "%s/msg.%s", msg->base_path, 123 cluster_id_string(msg, server_id)); 124} 125 126/* 127 dispatch a fully received message 128 129 note that this deliberately can match more than one message handler 130 per message. That allows a single messasging context to register 131 (for example) a debug handler for more than one piece of code 132*/ 133static void messaging_dispatch(struct messaging_context *msg, struct messaging_rec *rec) 134{ 135 struct dispatch_fn *d, *next; 136 137 /* temporary IDs use an idtree, the rest use a array of pointers */ 138 if (rec->header->msg_type >= MSG_TMP_BASE) { 139 d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 140 rec->header->msg_type); 141 } else if (rec->header->msg_type < msg->num_types) { 142 d = msg->dispatch[rec->header->msg_type]; 143 } else { 144 d = NULL; 145 } 146 147 for (; d; d = next) { 148 DATA_BLOB data; 149 next = d->next; 150 data.data = rec->packet.data + sizeof(*rec->header); 151 data.length = rec->header->length; 152 d->fn(msg, d->private_data, d->msg_type, rec->header->from, &data); 153 } 154 rec->header->length = 0; 155} 156 157/* 158 handler for messages that arrive from other nodes in the cluster 159*/ 160static void cluster_message_handler(struct messaging_context *msg, DATA_BLOB packet) 161{ 162 struct messaging_rec *rec; 163 164 rec = talloc(msg, struct messaging_rec); 165 if (rec == NULL) { 166 smb_panic("Unable to allocate messaging_rec"); 167 } 168 169 rec->msg = msg; 170 rec->path = msg->path; 171 rec->header = (struct messaging_header *)packet.data; 172 rec->packet = packet; 173 rec->retries = 0; 174 175 if (packet.length != sizeof(*rec->header) + rec->header->length) { 176 DEBUG(0,("messaging: bad message header size %d should be %d\n", 177 rec->header->length, (int)(packet.length - sizeof(*rec->header)))); 178 talloc_free(rec); 179 return; 180 } 181 182 messaging_dispatch(msg, rec); 183 talloc_free(rec); 184} 185 186 187 188/* 189 try to send the message 190*/ 191static NTSTATUS try_send(struct messaging_rec *rec) 192{ 193 struct messaging_context *msg = rec->msg; 194 size_t nsent; 195 void *priv; 196 NTSTATUS status; 197 struct socket_address *path; 198 199 /* rec->path is the path of the *other* socket, where we want 200 * this to end up */ 201 path = socket_address_from_strings(msg, msg->sock->backend_name, 202 rec->path, 0); 203 if (!path) { 204 return NT_STATUS_NO_MEMORY; 205 } 206 207 /* we send with privileges so messages work from any context */ 208 priv = root_privileges(); 209 status = socket_sendto(msg->sock, &rec->packet, &nsent, path); 210 talloc_free(path); 211 talloc_free(priv); 212 213 return status; 214} 215 216/* 217 retry backed off messages 218*/ 219static void msg_retry_timer(struct tevent_context *ev, struct tevent_timer *te, 220 struct timeval t, void *private_data) 221{ 222 struct messaging_context *msg = talloc_get_type(private_data, 223 struct messaging_context); 224 msg->retry_te = NULL; 225 226 /* put the messages back on the main queue */ 227 while (msg->retry_queue) { 228 struct messaging_rec *rec = msg->retry_queue; 229 DLIST_REMOVE(msg->retry_queue, rec); 230 DLIST_ADD_END(msg->pending, rec, struct messaging_rec *); 231 } 232 233 EVENT_FD_WRITEABLE(msg->event.fde); 234} 235 236/* 237 handle a socket write event 238*/ 239static void messaging_send_handler(struct messaging_context *msg) 240{ 241 while (msg->pending) { 242 struct messaging_rec *rec = msg->pending; 243 NTSTATUS status; 244 status = try_send(rec); 245 if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) { 246 rec->retries++; 247 if (rec->retries > 3) { 248 /* we're getting continuous write errors - 249 backoff this record */ 250 DLIST_REMOVE(msg->pending, rec); 251 DLIST_ADD_END(msg->retry_queue, rec, 252 struct messaging_rec *); 253 if (msg->retry_te == NULL) { 254 msg->retry_te = 255 event_add_timed(msg->event.ev, msg, 256 timeval_current_ofs(1, 0), 257 msg_retry_timer, msg); 258 } 259 } 260 break; 261 } 262 rec->retries = 0; 263 if (!NT_STATUS_IS_OK(status)) { 264 DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n", 265 cluster_id_string(debug_ctx(), rec->header->from), 266 cluster_id_string(debug_ctx(), rec->header->to), 267 rec->header->msg_type, 268 nt_errstr(status))); 269 } 270 DLIST_REMOVE(msg->pending, rec); 271 talloc_free(rec); 272 } 273 if (msg->pending == NULL) { 274 EVENT_FD_NOT_WRITEABLE(msg->event.fde); 275 } 276} 277 278/* 279 handle a new incoming packet 280*/ 281static void messaging_recv_handler(struct messaging_context *msg) 282{ 283 struct messaging_rec *rec; 284 NTSTATUS status; 285 DATA_BLOB packet; 286 size_t msize; 287 288 /* see how many bytes are in the next packet */ 289 status = socket_pending(msg->sock, &msize); 290 if (!NT_STATUS_IS_OK(status)) { 291 DEBUG(0,("socket_pending failed in messaging - %s\n", 292 nt_errstr(status))); 293 return; 294 } 295 296 packet = data_blob_talloc(msg, NULL, msize); 297 if (packet.data == NULL) { 298 /* assume this is temporary and retry */ 299 return; 300 } 301 302 status = socket_recv(msg->sock, packet.data, msize, &msize); 303 if (!NT_STATUS_IS_OK(status)) { 304 data_blob_free(&packet); 305 return; 306 } 307 308 if (msize < sizeof(*rec->header)) { 309 DEBUG(0,("messaging: bad message of size %d\n", (int)msize)); 310 data_blob_free(&packet); 311 return; 312 } 313 314 rec = talloc(msg, struct messaging_rec); 315 if (rec == NULL) { 316 smb_panic("Unable to allocate messaging_rec"); 317 } 318 319 talloc_steal(rec, packet.data); 320 rec->msg = msg; 321 rec->path = msg->path; 322 rec->header = (struct messaging_header *)packet.data; 323 rec->packet = packet; 324 rec->retries = 0; 325 326 if (msize != sizeof(*rec->header) + rec->header->length) { 327 DEBUG(0,("messaging: bad message header size %d should be %d\n", 328 rec->header->length, (int)(msize - sizeof(*rec->header)))); 329 talloc_free(rec); 330 return; 331 } 332 333 messaging_dispatch(msg, rec); 334 talloc_free(rec); 335} 336 337 338/* 339 handle a socket event 340*/ 341static void messaging_handler(struct tevent_context *ev, struct tevent_fd *fde, 342 uint16_t flags, void *private_data) 343{ 344 struct messaging_context *msg = talloc_get_type(private_data, 345 struct messaging_context); 346 if (flags & EVENT_FD_WRITE) { 347 messaging_send_handler(msg); 348 } 349 if (flags & EVENT_FD_READ) { 350 messaging_recv_handler(msg); 351 } 352} 353 354 355/* 356 Register a dispatch function for a particular message type. 357*/ 358NTSTATUS messaging_register(struct messaging_context *msg, void *private_data, 359 uint32_t msg_type, msg_callback_t fn) 360{ 361 struct dispatch_fn *d; 362 363 /* possibly expand dispatch array */ 364 if (msg_type >= msg->num_types) { 365 struct dispatch_fn **dp; 366 int i; 367 dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1); 368 NT_STATUS_HAVE_NO_MEMORY(dp); 369 msg->dispatch = dp; 370 for (i=msg->num_types;i<=msg_type;i++) { 371 msg->dispatch[i] = NULL; 372 } 373 msg->num_types = msg_type+1; 374 } 375 376 d = talloc_zero(msg->dispatch, struct dispatch_fn); 377 NT_STATUS_HAVE_NO_MEMORY(d); 378 d->msg_type = msg_type; 379 d->private_data = private_data; 380 d->fn = fn; 381 382 DLIST_ADD(msg->dispatch[msg_type], d); 383 384 return NT_STATUS_OK; 385} 386 387/* 388 register a temporary message handler. The msg_type is allocated 389 above MSG_TMP_BASE 390*/ 391NTSTATUS messaging_register_tmp(struct messaging_context *msg, void *private_data, 392 msg_callback_t fn, uint32_t *msg_type) 393{ 394 struct dispatch_fn *d; 395 int id; 396 397 d = talloc_zero(msg->dispatch, struct dispatch_fn); 398 NT_STATUS_HAVE_NO_MEMORY(d); 399 d->private_data = private_data; 400 d->fn = fn; 401 402 id = idr_get_new_above(msg->dispatch_tree, d, MSG_TMP_BASE, UINT16_MAX); 403 if (id == -1) { 404 talloc_free(d); 405 return NT_STATUS_TOO_MANY_CONTEXT_IDS; 406 } 407 408 d->msg_type = (uint32_t)id; 409 (*msg_type) = d->msg_type; 410 411 return NT_STATUS_OK; 412} 413 414/* 415 De-register the function for a particular message type. 416*/ 417void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void *private_data) 418{ 419 struct dispatch_fn *d, *next; 420 421 if (msg_type >= msg->num_types) { 422 d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 423 msg_type); 424 if (!d) return; 425 idr_remove(msg->dispatch_tree, msg_type); 426 talloc_free(d); 427 return; 428 } 429 430 for (d = msg->dispatch[msg_type]; d; d = next) { 431 next = d->next; 432 if (d->private_data == private_data) { 433 DLIST_REMOVE(msg->dispatch[msg_type], d); 434 talloc_free(d); 435 } 436 } 437} 438 439/* 440 Send a message to a particular server 441*/ 442NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server, 443 uint32_t msg_type, DATA_BLOB *data) 444{ 445 struct messaging_rec *rec; 446 NTSTATUS status; 447 size_t dlength = data?data->length:0; 448 449 rec = talloc(msg, struct messaging_rec); 450 if (rec == NULL) { 451 return NT_STATUS_NO_MEMORY; 452 } 453 454 rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength); 455 if (rec->packet.data == NULL) { 456 talloc_free(rec); 457 return NT_STATUS_NO_MEMORY; 458 } 459 460 rec->retries = 0; 461 rec->msg = msg; 462 rec->header = (struct messaging_header *)rec->packet.data; 463 /* zero padding */ 464 ZERO_STRUCTP(rec->header); 465 rec->header->version = MESSAGING_VERSION; 466 rec->header->msg_type = msg_type; 467 rec->header->from = msg->server_id; 468 rec->header->to = server; 469 rec->header->length = dlength; 470 if (dlength != 0) { 471 memcpy(rec->packet.data + sizeof(*rec->header), 472 data->data, dlength); 473 } 474 475 if (!cluster_node_equal(&msg->server_id, &server)) { 476 /* the destination is on another node - dispatch via 477 the cluster layer */ 478 status = cluster_message_send(server, &rec->packet); 479 talloc_free(rec); 480 return status; 481 } 482 483 rec->path = messaging_path(msg, server); 484 talloc_steal(rec, rec->path); 485 486 if (msg->pending != NULL) { 487 status = STATUS_MORE_ENTRIES; 488 } else { 489 status = try_send(rec); 490 } 491 492 if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) { 493 if (msg->pending == NULL) { 494 EVENT_FD_WRITEABLE(msg->event.fde); 495 } 496 DLIST_ADD_END(msg->pending, rec, struct messaging_rec *); 497 return NT_STATUS_OK; 498 } 499 500 talloc_free(rec); 501 502 return status; 503} 504 505/* 506 Send a message to a particular server, with the message containing a single pointer 507*/ 508NTSTATUS messaging_send_ptr(struct messaging_context *msg, struct server_id server, 509 uint32_t msg_type, void *ptr) 510{ 511 DATA_BLOB blob; 512 513 blob.data = (uint8_t *)&ptr; 514 blob.length = sizeof(void *); 515 516 return messaging_send(msg, server, msg_type, &blob); 517} 518 519 520/* 521 destroy the messaging context 522*/ 523static int messaging_destructor(struct messaging_context *msg) 524{ 525 unlink(msg->path); 526 while (msg->names && msg->names[0]) { 527 irpc_remove_name(msg, msg->names[0]); 528 } 529 return 0; 530} 531 532/* 533 create the listening socket and setup the dispatcher 534*/ 535struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 536 const char *dir, 537 struct server_id server_id, 538 struct smb_iconv_convenience *iconv_convenience, 539 struct tevent_context *ev) 540{ 541 struct messaging_context *msg; 542 NTSTATUS status; 543 struct socket_address *path; 544 545 if (ev == NULL) { 546 return NULL; 547 } 548 549 msg = talloc_zero(mem_ctx, struct messaging_context); 550 if (msg == NULL) { 551 return NULL; 552 } 553 554 /* setup a handler for messages from other cluster nodes, if appropriate */ 555 status = cluster_message_init(msg, server_id, cluster_message_handler); 556 if (!NT_STATUS_IS_OK(status)) { 557 talloc_free(msg); 558 return NULL; 559 } 560 561 /* create the messaging directory if needed */ 562 mkdir(dir, 0700); 563 564 msg->base_path = talloc_reference(msg, dir); 565 msg->path = messaging_path(msg, server_id); 566 msg->server_id = server_id; 567 msg->iconv_convenience = iconv_convenience; 568 msg->idr = idr_init(msg); 569 msg->dispatch_tree = idr_init(msg); 570 msg->start_time = timeval_current(); 571 572 status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0); 573 if (!NT_STATUS_IS_OK(status)) { 574 talloc_free(msg); 575 return NULL; 576 } 577 578 /* by stealing here we ensure that the socket is cleaned up (and even 579 deleted) on exit */ 580 talloc_steal(msg, msg->sock); 581 582 path = socket_address_from_strings(msg, msg->sock->backend_name, 583 msg->path, 0); 584 if (!path) { 585 talloc_free(msg); 586 return NULL; 587 } 588 589 status = socket_listen(msg->sock, path, 50, 0); 590 if (!NT_STATUS_IS_OK(status)) { 591 DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status))); 592 talloc_free(msg); 593 return NULL; 594 } 595 596 /* it needs to be non blocking for sends */ 597 set_blocking(socket_get_fd(msg->sock), false); 598 599 msg->event.ev = ev; 600 msg->event.fde = event_add_fd(ev, msg, socket_get_fd(msg->sock), 601 EVENT_FD_READ, messaging_handler, msg); 602 603 talloc_set_destructor(msg, messaging_destructor); 604 605 messaging_register(msg, NULL, MSG_PING, ping_message); 606 messaging_register(msg, NULL, MSG_IRPC, irpc_handler); 607 IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg); 608 609 return msg; 610} 611 612/* 613 A hack, for the short term until we get 'client only' messaging in place 614*/ 615struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx, 616 const char *dir, 617 struct smb_iconv_convenience *iconv_convenience, 618 struct tevent_context *ev) 619{ 620 struct server_id id; 621 ZERO_STRUCT(id); 622 id.id = random() % 0x10000000; 623 return messaging_init(mem_ctx, dir, id, iconv_convenience, ev); 624} 625/* 626 a list of registered irpc server functions 627*/ 628struct irpc_list { 629 struct irpc_list *next, *prev; 630 struct GUID uuid; 631 const struct ndr_interface_table *table; 632 int callnum; 633 irpc_function_t fn; 634 void *private_data; 635}; 636 637 638/* 639 register a irpc server function 640*/ 641NTSTATUS irpc_register(struct messaging_context *msg_ctx, 642 const struct ndr_interface_table *table, 643 int callnum, irpc_function_t fn, void *private_data) 644{ 645 struct irpc_list *irpc; 646 647 /* override an existing handler, if any */ 648 for (irpc=msg_ctx->irpc; irpc; irpc=irpc->next) { 649 if (irpc->table == table && irpc->callnum == callnum) { 650 break; 651 } 652 } 653 if (irpc == NULL) { 654 irpc = talloc(msg_ctx, struct irpc_list); 655 NT_STATUS_HAVE_NO_MEMORY(irpc); 656 DLIST_ADD(msg_ctx->irpc, irpc); 657 } 658 659 irpc->table = table; 660 irpc->callnum = callnum; 661 irpc->fn = fn; 662 irpc->private_data = private_data; 663 irpc->uuid = irpc->table->syntax_id.uuid; 664 665 return NT_STATUS_OK; 666} 667 668 669/* 670 handle an incoming irpc reply message 671*/ 672static void irpc_handler_reply(struct messaging_context *msg_ctx, struct irpc_message *m) 673{ 674 struct irpc_request *irpc; 675 enum ndr_err_code ndr_err; 676 677 irpc = (struct irpc_request *)idr_find(msg_ctx->idr, m->header.callid); 678 if (irpc == NULL) return; 679 680 /* parse the reply data */ 681 ndr_err = irpc->table->calls[irpc->callnum].ndr_pull(m->ndr, NDR_OUT, irpc->r); 682 if (NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { 683 irpc->status = m->header.status; 684 talloc_steal(irpc->mem_ctx, m); 685 } else { 686 irpc->status = ndr_map_error2ntstatus(ndr_err); 687 talloc_steal(irpc, m); 688 } 689 irpc->done = true; 690 if (irpc->async.fn) { 691 irpc->async.fn(irpc); 692 } 693} 694 695/* 696 send a irpc reply 697*/ 698NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status) 699{ 700 struct ndr_push *push; 701 DATA_BLOB packet; 702 enum ndr_err_code ndr_err; 703 704 m->header.status = status; 705 706 /* setup the reply */ 707 push = ndr_push_init_ctx(m->ndr, m->msg_ctx->iconv_convenience); 708 if (push == NULL) { 709 status = NT_STATUS_NO_MEMORY; 710 goto failed; 711 } 712 713 m->header.flags |= IRPC_FLAG_REPLY; 714 715 /* construct the packet */ 716 ndr_err = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, &m->header); 717 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { 718 status = ndr_map_error2ntstatus(ndr_err); 719 goto failed; 720 } 721 722 ndr_err = m->irpc->table->calls[m->irpc->callnum].ndr_push(push, NDR_OUT, m->data); 723 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { 724 status = ndr_map_error2ntstatus(ndr_err); 725 goto failed; 726 } 727 728 /* send the reply message */ 729 packet = ndr_push_blob(push); 730 status = messaging_send(m->msg_ctx, m->from, MSG_IRPC, &packet); 731 if (!NT_STATUS_IS_OK(status)) goto failed; 732 733failed: 734 talloc_free(m); 735 return status; 736} 737 738/* 739 handle an incoming irpc request message 740*/ 741static void irpc_handler_request(struct messaging_context *msg_ctx, 742 struct irpc_message *m) 743{ 744 struct irpc_list *i; 745 void *r; 746 enum ndr_err_code ndr_err; 747 748 for (i=msg_ctx->irpc; i; i=i->next) { 749 if (GUID_equal(&i->uuid, &m->header.uuid) && 750 i->table->syntax_id.if_version == m->header.if_version && 751 i->callnum == m->header.callnum) { 752 break; 753 } 754 } 755 756 if (i == NULL) { 757 /* no registered handler for this message */ 758 talloc_free(m); 759 return; 760 } 761 762 /* allocate space for the structure */ 763 r = talloc_zero_size(m->ndr, i->table->calls[m->header.callnum].struct_size); 764 if (r == NULL) goto failed; 765 766 /* parse the request data */ 767 ndr_err = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r); 768 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed; 769 770 /* make the call */ 771 m->private_data= i->private_data; 772 m->defer_reply = false; 773 m->msg_ctx = msg_ctx; 774 m->irpc = i; 775 m->data = r; 776 m->ev = msg_ctx->event.ev; 777 778 m->header.status = i->fn(m, r); 779 780 if (m->defer_reply) { 781 /* the server function has asked to defer the reply to later */ 782 talloc_steal(msg_ctx, m); 783 return; 784 } 785 786 irpc_send_reply(m, m->header.status); 787 return; 788 789failed: 790 talloc_free(m); 791} 792 793/* 794 handle an incoming irpc message 795*/ 796static void irpc_handler(struct messaging_context *msg_ctx, void *private_data, 797 uint32_t msg_type, struct server_id src, DATA_BLOB *packet) 798{ 799 struct irpc_message *m; 800 enum ndr_err_code ndr_err; 801 802 m = talloc(msg_ctx, struct irpc_message); 803 if (m == NULL) goto failed; 804 805 m->from = src; 806 807 m->ndr = ndr_pull_init_blob(packet, m, msg_ctx->iconv_convenience); 808 if (m->ndr == NULL) goto failed; 809 810 m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC; 811 812 ndr_err = ndr_pull_irpc_header(m->ndr, NDR_BUFFERS|NDR_SCALARS, &m->header); 813 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed; 814 815 if (m->header.flags & IRPC_FLAG_REPLY) { 816 irpc_handler_reply(msg_ctx, m); 817 } else { 818 irpc_handler_request(msg_ctx, m); 819 } 820 return; 821 822failed: 823 talloc_free(m); 824} 825 826 827/* 828 destroy a irpc request 829*/ 830static int irpc_destructor(struct irpc_request *irpc) 831{ 832 if (irpc->callid != -1) { 833 idr_remove(irpc->msg_ctx->idr, irpc->callid); 834 irpc->callid = -1; 835 } 836 837 if (irpc->reject_free) { 838 return -1; 839 } 840 return 0; 841} 842 843/* 844 timeout a irpc request 845*/ 846static void irpc_timeout(struct tevent_context *ev, struct tevent_timer *te, 847 struct timeval t, void *private_data) 848{ 849 struct irpc_request *irpc = talloc_get_type(private_data, struct irpc_request); 850 irpc->status = NT_STATUS_IO_TIMEOUT; 851 irpc->done = true; 852 if (irpc->async.fn) { 853 irpc->async.fn(irpc); 854 } 855} 856 857 858/* 859 make a irpc call - async send 860*/ 861struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx, 862 struct server_id server_id, 863 const struct ndr_interface_table *table, 864 int callnum, void *r, TALLOC_CTX *ctx) 865{ 866 struct irpc_header header; 867 struct ndr_push *ndr; 868 NTSTATUS status; 869 DATA_BLOB packet; 870 struct irpc_request *irpc; 871 enum ndr_err_code ndr_err; 872 873 irpc = talloc(msg_ctx, struct irpc_request); 874 if (irpc == NULL) goto failed; 875 876 irpc->msg_ctx = msg_ctx; 877 irpc->table = table; 878 irpc->callnum = callnum; 879 irpc->callid = idr_get_new(msg_ctx->idr, irpc, UINT16_MAX); 880 if (irpc->callid == -1) goto failed; 881 irpc->r = r; 882 irpc->done = false; 883 irpc->async.fn = NULL; 884 irpc->mem_ctx = ctx; 885 irpc->reject_free = false; 886 887 talloc_set_destructor(irpc, irpc_destructor); 888 889 /* setup the header */ 890 header.uuid = table->syntax_id.uuid; 891 892 header.if_version = table->syntax_id.if_version; 893 header.callid = irpc->callid; 894 header.callnum = callnum; 895 header.flags = 0; 896 header.status = NT_STATUS_OK; 897 898 /* construct the irpc packet */ 899 ndr = ndr_push_init_ctx(irpc, msg_ctx->iconv_convenience); 900 if (ndr == NULL) goto failed; 901 902 ndr_err = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header); 903 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed; 904 905 ndr_err = table->calls[callnum].ndr_push(ndr, NDR_IN, r); 906 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed; 907 908 /* and send it */ 909 packet = ndr_push_blob(ndr); 910 status = messaging_send(msg_ctx, server_id, MSG_IRPC, &packet); 911 if (!NT_STATUS_IS_OK(status)) goto failed; 912 913 event_add_timed(msg_ctx->event.ev, irpc, 914 timeval_current_ofs(IRPC_CALL_TIMEOUT, 0), 915 irpc_timeout, irpc); 916 917 talloc_free(ndr); 918 return irpc; 919 920failed: 921 talloc_free(irpc); 922 return NULL; 923} 924 925/* 926 wait for a irpc reply 927*/ 928NTSTATUS irpc_call_recv(struct irpc_request *irpc) 929{ 930 NTSTATUS status; 931 932 NT_STATUS_HAVE_NO_MEMORY(irpc); 933 934 irpc->reject_free = true; 935 936 while (!irpc->done) { 937 if (event_loop_once(irpc->msg_ctx->event.ev) != 0) { 938 return NT_STATUS_CONNECTION_DISCONNECTED; 939 } 940 } 941 942 irpc->reject_free = false; 943 944 status = irpc->status; 945 talloc_free(irpc); 946 return status; 947} 948 949/* 950 perform a synchronous irpc request 951*/ 952NTSTATUS irpc_call(struct messaging_context *msg_ctx, 953 struct server_id server_id, 954 const struct ndr_interface_table *table, 955 int callnum, void *r, 956 TALLOC_CTX *mem_ctx) 957{ 958 struct irpc_request *irpc = irpc_call_send(msg_ctx, server_id, 959 table, callnum, r, mem_ctx); 960 return irpc_call_recv(irpc); 961} 962 963/* 964 open the naming database 965*/ 966static struct tdb_wrap *irpc_namedb_open(struct messaging_context *msg_ctx) 967{ 968 struct tdb_wrap *t; 969 char *path = talloc_asprintf(msg_ctx, "%s/names.tdb", msg_ctx->base_path); 970 if (path == NULL) { 971 return NULL; 972 } 973 t = tdb_wrap_open(msg_ctx, path, 0, 0, O_RDWR|O_CREAT, 0660); 974 talloc_free(path); 975 return t; 976} 977 978 979/* 980 add a string name that this irpc server can be called on 981*/ 982NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name) 983{ 984 struct tdb_wrap *t; 985 TDB_DATA rec; 986 int count; 987 NTSTATUS status = NT_STATUS_OK; 988 989 t = irpc_namedb_open(msg_ctx); 990 NT_STATUS_HAVE_NO_MEMORY(t); 991 992 if (tdb_lock_bystring(t->tdb, name) != 0) { 993 talloc_free(t); 994 return NT_STATUS_LOCK_NOT_GRANTED; 995 } 996 rec = tdb_fetch_bystring(t->tdb, name); 997 count = rec.dsize / sizeof(struct server_id); 998 rec.dptr = (unsigned char *)realloc_p(rec.dptr, struct server_id, count+1); 999 rec.dsize += sizeof(struct server_id); 1000 if (rec.dptr == NULL) { 1001 tdb_unlock_bystring(t->tdb, name); 1002 talloc_free(t); 1003 return NT_STATUS_NO_MEMORY; 1004 } 1005 ((struct server_id *)rec.dptr)[count] = msg_ctx->server_id; 1006 if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) { 1007 status = NT_STATUS_INTERNAL_ERROR; 1008 } 1009 free(rec.dptr); 1010 tdb_unlock_bystring(t->tdb, name); 1011 talloc_free(t); 1012 1013 msg_ctx->names = str_list_add(msg_ctx->names, name); 1014 talloc_steal(msg_ctx, msg_ctx->names); 1015 1016 return status; 1017} 1018 1019/* 1020 return a list of server ids for a server name 1021*/ 1022struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx, 1023 TALLOC_CTX *mem_ctx, 1024 const char *name) 1025{ 1026 struct tdb_wrap *t; 1027 TDB_DATA rec; 1028 int count, i; 1029 struct server_id *ret; 1030 1031 t = irpc_namedb_open(msg_ctx); 1032 if (t == NULL) { 1033 return NULL; 1034 } 1035 1036 if (tdb_lock_bystring(t->tdb, name) != 0) { 1037 talloc_free(t); 1038 return NULL; 1039 } 1040 rec = tdb_fetch_bystring(t->tdb, name); 1041 if (rec.dptr == NULL) { 1042 tdb_unlock_bystring(t->tdb, name); 1043 talloc_free(t); 1044 return NULL; 1045 } 1046 count = rec.dsize / sizeof(struct server_id); 1047 ret = talloc_array(mem_ctx, struct server_id, count+1); 1048 if (ret == NULL) { 1049 tdb_unlock_bystring(t->tdb, name); 1050 talloc_free(t); 1051 return NULL; 1052 } 1053 for (i=0;i<count;i++) { 1054 ret[i] = ((struct server_id *)rec.dptr)[i]; 1055 } 1056 ret[i] = cluster_id(0, 0); 1057 free(rec.dptr); 1058 tdb_unlock_bystring(t->tdb, name); 1059 talloc_free(t); 1060 1061 return ret; 1062} 1063 1064/* 1065 remove a name from a messaging context 1066*/ 1067void irpc_remove_name(struct messaging_context *msg_ctx, const char *name) 1068{ 1069 struct tdb_wrap *t; 1070 TDB_DATA rec; 1071 int count, i; 1072 struct server_id *ids; 1073 1074 str_list_remove(msg_ctx->names, name); 1075 1076 t = irpc_namedb_open(msg_ctx); 1077 if (t == NULL) { 1078 return; 1079 } 1080 1081 if (tdb_lock_bystring(t->tdb, name) != 0) { 1082 talloc_free(t); 1083 return; 1084 } 1085 rec = tdb_fetch_bystring(t->tdb, name); 1086 if (rec.dptr == NULL) { 1087 tdb_unlock_bystring(t->tdb, name); 1088 talloc_free(t); 1089 return; 1090 } 1091 count = rec.dsize / sizeof(struct server_id); 1092 if (count == 0) { 1093 free(rec.dptr); 1094 tdb_unlock_bystring(t->tdb, name); 1095 talloc_free(t); 1096 return; 1097 } 1098 ids = (struct server_id *)rec.dptr; 1099 for (i=0;i<count;i++) { 1100 if (cluster_id_equal(&ids[i], &msg_ctx->server_id)) { 1101 if (i < count-1) { 1102 memmove(ids+i, ids+i+1, 1103 sizeof(struct server_id) * (count-(i+1))); 1104 } 1105 rec.dsize -= sizeof(struct server_id); 1106 break; 1107 } 1108 } 1109 tdb_store_bystring(t->tdb, name, rec, 0); 1110 free(rec.dptr); 1111 tdb_unlock_bystring(t->tdb, name); 1112 talloc_free(t); 1113} 1114 1115struct server_id messaging_get_server_id(struct messaging_context *msg_ctx) 1116{ 1117 return msg_ctx->server_id; 1118} 1119