1/* BEGIN LICENSE BLOCK 2 * Version: CMPL 1.1 3 * 4 * The contents of this file are subject to the Cisco-style Mozilla Public 5 * License Version 1.1 (the "License"); you may not use this file except 6 * in compliance with the License. You may obtain a copy of the License 7 * at www.eclipse-clp.org/license. 8 * 9 * Software distributed under the License is distributed on an "AS IS" 10 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See 11 * the License for the specific language governing rights and limitations 12 * under the License. 13 * 14 * The Original Code is The ECLiPSe Constraint Logic Programming System. 15 * The Initial Developer of the Original Code is Cisco Systems, Inc. 16 * Portions created by the Initial Developer are 17 * Copyright (C) 1996-2006 Cisco Systems, Inc. All Rights Reserved. 18 * 19 * Contributor(s): 20 * 21 * END LICENSE BLOCK */ 22 23/********************************************************************** 24** System: MPS (Message Passing System) 25** File: mps.c 26** Author: Kees Schuerman 27***********************************************************************/ 28 29#include "config.h" 30#include "sepia.h" 31#include <pds.h> /* PDS Library Interface */ 32#include "types.h" 33#include "embed.h" 34#include "mem.h" 35#include "error.h" 36#include "dict.h" 37 38#include <unistd.h> 39#include <stdio.h> 40#ifdef HAVE_STRING_H 41#include <string.h> 42#endif 43 44 45#define BmsgReturn(bret) { \ 46 switch (bret) { \ 47 case BMSG_OK : \ 48 case BMSG_POPENED : \ 49 case BMSG_PUNBLOCKED : \ 50 case BMSG_POPENING : \ 51 case BMSG_PCLOSING : \ 52 case BMSG_PBLOCKING : \ 53 case BMSG_PUNBLOCKING : \ 54 Succeed_; \ 55 default : \ 56 pds_error_string = bmsg_error_string(bret); \ 57 Bip_Error(MPS_ERROR); \ 58 } \ 59} 60 61 62#define AmsgReturn(aret) { \ 63 switch (aret) { \ 64 case AMSG_OK : \ 65 Succeed_; \ 66 default : \ 67 pds_error_string = amsg_error_string(aret); \ 68 Bip_Error(MPS_ERROR); \ 69 } \ 70} 71 72 73#define NsrvReturn(nret) { \ 74 switch (nret) { \ 75 case NSRV_OK : \ 76 Succeed_; \ 77 default : \ 78 pds_error_string = nsrv_error_string(nret); \ 79 Bip_Error(MPS_ERROR); \ 80 } \ 81} 82 83 84/********************************************************************** 85** MPS Process Connection Establishment 86***********************************************************************/ 87 88#define MPS_RETRIES_MAX 10000 89 90 91 92/********************************************************************** 93** MPS Domains 94***********************************************************************/ 95 96#define DOMAIN_SIZE 0x800000 /* 8 MByte */ 97 98 99 100/********************************************************************** 101** MPS Process Registration 102*********************************************************************** 103** The primitive pds_init() registers the invoking process' bport in 104** the name server. This is done under a name which is the string 105** representation of the process' bport identifier. Since bport 106** identifiers are unique, the name under which it is registered is 107** unique also. A session key is therefore not really necessary. The 108** uniqueness of the signature is ensured by taking the combination 109** of hostname and process identifier. 110** 111** signature: <hostname>.<pid> 112** name: <bport_self> 113** key: <DummyKey> 114***********************************************************************/ 115 116#define PID_MAX 999999 117#define PID_LEN 6 118#define HOST_NAMELEN (NSRV_NAMELEN - 1 - PID_LEN) 119 120#define DummyKey "" 121 122 123 124/********************************************************************** 125** Some Global Variables 126***********************************************************************/ 127 128static nsrv_name_t mps_signature; 129static nsrv_name_t mps_procname; 130static int mps_initialised=0; 131static int mps_nsrv_initialised=0; 132static int mps_amsg_initialised=0; 133static int mps_bmsg_initialised=0; 134 135static char *pds_error_string = (char *) 0; 136 137/********************************************************************** 138** Error primitives 139***********************************************************************/ 140 141int 142p_mps_error(value v, type t) 143{ 144 value vstr; 145 if (!pds_error_string) 146 Fail_; 147 Cstring_To_Prolog(pds_error_string, vstr); 148 pds_error_string = (char *) 0; 149 Return_Unify_String(v, t, vstr.ptr); 150} 151 152/********************************************************************** 153** Name Server Primitives 154***********************************************************************/ 155 156int 157p_mps_ping_1(value v_hostname, type t_hostname) 158{ 159 unsigned portnumber; 160 char * hostname; 161 nsrv_ret_t nret; 162 163 Get_Name(v_hostname, t_hostname, hostname); 164 165 if (strlen(hostname) == 0) { 166 Bip_Error(RANGE_ERROR); 167 } 168 169 portnumber = 0; 170 171 nret = nsrv_ping(hostname,&portnumber); 172 switch(nret) 173 { 174 case NSRV_OK : 175 Succeed_; 176 default: 177 Fail_; 178 } 179} 180 181 182 183int 184p_mps_ping_2(value v_hostname, type t_hostname, value v_portnumber, type t_portnumber) 185{ 186 unsigned portnumber; 187 char * hostname; 188 nsrv_ret_t nret; 189 190 Get_Name(v_hostname, t_hostname, hostname); 191 Check_Integer(t_portnumber); 192 193 if (v_portnumber.nint <= 0) { 194 Bip_Error(RANGE_ERROR); 195 } 196 else 197 portnumber = v_portnumber.nint; 198 199 if (strlen(hostname) == 0) { 200 Bip_Error(RANGE_ERROR); 201 } 202 203 nret = nsrv_ping(hostname,&portnumber); 204 switch(nret) 205 { 206 case NSRV_OK : 207 Succeed_; 208 default: 209 Fail_; 210 } 211} 212 213 214int 215p_mps_port_register_4(value v_key, type t_key, value v_name, type t_name, value v_signature, type t_signature, value v_port, type t_port) 216{ 217 char * key; 218 char * name; 219 char * signature; 220 aport_t port; 221 nsrv_ret_t nret; 222 223 if (!mps_initialised) 224 Bip_Error(MPS_ERROR); 225 226 Get_Name(v_key, t_key, key); 227 Get_Name(v_name, t_name, name); 228 Get_Name(v_signature, t_signature, signature); 229 Check_Integer(t_port); 230 231 port.aport_id = v_port.nint; 232 port.bport_id = aport_bport_id((aport_id_t) (v_port.nint)); 233 port.bdomain_id = bdomain_self(); 234 235 if (port.bport_id != bport_self()) 236 Bip_Error(MPS_ERROR); 237 238 nret = nsrv_aport_register(key,name,signature,&port); 239 NsrvReturn(nret); 240} 241 242 243int 244p_mps_port_lookup_3(value v_key, type t_key, value v_name, type t_name, value v_port, type t_port) 245{ 246 char * key; 247 char * name; 248 aport_t port; 249 nsrv_ret_t nret; 250 251 if (!mps_initialised) 252 Bip_Error(MPS_ERROR); 253 254 Get_Name(v_key, t_key, key); 255 Get_Name(v_name, t_name, name); 256 Check_Output_Integer(t_port); 257 258 nret = nsrv_aport_look_up(key,name,&port); 259 switch(nret) 260 { 261 case NSRV_OK : 262 Return_Unify_Integer(v_port,t_port,port.aport_id); 263 case NSRV_NOT_REGISTERED : 264 Fail_; 265 default: 266 pds_error_string = nsrv_error_string(nret); 267 Bip_Error(MPS_ERROR); 268 } 269} 270 271 272int 273p_mps_port_deregister_3(value v_key, type t_key, value v_name, type t_name, value v_signature, type t_signature) 274{ 275 char * key; 276 char * name; 277 char * signature; 278 aport_t port; 279 nsrv_ret_t nret; 280 281 if (!mps_initialised) 282 Bip_Error(MPS_ERROR); 283 284 Get_Name(v_key, t_key, key); 285 Get_Name(v_name, t_name, name); 286 Get_Name(v_signature, t_signature, signature); 287 288 nret = nsrv_aport_look_up(key,name,&port); 289 if (nret == NSRV_OK) { 290 if (port.bport_id != bport_self()) { 291 Bip_Error(MPS_ERROR); 292 } 293 nret = nsrv_aport_deregister(key,name,signature); 294 } 295 NsrvReturn(nret); 296} 297 298 299 300/********************************************************************** 301** MPS Control Primitives 302***********************************************************************/ 303 304static void 305exit_mps(void) 306{ 307 if (!mps_initialised) 308 return; 309 310 (void) nsrv_bport_deregister(DummyKey,mps_procname,mps_signature); 311 (void) nsrv_free_bdomain_id(mps_signature,bdomain_self()); 312 (void) nsrv_free_bport_id(mps_signature,bport_self()); 313 if (mps_nsrv_initialised) { 314 nsrv_exit(); 315 mps_nsrv_initialised = 0; 316 } 317 if (mps_amsg_initialised) { 318 amsg_exit(); 319 mps_amsg_initialised = 0; 320 } 321 if (mps_bmsg_initialised) { 322 bmsg_exit(); 323 mps_bmsg_initialised = 0; 324 } 325 mps_initialised = 0; 326} 327 328 329int 330p_mps_exit_0(void) 331{ 332 exit_mps(); 333 334 Succeed_; 335} 336 337 338int 339p_mps_init_2(value v_hostname, type t_hostname, value v_portnumber, type t_portnumber) 340{ 341 char localhostname[HOST_NAMELEN+1]; 342 char * hostname; 343 unsigned portnumber; 344 bdomain_id_t domain_id = 0; 345 bport_id_t port_id; 346 bdomain_t domain; 347 bport_t port; 348 nsrv_ret_t nret; 349 bmsg_ret_t bret; 350 amsg_ret_t aret; 351 352 if (ec_options.parallel_worker) 353 Bip_Error(NOT_IN_PARALLEL); 354 355 if (mps_initialised) 356 Bip_Error(MPS_ERROR); 357 358 Get_Name(v_hostname, t_hostname, hostname); 359 Check_Output_Integer(t_portnumber); 360 361 if (IsInteger(t_portnumber)) { 362 if (v_portnumber.nint <= 0) { 363 Bip_Error(RANGE_ERROR); 364 } 365 else 366 portnumber = v_portnumber.nint; 367 } 368 else 369 portnumber = 0; 370 if (strlen(hostname) == 0) { 371 Bip_Error(RANGE_ERROR); 372 } 373 374 if (gethostname(localhostname,HOST_NAMELEN+1) != 0) { 375 Bip_Error(SYS_ERROR); 376 } 377 localhostname[HOST_NAMELEN] = '\0'; 378 379 (void) sprintf(mps_signature, 380 "%s.%6d",localhostname,getpid() % (PID_MAX+1)); 381 382 /* 383 ** Initialise NSRV 384 */ 385 386 if (!nsrv_ready()) { 387 nret = nsrv_init(hostname,&portnumber); 388 if (nret != NSRV_OK) 389 NsrvReturn(nret); 390 mps_nsrv_initialised = 1; 391 } 392 else 393 Bip_Error(MPS_ERROR); 394 395 /* 396 ** Initialise BMSG 397 */ 398 399 if (!bmsg_ready()) { 400 nret = nsrv_new_bdomain_id(mps_signature,&domain_id); 401 if (nret != NSRV_OK) 402 NsrvReturn(nret); 403 nret = nsrv_new_bport_id(mps_signature,&port_id); 404 if (nret != NSRV_OK) { 405 (void) nsrv_free_bdomain_id(mps_signature,domain_id); 406 NsrvReturn(nret); 407 } 408 domain.bdomain_id = domain_id; 409 domain.bdomain_size = DOMAIN_SIZE; 410 if (!shared_mem_base()) 411 domain.bdomain_start = (bmem_address_t) (shared_mem_base()); 412 else 413 domain.bdomain_start = (bmem_address_t) 414 (shared_mem_base() + DOMAIN_SIZE); 415 domain.bdomain_start = 0; 416 (void) sprintf(domain.bdomain_file, 417 "/tmp/mps.%d.map",domain_id); 418 bret = bmsg_init(port_id,&domain,BDOMAIN_CREATE); 419 if (bret != BMSG_OK) { 420 (void) nsrv_free_bdomain_id(mps_signature,domain_id); 421 (void) nsrv_free_bport_id(mps_signature,port_id); 422 BmsgReturn(bret); 423 } 424 mps_bmsg_initialised = 1; 425 } 426 427 /* 428 ** Initialise AMSG 429 */ 430 431 if (!amsg_ready()) { 432 aret = amsg_init((unsigned) 0, 0, 0,0); 433 if (aret != AMSG_OK) { 434 exit_mps(); 435 AmsgReturn(aret); 436 } 437 mps_amsg_initialised = 1; 438 } 439 440 /* 441 ** Initialise NSRV Type System 442 */ 443 444 nret = nsrv_types_init(); 445 if (nret != NSRV_OK) { 446 exit_mps(); 447 NsrvReturn(nret); 448 } 449 450 bret = bport_port(bport_self(),&port); 451 if (bret != BMSG_OK) { 452 exit_mps(); 453 BmsgReturn(bret); 454 } 455 (void) sprintf(mps_procname,"%d",bport_self()); 456 457 nret = nsrv_bport_register(DummyKey,mps_procname,mps_signature,&port); 458 if (nret != NSRV_OK) { 459 exit_mps(); 460 NsrvReturn(nret); 461 } 462 463 mps_initialised = 1; 464 465 Return_Unify_Integer(v_portnumber,t_portnumber,portnumber); 466} 467 468 469 470/********************************************************************** 471** MPS Port Primitives 472***********************************************************************/ 473 474static void 475port_notifier(aport_id_t port_id) 476{ 477 pri *proc; 478 pword *p = TG; 479 value mod; 480 481 /* 482 ** Get port's data pointer which points to client's 483 ** port handler predicate PID 484 */ 485 (void) aport_get_option(port_id, 486 APORT_DATA_PTR, 487 (aport_optval_t *) &proc); 488 489 TG += 3; 490 p[0].tag.kernel = TCOMP; 491 p[0].val.ptr = p + 1; 492 p[1].tag.kernel = TDICT; 493 p[1].val.did = proc->did; 494 p[2].tag.kernel = TINT; 495 p[2].val.nint = (long) port_id; 496 mod.did = proc->module_def; /* call from the lookup module */ 497 498 (void) query_emulc(p->val, p->tag, mod, tdict); 499} 500 501int 502p_mps_port_allocate_3(value v_notifier, type t_notifier, value v_portid, type t_portid, value vmod, type tmod) 503{ 504 aport_id_t portid; 505 dident functor; 506 pri *proc; 507 void (*notifier)(); 508 amsg_ret_t aret; 509 510 if (!mps_initialised) 511 Bip_Error(MPS_ERROR); 512 513 Check_Output_Integer(t_portid); 514 Get_Proc_Did(v_notifier, t_notifier, functor); 515 516 proc = visible_procedure(functor, vmod.did, tmod, PRI_CREATE|PRI_REFER); 517 if (proc == 0) { 518 Bip_Error(NOENTRY) 519 } 520 if (functor == d_.true0 && proc->module_ref == d_.kernel_sepia) 521 notifier = (void (*)()) 0; 522 else 523 notifier = port_notifier; 524 525 aret = aport_allocate(&portid, notifier); 526 if (aret != AMSG_OK) { 527 AmsgReturn(aret); 528 } 529 if (notifier != (void (*)()) 0) { 530 (void) aport_set_option(portid, 531 APORT_DATA_PTR, 532 (aport_optval_t) proc); 533 } 534 Return_Unify_Integer(v_portid,t_portid,portid); 535} 536 537 538int 539p_mps_port_deallocate_1(value v_portid, type t_portid) 540{ 541 if (!mps_initialised) 542 Bip_Error(MPS_ERROR); 543 544 Check_Integer(t_portid); 545 546 AmsgReturn(aport_deallocate((aport_id_t) (v_portid.nint))); 547} 548 549 550 551/********************************************************************** 552** MPS Message Primitives 553***********************************************************************/ 554 555int 556p_mps_str_send_2(value v_portid, type t_portid, value v_str, type t_str) 557{ 558 bport_t peer; 559 aport_id_t portid; 560 amsg_t msg; 561 amsg_data_t * msg_data; 562 amsg_count_t msg_count; 563 static nsrv_name_t peername; 564 int retries; 565 nsrv_ret_t nret; 566 amsg_ret_t aret; 567 amsg_ret_t bret; 568 569 if (!mps_initialised) 570 Bip_Error(MPS_ERROR); 571 572 Check_Integer(t_portid); 573 Check_String(t_str); 574 575 msg_count = StringLength(v_str) + 1; 576 aret = amsg_alloc(msg_count,&msg_data,&msg); 577 if (aret != AMSG_OK) 578 AmsgReturn(aret); 579 bmem_cpy((bmem_address_t) msg_data, (bmem_address_t) StringStart(v_str), 580 msg_count); 581 portid = v_portid.nint; 582 aret = amsg_send(portid,msg,MDT_BYTE,msg_count,0); 583 if (aret == AMSG_NOPORT) { 584 (void) sprintf(peername,"%d",aport_bport_id(portid)); 585 nret = nsrv_bport_look_up(DummyKey,peername,&peer); 586 if (nret == NSRV_OK) { 587 retries = 0; 588 do 589 bret = bport_open(&peer); 590 while (((bret == BMSG_POPENING) || (bret == BMSG_PNOTAVAILABLE)) 591 && (retries++ < MPS_RETRIES_MAX)); 592 if (bret == BMSG_POPENED) 593 aret = amsg_send(portid,msg,MDT_BYTE,msg_count,0); 594 else { 595 (void) amsg_free(msg); 596 BmsgReturn(bret); 597 } 598 } 599 } 600 if (aret != AMSG_OK) 601 (void) amsg_free(msg); 602 AmsgReturn(aret); 603} 604 605 606int 607p_mps_str_receive_2(value v_portid, type t_portid, value v_str, type t_str) 608{ 609 amsg_t msg; 610 amsg_data_t * msg_data; 611 amsg_type_t msg_type; 612 amsg_count_t msg_count; 613 char *buf; 614 value v; 615 amsg_ret_t aret; 616 617 if (!mps_initialised) 618 Bip_Error(MPS_ERROR); 619 620 Check_Integer(t_portid); 621 Check_Output_String(t_str); 622 623 aret = amsg_receive((aport_id_t) (v_portid.nint),&msg,&msg_data,&msg_type,&msg_count,0); 624 switch(aret) 625 { 626 case AMSG_OK: 627 break; 628 case AMSG_NOMESSAGE: 629 Fail_; 630 default: 631 pds_error_string = amsg_error_string(aret); 632 Bip_Error(MPS_ERROR); 633 } 634 if (msg_type != MDT_BYTE) 635 Bip_Error(MPS_ERROR); 636 Make_Stack_String(msg_count, v, buf); 637 Copy_Bytes(buf, (char *) msg_data, msg_count); 638 (void) amsg_free(msg); 639 Return_Unify_String(v_str,t_str,v.ptr); 640} 641 642mps_present(void) 643{ 644 return 1; 645} 646 647void 648msg_init(int flags) 649{ 650 if (flags & INIT_SHARED) 651 { 652 (void) built_in(in_dict("mps_error", 1), 653 p_mps_error, B_SAFE|U_SIMPLE); 654 (void) built_in(in_dict("mps_ping", 1), 655 p_mps_ping_1, B_SAFE|U_NONE); 656 (void) built_in(in_dict("mps_ping", 2), 657 p_mps_ping_2, B_SAFE|U_NONE); 658 (void) built_in(in_dict("mps_port_register", 4), 659 p_mps_port_register_4, B_SAFE|U_NONE); 660 (void) built_in(in_dict("mps_port_deregister", 3), 661 p_mps_port_deregister_3, B_SAFE|U_NONE); 662 built_in(in_dict("mps_port_lookup", 3), 663 p_mps_port_lookup_3, B_SAFE|U_SIMPLE) 664 -> mode = BoundArg(3, CONSTANT); 665 built_in(in_dict("mps_init", 2), 666 p_mps_init_2, B_SAFE|U_SIMPLE) 667 -> mode = BoundArg(2, CONSTANT); 668 (void) built_in(in_dict("mps_exit", 0), 669 p_mps_exit_0, B_SAFE|U_NONE); 670 built_in(in_dict("mps_port_allocate", 3), 671 p_mps_port_allocate_3, B_SAFE|U_SIMPLE) 672 -> mode = BoundArg(1, CONSTANT); 673 (void) built_in(in_dict("mps_port_deallocate", 1), 674 p_mps_port_deallocate_1, B_SAFE|U_NONE); 675 (void) exported_built_in(in_dict("mps_str_send", 2), 676 p_mps_str_send_2, B_SAFE|U_NONE); 677 exported_built_in(in_dict("mps_str_receive", 2), 678 p_mps_str_receive_2, B_SAFE|U_SIMPLE) 679 -> mode = BoundArg(2, CONSTANT); 680 } 681} 682 683 684