1% ---------------------------------------------------------------------- 2% BEGIN LICENSE BLOCK 3% Version: CMPL 1.1 4% 5% The contents of this file are subject to the Cisco-style Mozilla Public 6% License Version 1.1 (the "License"); you may not use this file except 7% in compliance with the License. You may obtain a copy of the License 8% at www.eclipse-clp.org/license. 9% 10% Software distributed under the License is distributed on an "AS IS" 11% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See 12% the License for the specific language governing rights and limitations 13% under the License. 14% 15% The Original Code is The ECLiPSe Constraint Logic Programming System. 16% The Initial Developer of the Original Code is Cisco Systems, Inc. 17% Portions created by the Initial Developer are 18% Copyright (C) 1989-2006 Cisco Systems, Inc. All Rights Reserved. 19% 20% Contributor(s): ECRC GmbH 21% Contributor(s): IC-Parc, Imperal College London 22% 23% END LICENSE BLOCK 24% 25% System: ECLiPSe Constraint Logic Programming System 26% Version: $Id: io.pl,v 1.19 2015/05/19 22:16:32 jschimpf Exp $ 27% ---------------------------------------------------------------------- 28 29/* 30 * SEPIA PROLOG SOURCE MODULE 31 */ 32 33/* 34 * IDENTIFICATION: io.pl, part of module(sepia_kernel) 35 * 36 * DESCRIPTION: 37 * 38 * 39 * CONTENTS: 40 * 41 */ 42 43/* 44 * GLOBAL DIRECTIVES 45 */ 46:- pragma(nodebug). 47:- pragma(expand). 48:- pragma(skip). 49 50:- export 51 current_stream/3, 52 get_stream_info/3, 53 set_stream_property/3, 54 current_stream/1, 55 current_compiled_file/3, 56 dump_header/1, 57 dump_term/3, 58 exec/2, 59 exec/3, 60 exec_group/3, 61 make/0, 62 open/4, 63 sh/1, 64 system/1, 65 get_file_info/3, 66 op/3, 67 global_op/3, 68 phrase/2, 69 phrase/3, 70 peer/1, 71 peer_get_property/3, 72 peer_queue_create/5, 73 peer_queue_close/1, 74 peer_queue_get_property/3, 75 peer_multitask_confirm/0, 76 peer_multitask_terminate/0, 77 peer_register_multitask/2, 78 peer_deregister_multitask/1, 79 peer_do_multitask/1. 80 81 82:- tool(phrase/2, phrase_body/3). 83:- tool(phrase/3, phrase_body/4). 84:- tool(file_query/2, file_query_body/3). 85 86 87%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 88% current_stream(?Stream) 89% if Stream is uninstantiated, then stream handles returned. 90% if used for testing, a stream name is accepted as well. 91 92current_stream(Stream) :- var(Stream), !, 93 get_stream(stdin, StdIn), % stream 1 94 gen_open_stream(StdIn, Stream). 95current_stream(Stream) :- check_stream_spec(Stream), !, 96 is_open_stream(Stream). 97current_stream(Stream) :- 98 bip_error(current_stream(Stream)). 99 100 % This could be a backtacking external 101 gen_open_stream(Prev, Stream) :- 102 ( next_open_stream(Prev, Next) -> 103 ( Stream = Prev ; gen_open_stream(Next, Stream) ) 104 ; 105 Stream = Prev 106 ). 107 108 109% current_stream(?File, ?Mode, ?Stream) - DEPRECATED 110current_stream(File, Mode, Stream) :- 111 ( 112 check_var_or_atom_string(File), 113 check_var_or_atom(Mode), 114 check_var_or_stream_spec(Stream) 115 -> 116 ( var(Stream) -> 117 get_stream(stdin, StdIn), 118 gen_open_stream(StdIn, Stream) 119 ; 120 is_open_stream(Stream) % else fail 121 ), 122 stream_info_(Stream, 0, File), 123 stream_info_(Stream, 2, Mode) 124 ; 125 bip_error(current_stream(File, Mode, Stream)) 126 ). 127 128 129%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 130% get_stream_info(+Stream, ?Info, ?Value) 131% t a s k : accesss various data in the stream descriptor 132 133get_stream_info(Stream, Info, Value) :- 134 ( check_valid_stream(Stream) -> 135 ( var(Info) -> 136 stream_info_nr(Info, N), 137 stream_info_wrapper(Stream, N, Value) 138 ; atom(Info) -> 139 ( stream_info_nr(Info, N) -> 140 stream_info_wrapper(Stream, N, Value) 141 ; stream_info_nr_hidden(Info, N) -> 142 stream_info_(Stream, N, Value) 143 ; error(6, get_stream_info(Stream, Info, Value)) 144 ) 145 ; 146 error(5, get_stream_info(Stream, Info, Value)) 147 ) 148 ; 149 bip_error(get_stream_info(Stream, Info, Value)) 150 ). 151 152stream_info_wrapper(Stream, N, Value) :- 153 ( stream_info_nr(output_options, N) -> 154 stream_info_(Stream, N, On), 155 stream_info_nr_hidden(print_depth, N1), 156 stream_info_(Stream, N1, Depth), 157 options_from_format(On, Depth, Value) 158 ; 159 stream_info_(Stream, N, Value) 160 ). 161 162:- mode stream_info_nr(?,-). 163stream_info_nr(name, 0). 164%stream_info_nr(mode, 2). % old-style mode 165%stream_info_nr(physical_stream, 4). % hidden 166stream_info_nr(aliases, 3). 167stream_info_nr(system_use, 7). 168stream_info_nr(line, 5). 169stream_info_nr(offset, 6). 170stream_info_nr(prompt, 1). 171stream_info_nr(prompt_stream, 8). 172stream_info_nr(fd, 9). 173stream_info_nr(port, 10). 174stream_info_nr(connection, 11). 175stream_info_nr(reprompt_only, 12). 176stream_info_nr(device, 13). 177stream_info_nr(mode, 15). 178stream_info_nr(event, 17). 179stream_info_nr(flush, 18). 180stream_info_nr(yield, 19). 181stream_info_nr(end_of_line, 20). 182stream_info_nr(scramble, 21). 183stream_info_nr(sigio, 22). 184stream_info_nr(usable, 23). 185stream_info_nr(macro_expansion, 24). 186stream_info_nr(output_options, 25). 187%stream_info_nr(print_depth, 26). % hidden 188stream_info_nr(compress, 27). 189stream_info_nr(last_written, 28). 190stream_info_nr(handle, 29). 191stream_info_nr(delete_file, 30). 192stream_info_nr(path, 31). 193stream_info_nr(reposition, 32). 194stream_info_nr(encoding, 33). 195stream_info_nr(input, 34). 196stream_info_nr(output, 35). 197stream_info_nr(end_of_stream, 36). 198stream_info_nr(eof_action, 37). 199 200stream_info_nr_hidden(physical_stream, 4). 201stream_info_nr_hidden(print_depth, 26). 202 203 204set_stream_property(Stream, Info, Value) :- 205 set_stream_property1(Stream, Info, Value), 206 !. 207set_stream_property(Stream, Info, Value) :- 208 bip_error(set_stream_property(Stream, Info, Value)). 209 210 set_stream_property1(_Stream, Info, _Value) :- var(Info), !, 211 set_bip_error(4). 212 set_stream_property1(Stream, output_options, Options) :- !, 213 options_to_format(Options, 0, _Off, 0, On, 0, Depth, 1200, _Prec), 214 stream_info_nr(output_options, I1), 215 set_stream_prop_(Stream, I1, On), 216 stream_info_nr_hidden(print_depth, I2), 217 set_stream_prop_(Stream, I2, Depth). 218 set_stream_property1(Stream, Info, Value) :- 219 ( stream_info_nr(Info, Nr) -> true ; set_bip_error(6) ), 220 set_stream_prop_(Stream, Nr, Value). 221 222 223 224current_compiled_file(File, Time, Module) :- 225 current_compiled_file(File, Time, Module, _Goal). 226 227 228make :- 229 current_compiled_file(File, Time, Module, Goal), 230 get_file_info(File, mtime) =\= Time, 231 Goal@Module, % normally compile(File)@Module 232 fail. 233make. 234 235 236 237open(File, Mode, Stream, Options) :- 238 open(File, Mode, Stream), 239 set_stream_options(Options, Stream), !. 240open(File, Mode, Stream, Options) :- 241 bip_error(open(File, Mode, Stream, Options)). 242 243set_stream_options(Options, _) :- var(Options), !, set_bip_error(4). 244set_stream_options([], _) :- !. 245set_stream_options([O|Os], Stream) :- !, 246 set_stream_option(O, Stream), 247 set_stream_options(Os, Stream). 248set_stream_options(_, _) :- 249 set_bip_error(5). 250 251 set_stream_option(Option, _) :- var(Option), !, set_bip_error(4). 252 set_stream_option(alias(Name), Stream) ?- !, 253 ( current_stream(Name) -> set_bip_error(192) % ISO requirement 254 ; set_stream(Name, Stream) ). 255 set_stream_option(type(text), _Stream) ?- !. % ISO (only open/4) 256 set_stream_option(type(binary), Stream) ?- !, % ISO (only open/4) 257 stream_info_nr(encoding, I), 258 set_stream_prop_(Stream, I, octet). 259 set_stream_option(reposition(false), _Stream) ?- !. % ISO 260 set_stream_option(reposition(true), Stream) ?- !, 261 ( stream_info_nr(reposition, Nr), stream_info_(Stream, Nr, true) -> true 262 ; set_bip_error(192) ). % ISO 263 set_stream_option(output_options(Options), Stream) ?- 264 options_to_format(Options, 0, _Off, 0, On, 0, Depth, 1200, _Prec), 265 stream_info_nr(output_options, I1), 266 set_stream_prop_(Stream, I1, On), 267 stream_info_nr_hidden(print_depth, I2), 268 set_stream_prop_(Stream, I2, Depth), 269 !. 270 set_stream_option(Option, Stream) :- 271 compound(Option), 272 functor(Option, Name, 1), 273 arg(1, Option, Value), 274 stream_info_nr(Name, Nr), 275 !, 276 set_stream_prop_(Stream, Nr, Value). 277 set_stream_option(_Option, _) :- set_bip_error(6). 278 279 280 281%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 282% 283% OPERATORS 284% 285 286:- tool( op/3, local_op_body/4). 287:- tool( global_op/3, global_op_body/4). 288 289local_op_body(Preced, Assoc, Op, Module):- 290 op_body(local, Preced, Assoc, Op, Module), !. 291local_op_body(Preced, Assoc, Op, Module):- 292 bip_error(op(Preced, Assoc, Op), Module). 293 294global_op_body(Preced, Assoc, Op, Module):- 295 op_body(global, Preced, Assoc, Op, Module), !. 296global_op_body(Preced, Assoc, Op, Module):- 297 bip_error(global_op(Preced, Assoc, Op), Module). 298 299% Note: unfortunately, according to ISO, op(P,A,[]) means op(P,A,[[]]). 300op_body(Visible, Preced, Assoc, Ops, Module) :- nonvar(Ops), Ops=[_|_], !, 301 op_body1(Visible, Preced, Assoc, Ops, Module). 302op_body(Visible, Preced, Assoc, Ops, Module) :- 303 op_(Visible, Preced, Assoc, Ops, Module). 304 305op_body1(_, _, _, Ops, _) :- var(Ops), !, set_bip_error(4). 306op_body1(_, _, _, [], _) :- !. 307op_body1(Visible, Preced, Assoc, [Op|Ops], Module) :- 308 ( atom(Op) -> true ; var(Op) ), 309 !, 310 % report errors per-operator, if possible 311 ( op_(Visible, Preced, Assoc, Op, Module) -> true 312 ; Visible == local -> bip_error(op(Preced, Assoc, Op), Module) 313 ; bip_error(global_op(Preced, Assoc, Op), Module) ), 314 op_body1(Visible, Preced, Assoc, Ops, Module). 315op_body1(_, _, _, _, _) :- set_bip_error(5). 316 317 318%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 319% 320% read_term/3 and write_term/3 (ISO compatible) 321% In case of conflict, use the rightmost option 322 323:- export 324 read_term/2, 325 read_term/3. 326 327:- tool(read_term/2, read_term_/3). 328:- tool(read_term/3, read_term_/4). 329 330read_term_(Term, Options, Module) :- 331 read_term_(input, Term, Options, Module). 332 333read_term_(Stream, Term, Options, Module) :- % 8.14.1 334 check_read_options(Options), 335 !, 336 readvar(Stream, Term, Vars, Module), 337 handle_read_options(Options, Term, Vars). 338read_term_(Stream, Term, Options, Module) :- 339 bip_error(read_term(Stream, Term, Options), Module). 340 341 check_read_options(Options) :- var(Options), !, 342 set_bip_error(4). 343 check_read_options([]) :- !. 344 check_read_options([O|Os]) :- !, 345 check_read_option(O), 346 check_read_options(Os). 347 check_read_options(_Options) :- 348 set_bip_error(5). 349 350 :- mode handle_read_options(+,?,+). 351 handle_read_options([], _, _) :- !. 352 handle_read_options([O|Os], Term, Vars) :- !, 353 handle_read_option(O, Term, Vars), 354 handle_read_options(Os, Term, Vars). 355 356 % Always change the next 2 predicates together! 357 check_read_option(Option) :- var(Option), !, set_bip_error(4). 358 check_read_option(variables(_)). 359 check_read_option(variable_names(_)). 360 check_read_option(singletons(_)). 361 check_read_option(_) :- set_bip_error(6). 362 363 % If you make a change here, change also check_read_option/1! 364 :- mode handle_read_option(+,?,+). 365 handle_read_option(variables(Vs), Term, _Vars) :- 366 term_variables_lr(Term, Vs). 367 handle_read_option(variable_names(VNs), _Term, Vars) :- 368 name_eq_var(Vars, VNs). 369 handle_read_option(singletons(NamesSingletons), Term, NsVs) :- 370 collect_variables(Term, [], Vars), 371 ( Vars = [] -> 372 NamesSingletons = [] 373 ; 374 sort(0, =<, Vars, SortedVars), 375 SortedVars = [_X|Xs], 376 collect_singletons(_X, Xs, Singletons), 377 add_names(Singletons, NsVs, NamesSingletons) 378 ). 379 380 vars_only([], []). 381 vars_only([[_|V]|Vars], [V|Vs]) :- 382 vars_only(Vars, Vs). 383 384 name_eq_var([], []). 385 name_eq_var([[N|V]|VNs], [N=V|Vs]) :- 386 name_eq_var(VNs, Vs). 387 388 collect_singletons(_X, [], [_X]). 389 collect_singletons(_X, [_Y|Ys], Singletons) :- 390 ( _X == _Y -> 391 skip_multiples(_Y, Ys, Singletons) 392 ; 393 Singletons = [_X|Singletons1], 394 collect_singletons(_Y, Ys, Singletons1) 395 ). 396 397 skip_multiples(_, [], []). 398 skip_multiples(_X, [_Y|Ys], Singletons) :- 399 ( _X == _Y -> 400 skip_multiples(_Y, Ys, Singletons) 401 ; 402 collect_singletons(_Y, Ys, Singletons) 403 ). 404 405 add_names([], _, []). 406 add_names([S|Ss], NsVs, NsSs) :- 407 ( varnamelookup(S, NsVs, N) -> NsSs = [N=S|NsSs1] ; NsSs = NsSs1 ), 408 add_names(Ss, NsVs, NsSs1). 409 410 varnamelookup(X, [[N|Y]|_], N) :- X==Y, !. 411 varnamelookup(X, [_|T], N):- varnamelookup(X, T, N). 412 413 414:- export 415 write_term/2, 416 write_term/3. 417 418:- tool(write_term/2, write_term_/3). 419:- tool(write_term/3, write_term_/4). 420 421write_term_(Term, Options, Module) :- 422 write_term_(output, Term, Options, Module). 423 424write_term_(Stream, Term, Options, Module) :- % 8.14.2 425 options_to_format(Options, 0, Off, 0, On, 0, Depth, 1200, Prec), 426 write_term(Stream, Term, Off, On, Depth, Prec, Module), 427 !. 428write_term_(Stream, Term, Options, Module) :- 429 bip_error(write_term(Stream, Term, Options), Module). 430 431 432% The following auxiliary predicates map symbolic write-options to 433% bitmask+depth used on the C level (in write.c) and vice versa 434 435:- mode options_to_format(?,+,-,+,-,+,-,+,-). % may fail with bip_error 436options_to_format(List, _, _, _, _, _, _, _, _) :- var(List), !, 437 set_bip_error(4). 438options_to_format([], Off, Off, On, On, Depth, Depth, Prec, Prec) :- !. 439options_to_format([O|Os], Off0, Off, On0, On, Depth0, Depth, Prec0, Prec) :- !, 440 option_to_format(O, ThisOff, ThisOn, ThisDepth, ThisPrec), 441 Off1 is Off0 /\ \ThisOn \/ ThisOff, 442 On1 is On0 /\ \ThisOff \/ ThisOn, 443 ( var(ThisDepth) -> Depth1 = Depth0 ; Depth1 = ThisDepth ), 444 ( var(ThisPrec) -> Prec1 = Prec0 ; Prec1 = ThisPrec ), 445 options_to_format(Os, Off1, Off, On1, On, Depth1, Depth, Prec1, Prec). 446options_to_format(_, _, _, _, _, _, _, _, _) :- 447 set_bip_error(5). 448 449 option_to_format(Junk, _, _, _, _) :- var(Junk), !, 450 set_bip_error(4). 451 option_to_format(Option, C, S, D, _P) :- 452 option_format(Option, C, S, D), !. 453 option_to_format(Option, C, S, D, _P) :- 454 option_format_compat(Option, C, S, D), !. 455 option_to_format(precedence(P0), 0, 0, _D, P) :- !, 456 P = P0. 457 option_to_format(priority(P0), 0, 0, _D, P) :- !, % SICStus/SWI compat 458 P = P0. 459 option_to_format(Junk, _, _, _, _) :- compound(Junk), !, 460 set_bip_error(6). 461 option_to_format(_, _, _, _, _) :- 462 set_bip_error(5). 463 464 % same as before, but just check 465 valid_write_option(Junk) :- var(Junk), !, 466 fail. 467 valid_write_option(Option) :- 468 option_format(Option, _, _, _), !. 469 valid_write_option(Option) :- 470 option_format_compat(Option, _, _, _), !. 471 valid_write_option(precedence(_)). 472 valid_write_option(priority(_)). 473 474 475options_from_format(On, Depth, Options) :- 476 findall(Option, ( 477 option_format(Option, _, Bits, Depth), 478 On /\ Bits =\= 0 479 ), Options0), 480 once option_format(depth(full), _, FullDepthBit, _), 481 ( On /\ FullDepthBit =:= 0, Depth \== 0 -> 482 Options = [depth(Depth)|Options0] 483 ; 484 Options = Options0 485 ). 486 487 488% Output options 489% 490% ISO compatible: ignore_ops, quoted, numbervars 491% SICStus compatible: max_depth, portrayed 492% CAUTION: The numeric constants must match the definitions in ec_io.h! 493 494% option_format(?Option, -BitsToClear, -BitsToSet, ?MaxDepth). 495:- mode option_format(?,-,-,?). 496option_format(variables(anonymous), 16'4030, 16'4000, _). % VAR_ANON 497option_format(variables(default), 16'4030, 16'0000, _). 498option_format(variables(raw), 16'4030, 16'0010, _). % VAR_NUMBERS 499option_format(variables(full), 16'4030, 16'0020, _). % VAR_NAMENUM 500option_format(attributes(none), 16'0500, 16'0000, _). 501option_format(attributes(pretty), 16'0500, 16'0100, _). % ATTRIBUTE 502option_format(attributes(full), 16'0500, 16'0400, _). % STD_ATTR 503option_format(as(term), 16'1200, 16'0000, _). 504option_format(as(clause), 16'1200, 16'1000, _). % WRITE_CLAUSE 505option_format(as(goal), 16'1200, 16'0200, _). % WRITE_GOAL 506option_format(newlines(true), 16'0000, 16'2000, _). % DONT_QUOTE_NL 507option_format(newlines(false), 16'2000, 16'0000, _). 508option_format(operators(true), 16'0001, 16'0000, _). 509option_format(operators(false), 16'0000, 16'0001, _). % CANONICAL 510option_format(dotlists(true), 16'0000, 16'0004, _). % DOTLIST 511option_format(dotlists(false), 16'0004, 16'0000, _). 512option_format(transform(true), 16'0800, 16'0000, _). 513option_format(transform(false), 16'0000, 16'0800, _). % NO_MACROS 514option_format(quoted(true), 16'0000, 16'0008, _). % QUOTED 515option_format(quoted(false), 16'0008, 16'0000, _). 516option_format(numbervars(true), 16'0000, 16'8000, _). % OUT_DOLLAR_VAR 517option_format(numbervars(false), 16'8000, 16'0000, _). 518option_format(portrayed(true), 16'0000, 16'0040, _). % PRINT_CALL 519option_format(portrayed(false), 16'0040, 16'0000, _). 520option_format(depth(full), 16'0000, 16'0002, 0). % FULLDEPTH 521option_format(depth(N), 16'0002, 16'0000, N). 522option_format(compact(true), 16'0000, 16'0080, _). % WRITE_COMPACT 523option_format(compact(false), 16'0080, 16'0000, _). 524option_format(fullstop(true), 16'00000,16'20000, _). % TERM_FULLSTOP 525option_format(fullstop(false), 16'20000,16'00000, _). 526option_format(nl(true), 16'00000,16'40000, _). % TERM_NEWLINE 527option_format(nl(false), 16'40000,16'00000, _). 528 529option_format_compat(ignore_ops(true), 16'0000, 16'0805, _). % ISO compat 530option_format_compat(ignore_ops(false), 16'0805, 16'0000, _). 531option_format_compat(max_depth(0), 16'0000, 16'0002, 0). % SICS compat 532option_format_compat(max_depth(N), 16'0002, 16'0000, N). 533 534 535% 536% term_string(?Term, ?String, +Options) 537% This is currently strict wrt Options: they must fit with the direction. % SWI is permissive and ignores all unknown options in either direction. 538% 539 540:- export term_string/3. 541:- tool(term_string/3, term_string_/4). 542 543term_string_(T, S, Options, Module) :- var(S), !, 544 open(string(""), write, Stream), 545 filter_options(Options, write, WOptions), 546 write_term_(Stream, T, [ 547 attributes(full),quoted(true),numbervars(true), 548 variables(raw),depth(full),transform(false)|WOptions], Module), 549 stream_info_(Stream, 0, S), % = get_stream_info(Stream,name,S) 550 close(Stream). 551term_string_(T, S, Options, Module) :- string(S), !, 552 ( S \== "" -> 553 open(string(S), read, Stream), 554 ( 555 filter_options(Options, read, ROptions), 556 read_term_(Stream, T0, ROptions, Module), 557 read_token_(Stream, end_of_file, _, Module) 558 -> 559 close(Stream), 560 T = T0 561 ; 562 close(Stream), 563 error(7, term_string(T, S), Module) 564 ) 565 ; 566 error(7, term_string(T, S), Module) 567 ). 568term_string_(T, S, Options, Module) :- 569 error(5, term_string(T, S, Options), Module). 570 571 filter_options([Option|Options], RW, FOptions) ?- !, 572 ( ignore_option(Option, RW) -> 573 FOptions = FOptions0 574 ; 575 FOptions = [Option|FOptions0], 576 get_bip_error(_) % clear the error code 577 ), 578 filter_options(Options, RW, FOptions0). 579 filter_options(Options, _RW, Options). % [] and non-lists (for error later) 580 581 % fails with bip_error 582 ignore_option(Option, read) ?- option_to_format(Option, _, _, _, _). 583 ignore_option(Option, write) ?- check_read_option(Option). 584 585 586%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 587 588/**** REMEMBER TO UPDATE annotated_term used in raw form by expand_macros 589 **** and friends when changing the definition here 590 **** definition now moved to kernel.pl, update it there 591:- export struct(annotated_term( 592 term, % var, atomic or compound 593 type, % atom or var/1 594 file, % atom 595 line, % integer 596 from, % integer 597 to % integer 598 % may be extended in future 599 )). 600****/ 601 602:- export read_annotated/2. 603:- tool(read_annotated/2, read_annotated_/3). 604 605read_annotated_(Stream, AnnTerm, Module) :- 606 read_annotated_raw(Stream, RawAnnTerm, HasMacros, Module), 607 ( HasMacros == 1 -> 608 unannotate_term(RawAnnTerm, RawTerm), 609 expand_macros_annotated_(RawTerm, RawAnnTerm, _Term, AnnTerm, Module) 610 ; 611 AnnTerm = RawAnnTerm 612 ). 613 614 615:- export read_annotated/3. 616:- tool(read_annotated/3, read_annotated_/4). 617 618read_annotated_(Stream, Term, AnnTerm, Module) :- 619 read_annotated_raw(Stream, RawAnnTerm, HasMacros, Module), 620 unannotate_term(RawAnnTerm, RawTerm), 621 ( HasMacros == 1 -> 622 expand_macros_annotated_(RawTerm, RawAnnTerm, Term, AnnTerm, Module) 623 ; 624 Term = RawTerm, AnnTerm = RawAnnTerm 625 ). 626 627 628unannotate_term(end_of_file, Term) :- -?-> 629 Term = end_of_file. 630unannotate_term(annotated_term{term:TermAnn}, Term) :- -?-> 631 ( compound(TermAnn) -> 632 functor(TermAnn, F, A), 633 functor(Term, F, A), 634 unannotate_term_args(A, TermAnn, Term) 635 ; 636 Term = TermAnn 637 ). 638 639 unannotate_term_args(0, _TermAnn, _Term) :- !. 640 unannotate_term_args(I, TermAnn, Term) :- 641 I1 is I-1, 642 arg(I, TermAnn, AnnArg), 643 arg(I, Term, Arg), 644 unannotate_term(AnnArg, Arg), 645 unannotate_term_args(I1, TermAnn, Term). 646 647 648 649%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 650 651% write the header for a .eco file 652 653dump_header(Out) :- 654 % magic .eco header (see procedure.c) 655 put(Out, 16'EC), put(Out, 16'1C), put(Out, 16'29), 656 put(Out, 16'16), % ECO_CURRENT_VERSION, see procedure.c 657 % flush before switching to scramble mode 658 flush(Out), 659 % next line contains key that must be used in the .eco loader 660 set_stream_property(Out, scramble, 73540), 661 % 8 random bytes to make decryption more difficult 662 % (it may be better to have one after every dumped term) 663 random(R), get_flag(unix_time, T), 664 R1 is R/\255, R2 is R>>8/\255, R3 is R>>16/\255, R4 is R>>24/\255, 665 R5 is T/\255, R6 is T>>8/\255, R7 is T>>16/\255, R8 is T>>24/\255, 666 put(Out, R1), put(Out, R7), put(Out, R3), put(Out, R5), 667 put(Out, R2), put(Out, R8), put(Out, R4), put(Out, R6). 668 669 670% write a term in .eco format 671 672dump_term(Out, Term, Module) :- 673 term_to_bytes_(Term, String, Module), 674 string_length(String, Length), 675 write_integer(Out, Length), 676 printf(Out, "%Tw", String). % no macros! 677 678write_integer(Out, N) :- 679 Byte0 is N /\ 16'ff, 680 Byte1 is (N >> 8) /\ 16'ff, 681 Byte2 is (N >> 16) /\ 16'ff, 682 Byte3 is (N >> 24) /\ 16'ff, 683 put(Out, Byte3), 684 put(Out, Byte2), 685 put(Out, Byte1), 686 put(Out, Byte0). 687 688 689%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 690 691:- mode file_query_body(++, +, +). 692file_query_body(call(Goal), _, M) :- % call/1 forces execution 693 !, 694 call(Goal)@M. 695file_query_body((A, B), Proc, M) :- 696 !, 697 file_query_body(A, Proc, M), 698 file_query_body(B, Proc, M). 699file_query_body((A->B;C), Proc, M) :- 700 !, 701 (file_query_body(A, Proc, M) -> 702 file_query_body(B, Proc, M) 703 ; 704 file_query_body(C, Proc, M) 705 ). 706file_query_body((A;B), Proc, M) :- 707 !, 708 ( 709 file_query_body(A, Proc, M) 710 ; 711 file_query_body(B, Proc, M) 712 ). 713file_query_body([File|L], Proc, M) :- 714 !, 715 call_proc(Proc, File, M), 716 (L == [] -> 717 true 718 ; 719 file_query_body(L, Proc, M) 720 ). 721file_query_body(compile(File), Proc, M) :- 722 !, 723 (File = [_|_] -> 724 file_query_body(File, Proc, M) 725 ; 726 call_proc(Proc, File, M) 727 ). 728file_query_body(ensure_loaded(Files), Proc, M) :- 729 !, 730 (Files = [_|_] -> 731 file_query_body(Files, Proc, M) 732 ; 733 call_proc(Proc, Files, M) 734 ). 735file_query_body(:-(Goal), Proc, M) :- 736 !, 737 file_query_body(Goal, Proc, M). 738file_query_body(?-(Goal), Proc, M) :- 739 !, 740 file_query_body(Goal, Proc, M). 741file_query_body(meta_attribute(_, _), _, M) :- 742 !, 743 meta_attribute(M, []). 744file_query_body(Goal, _Proc, M) :- 745 execute(Goal) -> 746 call(Goal)@M 747 ; 748 true. 749 750:- mode execute(+). 751execute(use_module(_)). 752execute(define_struct(_)). % library(structures) 753execute(erase_struct(_)). 754execute(op(_, _, _)). 755execute(global_op(_, _, _)). 756execute(local_op(_, _, _)). 757execute(set_flag(A, _)) :- allowed_flag(A). 758execute(get_flag(_, _)). 759execute(define_global_macro(_, _, _)). 760execute(define_local_macro(_, _, _)). 761execute(define_macro(_, _, _)). 762execute(erase_macro(_)). 763execute(set_chtab(_, _)). 764execute(asserta(_)). 765execute(assert(_)). 766execute(assertz(_)). 767execute(compile_term(_)). 768execute(cprolog). 769execute(quintus). 770execute(bsi). 771execute(sicstus). 772 773:- mode allowed_flag(+). 774allowed_flag(library_path). 775allowed_flag(macro_expansion). 776allowed_flag(prolog_suffix). 777 778call_proc(Proc, File, M) :- 779 copy_term(Proc, Copy), 780 arg(1, Copy, File), 781 call(Copy)@M. 782 783%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 784% 785% EXEC 786% 787 788exec(Command, Streams) :- 789 exec(Command, Streams, Pid, 2), % fails on error 790 !, 791 wait(Pid, Code), % waitpid() 792 ( Code /\ 8'377 =:= 0 -> % process exited normally 793 Status is Code >> 8 /\ 8'377, 794 Err is Status - 128, 795 (Err > 0 -> 796 set_last_errno(Err), 797 error(170, exec(Command, Streams)) 798 ; 799 true 800 ) 801 ; Code /\ 8'377 =:= 8'177 -> % process stopped 802 error(175, exec(Command, Streams)) 803 ; % process died 804 error(174, exec(Command, Streams)) 805 ). 806exec(Command, Streams) :- 807 bip_error(exec(Command, Streams)). 808 809 810exec(Command, Streams, Pid) :- 811 exec(Command, Streams, Pid, 0), !. 812exec(Command, Streams, Pid) :- 813 bip_error(exec(Command, Streams, Pid)). 814 815exec_group(Command, Streams, Pid) :- 816 exec(Command, Streams, Pid, 1), !. 817exec_group(Command, Streams, Pid) :- 818 bip_error(exec_group(Command, Streams, Pid)). 819 820 821%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 822% 823% Sh 824% 825 826system(X) :- 827 ( get_flag('_system'/1, defined, on) -> 828 '_system'(X) 829 ; 830 exec(['/bin/sh', '-c', X], []) 831 ). 832 833sh(X) :- 834 system(X). 835 836%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 837% 838% MACROS 839% 840 841phrase_body(Grammar, S, R, M) :- 842 var(Grammar), 843 !, 844 error(4, phrase(Grammar, S, R), M). 845phrase_body(Grammar, S, R, M) :- 846 check_grammar(Grammar, S, R, M, NewGr), 847 call(NewGr)@M. 848 849check_grammar(Grammar, S, R, M, NewGr) :- 850 ((number(Grammar) ; string(Grammar)) -> 851 error(5, phrase(Grammar, S, R), M) 852 ; 853 true 854 ), 855 Grammar =.. [F | L], 856 append(L, [S, R], NL), 857 NewGr =.. [F | NL]. 858 859phrase_body(Grammar, S, M) :- 860 phrase_body(Grammar, S, [], M). 861 862%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 863% 864% FILES 865% 866get_file_info(File, Name, Value) :- 867 check_atom_string(File), 868 check_var_or_atom(Name), 869 check_var_or_atomic(Value), 870 !, 871 expand_filename(File, ExpandedFile, 1), % EXPAND_STANDARD 872 do_get_file_info(ExpandedFile, Name, Value). 873get_file_info(File, Name, Value) :- 874 bip_error(get_file_info(File, Name, Value)). 875 876 877% This predicate expects an already expanded file name! 878do_get_file_info(File, device, X) :- 879 sys_file_flag(File, 9, X). 880do_get_file_info(File, inode, X) :- 881 sys_file_flag(File, 1, X). 882do_get_file_info(File, mode, X) :- 883 sys_file_flag(File, 0, X). 884do_get_file_info(File, nlink, X) :- 885 sys_file_flag(File, 2, X). 886do_get_file_info(File, uid, X) :- 887 sys_file_flag(File, 3, X). 888do_get_file_info(File, uname, X) :- 889 sys_file_flag(File, 15, X). 890do_get_file_info(File, gid, X) :- 891 sys_file_flag(File, 4, X). 892do_get_file_info(File, gname, X) :- 893 sys_file_flag(File, 16, X). 894do_get_file_info(File, size, X) :- 895 sys_file_flag(File, 5, X). 896do_get_file_info(File, atime, X) :- 897 sys_file_flag(File, 6, X). 898do_get_file_info(File, adate, X) :- 899 sys_file_flag(File, 12, X). 900do_get_file_info(File, mtime, X) :- 901 sys_file_flag(File, 7, X). 902do_get_file_info(File, mdate, X) :- 903 sys_file_flag(File, 13, X). 904do_get_file_info(File, ctime, X) :- 905 sys_file_flag(File, 8, X). 906do_get_file_info(File, cdate, X) :- 907 sys_file_flag(File, 14, X). 908do_get_file_info(File, blksize, X) :- 909 sys_file_flag(File, 11, X). 910do_get_file_info(File, blocks, X) :- 911 sys_file_flag(File, 10, X). 912do_get_file_info(File, readable, X) :- 913 process_file_permission(readable, N), 914 sys_file_flag(File, N, X). 915do_get_file_info(File, writable, X) :- 916 process_file_permission(writable, N), 917 sys_file_flag(File, N, X). 918do_get_file_info(File, executable, X) :- 919 process_file_permission(executable, N), 920 sys_file_flag(File, N, X). 921do_get_file_info(File, type, Type) :- 922 sys_file_flag(File, 0, Mode), 923 TypeBits is Mode /\ 8'170000, 924 ( TypeBits == 8'010000 -> Type = fifo 925 ; TypeBits == 8'020000 -> Type = char_device 926 ; TypeBits == 8'040000 -> Type = directory 927 ; TypeBits == 8'060000 -> Type = block_device 928 ; TypeBits == 8'100000 -> Type = file 929 ; TypeBits == 8'120000 -> Type = link 930 ; TypeBits == 8'140000 -> Type = socket 931 ; Type = unknown 932 ). 933do_get_file_info(File, compiled_time, Time) :- 934 current_compiled_file(File, Time, _Module, _Goal). 935 936 937%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 938% 939% Remote Socket Interface 940% 941% Recorded information: 942% recorded_list(peer_info, PeerInfos) - Infotmation on all the peers 943% 944% Each PeerInfos is a structure: 945% 946% PeerName-peer_info(PeerType, Lang, Key, Connect) 947% 948% PeerName: peer name (atom) 949% PeerType: either remote or embed. 950% Lang: programming language of the peer (atom). 951% Key: the key used to access dynamic data associated with the peer. 952% Connect: connection information. Either: 953% remote(PeerHost,LocalHost,TimeOut) 954% embed(PeerHost,LocalHost,TimeOut) 955% 956% PeerHost is the hostname for the peer side. Used for verifying 957% that any accepted socket connection comes from the same host. 958% LocalHost is the ECLiPSe side hostname specification. This is 959% used in setting up a server connection to the peer side. 960% What is used has implications for what the remote side can 961% give for the server (ECLiPSe) hostname when making a client 962% connection: 963% 1) actual ECLiPSe side hostname: remote side must also 964% use this name for the server. 965% 2) localhost: remote side must used localhost for the 966% server name. This restricts the connection to the 967% local machine. 968% 3) not instantiated: remote side can use either 969% localhost or the actual server hostname. 970% We use the same specification for all connections between 971% ECLiPSe and the peer for security reasons. 972% TimeOut is the time-out (in seconds) for accepting any socket 973% connection, plus waiting time for the initial read of data 974% during attachment. Can be 'block' for no time-outs 975% 976% Currently, the only dynamic data associated with a peer are the peer queues. 977% Each queue is an record item recorded under the Key for the peer as: 978% 979% queue(StreamNum) 980% 981% From StreamNum, a queue key can be derived by calling get_peer_queue_key/2. 982% Information for the queue is recorded under this key: 983% 984% peer_queue(PeerType, PeerName, QueueType, Direction) 985% 986% PeerType: peer type for the queue: Either embed or remote 987% PeerName: peer name for the queue. 988% QueueType: the type of the queue. This is either: 989% sync(SocketName): synchronous remote queue, with socket 990% async : asynchronous remote queue 991% embed : queue in a embedded peer 992% Direction: direction of queue. Either fromec, toec or bidirect 993% 994% The above information is used to clean up a remote side when it is 995% disconnected 996% 997% Dealing with events: 998% To ensure that the remote interface protocol is followed at all times, 999% event handling is deferred during most of the code below. We only 1000% allow events during 1001% - running rpcs 1002% - running user goals, e.g. remote_init 1003% - remote flush (but not if inside ec_waitio) 1004% but these goals must be safely wrapped in catch/3 to make sure the 1005% events are deferred again even on failure/throw. 1006% Note that, since we re-enable events temporarily from within events-deferred 1007% code, we cannot allow nesting, i.e. remote_accept, peer_queue_xxx and 1008% flush/waitio handlers must be called from non-events-deferred contexts. 1009% 1010% First clause is the current version of the remote protocol. 1011% The version information should not occur anywhere else on the ECLiPSe side. 1012% 1013 1014remote_version(1). 1015 1016:- local variable(rpeer_count, 0). 1017:- local variable(in_ec_waitio, []). 1018:- local struct(peer_info(type,lang,key,connect)). 1019:- local struct(peer_queue(ptype,pname,qtype,dir)). 1020 1021 1022non_interruptable(Goal) :- 1023 ( events_defer -> 1024 call(Goal), 1025 events_nodefer 1026 ; 1027 printf(warning_output, "Warning: Illegal events_defer nesting detected during remote protocol (%w)",[Goal]), 1028 call(Goal) 1029 ). 1030 1031 1032remote_connect(Address, Control, Init, Mod) :- 1033 remote_connect_setup(Address, Control, Soc), !, 1034 printf(log_output, "Socket created at address %w%n%b", [Address]), 1035 remote_connect_accept(Control, Soc, block, Init, "", _, Mod). 1036remote_connect(Address, Control, Init, _Mod) :- 1037 error(5, remote_connect(Address, Control, Init)). 1038 1039remote_connect_setup(Host/Port, Control, Soc) :- 1040 check_var_or_integer(Port), 1041 check_var_or_atom(Control), 1042 check_var(Soc), 1043 !, 1044 copy_term(Host,OrigHost), % OrigHost can be a variable 1045 new_socket_server(Soc, Host/Port, 2), 1046 (var(Control) -> 1047 new_remote_peer_name(Control) 1048 ; 1049 not_existing_peer_name(Control) 1050 ), 1051 recorda(remote_control_host, Control-OrigHost). 1052remote_connect_setup(Address, Control, Soc) :- 1053 bip_error(remote_connect_setup(Address, Control, Soc)). 1054 1055new_remote_peer_name(Name) :- 1056 repeat, 1057 incval(rpeer_count), 1058 getval(rpeer_count, NPeer), 1059 concat_atom([peer, NPeer], Name), 1060 not_existing_peer_name(Name), !. 1061 1062not_existing_peer_name(Name) :- 1063% fails if Name is either an existing or potential peer 1064 \+ peer(Name), \+ recorded(remote_control_host, Name-_). 1065 1066 1067remote_connect_accept(Control, Soc, TimeOut, Init, Pass, Res, Mod) :- 1068 check_nonvar(Pass), 1069 erase(remote_control_host, Control-Host), 1070 get_rpcstream_names(Control, Rpc), 1071 timed_accept(Soc, TimeOut, RemoteHost, Control), 1072 check_remote_version(Control), 1073 timed_read_exdr(Control, TimeOut, Pass0), 1074 (Pass == Pass0 -> true ; set_bip_error(1)), 1075 write_exdr(Control, Control), flush(Control), 1076 % Host is the host name that will be used in any subsequent connections 1077 timed_read_exdr(Control, TimeOut, RemoteLang), 1078 timed_accept(Soc, TimeOut, RemoteHost, Rpc), 1079 write_exdr(Rpc, Control), flush(Rpc), 1080 set_peer_property(Control, peer_info{type:remote,lang:RemoteLang, 1081 connect:remote(RemoteHost,Host,TimeOut)}), 1082 set_event_handler(Control, true/0), 1083 close(Soc), 1084 events_defer, % fail if already deferred (can't handle nesting) 1085 !, 1086 catch(( 1087 run_remote_init(Init, Res, Mod), 1088 remote_control_read(Control, Message), 1089 handle_ec_resume(Message, Control), 1090 events_nodefer 1091 ), Tag, ( 1092 events_nodefer, 1093 throw(Tag) 1094 )). 1095remote_connect_accept(Control, Soc, TimeOut, Init, Pass, Res, Mod) :- 1096 (nonvar(Soc),current_stream(Soc) -> close(Soc) ; true), 1097 (nonvar(Control), current_stream(Control) -> close(Control) ; true), 1098 get_bip_error(Err), 1099 error(Err, remote_connect_accept(Control, Soc, TimeOut, Init, Pass, Res, Mod)). 1100 1101 1102check_remote_version(Control) :- 1103 (timed_read_exdr(Control, 100, RemoteVersion) -> 1104 true ; set_bip_error(6) 1105 ), 1106 get_flag(remote_protocol_version, Version), 1107 (RemoteVersion == remote_protocol(Version) -> 1108 write_exdr(Control, "yes"), flush(Control) 1109 ; write_exdr(Control, Version), flush(Control), 1110 printf(error, "Incompatible remote protocol on remote side: %w%n", 1111 [RemoteVersion]), 1112 set_bip_error(141) 1113 ). 1114 1115timed_read_exdr(Stream, TimeOut, Data) :- 1116 stream_select([Stream], TimeOut, [Stream]), 1117 catch(read_exdr(Stream, Data), _, fail). 1118 1119timed_accept(Server, TimeOut, RemoteHost, NewQueue) :- 1120 stream_select([Server], TimeOut, [Server]), 1121 accept(Server, RemoteHost0/_, NewQueue), 1122 (RemoteHost = RemoteHost0 -> 1123 true ; close(NewQueue), fail 1124 ). 1125 1126 1127% events deferred! 1128run_remote_init(Init, Res, Mod) :- 1129 ( nonvar(Init), var(Res) -> 1130 catch(( 1131 events_nodefer, 1132 (call(Init)@Mod -> Res = Init ; Res = fail), 1133 events_defer 1134 ), 1135 _, 1136 (events_defer, Res = throw) 1137 ) 1138 ; nonvar(Res) -> 1139 printf(warning_output, "Warning: result argument %w for initial goal not a variable in remote_control_accept/5. Initial Goal not executed.", [Res]) 1140 ; true 1141 ). 1142 1143peer_info(Peer, Info) :- 1144 recorded(peer_info, Peer-Info). 1145 1146peer(Peer) :- 1147 ( var(Peer) -> 1148 peer_info(Peer, _) 1149 ; atom(Peer) -> 1150 once peer_info(Peer, _) 1151 ; error(5, peer(Peer)) 1152 ). 1153 1154peer_get_property(Peer, Property, Value) :- 1155 check_atom(Peer), 1156 check_var_or_atom(Property), 1157 !, 1158 once(peer_info(Peer, Info)), 1159 get_a_peer_property(Property, Info, Value). 1160peer_get_property(Peer, Property, Value) :- 1161 bip_error(peer_get_property(Peer,Property,Value)). 1162 1163set_embed_peer(Peer, Lang) :- 1164 \+peer(Peer), 1165 get_flag(hostname, Host), 1166 set_peer_property(Peer, peer_info{type:embed,lang:Lang,connect:embed(Host,Host,block)}). 1167 1168% all the predicates that access peer_info directly should be put here 1169get_embed_peer(Peer) :- 1170 recorded(peer_info, Peer-(peer_info{type: embed})), !. 1171 1172set_peer_property(Peer, Info) :- 1173 get_peer_dyn_info_key(Peer, Key), 1174 Info = peer_info{key: Key}, 1175 recorda(peer_info, Peer-Info). 1176 1177get_a_peer_property(type, peer_info{type:Type}, Type). 1178get_a_peer_property(language, peer_info{lang:Lang}, Lang). 1179get_a_peer_property(connect, peer_info{connect:Connect}, Connect). 1180get_a_peer_property(queues, peer_info{key:Key}, Qs) :- 1181 findall(Queue,(recorded(Key,queue(Nr)),get_stream(Nr,Queue)), Qs). 1182 1183 1184peer_queue_get_property(Queue, Prop, Value) :- 1185 check_stream_spec(Queue), 1186 check_var_or_atom(Prop), 1187 !, 1188 get_queueinfo_st(Queue, _, QueueInfo), 1189 get_queueinfo_item(Prop, QueueInfo, Value). 1190peer_queue_get_property(Queue, Prop, Value) :- 1191 bip_error(peer_queue_get_property(Queue,Prop,Value)). 1192 1193 1194get_queue_info(Name, Nr, Peer, QType, Dir) :- 1195 get_queueinfo_st(Name, Nr, QueueInfo), 1196 get_queueinfo_item(peer, QueueInfo, Peer), 1197 get_queueinfo_item(type, QueueInfo, QType), 1198 get_queueinfo_item(direction, QueueInfo, Dir). 1199 1200get_queueinfo_st(Name, Nr, QueueInfo) :- 1201 current_stream(Name), 1202 get_stream_info(Name, physical_stream, Nr), 1203 get_peer_queue_key(Nr, Key), 1204 recorded(Key, QueueInfo), !. 1205 1206get_queueinfo_item(peer_type, peer_queue{ptype:PeerType}, PeerType). 1207get_queueinfo_item(peer_name, peer_queue{pname:Peer}, Peer). 1208get_queueinfo_item(type, peer_queue{qtype:Type}, Type). 1209get_queueinfo_item(direction, peer_queue{dir:Direction}, Direction). 1210get_queueinfo_item(peer, peer_queue{ptype:PeerType,pname:Peer}, 1211 PInfo) :- % for backwards compatibility 1212 PInfo =.. [PeerType, Peer]. 1213 1214 1215is_remote_sync_queue(PhysicalStream, Socket, ControlStream) :- 1216 peer_queue_get_property(PhysicalStream, peer, remote(ControlStream)), 1217 peer_queue_get_property(PhysicalStream, type, sync(Socket)). 1218 1219 1220deregister_queue(Stream, Control) :- 1221 get_stream_info(Stream, physical_stream, StreamNum), 1222 get_peer_queue_key(StreamNum, Key), 1223 erase_all(Key), 1224 get_peer_dyn_info_key(Control, ControlKey), 1225 recorded(ControlKey, queue(StreamNum), Ref), !, 1226 erase(Ref). 1227 1228register_remote_queue(Name, Control, Type, Direction) :- 1229 get_stream_info(Name, physical_stream, Nr), 1230 get_peer_queue_key(Nr, Key), 1231 get_peer_dyn_info_key(Control, ControlKey), 1232 recorda(ControlKey, queue(Nr)), 1233 recorda(Key, peer_queue{ptype:remote, pname:Control, qtype:Type, dir:Direction}). 1234 1235 1236register_embed_queue(Name, Peer, Direction) :- 1237 get_stream_info(Name, physical_stream, Nr), 1238 get_peer_queue_key(Nr, Key), 1239 peer_get_property(Peer, type, embed), 1240 get_peer_dyn_info_key(Peer, PeerKey), 1241 recorda(PeerKey, queue(Nr)), 1242 recorda(Key, peer_queue{ptype:embed, pname:Peer, qtype:sync(Nr), dir:Direction}). 1243 1244 1245get_peer_dyn_info_key(Control, ControlKey) :- 1246 concat_atom([peer_dynamic_info, Control], ControlKey). 1247 1248get_peer_queue_key(N, Key) :- 1249 concat_atom([peer_queue, N], Key). 1250 1251 1252new_socket_server(Soc, Address, N) :- 1253 socket(internet, stream, Soc), 1254 bind(Soc, Address), 1255 listen(Soc, N). 1256 1257 1258peer_queue_close(Queue) :- 1259 check_stream_spec(Queue), !, 1260 get_queue_info(Queue, StreamNum, Peer, QType, _Direction), 1261 non_interruptable( 1262 close_peer_queue_type(Peer, StreamNum, QType) 1263 ). 1264peer_queue_close(Queue) :- 1265 bip_error(peer_queue_close(Queue)). 1266 1267 1268 close_peer_queue_type(remote(Peer), StreamNum, QType) :- 1269 remote_control_send(Peer, queue_close(StreamNum)), 1270 remote_control_read(Peer, ResumeMessage), 1271 close_remote_queue_eclipseside(Peer, StreamNum, QType), 1272 handle_ec_resume(ResumeMessage, Peer). 1273 close_peer_queue_type(embed(Peer), StreamNum, _QType) :- 1274 write_exdr(embed_info, queue_close(StreamNum)), 1275 flush(embed_info), 1276 close_embed_queue_eclipseside(Peer, StreamNum). 1277 1278 close_embed_queue_eclipseside(Peer, StreamNum) :- 1279 deregister_queue(StreamNum, Peer), 1280 close(StreamNum). 1281 1282 close_remote_queue_eclipseside(Control, StreamNum, QType) :- 1283 deregister_queue(StreamNum, Control), 1284 close_remote_physical_streams(QType, StreamNum). 1285 1286 close_remote_physical_streams(sync(Socket), StreamNum) :- 1287 (current_stream(StreamNum) -> close(StreamNum) ; true), 1288 (current_stream(Socket) -> close(Socket) ; true). 1289 close_remote_physical_streams(async, StreamNum) :- 1290 (current_stream(StreamNum) -> close(StreamNum) ; true). 1291 1292 1293 1294peer_queue_create(Name, Control, Sync, Direction, Event) :- 1295 non_interruptable( 1296 peer_queue_create1(Name, Control, Sync, Direction, Event) 1297 ). 1298 1299 1300peer_queue_create1(Name, Control, Sync, Direction, Event) :- 1301 (atom(Name), atom(Control), is_event(Event) -> 1302 true ; set_bip_error(5) 1303 ), 1304 peer_get_property(Control, connect, Type), 1305 (Sync == sync -> 1306 (Direction == fromec ; Direction == toec ; set_bip_error(6)) 1307 ; 1308 (Sync == async, functor(Type,remote,_)) 1309 ; 1310 set_bip_error(6) 1311 ), !, 1312 create_peer_queue_type(Type, Name, Control, Sync, Direction, Event). 1313peer_queue_create1(Name, Control, Sync, Direction, Event) :- 1314 get_bip_error(E), 1315 error(E, peer_queue_create(Name, Control, Sync, Direction, Event)). 1316 1317 1318 % events deferred! 1319 create_peer_queue_type(remote(PeerHost,LocalHost,TimeOut), Name, Control, Sync, Direction, Event) ?- 1320 new_socket_server(Soc, LocalHost/Port, 1), 1321 remote_control_send(Control, socket_client(Port, Name, Sync, Direction)), 1322 remote_control_read(Control, ResumeMessage), 1323 (is_disconnection(ResumeMessage) -> 1324 close(Soc), 1325 handle_ec_resume(ResumeMessage, Control) 1326 ; 1327 ResumeMessage = socket_connect(Name,Status) -> 1328 connect_remote_queue(Status, Soc, Name, Control, Sync, Direction, Event, TimeOut, PeerHost, Return), 1329 Return \== fail % fails if connection failed 1330 ; 1331 printf(error, "Unexpected control message %w while creating peer queue %w on remote side %w; disconnecting.%n", [ResumeMessage, Name, Control]), 1332 close(Soc), 1333 handle_ec_resume(disconnect, Control) 1334 ). 1335 create_peer_queue_type(embed(_,_,_), Name, _Peer, _Sync, Direction, Event) ?- 1336 ecl_create_embed_queue(Name, Direction, Event), 1337 get_stream_info(Name, physical_stream, Nr), 1338 write_exdr(embed_info, queue_connect(Name, Nr, Direction)), 1339 flush(embed_info). 1340 1341 1342 ecl_create_embed_queue(Name, Direction, Event) :- 1343 get_embed_peer(Peer), 1344 (Direction == fromec -> 1345 Options = [yield(on)], 1346 Mode = write 1347 ; 1348 (Event == '' -> Options = [yield(on)] ; Options = [event(Event)]), 1349 Mode = read 1350 ), 1351 open(queue(""), Mode, Name, Options), 1352 register_embed_queue(Name, Peer, Direction). 1353 1354 1355 is_disconnection(disconnect). 1356 is_disconnection(disconnect_resume). 1357 is_disconnection(end_of_file). 1358 1359 % events deferred! 1360 connect_remote_queue(success, Soc, Name, Control, Sync, Direction, Event, TimeOut, RHost, StreamId) :- 1361 catch( 1362 (create_remote_queue(Sync, Direction, Soc, Name, Control, TimeOut, RHost, Event) -> 1363 get_stream_info(Name, physical_stream, StreamId) 1364 ; 1365 % Timed out or other problem 1366 close(Soc), 1367 StreamId = fail 1368 ), _, ((current_stream(Soc) -> close(Soc);true), StreamId = fail) 1369 ), 1370 remote_control_send(Control, socket_accept(Name,StreamId)), 1371 remote_control_read(Control, ResumeMessage), 1372 handle_ec_resume(ResumeMessage, Control). 1373 connect_remote_queue(fail, Soc, Name, Control, _, _, _, _, _, StreamId) :- 1374 close(Soc), 1375 StreamId = fail, 1376 remote_control_send(Control, socket_accept(Name, fail)), 1377 remote_control_read(Control, ResumeMessage), 1378 handle_ec_resume(ResumeMessage, Control). 1379 1380 1381 create_remote_queue(async, _, Soc, Name, Control, TimeOut, RHost, Event) ?- 1382 remote_create_async_queue(Soc, Name, Control, TimeOut, RHost, Event). 1383 create_remote_queue(sync, fromec, Soc, Name, Control, TimeOut, RHost, Event) ?- 1384 remote_create_fromec_queue(Soc, Name, Control, TimeOut, RHost, Event). 1385 create_remote_queue(sync, toec, Soc, Name, Control, TimeOut, RHost, Event) ?- 1386 remote_create_toec_queue(Soc, Name, Control, TimeOut, RHost, Event). 1387 1388 % memory queue needed to allow eof event to be raised reading empty queue 1389 remote_create_toec_queue(Soc, Name, Control, TimeOut, RemoteHost, Event) :- 1390 open(queue(""), update, Name), 1391 concat_atom([Name, soc], SocName), 1392 timed_accept(Soc, TimeOut, RemoteHost, SocName), 1393 close(Soc), 1394 (Event == '' -> 1395 set_stream_property(Name, yield, on) 1396 ; set_stream_property(Name, event, Event) 1397 ), 1398 register_remote_queue(Name, Control, sync(SocName), toec). 1399 1400 1401 % memory queue needed for buffering output. 1402 % Event is dummy for now, to be used for remote side requesting data 1403 remote_create_fromec_queue(Soc, Name, Control, TimeOut, RemoteHost, _Event) :- 1404 open(queue(""), update, Name, [yield(on)]), 1405 concat_atom([Name, soc], SocName), 1406 timed_accept(Soc, TimeOut, RemoteHost, SocName), 1407 close(Soc), 1408 register_remote_queue(Name, Control, sync(SocName), fromec). 1409 1410 remote_create_async_queue(Soc, Name, Control, TimeOut, RemoteHost, Event) :- 1411 % use Control to remember which remote process this stream is connected to 1412 timed_accept(Soc, TimeOut, RemoteHost, Name), 1413 (Event == '' -> 1414 true 1415 ; set_stream_property(Name, event, Event) 1416 ), 1417 close(Soc), 1418 register_remote_queue(Name, Control, async, bidirect). 1419 1420 1421% returns end_of_file as a message if something goes wrong 1422remote_control_read(Control, Message) :- 1423 catch((read_exdr(Control, Message) -> true ; Message = end_of_file), 1424 _, Message = end_of_file 1425 ). 1426 1427% catches any prblems before sending control message 1428remote_control_send(Control, Message) :- 1429 (stream_select([Control], 0, [Control]) -> 1430 % unexpected message arrived on control stream 1431 remote_control_read(Control, InMessage), 1432 ((InMessage == disconnect_resume; InMessage == end_of_file) -> 1433 % unilateral disconnect from remote side; disconnect locally now 1434 remote_cleanup(Control), 1435 throw(peer_abort_disconnected) 1436 ; printf(error, "Unexpected incoming message %w on remote %w.\n", [InMessage,Control]), 1437 throw(peer_abort_error) 1438 ) 1439 ; 1440 write_exdr(Control, Message), 1441 flush(Control) 1442 ). 1443 1444 1445:- local finalization(disconnect_remotes). 1446 1447disconnect_remotes :- 1448 recorded_list(peer_info, Remotes), 1449 disconnect_remotes(Remotes). 1450 1451disconnect_remotes([]). 1452disconnect_remotes([Control-_|Controls]) :- 1453 remote_disconnect(Control), 1454 disconnect_remotes(Controls). 1455 1456 1457remote_disconnect(Control) :- 1458 ((nonvar(Control), current_stream(Control), 1459 peer_get_property(Control,type,remote) 1460 ) -> 1461 remote_control_send(Control, disconnect), 1462 (read_exdr(Control, disconnect_resume) -> 1463 remote_cleanup(Control) 1464 ; % if not resume, then problem.... 1465 true 1466 ) 1467 ; true % Control is not a current remote peer... 1468 ). 1469 1470 1471% events not deferred! 1472remote_output(PhysicalStream, ControlStream, RemoteStream) :- 1473 non_interruptable(( 1474 read_string(PhysicalStream, end_of_file, Len, Data), 1475 yield_to_remote(ControlStream, ec_flushio(PhysicalStream, Len), RemoteStream, Data) 1476 )). 1477 1478 % events deferred! 1479 yield_to_remote(ControlStream, YieldMessage, DataStream, Data) :- 1480 remote_control_send(ControlStream, YieldMessage), 1481 write(DataStream, Data), 1482 flush(DataStream), 1483 remote_control_read(ControlStream, ResumeMessage), 1484 handle_ec_resume(ResumeMessage, ControlStream). 1485 1486 1487% events not deferred! 1488remote_input(PhysicalStream, ControlStream) :- 1489 non_interruptable(( 1490 remote_control_send(ControlStream, ec_waitio(PhysicalStream)), 1491 wait_for_remote_input(PhysicalStream, ControlStream) 1492 )). 1493 1494 % wait for remote input to arrive, handle any messages before this, 1495 % data is then copied from the socket to the queue stream (physical stream) 1496 % events deferred! 1497 wait_for_remote_input(PhysicalStream, ControlStream) :- 1498 % we expect at least one rem_flushio-message and a resume 1499 setval(in_ec_waitio, PhysicalStream), 1500 remote_control_read(ControlStream, Message0), 1501 expect_control(ControlStream, 1502 [rem_flushio(PhysicalStream, _), rem_flushio(PhysicalStream)], 1503 Message0, Message1), 1504 handle_control(Message1, ControlStream, Message2), 1505 setval(in_ec_waitio, []), 1506 expect_control(ControlStream, [resume], Message2, _). 1507 1508 1509remote_rpc_handler(Rpc, Control) :- 1510 % The socket rpc can only handle a single rpc 1511 % the rpc goal corresponding to the control message must eventually 1512 % arrive on the Rpc socket stream 1513 stream_select([Rpc], block, [Rpc]), % wait until Rpc stream is ready.. 1514 catch(execute_remote_rpc(Rpc, Control), _, handle_remote_rpc_throw(Rpc, Control)). 1515 1516 execute_remote_rpc(Rpc, Control) :- 1517 read_exdr(Rpc, Goal), 1518 events_nodefer, 1519 execute_rpc(Rpc, Goal, ( 1520 events_defer, 1521 remote_control_send(Control,yield) 1522 )). 1523 1524 handle_remote_rpc_throw(Rpc, Control) :- 1525 events_defer, 1526 remote_control_send(Control, yield), 1527 write_exdr(Rpc, throw), flush(Rpc). 1528 1529 1530% Handle initial message Message0 (and possibly further messages on Control) 1531% until we get one of the messages specified in the list Expected. 1532% The expected message itself is not handled, but returned as ExpectedMessage. 1533 1534% events deferred! 1535expect_control(Control, Expected, Message0, ExpectedMessage) :- 1536 ( nonmember(Message0, Expected) -> 1537 ( Message0 = resume -> 1538 printf(warning_output, 1539 "Unexpected resume from remote peer %w while waiting for %w%n%b", 1540 [Control, Expected]), 1541 % yield back and hope for the best 1542 remote_yield(Control, Message1) 1543 ; 1544 % some other message, try to process it 1545 handle_control(Message0, Control, Message1) 1546 ), 1547 expect_control(Control, Expected, Message1, ExpectedMessage) 1548 ; 1549 ExpectedMessage = Message0 1550 ). 1551 1552 1553% Handle initial message Message (and possibly further messages on Control). 1554% Return as soon as we get a resume message. 1555 1556% events deferred! 1557handle_ec_resume(Message, Control) :- 1558 expect_control(Control, [resume], Message, _Message). 1559 1560 1561% events deferred! 1562handle_control(rpc, Control, NextMsg) :- -?-> !, % rpc call 1563 get_rpcstream_names(Control, Rpc), 1564 remote_rpc_handler(Rpc, Control), 1565 remote_control_read(Control, NextMsg). 1566handle_control(disconnect, Control, _NextMsg) :- -?-> !, % disconnect request 1567 write_exdr(Control, disconnect_yield), % acknowledge disconnect 1568 flush(Control), 1569 remote_cleanup(Control), 1570 throw(peer_abort_disconnected). 1571handle_control(rem_flushio(Queue), Control, NextMsg) :- -?-> !, 1572 get_stream_info(Queue, device, Device), 1573 deal_with_remote_flush(Device, Queue, unknown), 1574 remote_yield(Control, NextMsg). 1575handle_control(rem_flushio(Queue, Len), Control, NextMsg) :- -?-> !, 1576 get_stream_info(Queue, device, Device), 1577 deal_with_remote_flush(Device, Queue, Len), 1578 remote_yield(Control, NextMsg). 1579handle_control(queue_create(Name,Sync,Direction,Event), Control, NextMsg) :- -?-> !, 1580 catch(( 1581 peer_queue_create1(Name, Control, Sync, Direction, Event) -> true;true), 1582 _, true 1583 ), 1584 remote_yield(Control, NextMsg). 1585handle_control(queue_close(Queue), Control, NextMsg) :- -?-> !, 1586 ((current_stream(Queue),get_queue_info(Queue, Queue, remote(Control), QType, _)) -> 1587 close_remote_queue_eclipseside(Control, Queue, QType) 1588 ; % not a remote queue, just ignore 1589 true 1590 ), remote_yield(Control, NextMsg). 1591handle_control(disconnect_resume, Control, _NextMsg) :- -?-> !, 1592% remote side already disconnected, no acknowledgement 1593 remote_cleanup(Control), 1594 throw(peer_abort_disconnected). 1595handle_control(end_of_file, Control, _NextMsg) :- -?-> !, 1596% Control is disconnected. Assume remote side disconnected unexpectedly 1597 remote_cleanup(Control), 1598 throw(peer_abort_disconnected). 1599handle_control(Message, Control, NextMsg) :- 1600 printf(error, "Unrecognised control signal %w; disconnecting.%n", 1601 [Message]), 1602 handle_control(disconnect, Control, NextMsg). 1603 1604 1605% events deferred! 1606deal_with_remote_flush(Device, Queue, Len) :- 1607 ( getval(in_ec_waitio, Queue) -> 1608 % this flush is the input corresponding to a ec_waitio 1609 % don't handle events 1610 catch(( 1611 deal_with_remote_flush1(Device, Queue, Len) -> true ; true 1612 ), _, true) % ignore any problems with the handler 1613 1614 ; events_nodefer -> %%%% this can't fail! 1615 % handle events during remote flush 1616 catch(( 1617 deal_with_remote_flush1(Device, Queue, Len) -> true ; true 1618 ), _, true), % ignore any problems with the handler 1619 events_defer 1620 ; 1621 printf(error, "Unexpected events_nodefer state in remote flush %w%n", [Queue]) 1622 ). 1623 1624 deal_with_remote_flush1(socket, Queue, Len) ?- !, 1625 % raw socket, is an asyn. queue; user process the data 1626 get_stream_info(Queue, event, Event), 1627 error(Event, rem_flushio(Queue, Len)). 1628 deal_with_remote_flush1(_, Queue, Len) :- 1629 % non-socket case, read data into a buffer 1630 peer_queue_get_property(Queue, type, sync(SockName)), 1631 read_sync_data_to_buffer(Len, Queue, SockName). 1632 1633 read_sync_data_to_buffer(Len, Queue, SockName) :- 1634 (integer(Len) -> 1635 (read_string(SockName, end_of_file, Len, Data) -> true ; Data = end_of_file), 1636 write(Queue, Data) 1637 ; % Length unknown, read as exdr term 1638 (read_exdr(SockName, Data) -> true ; Data = end_of_file), 1639 write_exdr(Queue, Data) 1640 ). 1641 1642 1643% make remote_cleanup more robust so that problems will not choke eclipse 1644% events deferred on entry, undeferred om exit 1645remote_cleanup(Control) :- 1646 catch(remote_cleanup_raw(Control), _, fail), !. 1647remote_cleanup(Control) :- 1648 printf(error, "Problem with cleaning up remote peer %w.%n", [Control]). 1649 1650 1651remote_cleanup_raw(Control) :- 1652 events_nodefer, % to make next line work 1653 (event(Control) -> true ; true), % user defined cleanup 1654 reset_event_handler(Control), 1655 get_peer_dyn_info_key(Control, ControlKey), 1656 % get all the socket streams associated with this remote process 1657 recorded_list(ControlKey, RemoteDynInfo), 1658 cleanup_dynamic_infos(RemoteDynInfo, Control), 1659 cleanup_peer_multitask_infos(Control), 1660 erase_all(ControlKey), 1661 get_rpcstream_names(Control, Rpc), 1662 (erase(peer_info, Control-_) -> true;true), 1663 close(Rpc), 1664 close(Control). 1665 1666cleanup_dynamic_infos([Item|Infos], Control) :- 1667 (Item = queue(Queue) -> 1668 get_queue_info(Queue, StreamNum, remote(Peer), QType, _Dir), 1669 close_remote_queue_eclipseside(Peer, StreamNum, QType) 1670 ; 1671 true 1672 ), 1673 cleanup_dynamic_infos(Infos, Control). 1674cleanup_dynamic_infos([], _). 1675 1676 1677% events deferred! 1678remote_yield(Control, ResumeMessage) :- 1679 nonvar(Control), 1680 peer(Control), 1681 current_stream(Control), 1682 remote_control_send(Control, yield), 1683 remote_control_read(Control, ResumeMessage). 1684 1685% events deferred or undeferred! 1686remote_yield(Control) :- 1687 ( events_defer -> Reset=events_nodefer ; Reset=true ), 1688 remote_yield(Control, ResumeMessage), 1689 handle_ec_resume(ResumeMessage, Control), 1690 Reset. 1691 1692 1693get_rpcstream_names(Control, Rpc) :- 1694 concat_atom([Control, '_rpc'], Rpc). 1695 1696 1697%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 1698% Peer Multitasking 1699 1700:- local struct(mt_peer(peer,msgq)). 1701:- local variable(peer_mt_status). 1702:- setval(peer_mt_status, off). 1703 1704peers_are_multitasking :- 1705 \+getval(peer_mt_status, off). 1706 1707peers_should_cycle :- 1708 getval(peer_mt_status, mt_set). 1709 1710 1711peer_register_multitask(Peer, MsgQ) :- 1712 (peer(Peer) -> 1713 \+ recorded(multitask_peers, mt_peer{peer:Peer}), 1714 concat_atom([Peer, multifrom], MsgQ), 1715 peer_queue_create(MsgQ, Peer, sync, fromec, ''), 1716 record(multitask_peers, mt_peer{peer:Peer,msgq:MsgQ}) 1717 ; 1718 error(6, peer_register_multitask(Peer, MsgQ)) 1719 ). 1720 1721peer_deregister_multitask(Peer) :- 1722 (peer(Peer) -> 1723 recorded(multitask_peers, mt_peer{peer:Peer,msgq:MsgQ}), 1724 cleanup_peer_multitask_infos(Peer), 1725 peer_queue_close(MsgQ) 1726 ; 1727 error(6, peer_deregister_multitask(Peer)) 1728 ). 1729 1730peer_do_multitask(Type) :- 1731 \+peers_are_multitasking, 1732 /* multitasking will terminate if peers do not confirm multitasking */ 1733 catch(( (peer_multitask_terminate, 1734 peer_multitask_phase(Type, Err) 1735 )-> true 1736 ; peer_end_multitask(Err) 1737 ), 1738 Tag, (peer_end_multitask(_Err2), Tag = Err) 1739 ), 1740 (nonvar(Err) -> throw(Err) ; true). 1741 1742 peer_multitask_phase(Type, Err) :- 1743 peers_mt_broadcast_with_cleanup(start_multitask(Type), Err), 1744 (nonvar(Err) -> true ; peers_mt_cycle(Err)), 1745 peer_end_multitask(Err). 1746 1747/* ensure that multitask phase is ended properly: if failure or 1748 throw occurs, broadcast end_multitask again */ 1749peer_end_multitask(Err) :- 1750 catch(( (peers_mt_broadcast_with_cleanup(end_multitask, Err), 1751 peer_multitask_off 1752 ) -> true 1753 ; peer_end_multitask(Err) 1754 ), _, peer_end_multitask(_)). 1755 1756 1757peer_multitask_terminate :- setval(peer_mt_status, mt_reset). 1758peer_multitask_confirm :- setval(peer_mt_status, mt_set). 1759peer_multitask_off :- setval(peer_mt_status, off). 1760 1761% avoids pushing witness pword onto global stack by avoiding a CP here 1762% all peer_mt_status state must be given by the clauses 1763do_peers_mt_cycle(mt_set, Err) ?- 1764 sleep(0.01), 1765 peers_mt_broadcast_with_cleanup(interact, Err), 1766 peers_mt_cycle(Err). 1767do_peers_mt_cycle(mt_reset, _Err) ?- true. 1768do_peers_mt_cycle(off, _Err) ?- true. 1769 1770peers_mt_cycle(Err) :- 1771 getval(peer_mt_status, Status), 1772 do_peers_mt_cycle(Status, Err). 1773 1774peers_mt_broadcast_with_cleanup(Msg, Err) :- 1775 % rollback the garbage generated by peers_mt_broadcast/2 1776 % if no error occurred 1777 (peers_mt_broadcast(Msg, Err), nonvar(Err) -> true ; true). 1778 1779 1780peers_mt_broadcast(Msg, Err) :- 1781 recorded_list(multitask_peers, Ps), 1782 (Ps \== [] -> 1783 peers_mt_broadcast1(Ps, Msg, Err) 1784 ; 1785 peer_multitask_terminate, 1786 (Err = peer_multitask_empty -> true; true) 1787 ). 1788 1789peers_mt_broadcast1([], _, _). 1790peers_mt_broadcast1([mt_peer{peer:Peer,msgq:MQ}|Ps], Msg, Err) :- 1791 catch(send_mt_message(MQ, Msg), Tag, 1792 peer_mt_error_recover(Tag,Peer,Err)), 1793 peers_mt_broadcast1(Ps, Msg, Err). 1794 1795 1796send_mt_message(ToPQ, Msg) :- 1797 % ignore failure (invalid terms substituted by _) 1798 (write_exdr(ToPQ, Msg) -> true;true), 1799 flush(ToPQ). 1800 1801% First case happens if a remote peer has disconnected. In this case, the 1802% remote peer code should have cleaned up already 1803peer_mt_error_recover(peer_abort_disconnected, _, _) :- !. 1804peer_mt_error_recover(abort, _Peer, Err) :- !, 1805 % abort raised. Stop multitasking and allow abort to continue 1806 peer_multitask_terminate, 1807 (Err = abort -> true ; true). 1808peer_mt_error_recover(Tag, Peer, Err) :- 1809 % something went wrong, remove problematic peer from multitasking 1810 % list and end multitask, follow by aborting with first error 1811 peer(Peer), 1812 cleanup_peer_multitask_infos(Peer), 1813 peer_multitask_terminate, 1814 (Tag = Err -> true ; true). 1815 1816cleanup_peer_multitask_infos(Peer) :- 1817 (erase(multitask_peers, mt_peer{peer:Peer}) -> true ; true). 1818 1819 1820%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 1821 1822% Allow users to trace grammar rules through phrase/2/3 1823?- unskipped 1824 phrase_body/3, 1825 phrase_body/4. 1826 1827% tool interfaces must be set to skipped explicitely 1828:- skipped 1829 file_query/2, 1830 global_op/3, 1831 op/3, 1832 read_token/2. 1833 1834% Set all output predicates to skipped in order not to trace the 1835% flush event handler (io_yield_handler) when it happens. 1836:- skipped 1837 flush/1, 1838 display/1, 1839 display/2, 1840 nl/0, 1841 nl/1, 1842 put/1, 1843 put/2, 1844 print/1, 1845 print/2, 1846 printf/2, 1847 printf/3, 1848 tyo/1, 1849 tyo/2, 1850 write/1, 1851 write/2, 1852 write_canonical/1, 1853 write_canonical/2, 1854 write_exdr/2, 1855 write_term/2, 1856 write_term/3, 1857 writeln/1, 1858 writeln/2, 1859 writeq/1, 1860 writeq/2. 1861 1862:- untraceable 1863 make/0. 1864 1865:- export 1866 file_query/2. 1867