1/* BEGIN LICENSE BLOCK
2 * Version: CMPL 1.1
3 *
4 * The contents of this file are subject to the Cisco-style Mozilla Public
5 * License Version 1.1 (the "License"); you may not use this file except
6 * in compliance with the License.  You may obtain a copy of the License
7 * at www.eclipse-clp.org/license.
8 *
9 * Software distributed under the License is distributed on an "AS IS"
10 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied.  See
11 * the License for the specific language governing rights and limitations
12 * under the License.
13 *
14 * The Original Code is  The ECLiPSe Constraint Logic Programming System.
15 * The Initial Developer of the Original Code is  Cisco Systems, Inc.
16 * Portions created by the Initial Developer are
17 * Copyright (C) 1996-2006 Cisco Systems, Inc.  All Rights Reserved.
18 *
19 * Contributor(s):
20 *
21 * END LICENSE BLOCK */
22
23/**********************************************************************
24**      System: MPS (Message Passing System)
25**        File: mps.c
26**      Author: Kees Schuerman
27***********************************************************************/
28
29#include "config.h"
30#include "sepia.h"
31#include <pds.h>	/* PDS Library Interface		      */
32#include "types.h"
33#include "embed.h"
34#include "mem.h"
35#include "error.h"
36#include "dict.h"
37
38#include <unistd.h>
39#include <stdio.h>
40#ifdef HAVE_STRING_H
41#include <string.h>
42#endif
43
44
45#define BmsgReturn(bret) {              \
46    switch (bret) {                     \
47        case BMSG_OK :                  \
48        case BMSG_POPENED :             \
49        case BMSG_PUNBLOCKED :          \
50        case BMSG_POPENING :            \
51        case BMSG_PCLOSING :            \
52        case BMSG_PBLOCKING :           \
53        case BMSG_PUNBLOCKING :         \
54            Succeed_;                   \
55        default :                       \
56	    pds_error_string = bmsg_error_string(bret); \
57            Bip_Error(MPS_ERROR);          \
58    }                                   \
59}
60
61
62#define AmsgReturn(aret) {              \
63    switch (aret) {                     \
64        case AMSG_OK :                  \
65            Succeed_;                   \
66        default :                       \
67	    pds_error_string = amsg_error_string(aret); \
68            Bip_Error(MPS_ERROR);          \
69    }                                   \
70}
71
72
73#define NsrvReturn(nret) {              \
74    switch (nret) {                     \
75        case NSRV_OK :                  \
76            Succeed_;                   \
77        default :                       \
78	    pds_error_string = nsrv_error_string(nret); \
79            Bip_Error(MPS_ERROR);          \
80    }                                   \
81}
82
83
84/**********************************************************************
85** MPS Process Connection Establishment
86***********************************************************************/
87
88#define MPS_RETRIES_MAX		10000
89
90
91
92/**********************************************************************
93** MPS Domains
94***********************************************************************/
95
96#define	DOMAIN_SIZE	0x800000	/* 8 MByte */
97
98
99
100/**********************************************************************
101** MPS Process Registration
102***********************************************************************
103** The primitive pds_init() registers the invoking process' bport in
104** the name server. This is done under a name which is the string
105** representation of the process' bport identifier. Since bport
106** identifiers are unique, the name under which it is registered is
107** unique also. A session key is therefore not really necessary. The
108** uniqueness of the signature is ensured by taking the combination
109** of hostname and process identifier.
110**
111** signature: <hostname>.<pid>
112**      name: <bport_self>
113**       key: <DummyKey>
114***********************************************************************/
115
116#define PID_MAX		999999
117#define PID_LEN		6
118#define HOST_NAMELEN	(NSRV_NAMELEN - 1 - PID_LEN)
119
120#define	DummyKey	""
121
122
123
124/**********************************************************************
125** Some Global Variables
126***********************************************************************/
127
128static nsrv_name_t mps_signature;
129static nsrv_name_t mps_procname;
130static int mps_initialised=0;
131static int mps_nsrv_initialised=0;
132static int mps_amsg_initialised=0;
133static int mps_bmsg_initialised=0;
134
135static char *pds_error_string = (char *) 0;
136
137/**********************************************************************
138** Error primitives
139***********************************************************************/
140
141int
142p_mps_error(value v, type t)
143{
144    value vstr;
145    if (!pds_error_string)
146	Fail_;
147    Cstring_To_Prolog(pds_error_string, vstr);
148    pds_error_string = (char *) 0;
149    Return_Unify_String(v, t, vstr.ptr);
150}
151
152/**********************************************************************
153** Name Server Primitives
154***********************************************************************/
155
156int
157p_mps_ping_1(value v_hostname, type t_hostname)
158{
159    unsigned portnumber;
160    char * hostname;
161    nsrv_ret_t nret;
162
163    Get_Name(v_hostname, t_hostname, hostname);
164
165    if (strlen(hostname) == 0) {
166	Bip_Error(RANGE_ERROR);
167    }
168
169    portnumber = 0;
170
171    nret = nsrv_ping(hostname,&portnumber);
172    switch(nret)
173    {
174    case NSRV_OK :
175        Succeed_;
176    default:
177    	Fail_;
178    }
179}
180
181
182
183int
184p_mps_ping_2(value v_hostname, type t_hostname, value v_portnumber, type t_portnumber)
185{
186    unsigned portnumber;
187    char * hostname;
188    nsrv_ret_t nret;
189
190    Get_Name(v_hostname, t_hostname, hostname);
191    Check_Integer(t_portnumber);
192
193    if (v_portnumber.nint <= 0) {
194        Bip_Error(RANGE_ERROR);
195    }
196    else
197	portnumber = v_portnumber.nint;
198
199    if (strlen(hostname) == 0) {
200	Bip_Error(RANGE_ERROR);
201    }
202
203    nret = nsrv_ping(hostname,&portnumber);
204    switch(nret)
205    {
206    case NSRV_OK :
207        Succeed_;
208    default:
209    	Fail_;
210    }
211}
212
213
214int
215p_mps_port_register_4(value v_key, type t_key, value v_name, type t_name, value v_signature, type t_signature, value v_port, type t_port)
216{
217    char * key;
218    char * name;
219    char * signature;
220    aport_t port;
221    nsrv_ret_t nret;
222
223    if (!mps_initialised)
224	Bip_Error(MPS_ERROR);
225
226    Get_Name(v_key, t_key, key);
227    Get_Name(v_name, t_name, name);
228    Get_Name(v_signature, t_signature, signature);
229    Check_Integer(t_port);
230
231    port.aport_id = v_port.nint;
232    port.bport_id = aport_bport_id((aport_id_t) (v_port.nint));
233    port.bdomain_id = bdomain_self();
234
235    if (port.bport_id != bport_self())
236	Bip_Error(MPS_ERROR);
237
238    nret = nsrv_aport_register(key,name,signature,&port);
239    NsrvReturn(nret);
240}
241
242
243int
244p_mps_port_lookup_3(value v_key, type t_key, value v_name, type t_name, value v_port, type t_port)
245{
246    char * key;
247    char * name;
248    aport_t port;
249    nsrv_ret_t nret;
250
251    if (!mps_initialised)
252	Bip_Error(MPS_ERROR);
253
254    Get_Name(v_key, t_key, key);
255    Get_Name(v_name, t_name, name);
256    Check_Output_Integer(t_port);
257
258    nret = nsrv_aport_look_up(key,name,&port);
259    switch(nret)
260    {
261    case NSRV_OK :
262        Return_Unify_Integer(v_port,t_port,port.aport_id);
263    case NSRV_NOT_REGISTERED :
264    	Fail_;
265    default:
266	pds_error_string = nsrv_error_string(nret);
267        Bip_Error(MPS_ERROR);
268    }
269}
270
271
272int
273p_mps_port_deregister_3(value v_key, type t_key, value v_name, type t_name, value v_signature, type t_signature)
274{
275    char * key;
276    char * name;
277    char * signature;
278    aport_t port;
279    nsrv_ret_t nret;
280
281    if (!mps_initialised)
282	Bip_Error(MPS_ERROR);
283
284    Get_Name(v_key, t_key, key);
285    Get_Name(v_name, t_name, name);
286    Get_Name(v_signature, t_signature, signature);
287
288    nret = nsrv_aport_look_up(key,name,&port);
289    if (nret == NSRV_OK) {
290	if (port.bport_id != bport_self()) {
291	    Bip_Error(MPS_ERROR);
292	}
293        nret = nsrv_aport_deregister(key,name,signature);
294    }
295    NsrvReturn(nret);
296}
297
298
299
300/**********************************************************************
301** MPS Control Primitives
302***********************************************************************/
303
304static void
305exit_mps(void)
306{
307    if (!mps_initialised)
308    	return;
309
310    (void) nsrv_bport_deregister(DummyKey,mps_procname,mps_signature);
311    (void) nsrv_free_bdomain_id(mps_signature,bdomain_self());
312    (void) nsrv_free_bport_id(mps_signature,bport_self());
313    if (mps_nsrv_initialised) {
314	nsrv_exit();
315	mps_nsrv_initialised = 0;
316    }
317    if (mps_amsg_initialised) {
318	amsg_exit();
319	mps_amsg_initialised = 0;
320    }
321    if (mps_bmsg_initialised) {
322	bmsg_exit();
323	mps_bmsg_initialised = 0;
324    }
325    mps_initialised = 0;
326}
327
328
329int
330p_mps_exit_0(void)
331{
332    exit_mps();
333
334    Succeed_;
335}
336
337
338int
339p_mps_init_2(value v_hostname, type t_hostname, value v_portnumber, type t_portnumber)
340{
341    char localhostname[HOST_NAMELEN+1];
342    char * hostname;
343    unsigned portnumber;
344    bdomain_id_t domain_id = 0;
345    bport_id_t port_id;
346    bdomain_t domain;
347    bport_t port;
348    nsrv_ret_t nret;
349    bmsg_ret_t bret;
350    amsg_ret_t aret;
351
352    if (ec_options.parallel_worker)
353	Bip_Error(NOT_IN_PARALLEL);
354
355    if (mps_initialised)
356	Bip_Error(MPS_ERROR);
357
358    Get_Name(v_hostname, t_hostname, hostname);
359    Check_Output_Integer(t_portnumber);
360
361    if (IsInteger(t_portnumber)) {
362	if (v_portnumber.nint <= 0) {
363	    Bip_Error(RANGE_ERROR);
364        }
365	else
366            portnumber = v_portnumber.nint;
367    }
368    else
369        portnumber = 0;
370    if (strlen(hostname) == 0) {
371	Bip_Error(RANGE_ERROR);
372    }
373
374    if (gethostname(localhostname,HOST_NAMELEN+1) != 0) {
375	Bip_Error(SYS_ERROR);
376    }
377    localhostname[HOST_NAMELEN] = '\0';
378
379    (void) sprintf(mps_signature,
380	    "%s.%6d",localhostname,getpid() % (PID_MAX+1));
381
382    /*
383    ** Initialise NSRV
384    */
385
386    if (!nsrv_ready()) {
387        nret = nsrv_init(hostname,&portnumber);
388        if (nret != NSRV_OK)
389	    NsrvReturn(nret);
390	mps_nsrv_initialised = 1;
391    }
392    else
393	    Bip_Error(MPS_ERROR);
394
395    /*
396    ** Initialise BMSG
397    */
398
399    if (!bmsg_ready()) {
400	nret = nsrv_new_bdomain_id(mps_signature,&domain_id);
401	if (nret != NSRV_OK)
402	    NsrvReturn(nret);
403	nret = nsrv_new_bport_id(mps_signature,&port_id);
404	if (nret != NSRV_OK) {
405	    (void) nsrv_free_bdomain_id(mps_signature,domain_id);
406	    NsrvReturn(nret);
407	}
408	domain.bdomain_id = domain_id;
409	domain.bdomain_size = DOMAIN_SIZE;
410	if (!shared_mem_base())
411	    domain.bdomain_start = (bmem_address_t) (shared_mem_base());
412	else
413	    domain.bdomain_start = (bmem_address_t)
414				   (shared_mem_base() + DOMAIN_SIZE);
415	domain.bdomain_start = 0;
416	(void) sprintf(domain.bdomain_file,
417	        "/tmp/mps.%d.map",domain_id);
418	bret = bmsg_init(port_id,&domain,BDOMAIN_CREATE);
419	if (bret != BMSG_OK) {
420	    (void) nsrv_free_bdomain_id(mps_signature,domain_id);
421	    (void) nsrv_free_bport_id(mps_signature,port_id);
422	    BmsgReturn(bret);
423	}
424	mps_bmsg_initialised = 1;
425    }
426
427    /*
428    ** Initialise AMSG
429    */
430
431    if (!amsg_ready()) {
432	aret = amsg_init((unsigned) 0, 0, 0,0);
433	if (aret != AMSG_OK) {
434	    exit_mps();
435	    AmsgReturn(aret);
436	}
437	mps_amsg_initialised = 1;
438    }
439
440    /*
441    ** Initialise NSRV Type System
442    */
443
444    nret = nsrv_types_init();
445    if (nret != NSRV_OK) {
446	exit_mps();
447	NsrvReturn(nret);
448    }
449
450    bret = bport_port(bport_self(),&port);
451    if (bret != BMSG_OK) {
452	exit_mps();
453	BmsgReturn(bret);
454    }
455    (void) sprintf(mps_procname,"%d",bport_self());
456
457    nret = nsrv_bport_register(DummyKey,mps_procname,mps_signature,&port);
458    if (nret != NSRV_OK) {
459	exit_mps();
460	NsrvReturn(nret);
461    }
462
463    mps_initialised = 1;
464
465    Return_Unify_Integer(v_portnumber,t_portnumber,portnumber);
466}
467
468
469
470/**********************************************************************
471** MPS Port Primitives
472***********************************************************************/
473
474static void
475port_notifier(aport_id_t port_id)
476{
477    pri		*proc;
478    pword	*p = TG;
479    value	mod;
480
481    /*
482    ** Get port's data pointer which points to client's
483    ** port handler predicate PID
484    */
485    (void) aport_get_option(port_id,
486                                APORT_DATA_PTR,
487                                (aport_optval_t *) &proc);
488
489    TG += 3;
490    p[0].tag.kernel = TCOMP;
491    p[0].val.ptr = p + 1;
492    p[1].tag.kernel = TDICT;
493    p[1].val.did = proc->did;
494    p[2].tag.kernel = TINT;
495    p[2].val.nint = (long) port_id;
496    mod.did = proc->module_def;	/* call from the lookup module */
497
498    (void) query_emulc(p->val, p->tag, mod, tdict);
499}
500
501int
502p_mps_port_allocate_3(value v_notifier, type t_notifier, value v_portid, type t_portid, value vmod, type tmod)
503{
504    aport_id_t portid;
505    dident	functor;
506    pri		*proc;
507    void 	(*notifier)();
508    amsg_ret_t aret;
509
510    if (!mps_initialised)
511	Bip_Error(MPS_ERROR);
512
513    Check_Output_Integer(t_portid);
514    Get_Proc_Did(v_notifier, t_notifier, functor);
515
516    proc = visible_procedure(functor, vmod.did, tmod, PRI_CREATE|PRI_REFER);
517    if (proc == 0) {
518	Bip_Error(NOENTRY)
519    }
520    if (functor == d_.true0 && proc->module_ref == d_.kernel_sepia)
521	notifier = (void (*)()) 0;
522    else
523	notifier = port_notifier;
524
525    aret = aport_allocate(&portid, notifier);
526    if (aret != AMSG_OK) {
527        AmsgReturn(aret);
528    }
529    if (notifier != (void (*)()) 0) {
530	(void) aport_set_option(portid,
531				    APORT_DATA_PTR,
532				    (aport_optval_t) proc);
533    }
534    Return_Unify_Integer(v_portid,t_portid,portid);
535}
536
537
538int
539p_mps_port_deallocate_1(value v_portid, type t_portid)
540{
541    if (!mps_initialised)
542	Bip_Error(MPS_ERROR);
543
544    Check_Integer(t_portid);
545
546    AmsgReturn(aport_deallocate((aport_id_t) (v_portid.nint)));
547}
548
549
550
551/**********************************************************************
552** MPS Message Primitives
553***********************************************************************/
554
555int
556p_mps_str_send_2(value v_portid, type t_portid, value v_str, type t_str)
557{
558    bport_t peer;
559    aport_id_t portid;
560    amsg_t msg;
561    amsg_data_t * msg_data;
562    amsg_count_t msg_count;
563    static nsrv_name_t peername;
564    int retries;
565    nsrv_ret_t nret;
566    amsg_ret_t aret;
567    amsg_ret_t bret;
568
569    if (!mps_initialised)
570	Bip_Error(MPS_ERROR);
571
572    Check_Integer(t_portid);
573    Check_String(t_str);
574
575    msg_count = StringLength(v_str) + 1;
576    aret = amsg_alloc(msg_count,&msg_data,&msg);
577    if (aret != AMSG_OK)
578        AmsgReturn(aret);
579    bmem_cpy((bmem_address_t) msg_data, (bmem_address_t) StringStart(v_str),
580	msg_count);
581    portid = v_portid.nint;
582    aret = amsg_send(portid,msg,MDT_BYTE,msg_count,0);
583    if (aret == AMSG_NOPORT) {
584	(void) sprintf(peername,"%d",aport_bport_id(portid));
585        nret = nsrv_bport_look_up(DummyKey,peername,&peer);
586        if (nret == NSRV_OK) {
587	    retries = 0;
588	    do
589		bret = bport_open(&peer);
590	    while (((bret == BMSG_POPENING) || (bret == BMSG_PNOTAVAILABLE))
591		   && (retries++ < MPS_RETRIES_MAX));
592	    if (bret == BMSG_POPENED)
593    		aret = amsg_send(portid,msg,MDT_BYTE,msg_count,0);
594	    else {
595        	(void) amsg_free(msg);
596    		BmsgReturn(bret);
597	    }
598        }
599    }
600    if (aret != AMSG_OK)
601        (void) amsg_free(msg);
602    AmsgReturn(aret);
603}
604
605
606int
607p_mps_str_receive_2(value v_portid, type t_portid, value v_str, type t_str)
608{
609    amsg_t msg;
610    amsg_data_t * msg_data;
611    amsg_type_t msg_type;
612    amsg_count_t	msg_count;
613    char	*buf;
614    value v;
615    amsg_ret_t aret;
616
617    if (!mps_initialised)
618	Bip_Error(MPS_ERROR);
619
620    Check_Integer(t_portid);
621    Check_Output_String(t_str);
622
623    aret = amsg_receive((aport_id_t) (v_portid.nint),&msg,&msg_data,&msg_type,&msg_count,0);
624    switch(aret)
625    {
626    case AMSG_OK:
627    	break;
628    case AMSG_NOMESSAGE:
629    	Fail_;
630    default:
631	pds_error_string = amsg_error_string(aret);
632    	Bip_Error(MPS_ERROR);
633    }
634    if (msg_type != MDT_BYTE)
635	Bip_Error(MPS_ERROR);
636    Make_Stack_String(msg_count, v, buf);
637    Copy_Bytes(buf, (char *) msg_data, msg_count);
638    (void) amsg_free(msg);
639    Return_Unify_String(v_str,t_str,v.ptr);
640}
641
642mps_present(void)
643{
644    return 1;
645}
646
647void
648msg_init(int flags)
649{
650    if (flags & INIT_SHARED)
651    {
652	(void) built_in(in_dict("mps_error", 1),
653				p_mps_error, B_SAFE|U_SIMPLE);
654	(void) built_in(in_dict("mps_ping", 1),
655				p_mps_ping_1, B_SAFE|U_NONE);
656	(void) built_in(in_dict("mps_ping", 2),
657				p_mps_ping_2, B_SAFE|U_NONE);
658	(void) built_in(in_dict("mps_port_register", 4),
659				p_mps_port_register_4, B_SAFE|U_NONE);
660	(void) built_in(in_dict("mps_port_deregister", 3),
661				p_mps_port_deregister_3, B_SAFE|U_NONE);
662	built_in(in_dict("mps_port_lookup", 3),
663				p_mps_port_lookup_3, B_SAFE|U_SIMPLE)
664		-> mode = BoundArg(3, CONSTANT);
665	built_in(in_dict("mps_init", 2),
666				p_mps_init_2, B_SAFE|U_SIMPLE)
667		-> mode = BoundArg(2, CONSTANT);
668	(void) built_in(in_dict("mps_exit", 0),
669				p_mps_exit_0, B_SAFE|U_NONE);
670	built_in(in_dict("mps_port_allocate", 3),
671				p_mps_port_allocate_3, B_SAFE|U_SIMPLE)
672		-> mode = BoundArg(1, CONSTANT);
673	(void) built_in(in_dict("mps_port_deallocate", 1),
674				p_mps_port_deallocate_1, B_SAFE|U_NONE);
675	(void) exported_built_in(in_dict("mps_str_send", 2),
676				p_mps_str_send_2, B_SAFE|U_NONE);
677	exported_built_in(in_dict("mps_str_receive", 2),
678				p_mps_str_receive_2, B_SAFE|U_SIMPLE)
679		-> mode = BoundArg(2, CONSTANT);
680    }
681}
682
683
684