1/* BEGIN LICENSE BLOCK
2 * Version: CMPL 1.1
3 *
4 * The contents of this file are subject to the Cisco-style Mozilla Public
5 * License Version 1.1 (the "License"); you may not use this file except
6 * in compliance with the License.  You may obtain a copy of the License
7 * at www.eclipse-clp.org/license.
8 *
9 * Software distributed under the License is distributed on an "AS IS"
10 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied.  See
11 * the License for the specific language governing rights and limitations
12 * under the License.
13 *
14 * The Original Code is  The ECLiPSe Constraint Logic Programming System.
15 * The Initial Developer of the Original Code is  Cisco Systems, Inc.
16 * Portions created by the Initial Developer are
17 * Copyright (C) 1995-2006 Cisco Systems, Inc.  All Rights Reserved.
18 *
19 * Contributor(s):
20 *
21 * END LICENSE BLOCK */
22
23/*
24 * VERSION	$Id: bip_parallel.c,v 1.3 2012/02/11 17:09:31 jschimpf Exp $
25 */
26
27/* ********************************************************************
28 *
29 *	ECLiSPe builtins for the parallel version only
30 *
31 ******************************************************************** */
32
33
34#include	"config.h"
35#include        "sepia.h"
36#include        "pds.h"		/* pds.h has to be placed before types.h */
37#include        "types.h"
38#include        "embed.h"
39#include        "mem.h"
40#include        "error.h"
41#include        "ec_io.h"
42#include	"dict.h"
43#include	"emu_export.h"
44#include	"property.h"
45#include	"module.h"
46#include	"opcode.h"
47
48
49extern int	oracle_size();
50extern void	retrieve_oracle();
51
52extern amsg_t	par_goal_msg_;
53
54
55/*------------------------------------------------------------------
56 * Distributed bag primitives
57 *
58 * dbag_create(-Bag)
59 * dbag_enter(+Bag, +Term)
60 * dbag_dissolve(+Bag, -List)
61 * CAUTION: The bag can no longer be accessed after bag_dissolve/2 !
62 *
63 * These are similar to the ones in bip_record.c.
64 * The difference is that we convert the terms to dbformat (ie a string)
65 * and fill them into a message buffer. This message may be sent via
66 * the message passing system (if the bag is remote), and the bag itself
67 * just consists of a linked list of the received message buffers.
68 * dbag_dissolve converts back to terms, builds a list of them, and
69 * frees the buffers.
70 *------------------------------------------------------------------*/
71
72typedef struct amsg_ref {
73	amsg_t msg;
74	struct amsg_ref *msg_data_hdr;
75} amsg_ref_t;
76
77typedef struct {
78	amsg_ref_t first;
79	amsg_ref_t *last;
80} dbag_descr_t;
81
82static void
83dbag_port_upcall(aport_id_t bag_aport_id)
84{
85    dbag_descr_t *dbag_descr;
86    amsg_ret_t ret;
87    amsg_count_t size;
88    amsg_t msg;
89    amsg_data_t *msg_data;
90
91    if (aport_get_option(bag_aport_id, APORT_DATA_PTR,
92				(aport_optval_t *) &dbag_descr) != AMSG_OK)
93    {
94	p_fprintf(current_err_, "aport_get_option() failed\n");
95	ec_flush(current_err_);
96    }
97
98    while ((ret = amsg_receive(bag_aport_id, &msg, &msg_data,
99			    (amsg_type_t *) 0, &size, 0)) != AMSG_NOMESSAGE)
100    {
101	if (ret != AMSG_OK)
102	{
103	    p_fprintf(current_err_, "amsg_receive() failed\n");
104	    ec_flush(current_err_);
105	    continue;
106	}
107
108	/*
109	 * Handle one message: Add message buffer to the bag
110	 */
111	dbag_descr->last->msg = msg;
112	dbag_descr->last->msg_data_hdr = (amsg_ref_t *) msg_data;
113	dbag_descr->last = (amsg_ref_t *) msg_data;
114	dbag_descr->last->msg_data_hdr = &dbag_descr->first;
115    }
116}
117
118static int
119p_dbag_create(value vbag, type tbag)
120{
121    dbag_descr_t *dbag_descr;
122    aport_id_t bag_aport_id;
123
124#ifndef lint
125    if (sizeof(aport_id_t) > sizeof(value))
126    {
127	Bip_Error(UNIMPLEMENTED);	/* can't pack aport_id in integer */
128    }
129#endif
130
131    if (aport_allocate(&bag_aport_id, dbag_port_upcall) != AMSG_OK)
132	{ Bip_Error(MPS_ERROR); }
133    if (aport_set_option(bag_aport_id, APORT_NOTIFY_LEVEL, (aport_optval_t) 3)
134	    != AMSG_OK)
135	{ Bip_Error(MPS_ERROR); }
136
137    dbag_descr = (dbag_descr_t *) hp_alloc_size(sizeof(dbag_descr_t));
138    dbag_descr->last = dbag_descr->first.msg_data_hdr = &dbag_descr->first;
139
140    if (aport_set_option(bag_aport_id, APORT_DATA_PTR,
141				(aport_optval_t) dbag_descr) != AMSG_OK)
142	{ Bip_Error(MPS_ERROR); }
143
144    Return_Unify_Integer(vbag, tbag, (word) bag_aport_id);
145}
146
147static int
148p_dbag_enter(value vbag, type tbag, value vterm, type tterm)
149{
150    aport_id_t	bag_aport_id;
151    pword	term, *term_as_bytes;
152    pword	*old_tg = TG;
153    amsg_size_t	msg_size;
154    amsg_t	msg;
155    amsg_data_t *msg_data;
156
157    Check_Integer(tbag);
158    bag_aport_id = (aport_id_t) vbag.nint;
159
160    /* encode the term */
161    term.val.all = vterm.all;
162    term.tag.kernel = tterm.kernel;
163    term_as_bytes = term_to_dbformat(&term, D_UNKNOWN);
164
165    /* fill into a message buffer */
166    msg_size = BufferSize(term_as_bytes) + sizeof(amsg_ref_t);
167    if (amsg_alloc(msg_size, &msg_data, &msg) != AMSG_OK)
168    {
169	Bip_Error(MPS_ERROR);
170    }
171    bmem_cpy((generic_ptr) ((char *) msg_data + sizeof(amsg_ref_t)),
172	    (generic_ptr) BufferStart(term_as_bytes),
173	    (bmem_size_t) BufferSize(term_as_bytes));
174    TG = old_tg;	/* pop the temporary stack string */
175
176    /* send the message */
177    if (amsg_send(bag_aport_id, msg, MDT_BYTE, (amsg_count_t) msg_size, 0) != AMSG_OK)
178    {
179	Bip_Error(MPS_ERROR);
180    }
181    Succeed_;
182}
183
184/*
185 * Must be called on the worker that created the bag, no check yet!
186 */
187static int
188p_dbag_dissolve(value vdbag, type tdbag, value vl, type tl)
189{
190    aport_id_t bag_aport_id;
191    dbag_descr_t *dbag_descr;
192    amsg_t this_msg;
193    amsg_ref_t *this_msg_data_hdr;
194    pword list;
195    register pword *car, *cdr;
196
197    Check_Integer(tdbag);
198    bag_aport_id = (aport_id_t) vdbag.nint;
199    Check_Output_List(tl);
200    if (aport_get_option(bag_aport_id, APORT_DATA_PTR,
201				(aport_optval_t *) &dbag_descr) != AMSG_OK)
202    {
203	Bip_Error(MPS_ERROR);
204    }
205
206    this_msg = dbag_descr->first.msg;
207    this_msg_data_hdr = dbag_descr->first.msg_data_hdr;
208    hp_free_size((generic_ptr) dbag_descr, sizeof(dbag_descr_t));
209    cdr = &list;
210    while (this_msg_data_hdr != &dbag_descr->first)
211    {
212	pword *pw1;
213	amsg_t old_msg;
214
215        car = TG;
216        Push_List_Frame();
217        Make_List(cdr, car);
218	cdr = car + 1;
219
220	pw1 = dbformat_to_term((char*)(this_msg_data_hdr+1), D_UNKNOWN, tdict);
221	if (!pw1)
222	{
223	    value va;
224	    va.did = d_.abort;
225	    Bip_Throw(va, tdict);
226	}
227	car->val.ptr = pw1->val.ptr;
228	car->tag.kernel = pw1->tag.kernel;
229
230	old_msg = this_msg;
231	this_msg = this_msg_data_hdr->msg;
232	this_msg_data_hdr = this_msg_data_hdr->msg_data_hdr;
233	(void) amsg_free(old_msg);
234    }
235    Make_Nil(cdr);
236    if (aport_deallocate(bag_aport_id) != AMSG_OK)
237    {
238	Bip_Error(MPS_ERROR);
239    }
240    Return_Unify_Pw(vl, tl, list.val, list.tag);
241}
242
243
244/*------------------------------------------------------------------
245 * Oracle-related builtins
246 *------------------------------------------------------------------*/
247
248#ifdef NEW_ORACLE
249
250/*
251 * get_oracle(+FromChp, +ToChp, -Oracle)
252 * install_oracle(+Oracle)
253 *
254 * These two were used for testing the oracle implementation.
255 */
256/*ARGSUSED*/
257static int
258p_get_oracle3(value vfrom, type tfrom, value vto, type tto, value v, type t)
259{
260    pword *b_aux;
261    char *buf;
262    int size;
263
264    if (IsRef(tto))
265    {
266	b_aux = B.args;
267	while (!IsParallelFrame(BTop(b_aux)))
268	    b_aux = BPrev(b_aux);
269    }
270    else
271	b_aux = vto.ptr;
272
273    size = oracle_size(vfrom.ptr, b_aux);
274    buf = (char *) hp_alloc(size);
275    retrieve_oracle(buf, size, vfrom.ptr, b_aux);
276
277    Return_Unify_Integer(v, t, (word) buf);
278}
279
280static int
281p_install_oracle(value v, type t)
282{
283    Check_Integer(t);
284    PO = FO = v.str;
285    NTRY = 0;
286    Succeed_;
287}
288
289/*
290 * This is supposed to be called after the initialization goal has been
291 * executed and the proper reexecution starts. It sets the FO register.
292 */
293static int
294p_install_pending_oracle(void)
295{
296    if (FO || !PO)
297    {
298	 Bip_Error(RECOMP_FAILED);
299    }
300    FO = PO;
301    NTRY = 0;
302    Succeed_;
303}
304
305/*
306 * Check whether we are inside the recomputation phase (including
307 * the execution of the initialization goal)
308 */
309static int
310p_recomputing(void)
311{
312    Succeed_If(PO);
313}
314
315/*
316 * This is a primitive to help debugging in case recomputation goes wrong.
317 * A call to oracle_check(N) inserts special checkpoint entries with integer
318 * identifier N into the oracle. During recomputation this identifier is
319 * checked, which helps tracking down where the oracle gets out of phase.
320 */
321static int
322p_oracle_check(value v, type t)
323{
324    Check_Integer(t);
325    if (FO)
326    {
327	int i = FoHeader(FO);
328	if (!FoIsChk(i) || FoAlt(FO,i) != v.nint)
329	{
330	     PO = FO;	/* for debugging and to prevent message handling */
331	     FO = (char *) 0;
332	     Bip_Error(RECOMP_FAILED);
333	}
334    }
335    if (TO)
336    {
337	Record_Alternative(v.nint, O_CHK_ORACLE);
338    }
339    Succeed_;
340}
341
342static int
343p_set_par_goal(value v, type t)
344{
345    pword *old_tg = TG;
346    pword term, *term_as_bytes;
347    amsg_data_t *msg_data;
348
349    if (par_goal_msg_)
350	(void) amsg_free(par_goal_msg_);
351
352    /* encode the term */
353    term.val.all = v.all;
354    term.tag.kernel = t.kernel;
355    term_as_bytes = term_to_dbformat(&term, D_UNKNOWN);
356
357    /* fill into a message buffer */
358    if (amsg_alloc((amsg_size_t) BufferSize(term_as_bytes), &msg_data, &par_goal_msg_)
359	!= AMSG_OK)
360    {
361	Bip_Error(MPS_ERROR);
362    }
363    bmem_cpy(	(generic_ptr) msg_data,
364		(generic_ptr) BufferStart(term_as_bytes),
365		(bmem_size_t) BufferSize(term_as_bytes));
366    TG = old_tg;	/* pop the temporary stack string */
367    Succeed_;
368}
369
370static int
371p_get_par_goal(value v, type t)
372{
373    pword *pw1;
374
375    if (!par_goal_msg_) { Fail_; }
376
377    pw1 = dbformat_to_term((char*) amsg_data(par_goal_msg_), D_UNKNOWN, tdict);
378    if (!pw1)
379    {
380	value va;
381	va.did = d_.abort;
382	Bip_Throw(va, tdict);
383    }
384    Return_Unify_Pw(v, t, pw1->val, pw1->tag);
385}
386
387#endif /* NEW_ORACLE */
388
389
390/*------------------------------------------------------------------
391 * Initialization code
392 *------------------------------------------------------------------*/
393
394void
395bip_parallel_init(int flags)
396{
397    if (flags & INIT_SHARED)
398    {
399#ifdef NEW_ORACLE
400	(void) exported_built_in(in_dict("get_oracle", 3),
401				p_get_oracle3, B_UNSAFE);
402	(void) exported_built_in(in_dict("install_oracle", 1),
403				p_install_oracle, B_SAFE);
404	(void) exported_built_in(in_dict("install_pending_oracle", 0),
405				p_install_pending_oracle, B_SAFE);
406	(void) built_in(in_dict("recomputing", 0),
407				p_recomputing, B_SAFE);
408	(void) exported_built_in(in_dict("oracle_check", 1),
409				p_oracle_check, B_UNSAFE);
410	(void) local_built_in(in_dict("set_par_goal", 1),
411				p_set_par_goal, B_UNSAFE);
412	(void) local_built_in(in_dict("get_par_goal", 1),
413				p_get_par_goal, B_UNSAFE|U_UNIFY);
414#endif
415	(void) local_built_in(in_dict("dbag_create", 1),
416				p_dbag_create, B_SAFE|U_GROUND);
417	(void) local_built_in(in_dict("dbag_enter", 2),
418				p_dbag_enter, B_SAFE|U_NONE);
419	(void) local_built_in(in_dict("dbag_dissolve", 2),
420				p_dbag_dissolve, B_UNSAFE|U_UNIFY);
421    }
422}
423
424
425