1/* BEGIN LICENSE BLOCK 2 * Version: CMPL 1.1 3 * 4 * The contents of this file are subject to the Cisco-style Mozilla Public 5 * License Version 1.1 (the "License"); you may not use this file except 6 * in compliance with the License. You may obtain a copy of the License 7 * at www.eclipse-clp.org/license. 8 * 9 * Software distributed under the License is distributed on an "AS IS" 10 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See 11 * the License for the specific language governing rights and limitations 12 * under the License. 13 * 14 * The Original Code is The ECLiPSe Constraint Logic Programming System. 15 * The Initial Developer of the Original Code is Cisco Systems, Inc. 16 * Portions created by the Initial Developer are 17 * Copyright (C) 1995-2006 Cisco Systems, Inc. All Rights Reserved. 18 * 19 * Contributor(s): 20 * 21 * END LICENSE BLOCK */ 22 23/* 24 * VERSION $Id: bip_parallel.c,v 1.3 2012/02/11 17:09:31 jschimpf Exp $ 25 */ 26 27/* ******************************************************************** 28 * 29 * ECLiSPe builtins for the parallel version only 30 * 31 ******************************************************************** */ 32 33 34#include "config.h" 35#include "sepia.h" 36#include "pds.h" /* pds.h has to be placed before types.h */ 37#include "types.h" 38#include "embed.h" 39#include "mem.h" 40#include "error.h" 41#include "ec_io.h" 42#include "dict.h" 43#include "emu_export.h" 44#include "property.h" 45#include "module.h" 46#include "opcode.h" 47 48 49extern int oracle_size(); 50extern void retrieve_oracle(); 51 52extern amsg_t par_goal_msg_; 53 54 55/*------------------------------------------------------------------ 56 * Distributed bag primitives 57 * 58 * dbag_create(-Bag) 59 * dbag_enter(+Bag, +Term) 60 * dbag_dissolve(+Bag, -List) 61 * CAUTION: The bag can no longer be accessed after bag_dissolve/2 ! 62 * 63 * These are similar to the ones in bip_record.c. 64 * The difference is that we convert the terms to dbformat (ie a string) 65 * and fill them into a message buffer. This message may be sent via 66 * the message passing system (if the bag is remote), and the bag itself 67 * just consists of a linked list of the received message buffers. 68 * dbag_dissolve converts back to terms, builds a list of them, and 69 * frees the buffers. 70 *------------------------------------------------------------------*/ 71 72typedef struct amsg_ref { 73 amsg_t msg; 74 struct amsg_ref *msg_data_hdr; 75} amsg_ref_t; 76 77typedef struct { 78 amsg_ref_t first; 79 amsg_ref_t *last; 80} dbag_descr_t; 81 82static void 83dbag_port_upcall(aport_id_t bag_aport_id) 84{ 85 dbag_descr_t *dbag_descr; 86 amsg_ret_t ret; 87 amsg_count_t size; 88 amsg_t msg; 89 amsg_data_t *msg_data; 90 91 if (aport_get_option(bag_aport_id, APORT_DATA_PTR, 92 (aport_optval_t *) &dbag_descr) != AMSG_OK) 93 { 94 p_fprintf(current_err_, "aport_get_option() failed\n"); 95 ec_flush(current_err_); 96 } 97 98 while ((ret = amsg_receive(bag_aport_id, &msg, &msg_data, 99 (amsg_type_t *) 0, &size, 0)) != AMSG_NOMESSAGE) 100 { 101 if (ret != AMSG_OK) 102 { 103 p_fprintf(current_err_, "amsg_receive() failed\n"); 104 ec_flush(current_err_); 105 continue; 106 } 107 108 /* 109 * Handle one message: Add message buffer to the bag 110 */ 111 dbag_descr->last->msg = msg; 112 dbag_descr->last->msg_data_hdr = (amsg_ref_t *) msg_data; 113 dbag_descr->last = (amsg_ref_t *) msg_data; 114 dbag_descr->last->msg_data_hdr = &dbag_descr->first; 115 } 116} 117 118static int 119p_dbag_create(value vbag, type tbag) 120{ 121 dbag_descr_t *dbag_descr; 122 aport_id_t bag_aport_id; 123 124#ifndef lint 125 if (sizeof(aport_id_t) > sizeof(value)) 126 { 127 Bip_Error(UNIMPLEMENTED); /* can't pack aport_id in integer */ 128 } 129#endif 130 131 if (aport_allocate(&bag_aport_id, dbag_port_upcall) != AMSG_OK) 132 { Bip_Error(MPS_ERROR); } 133 if (aport_set_option(bag_aport_id, APORT_NOTIFY_LEVEL, (aport_optval_t) 3) 134 != AMSG_OK) 135 { Bip_Error(MPS_ERROR); } 136 137 dbag_descr = (dbag_descr_t *) hp_alloc_size(sizeof(dbag_descr_t)); 138 dbag_descr->last = dbag_descr->first.msg_data_hdr = &dbag_descr->first; 139 140 if (aport_set_option(bag_aport_id, APORT_DATA_PTR, 141 (aport_optval_t) dbag_descr) != AMSG_OK) 142 { Bip_Error(MPS_ERROR); } 143 144 Return_Unify_Integer(vbag, tbag, (word) bag_aport_id); 145} 146 147static int 148p_dbag_enter(value vbag, type tbag, value vterm, type tterm) 149{ 150 aport_id_t bag_aport_id; 151 pword term, *term_as_bytes; 152 pword *old_tg = TG; 153 amsg_size_t msg_size; 154 amsg_t msg; 155 amsg_data_t *msg_data; 156 157 Check_Integer(tbag); 158 bag_aport_id = (aport_id_t) vbag.nint; 159 160 /* encode the term */ 161 term.val.all = vterm.all; 162 term.tag.kernel = tterm.kernel; 163 term_as_bytes = term_to_dbformat(&term, D_UNKNOWN); 164 165 /* fill into a message buffer */ 166 msg_size = BufferSize(term_as_bytes) + sizeof(amsg_ref_t); 167 if (amsg_alloc(msg_size, &msg_data, &msg) != AMSG_OK) 168 { 169 Bip_Error(MPS_ERROR); 170 } 171 bmem_cpy((generic_ptr) ((char *) msg_data + sizeof(amsg_ref_t)), 172 (generic_ptr) BufferStart(term_as_bytes), 173 (bmem_size_t) BufferSize(term_as_bytes)); 174 TG = old_tg; /* pop the temporary stack string */ 175 176 /* send the message */ 177 if (amsg_send(bag_aport_id, msg, MDT_BYTE, (amsg_count_t) msg_size, 0) != AMSG_OK) 178 { 179 Bip_Error(MPS_ERROR); 180 } 181 Succeed_; 182} 183 184/* 185 * Must be called on the worker that created the bag, no check yet! 186 */ 187static int 188p_dbag_dissolve(value vdbag, type tdbag, value vl, type tl) 189{ 190 aport_id_t bag_aport_id; 191 dbag_descr_t *dbag_descr; 192 amsg_t this_msg; 193 amsg_ref_t *this_msg_data_hdr; 194 pword list; 195 register pword *car, *cdr; 196 197 Check_Integer(tdbag); 198 bag_aport_id = (aport_id_t) vdbag.nint; 199 Check_Output_List(tl); 200 if (aport_get_option(bag_aport_id, APORT_DATA_PTR, 201 (aport_optval_t *) &dbag_descr) != AMSG_OK) 202 { 203 Bip_Error(MPS_ERROR); 204 } 205 206 this_msg = dbag_descr->first.msg; 207 this_msg_data_hdr = dbag_descr->first.msg_data_hdr; 208 hp_free_size((generic_ptr) dbag_descr, sizeof(dbag_descr_t)); 209 cdr = &list; 210 while (this_msg_data_hdr != &dbag_descr->first) 211 { 212 pword *pw1; 213 amsg_t old_msg; 214 215 car = TG; 216 Push_List_Frame(); 217 Make_List(cdr, car); 218 cdr = car + 1; 219 220 pw1 = dbformat_to_term((char*)(this_msg_data_hdr+1), D_UNKNOWN, tdict); 221 if (!pw1) 222 { 223 value va; 224 va.did = d_.abort; 225 Bip_Throw(va, tdict); 226 } 227 car->val.ptr = pw1->val.ptr; 228 car->tag.kernel = pw1->tag.kernel; 229 230 old_msg = this_msg; 231 this_msg = this_msg_data_hdr->msg; 232 this_msg_data_hdr = this_msg_data_hdr->msg_data_hdr; 233 (void) amsg_free(old_msg); 234 } 235 Make_Nil(cdr); 236 if (aport_deallocate(bag_aport_id) != AMSG_OK) 237 { 238 Bip_Error(MPS_ERROR); 239 } 240 Return_Unify_Pw(vl, tl, list.val, list.tag); 241} 242 243 244/*------------------------------------------------------------------ 245 * Oracle-related builtins 246 *------------------------------------------------------------------*/ 247 248#ifdef NEW_ORACLE 249 250/* 251 * get_oracle(+FromChp, +ToChp, -Oracle) 252 * install_oracle(+Oracle) 253 * 254 * These two were used for testing the oracle implementation. 255 */ 256/*ARGSUSED*/ 257static int 258p_get_oracle3(value vfrom, type tfrom, value vto, type tto, value v, type t) 259{ 260 pword *b_aux; 261 char *buf; 262 int size; 263 264 if (IsRef(tto)) 265 { 266 b_aux = B.args; 267 while (!IsParallelFrame(BTop(b_aux))) 268 b_aux = BPrev(b_aux); 269 } 270 else 271 b_aux = vto.ptr; 272 273 size = oracle_size(vfrom.ptr, b_aux); 274 buf = (char *) hp_alloc(size); 275 retrieve_oracle(buf, size, vfrom.ptr, b_aux); 276 277 Return_Unify_Integer(v, t, (word) buf); 278} 279 280static int 281p_install_oracle(value v, type t) 282{ 283 Check_Integer(t); 284 PO = FO = v.str; 285 NTRY = 0; 286 Succeed_; 287} 288 289/* 290 * This is supposed to be called after the initialization goal has been 291 * executed and the proper reexecution starts. It sets the FO register. 292 */ 293static int 294p_install_pending_oracle(void) 295{ 296 if (FO || !PO) 297 { 298 Bip_Error(RECOMP_FAILED); 299 } 300 FO = PO; 301 NTRY = 0; 302 Succeed_; 303} 304 305/* 306 * Check whether we are inside the recomputation phase (including 307 * the execution of the initialization goal) 308 */ 309static int 310p_recomputing(void) 311{ 312 Succeed_If(PO); 313} 314 315/* 316 * This is a primitive to help debugging in case recomputation goes wrong. 317 * A call to oracle_check(N) inserts special checkpoint entries with integer 318 * identifier N into the oracle. During recomputation this identifier is 319 * checked, which helps tracking down where the oracle gets out of phase. 320 */ 321static int 322p_oracle_check(value v, type t) 323{ 324 Check_Integer(t); 325 if (FO) 326 { 327 int i = FoHeader(FO); 328 if (!FoIsChk(i) || FoAlt(FO,i) != v.nint) 329 { 330 PO = FO; /* for debugging and to prevent message handling */ 331 FO = (char *) 0; 332 Bip_Error(RECOMP_FAILED); 333 } 334 } 335 if (TO) 336 { 337 Record_Alternative(v.nint, O_CHK_ORACLE); 338 } 339 Succeed_; 340} 341 342static int 343p_set_par_goal(value v, type t) 344{ 345 pword *old_tg = TG; 346 pword term, *term_as_bytes; 347 amsg_data_t *msg_data; 348 349 if (par_goal_msg_) 350 (void) amsg_free(par_goal_msg_); 351 352 /* encode the term */ 353 term.val.all = v.all; 354 term.tag.kernel = t.kernel; 355 term_as_bytes = term_to_dbformat(&term, D_UNKNOWN); 356 357 /* fill into a message buffer */ 358 if (amsg_alloc((amsg_size_t) BufferSize(term_as_bytes), &msg_data, &par_goal_msg_) 359 != AMSG_OK) 360 { 361 Bip_Error(MPS_ERROR); 362 } 363 bmem_cpy( (generic_ptr) msg_data, 364 (generic_ptr) BufferStart(term_as_bytes), 365 (bmem_size_t) BufferSize(term_as_bytes)); 366 TG = old_tg; /* pop the temporary stack string */ 367 Succeed_; 368} 369 370static int 371p_get_par_goal(value v, type t) 372{ 373 pword *pw1; 374 375 if (!par_goal_msg_) { Fail_; } 376 377 pw1 = dbformat_to_term((char*) amsg_data(par_goal_msg_), D_UNKNOWN, tdict); 378 if (!pw1) 379 { 380 value va; 381 va.did = d_.abort; 382 Bip_Throw(va, tdict); 383 } 384 Return_Unify_Pw(v, t, pw1->val, pw1->tag); 385} 386 387#endif /* NEW_ORACLE */ 388 389 390/*------------------------------------------------------------------ 391 * Initialization code 392 *------------------------------------------------------------------*/ 393 394void 395bip_parallel_init(int flags) 396{ 397 if (flags & INIT_SHARED) 398 { 399#ifdef NEW_ORACLE 400 (void) exported_built_in(in_dict("get_oracle", 3), 401 p_get_oracle3, B_UNSAFE); 402 (void) exported_built_in(in_dict("install_oracle", 1), 403 p_install_oracle, B_SAFE); 404 (void) exported_built_in(in_dict("install_pending_oracle", 0), 405 p_install_pending_oracle, B_SAFE); 406 (void) built_in(in_dict("recomputing", 0), 407 p_recomputing, B_SAFE); 408 (void) exported_built_in(in_dict("oracle_check", 1), 409 p_oracle_check, B_UNSAFE); 410 (void) local_built_in(in_dict("set_par_goal", 1), 411 p_set_par_goal, B_UNSAFE); 412 (void) local_built_in(in_dict("get_par_goal", 1), 413 p_get_par_goal, B_UNSAFE|U_UNIFY); 414#endif 415 (void) local_built_in(in_dict("dbag_create", 1), 416 p_dbag_create, B_SAFE|U_GROUND); 417 (void) local_built_in(in_dict("dbag_enter", 2), 418 p_dbag_enter, B_SAFE|U_NONE); 419 (void) local_built_in(in_dict("dbag_dissolve", 2), 420 p_dbag_dissolve, B_UNSAFE|U_UNIFY); 421 } 422} 423 424 425