1% ----------------------------------------------------------------------
2% BEGIN LICENSE BLOCK
3% Version: CMPL 1.1
4%
5% The contents of this file are subject to the Cisco-style Mozilla Public
6% License Version 1.1 (the "License"); you may not use this file except
7% in compliance with the License.  You may obtain a copy of the License
8% at www.eclipse-clp.org/license.
9%
10% Software distributed under the License is distributed on an "AS IS"
11% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied.  See
12% the License for the specific language governing rights and limitations
13% under the License.
14%
15% The Original Code is  The ECLiPSe Constraint Logic Programming System.
16% The Initial Developer of the Original Code is  Cisco Systems, Inc.
17% Portions created by the Initial Developer are
18% Copyright (C) 1989-2006 Cisco Systems, Inc.  All Rights Reserved.
19%
20% Contributor(s): ECRC GmbH
21% Contributor(s): IC-Parc, Imperal College London
22%
23% END LICENSE BLOCK
24%
25% System:	ECLiPSe Constraint Logic Programming System
26% Version:	$Id: io.pl,v 1.19 2015/05/19 22:16:32 jschimpf Exp $
27% ----------------------------------------------------------------------
28
29/*
30 * SEPIA PROLOG SOURCE MODULE
31 */
32
33/*
34 * IDENTIFICATION:	io.pl, part of module(sepia_kernel)
35 *
36 * DESCRIPTION:
37 *
38 *
39 * CONTENTS:
40 *
41 */
42
43/*
44 * GLOBAL DIRECTIVES
45 */
46:- pragma(nodebug).
47:- pragma(expand).
48:- pragma(skip).
49
50:- export
51	current_stream/3,
52	get_stream_info/3,
53	set_stream_property/3,
54	current_stream/1,
55	current_compiled_file/3,
56	dump_header/1,
57	dump_term/3,
58	exec/2,
59	exec/3,
60	exec_group/3,
61	make/0,
62	open/4,
63	sh/1,
64	system/1,
65	get_file_info/3,
66	op/3,
67	global_op/3,
68	phrase/2,
69	phrase/3,
70	peer/1,
71	peer_get_property/3,
72	peer_queue_create/5,
73	peer_queue_close/1,
74	peer_queue_get_property/3,
75        peer_multitask_confirm/0,
76        peer_multitask_terminate/0,
77        peer_register_multitask/2,
78        peer_deregister_multitask/1,
79        peer_do_multitask/1.
80
81
82:- tool(phrase/2, phrase_body/3).
83:- tool(phrase/3, phrase_body/4).
84:- tool(file_query/2, file_query_body/3).
85
86
87%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
88% current_stream(?Stream)
89% if Stream is uninstantiated, then stream handles returned.
90% if used for testing, a stream name is accepted as well.
91
92current_stream(Stream) :- var(Stream), !,
93	get_stream(stdin, StdIn),	% stream 1
94	gen_open_stream(StdIn, Stream).
95current_stream(Stream) :- check_stream_spec(Stream), !,
96	is_open_stream(Stream).
97current_stream(Stream) :-
98	bip_error(current_stream(Stream)).
99
100    % This could be a backtacking external
101    gen_open_stream(Prev, Stream) :-
102	( next_open_stream(Prev, Next) ->
103	    ( Stream = Prev ; gen_open_stream(Next, Stream) )
104	;
105	    Stream = Prev
106	).
107
108
109% current_stream(?File, ?Mode, ?Stream) - DEPRECATED
110current_stream(File, Mode, Stream) :-
111	(
112	    check_var_or_atom_string(File),
113	    check_var_or_atom(Mode),
114	    check_var_or_stream_spec(Stream)
115	->
116		( var(Stream) ->
117		    get_stream(stdin, StdIn),
118		    gen_open_stream(StdIn, Stream)
119		;
120		    is_open_stream(Stream)	% else fail
121		),
122		stream_info_(Stream, 0, File),
123		stream_info_(Stream, 2, Mode)
124	;
125	    bip_error(current_stream(File, Mode, Stream))
126	).
127
128
129%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
130% get_stream_info(+Stream, ?Info, ?Value)
131% t a s k : accesss various data in the stream descriptor
132
133get_stream_info(Stream, Info, Value) :-
134	( check_valid_stream(Stream) ->
135	    (   var(Info) ->
136		    stream_info_nr(Info, N),
137		    stream_info_wrapper(Stream, N, Value)
138	    ;   atom(Info) ->
139		(   stream_info_nr(Info, N) ->
140			stream_info_wrapper(Stream, N, Value)
141		;   stream_info_nr_hidden(Info, N) ->
142			stream_info_(Stream, N, Value)
143		;    error(6, get_stream_info(Stream, Info, Value))
144		)
145	    ;
146		error(5, get_stream_info(Stream, Info, Value))
147	    )
148	;
149	    bip_error(get_stream_info(Stream, Info, Value))
150	).
151
152stream_info_wrapper(Stream, N, Value) :-
153	( stream_info_nr(output_options, N) ->
154	    stream_info_(Stream, N, On),
155	    stream_info_nr_hidden(print_depth, N1),
156	    stream_info_(Stream, N1, Depth),
157	    options_from_format(On, Depth, Value)
158	;
159	    stream_info_(Stream, N, Value)
160	).
161
162:- mode stream_info_nr(?,-).
163stream_info_nr(name, 0).
164%stream_info_nr(mode, 2).	% old-style mode
165%stream_info_nr(physical_stream, 4).	% hidden
166stream_info_nr(aliases, 3).
167stream_info_nr(system_use, 7).
168stream_info_nr(line, 5).
169stream_info_nr(offset, 6).
170stream_info_nr(prompt, 1).
171stream_info_nr(prompt_stream, 8).
172stream_info_nr(fd, 9).
173stream_info_nr(port, 10).
174stream_info_nr(connection, 11).
175stream_info_nr(reprompt_only, 12).
176stream_info_nr(device, 13).
177stream_info_nr(mode, 15).
178stream_info_nr(event, 17).
179stream_info_nr(flush, 18).
180stream_info_nr(yield, 19).
181stream_info_nr(end_of_line, 20).
182stream_info_nr(scramble, 21).
183stream_info_nr(sigio, 22).
184stream_info_nr(usable, 23).
185stream_info_nr(macro_expansion, 24).
186stream_info_nr(output_options, 25).
187%stream_info_nr(print_depth, 26).	% hidden
188stream_info_nr(compress, 27).
189stream_info_nr(last_written, 28).
190stream_info_nr(handle, 29).
191stream_info_nr(delete_file, 30).
192stream_info_nr(path, 31).
193stream_info_nr(reposition, 32).
194stream_info_nr(encoding, 33).
195stream_info_nr(input, 34).
196stream_info_nr(output, 35).
197stream_info_nr(end_of_stream, 36).
198stream_info_nr(eof_action, 37).
199
200stream_info_nr_hidden(physical_stream, 4).
201stream_info_nr_hidden(print_depth, 26).
202
203
204set_stream_property(Stream, Info, Value) :-
205	set_stream_property1(Stream, Info, Value),
206	!.
207set_stream_property(Stream, Info, Value) :-
208	bip_error(set_stream_property(Stream, Info, Value)).
209
210    set_stream_property1(_Stream, Info, _Value) :- var(Info), !,
211	set_bip_error(4).
212    set_stream_property1(Stream, output_options, Options) :- !,
213	options_to_format(Options, 0, _Off, 0, On, 0, Depth, 1200, _Prec),
214	stream_info_nr(output_options, I1),
215	set_stream_prop_(Stream, I1, On),
216	stream_info_nr_hidden(print_depth, I2),
217	set_stream_prop_(Stream, I2, Depth).
218    set_stream_property1(Stream, Info, Value) :-
219	( stream_info_nr(Info, Nr) -> true ; set_bip_error(6) ),
220	set_stream_prop_(Stream, Nr, Value).
221
222
223
224current_compiled_file(File, Time, Module) :-
225	current_compiled_file(File, Time, Module, _Goal).
226
227
228make :-
229	current_compiled_file(File, Time, Module, Goal),
230	    get_file_info(File, mtime) =\= Time,
231	    Goal@Module,	% normally compile(File)@Module
232	fail.
233make.
234
235
236
237open(File, Mode, Stream, Options) :-
238	open(File, Mode, Stream),
239	set_stream_options(Options, Stream), !.
240open(File, Mode, Stream, Options) :-
241	bip_error(open(File, Mode, Stream, Options)).
242
243set_stream_options(Options, _) :- var(Options), !, set_bip_error(4).
244set_stream_options([], _) :- !.
245set_stream_options([O|Os], Stream) :- !,
246	set_stream_option(O, Stream),
247	set_stream_options(Os, Stream).
248set_stream_options(_, _) :-
249	set_bip_error(5).
250
251    set_stream_option(Option, _) :- var(Option), !, set_bip_error(4).
252    set_stream_option(alias(Name), Stream) ?- !,
253	( current_stream(Name) -> set_bip_error(192)	% ISO requirement
254	; set_stream(Name, Stream) ).
255    set_stream_option(type(text), _Stream) ?- !.	% ISO (only open/4)
256    set_stream_option(type(binary), Stream) ?- !,	% ISO (only open/4)
257	stream_info_nr(encoding, I),
258	set_stream_prop_(Stream, I, octet).
259    set_stream_option(reposition(false), _Stream) ?- !.	% ISO
260    set_stream_option(reposition(true), Stream) ?- !,
261    	( stream_info_nr(reposition, Nr), stream_info_(Stream, Nr, true) -> true
262	; set_bip_error(192) ).				% ISO
263    set_stream_option(output_options(Options), Stream) ?-
264	options_to_format(Options, 0, _Off, 0, On, 0, Depth, 1200, _Prec),
265	stream_info_nr(output_options, I1),
266	set_stream_prop_(Stream, I1, On),
267	stream_info_nr_hidden(print_depth, I2),
268	set_stream_prop_(Stream, I2, Depth),
269	!.
270    set_stream_option(Option, Stream) :-
271	compound(Option),
272        functor(Option, Name, 1),
273	arg(1, Option, Value),
274	stream_info_nr(Name, Nr),
275	!,
276	set_stream_prop_(Stream, Nr, Value).
277    set_stream_option(_Option, _) :- set_bip_error(6).
278
279
280
281%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
282%
283% OPERATORS
284%
285
286:- tool( op/3,	local_op_body/4).
287:- tool( global_op/3,		global_op_body/4).
288
289local_op_body(Preced, Assoc, Op, Module):-
290	op_body(local, Preced, Assoc, Op, Module), !.
291local_op_body(Preced, Assoc, Op, Module):-
292	bip_error(op(Preced, Assoc, Op), Module).
293
294global_op_body(Preced, Assoc, Op, Module):-
295	op_body(global, Preced, Assoc, Op, Module), !.
296global_op_body(Preced, Assoc, Op, Module):-
297	bip_error(global_op(Preced, Assoc, Op), Module).
298
299% Note: unfortunately, according to ISO, op(P,A,[]) means op(P,A,[[]]).
300op_body(Visible, Preced, Assoc, Ops, Module) :- nonvar(Ops), Ops=[_|_], !,
301	op_body1(Visible, Preced, Assoc, Ops, Module).
302op_body(Visible, Preced, Assoc, Ops, Module) :-
303	op_(Visible, Preced, Assoc, Ops, Module).
304
305op_body1(_, _, _, Ops, _) :- var(Ops), !, set_bip_error(4).
306op_body1(_, _, _, [], _) :- !.
307op_body1(Visible, Preced, Assoc, [Op|Ops], Module) :-
308	( atom(Op) -> true ; var(Op) ),
309	!,
310	% report errors per-operator, if possible
311	( op_(Visible, Preced, Assoc, Op, Module) -> true
312	; Visible == local -> bip_error(op(Preced, Assoc, Op), Module)
313	; bip_error(global_op(Preced, Assoc, Op), Module) ),
314        op_body1(Visible, Preced, Assoc, Ops, Module).
315op_body1(_, _, _, _, _) :- set_bip_error(5).
316
317
318%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
319%
320% read_term/3 and write_term/3 (ISO compatible)
321% In case of conflict, use the rightmost option
322
323:- export
324	read_term/2,
325	read_term/3.
326
327:- tool(read_term/2, read_term_/3).
328:- tool(read_term/3, read_term_/4).
329
330read_term_(Term, Options, Module) :-
331	read_term_(input, Term, Options, Module).
332
333read_term_(Stream, Term, Options, Module) :-		% 8.14.1
334	check_read_options(Options),
335	!,
336	readvar(Stream, Term, Vars, Module),
337	handle_read_options(Options, Term, Vars).
338read_term_(Stream, Term, Options, Module) :-
339	bip_error(read_term(Stream, Term, Options), Module).
340
341    check_read_options(Options) :- var(Options), !,
342    	set_bip_error(4).
343    check_read_options([]) :- !.
344    check_read_options([O|Os]) :- !,
345	check_read_option(O),
346	check_read_options(Os).
347    check_read_options(_Options) :-
348    	set_bip_error(5).
349
350    :- mode handle_read_options(+,?,+).
351    handle_read_options([], _, _) :- !.
352    handle_read_options([O|Os], Term, Vars) :- !,
353	handle_read_option(O, Term, Vars),
354	handle_read_options(Os, Term, Vars).
355
356    % Always change the next 2 predicates together!
357    check_read_option(Option) :- var(Option), !, set_bip_error(4).
358    check_read_option(variables(_)).
359    check_read_option(variable_names(_)).
360    check_read_option(singletons(_)).
361    check_read_option(_) :- set_bip_error(6).
362
363    % If you make a change here, change also check_read_option/1!
364    :- mode handle_read_option(+,?,+).
365    handle_read_option(variables(Vs), Term, _Vars) :-
366	term_variables_lr(Term, Vs).
367    handle_read_option(variable_names(VNs), _Term, Vars) :-
368    	name_eq_var(Vars, VNs).
369    handle_read_option(singletons(NamesSingletons), Term, NsVs) :-
370	collect_variables(Term, [], Vars),
371	( Vars = [] ->
372	    NamesSingletons = []
373	;
374	    sort(0, =<, Vars, SortedVars),
375	    SortedVars = [_X|Xs],
376	    collect_singletons(_X, Xs, Singletons),
377	    add_names(Singletons, NsVs, NamesSingletons)
378	).
379
380    vars_only([], []).
381    vars_only([[_|V]|Vars], [V|Vs]) :-
382	vars_only(Vars, Vs).
383
384    name_eq_var([], []).
385    name_eq_var([[N|V]|VNs], [N=V|Vs]) :-
386	name_eq_var(VNs, Vs).
387
388    collect_singletons(_X, [], [_X]).
389    collect_singletons(_X, [_Y|Ys], Singletons) :-
390	( _X == _Y ->
391	     skip_multiples(_Y, Ys, Singletons)
392	;
393	     Singletons = [_X|Singletons1],
394	     collect_singletons(_Y, Ys, Singletons1)
395	).
396
397    skip_multiples(_, [], []).
398    skip_multiples(_X, [_Y|Ys], Singletons) :-
399	( _X == _Y ->
400	     skip_multiples(_Y, Ys, Singletons)
401	;
402	     collect_singletons(_Y, Ys, Singletons)
403	).
404
405    add_names([], _, []).
406    add_names([S|Ss], NsVs, NsSs) :-
407	( varnamelookup(S, NsVs, N) -> NsSs = [N=S|NsSs1] ; NsSs = NsSs1 ),
408	add_names(Ss, NsVs, NsSs1).
409
410    varnamelookup(X, [[N|Y]|_], N) :- X==Y, !.
411    varnamelookup(X, [_|T], N):- varnamelookup(X, T, N).
412
413
414:- export
415	write_term/2,
416	write_term/3.
417
418:- tool(write_term/2, write_term_/3).
419:- tool(write_term/3, write_term_/4).
420
421write_term_(Term, Options, Module) :-
422	write_term_(output, Term, Options, Module).
423
424write_term_(Stream, Term, Options, Module) :-		% 8.14.2
425	options_to_format(Options, 0, Off, 0, On, 0, Depth, 1200, Prec),
426	write_term(Stream, Term, Off, On, Depth, Prec, Module),
427	!.
428write_term_(Stream, Term, Options, Module) :-
429	bip_error(write_term(Stream, Term, Options), Module).
430
431
432% The following auxiliary predicates map symbolic write-options to
433% bitmask+depth used on the C level (in write.c) and vice versa
434
435:- mode options_to_format(?,+,-,+,-,+,-,+,-).	% may fail with bip_error
436options_to_format(List, _, _, _, _, _, _, _, _) :- var(List), !,
437	set_bip_error(4).
438options_to_format([], Off, Off, On, On, Depth, Depth, Prec, Prec) :- !.
439options_to_format([O|Os], Off0, Off, On0, On, Depth0, Depth, Prec0, Prec) :- !,
440	option_to_format(O, ThisOff, ThisOn, ThisDepth, ThisPrec),
441	Off1 is Off0 /\ \ThisOn \/ ThisOff,
442	On1 is On0 /\ \ThisOff \/ ThisOn,
443	( var(ThisDepth) -> Depth1 = Depth0 ; Depth1 = ThisDepth ),
444	( var(ThisPrec) -> Prec1 = Prec0 ; Prec1 = ThisPrec ),
445	options_to_format(Os, Off1, Off, On1, On, Depth1, Depth, Prec1, Prec).
446options_to_format(_, _, _, _, _, _, _, _, _) :-
447	set_bip_error(5).
448
449    option_to_format(Junk, _, _, _, _) :- var(Junk), !,
450	set_bip_error(4).
451    option_to_format(Option, C, S, D, _P) :-
452	option_format(Option, C, S, D), !.
453    option_to_format(Option, C, S, D, _P) :-
454	option_format_compat(Option, C, S, D), !.
455    option_to_format(precedence(P0), 0, 0, _D, P) :- !,
456    	P = P0.
457    option_to_format(priority(P0), 0, 0, _D, P) :- !,	% SICStus/SWI compat
458    	P = P0.
459    option_to_format(Junk, _, _, _, _) :- compound(Junk), !,
460	set_bip_error(6).
461    option_to_format(_, _, _, _, _) :-
462	set_bip_error(5).
463
464    % same as before, but just check
465    valid_write_option(Junk) :- var(Junk), !,
466    	fail.
467    valid_write_option(Option) :-
468	option_format(Option, _, _, _), !.
469    valid_write_option(Option) :-
470	option_format_compat(Option, _, _, _), !.
471    valid_write_option(precedence(_)).
472    valid_write_option(priority(_)).
473
474
475options_from_format(On, Depth, Options) :-
476	findall(Option, (
477		option_format(Option, _, Bits, Depth),
478		On /\ Bits =\= 0
479	    ), Options0),
480	once option_format(depth(full), _, FullDepthBit, _),
481	( On /\ FullDepthBit =:= 0, Depth \== 0 ->
482	    Options = [depth(Depth)|Options0]
483	;
484	    Options = Options0
485	).
486
487
488% Output options
489%
490% ISO compatible:	ignore_ops, quoted, numbervars
491% SICStus compatible:	max_depth, portrayed
492% CAUTION: The numeric constants must match the definitions in ec_io.h!
493
494% option_format(?Option, -BitsToClear, -BitsToSet, ?MaxDepth).
495:- mode option_format(?,-,-,?).
496option_format(variables(anonymous),	16'4030, 16'4000, _).	% VAR_ANON
497option_format(variables(default),	16'4030, 16'0000, _).
498option_format(variables(raw),		16'4030, 16'0010, _).	% VAR_NUMBERS
499option_format(variables(full),		16'4030, 16'0020, _).	% VAR_NAMENUM
500option_format(attributes(none),		16'0500, 16'0000, _).
501option_format(attributes(pretty),	16'0500, 16'0100, _).	% ATTRIBUTE
502option_format(attributes(full),		16'0500, 16'0400, _).	% STD_ATTR
503option_format(as(term),			16'1200, 16'0000, _).
504option_format(as(clause),		16'1200, 16'1000, _).	% WRITE_CLAUSE
505option_format(as(goal),			16'1200, 16'0200, _).	% WRITE_GOAL
506option_format(newlines(true),		16'0000, 16'2000, _).	% DONT_QUOTE_NL
507option_format(newlines(false),		16'2000, 16'0000, _).
508option_format(operators(true),		16'0001, 16'0000, _).
509option_format(operators(false),		16'0000, 16'0001, _).	% CANONICAL
510option_format(dotlists(true),		16'0000, 16'0004, _).	% DOTLIST
511option_format(dotlists(false),		16'0004, 16'0000, _).
512option_format(transform(true),		16'0800, 16'0000, _).
513option_format(transform(false),		16'0000, 16'0800, _).	% NO_MACROS
514option_format(quoted(true),		16'0000, 16'0008, _).	% QUOTED
515option_format(quoted(false),		16'0008, 16'0000, _).
516option_format(numbervars(true),		16'0000, 16'8000, _).	% OUT_DOLLAR_VAR
517option_format(numbervars(false),	16'8000, 16'0000, _).
518option_format(portrayed(true),		16'0000, 16'0040, _).	% PRINT_CALL
519option_format(portrayed(false),		16'0040, 16'0000, _).
520option_format(depth(full),		16'0000, 16'0002, 0).	% FULLDEPTH
521option_format(depth(N),			16'0002, 16'0000, N).
522option_format(compact(true),		16'0000, 16'0080, _).	% WRITE_COMPACT
523option_format(compact(false),		16'0080, 16'0000, _).
524option_format(fullstop(true),          16'00000,16'20000, _).	% TERM_FULLSTOP
525option_format(fullstop(false),         16'20000,16'00000, _).
526option_format(nl(true),                16'00000,16'40000, _).	% TERM_NEWLINE
527option_format(nl(false),               16'40000,16'00000, _).
528
529option_format_compat(ignore_ops(true),	16'0000, 16'0805, _).	% ISO compat
530option_format_compat(ignore_ops(false),	16'0805, 16'0000, _).
531option_format_compat(max_depth(0),	16'0000, 16'0002, 0).	% SICS compat
532option_format_compat(max_depth(N),	16'0002, 16'0000, N).
533
534
535%
536% term_string(?Term, ?String, +Options)
537% This is currently strict wrt Options: they must fit with the direction.  % SWI is permissive and ignores all unknown options in either direction.
538%
539
540:- export term_string/3.
541:- tool(term_string/3, term_string_/4).
542
543term_string_(T, S, Options, Module) :- var(S), !,
544	open(string(""), write, Stream),
545	filter_options(Options, write, WOptions),
546	write_term_(Stream, T, [
547	    attributes(full),quoted(true),numbervars(true),
548	    variables(raw),depth(full),transform(false)|WOptions], Module),
549	stream_info_(Stream, 0, S),  % = get_stream_info(Stream,name,S)
550	close(Stream).
551term_string_(T, S, Options, Module) :- string(S), !,
552	( S \== "" ->
553	    open(string(S), read, Stream),
554	    (
555		filter_options(Options, read, ROptions),
556		read_term_(Stream, T0, ROptions, Module),
557		read_token_(Stream, end_of_file, _, Module)
558	    ->
559		close(Stream),
560		T = T0
561	    ;
562		close(Stream),
563		error(7, term_string(T, S), Module)
564	    )
565	;
566	    error(7, term_string(T, S), Module)
567	).
568term_string_(T, S, Options, Module) :-
569	error(5, term_string(T, S, Options), Module).
570
571    filter_options([Option|Options], RW, FOptions) ?- !,
572	( ignore_option(Option, RW) ->
573	    FOptions = FOptions0
574	;
575	    FOptions = [Option|FOptions0],
576	    get_bip_error(_)	% clear the error code
577	),
578	filter_options(Options, RW, FOptions0).
579    filter_options(Options, _RW, Options).	% [] and non-lists (for error later)
580
581    % fails with bip_error
582    ignore_option(Option, read) ?- option_to_format(Option, _, _, _, _).
583    ignore_option(Option, write) ?- check_read_option(Option).
584
585
586%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
587
588/**** REMEMBER TO UPDATE annotated_term used in raw form by expand_macros
589 **** and friends when changing the definition here
590 **** definition now moved to kernel.pl, update it there
591:- export struct(annotated_term(
592	term,		% var, atomic or compound
593	type,		% atom or var/1
594	file,		% atom
595	line,		% integer
596	from,		% integer
597	to		% integer
598	% may be extended in future
599    )).
600****/
601
602:- export read_annotated/2.
603:- tool(read_annotated/2, read_annotated_/3).
604
605read_annotated_(Stream, AnnTerm, Module) :-
606	read_annotated_raw(Stream, RawAnnTerm, HasMacros, Module),
607	( HasMacros == 1 ->
608	    unannotate_term(RawAnnTerm, RawTerm),
609	    expand_macros_annotated_(RawTerm, RawAnnTerm, _Term, AnnTerm, Module)
610	;
611	    AnnTerm = RawAnnTerm
612	).
613
614
615:- export read_annotated/3.
616:- tool(read_annotated/3, read_annotated_/4).
617
618read_annotated_(Stream, Term, AnnTerm, Module) :-
619	read_annotated_raw(Stream, RawAnnTerm, HasMacros, Module),
620	unannotate_term(RawAnnTerm, RawTerm),
621	( HasMacros == 1 ->
622	    expand_macros_annotated_(RawTerm, RawAnnTerm, Term, AnnTerm, Module)
623	;
624	    Term = RawTerm, AnnTerm = RawAnnTerm
625	).
626
627
628unannotate_term(end_of_file, Term) :- -?->
629	Term = end_of_file.
630unannotate_term(annotated_term{term:TermAnn}, Term) :- -?->
631	( compound(TermAnn) ->
632	    functor(TermAnn, F, A),
633	    functor(Term, F, A),
634	    unannotate_term_args(A, TermAnn, Term)
635	;
636	    Term = TermAnn
637	).
638
639    unannotate_term_args(0, _TermAnn, _Term) :- !.
640    unannotate_term_args(I, TermAnn, Term) :-
641	    I1 is I-1,
642	    arg(I, TermAnn, AnnArg),
643	    arg(I, Term, Arg),
644	    unannotate_term(AnnArg, Arg),
645	    unannotate_term_args(I1, TermAnn, Term).
646
647
648
649%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
650
651% write the header for a .eco file
652
653dump_header(Out) :-
654	% magic .eco header (see procedure.c)
655	put(Out, 16'EC), put(Out, 16'1C), put(Out, 16'29),
656	put(Out, 16'16),	% ECO_CURRENT_VERSION, see procedure.c
657	% flush before switching to scramble mode
658	flush(Out),
659	% next line contains key that must be used in the .eco loader
660	set_stream_property(Out, scramble, 73540),
661	% 8 random bytes to make decryption more difficult
662	% (it may be better to have one after every dumped term)
663	random(R), get_flag(unix_time, T),
664	R1 is R/\255, R2 is R>>8/\255, R3 is R>>16/\255, R4 is R>>24/\255,
665	R5 is T/\255, R6 is T>>8/\255, R7 is T>>16/\255, R8 is T>>24/\255,
666	put(Out, R1), put(Out, R7), put(Out, R3), put(Out, R5),
667	put(Out, R2), put(Out, R8), put(Out, R4), put(Out, R6).
668
669
670% write a term in .eco format
671
672dump_term(Out, Term, Module) :-
673        term_to_bytes_(Term, String, Module),
674        string_length(String, Length),
675        write_integer(Out, Length),
676        printf(Out, "%Tw", String).             % no macros!
677
678write_integer(Out, N) :-
679        Byte0 is N /\ 16'ff,
680        Byte1 is (N >> 8) /\ 16'ff,
681        Byte2 is (N >> 16) /\ 16'ff,
682        Byte3 is (N >> 24) /\ 16'ff,
683        put(Out, Byte3),
684        put(Out, Byte2),
685        put(Out, Byte1),
686        put(Out, Byte0).
687
688
689%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
690
691:- mode file_query_body(++, +, +).
692file_query_body(call(Goal), _, M) :-		% call/1 forces execution
693	!,
694	call(Goal)@M.
695file_query_body((A, B), Proc, M) :-
696	!,
697	file_query_body(A, Proc, M),
698	file_query_body(B, Proc, M).
699file_query_body((A->B;C), Proc, M) :-
700	!,
701	(file_query_body(A, Proc, M) ->
702	    file_query_body(B, Proc, M)
703	;
704	    file_query_body(C, Proc, M)
705	).
706file_query_body((A;B), Proc, M) :-
707	!,
708	(
709	    file_query_body(A, Proc, M)
710	;
711	    file_query_body(B, Proc, M)
712	).
713file_query_body([File|L], Proc, M) :-
714	!,
715	call_proc(Proc, File, M),
716	(L == [] ->
717	    true
718	;
719	    file_query_body(L, Proc, M)
720	).
721file_query_body(compile(File), Proc, M) :-
722	!,
723	(File = [_|_] ->
724	    file_query_body(File, Proc, M)
725	;
726	    call_proc(Proc, File, M)
727	).
728file_query_body(ensure_loaded(Files), Proc, M) :-
729	!,
730	(Files = [_|_] ->
731	    file_query_body(Files, Proc, M)
732	;
733	    call_proc(Proc, Files, M)
734	).
735file_query_body(:-(Goal), Proc, M) :-
736        !,
737        file_query_body(Goal, Proc, M).
738file_query_body(?-(Goal), Proc, M) :-
739        !,
740        file_query_body(Goal, Proc, M).
741file_query_body(meta_attribute(_, _), _, M) :-
742        !,
743        meta_attribute(M, []).
744file_query_body(Goal, _Proc, M) :-
745	execute(Goal) ->
746	    call(Goal)@M
747	;
748	    true.
749
750:- mode execute(+).
751execute(use_module(_)).
752execute(define_struct(_)).	% library(structures)
753execute(erase_struct(_)).
754execute(op(_, _, _)).
755execute(global_op(_, _, _)).
756execute(local_op(_, _, _)).
757execute(set_flag(A, _)) :- allowed_flag(A).
758execute(get_flag(_, _)).
759execute(define_global_macro(_, _, _)).
760execute(define_local_macro(_, _, _)).
761execute(define_macro(_, _, _)).
762execute(erase_macro(_)).
763execute(set_chtab(_, _)).
764execute(asserta(_)).
765execute(assert(_)).
766execute(assertz(_)).
767execute(compile_term(_)).
768execute(cprolog).
769execute(quintus).
770execute(bsi).
771execute(sicstus).
772
773:- mode allowed_flag(+).
774allowed_flag(library_path).
775allowed_flag(macro_expansion).
776allowed_flag(prolog_suffix).
777
778call_proc(Proc, File, M) :-
779	copy_term(Proc, Copy),
780	arg(1, Copy, File),
781	call(Copy)@M.
782
783%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
784%
785%	EXEC
786%
787
788exec(Command, Streams) :-
789	exec(Command, Streams, Pid, 2),	% fails on error
790	!,
791	wait(Pid, Code),		% waitpid()
792	( Code /\ 8'377 =:= 0 ->	% process exited normally
793	    Status is Code >> 8 /\ 8'377,
794	    Err is Status - 128,
795	    (Err > 0 ->
796		set_last_errno(Err),
797		error(170, exec(Command, Streams))
798	    ;
799		true
800	    )
801	; Code /\ 8'377 =:= 8'177 ->	% process stopped
802	    error(175, exec(Command, Streams))
803	;				% process died
804	    error(174, exec(Command, Streams))
805	).
806exec(Command, Streams) :-
807	bip_error(exec(Command, Streams)).
808
809
810exec(Command, Streams, Pid) :-
811	exec(Command, Streams, Pid, 0), !.
812exec(Command, Streams, Pid) :-
813	bip_error(exec(Command, Streams, Pid)).
814
815exec_group(Command, Streams, Pid) :-
816	exec(Command, Streams, Pid, 1), !.
817exec_group(Command, Streams, Pid) :-
818	bip_error(exec_group(Command, Streams, Pid)).
819
820
821%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
822%
823%	Sh
824%
825
826system(X) :-
827	( get_flag('_system'/1, defined, on) ->
828	    '_system'(X)
829	;
830	    exec(['/bin/sh', '-c', X], [])
831	).
832
833sh(X) :-
834	system(X).
835
836%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
837%
838%	MACROS
839%
840
841phrase_body(Grammar, S, R, M) :-
842	var(Grammar),
843	!,
844	error(4, phrase(Grammar, S, R), M).
845phrase_body(Grammar, S, R, M) :-
846	check_grammar(Grammar, S, R, M, NewGr),
847	call(NewGr)@M.
848
849check_grammar(Grammar, S, R, M, NewGr) :-
850	((number(Grammar) ; string(Grammar)) ->
851	    error(5, phrase(Grammar, S, R), M)
852	;
853	    true
854	),
855	Grammar =.. [F | L],
856	append(L, [S, R], NL),
857	NewGr =.. [F | NL].
858
859phrase_body(Grammar, S, M) :-
860	phrase_body(Grammar, S, [], M).
861
862%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
863%
864%	FILES
865%
866get_file_info(File, Name, Value) :-
867	check_atom_string(File),
868	check_var_or_atom(Name),
869	check_var_or_atomic(Value),
870	!,
871	expand_filename(File, ExpandedFile, 1),  % EXPAND_STANDARD
872	do_get_file_info(ExpandedFile, Name, Value).
873get_file_info(File, Name, Value) :-
874	bip_error(get_file_info(File, Name, Value)).
875
876
877% This predicate expects an already expanded file name!
878do_get_file_info(File, device, X) :-
879	sys_file_flag(File, 9, X).
880do_get_file_info(File, inode, X) :-
881	sys_file_flag(File, 1, X).
882do_get_file_info(File, mode, X) :-
883	sys_file_flag(File, 0, X).
884do_get_file_info(File, nlink, X) :-
885	sys_file_flag(File, 2, X).
886do_get_file_info(File, uid, X) :-
887	sys_file_flag(File, 3, X).
888do_get_file_info(File, uname, X) :-
889	sys_file_flag(File, 15, X).
890do_get_file_info(File, gid, X) :-
891	sys_file_flag(File, 4, X).
892do_get_file_info(File, gname, X) :-
893	sys_file_flag(File, 16, X).
894do_get_file_info(File, size, X) :-
895	sys_file_flag(File, 5, X).
896do_get_file_info(File, atime, X) :-
897	sys_file_flag(File, 6, X).
898do_get_file_info(File, adate, X) :-
899	sys_file_flag(File, 12, X).
900do_get_file_info(File, mtime, X) :-
901	sys_file_flag(File, 7, X).
902do_get_file_info(File, mdate, X) :-
903	sys_file_flag(File, 13, X).
904do_get_file_info(File, ctime, X) :-
905	sys_file_flag(File, 8, X).
906do_get_file_info(File, cdate, X) :-
907	sys_file_flag(File, 14, X).
908do_get_file_info(File, blksize, X) :-
909	sys_file_flag(File, 11, X).
910do_get_file_info(File, blocks, X) :-
911	sys_file_flag(File, 10, X).
912do_get_file_info(File, readable, X) :-
913	process_file_permission(readable, N),
914	sys_file_flag(File, N, X).
915do_get_file_info(File, writable, X) :-
916	process_file_permission(writable, N),
917	sys_file_flag(File, N, X).
918do_get_file_info(File, executable, X) :-
919	process_file_permission(executable, N),
920	sys_file_flag(File, N, X).
921do_get_file_info(File, type, Type) :-
922	sys_file_flag(File, 0, Mode),
923	TypeBits is Mode /\ 8'170000,
924	( TypeBits == 8'010000 -> Type = fifo
925	; TypeBits == 8'020000 -> Type = char_device
926	; TypeBits == 8'040000 -> Type = directory
927	; TypeBits == 8'060000 -> Type = block_device
928	; TypeBits == 8'100000 -> Type = file
929	; TypeBits == 8'120000 -> Type = link
930	; TypeBits == 8'140000 -> Type = socket
931	; Type = unknown
932	).
933do_get_file_info(File, compiled_time, Time) :-
934	current_compiled_file(File, Time, _Module, _Goal).
935
936
937%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
938%
939% Remote Socket Interface
940%
941% Recorded information:
942%      recorded_list(peer_info, PeerInfos) - Infotmation on all the peers
943%
944%  Each PeerInfos is a structure:
945%
946%         PeerName-peer_info(PeerType, Lang, Key, Connect)
947%
948%      PeerName: peer name (atom)
949%      PeerType: either remote or embed.
950%      Lang:     programming language of the peer (atom).
951%      Key:      the key used to access dynamic data associated with the peer.
952%      Connect:  connection information. Either:
953%                   remote(PeerHost,LocalHost,TimeOut)
954%                   embed(PeerHost,LocalHost,TimeOut)
955%
956%            PeerHost is the hostname for the peer side. Used for verifying
957%              that any accepted socket connection comes from the same host.
958%            LocalHost is the ECLiPSe side hostname specification. This is
959%              used in setting up a server connection to the peer side.
960%              What is used has implications for what the remote side can
961%              give for the server (ECLiPSe) hostname when making a client
962%              connection:
963%                   1) actual ECLiPSe side hostname: remote side must also
964%                      use this name for the server.
965%                   2) localhost: remote side must used localhost for the
966%                      server name. This restricts the connection to the
967%                      local machine.
968%                   3) not instantiated: remote side can use either
969%                      localhost or the actual server hostname.
970%              We use the same specification for all connections between
971%              ECLiPSe and the peer for security reasons.
972%            TimeOut is the time-out (in seconds) for accepting any socket
973%              connection, plus waiting time for the initial read of data
974%              during attachment. Can be 'block' for no time-outs
975%
976% Currently, the only dynamic data associated with a peer are the peer queues.
977% Each queue is an record item recorded under the Key for the peer as:
978%
979%         queue(StreamNum)
980%
981% From StreamNum, a queue key can be derived by calling get_peer_queue_key/2.
982% Information for the queue is recorded under this key:
983%
984%         peer_queue(PeerType, PeerName, QueueType, Direction)
985%
986%      PeerType: peer type for the queue: Either embed or remote
987%      PeerName: peer name for the queue.
988%      QueueType: the type of the queue. This is either:
989%             sync(SocketName): synchronous remote queue, with socket
990%             async           : asynchronous remote queue
991%             embed           : queue in a embedded peer
992%      Direction: direction of queue. Either fromec, toec or bidirect
993%
994% The above information is used to clean up a remote side when it is
995% disconnected
996%
997% Dealing with events:
998% To ensure that the remote interface protocol is followed at all times,
999% event handling is deferred during most of the code below. We only
1000% allow events during
1001%	- running rpcs
1002%	- running user goals, e.g. remote_init
1003%	- remote flush (but not if inside ec_waitio)
1004% but these goals must be safely wrapped in catch/3 to make sure the
1005% events are deferred again even on failure/throw.
1006% Note that, since we re-enable events temporarily from within events-deferred
1007% code, we cannot allow nesting, i.e. remote_accept, peer_queue_xxx and
1008% flush/waitio handlers must be called from non-events-deferred contexts.
1009%
1010% First clause is the current version of the remote protocol.
1011% The version information should not occur anywhere else on the ECLiPSe side.
1012%
1013
1014remote_version(1).
1015
1016:- local variable(rpeer_count, 0).
1017:- local variable(in_ec_waitio, []).
1018:- local struct(peer_info(type,lang,key,connect)).
1019:- local struct(peer_queue(ptype,pname,qtype,dir)).
1020
1021
1022non_interruptable(Goal) :-
1023	( events_defer ->
1024	    call(Goal),
1025	    events_nodefer
1026	;
1027	    printf(warning_output, "Warning: Illegal events_defer nesting detected during remote protocol (%w)",[Goal]),
1028	    call(Goal)
1029	).
1030
1031
1032remote_connect(Address, Control, Init, Mod) :-
1033	remote_connect_setup(Address, Control, Soc), !,
1034	printf(log_output, "Socket created at address %w%n%b", [Address]),
1035        remote_connect_accept(Control, Soc, block, Init, "", _, Mod).
1036remote_connect(Address, Control, Init, _Mod) :-
1037	error(5, remote_connect(Address, Control, Init)).
1038
1039remote_connect_setup(Host/Port, Control, Soc) :-
1040	check_var_or_integer(Port),
1041	check_var_or_atom(Control),
1042	check_var(Soc),
1043	!,
1044        copy_term(Host,OrigHost), % OrigHost can be a variable
1045	new_socket_server(Soc, Host/Port, 2),
1046        (var(Control) ->
1047	    new_remote_peer_name(Control)
1048        ;
1049	    not_existing_peer_name(Control)
1050	),
1051	recorda(remote_control_host, Control-OrigHost).
1052remote_connect_setup(Address, Control, Soc) :-
1053	bip_error(remote_connect_setup(Address, Control, Soc)).
1054
1055new_remote_peer_name(Name) :-
1056	repeat,
1057	incval(rpeer_count),
1058	getval(rpeer_count, NPeer),
1059	concat_atom([peer, NPeer], Name),
1060	not_existing_peer_name(Name), !.
1061
1062not_existing_peer_name(Name) :-
1063% fails if Name is either an existing or potential peer
1064	\+ peer(Name), \+ recorded(remote_control_host, Name-_).
1065
1066
1067remote_connect_accept(Control, Soc, TimeOut, Init, Pass, Res, Mod) :-
1068	check_nonvar(Pass),
1069	erase(remote_control_host, Control-Host),
1070	get_rpcstream_names(Control, Rpc),
1071	timed_accept(Soc, TimeOut, RemoteHost, Control),
1072	check_remote_version(Control),
1073	timed_read_exdr(Control, TimeOut, Pass0),
1074        (Pass == Pass0 -> true ; set_bip_error(1)),
1075	write_exdr(Control, Control), flush(Control),
1076	% Host is the host name that will be used in any subsequent connections
1077        timed_read_exdr(Control, TimeOut, RemoteLang),
1078	timed_accept(Soc, TimeOut, RemoteHost, Rpc),
1079	write_exdr(Rpc, Control), flush(Rpc),
1080	set_peer_property(Control, peer_info{type:remote,lang:RemoteLang,
1081                                     connect:remote(RemoteHost,Host,TimeOut)}),
1082	set_event_handler(Control, true/0),
1083	close(Soc),
1084	events_defer,	% fail if already deferred (can't handle nesting)
1085	!,
1086	catch((
1087		run_remote_init(Init, Res, Mod),
1088		remote_control_read(Control, Message),
1089		handle_ec_resume(Message, Control),
1090		events_nodefer
1091	    ), Tag, (
1092		events_nodefer,
1093		throw(Tag)
1094	    )).
1095remote_connect_accept(Control, Soc, TimeOut, Init, Pass, Res, Mod) :-
1096	(nonvar(Soc),current_stream(Soc) -> close(Soc) ; true),
1097	(nonvar(Control), current_stream(Control) -> close(Control) ; true),
1098	get_bip_error(Err),
1099	error(Err, remote_connect_accept(Control, Soc, TimeOut, Init, Pass, Res, Mod)).
1100
1101
1102check_remote_version(Control) :-
1103	(timed_read_exdr(Control, 100, RemoteVersion) ->
1104             true ; set_bip_error(6)
1105        ),
1106	get_flag(remote_protocol_version, Version),
1107	(RemoteVersion == remote_protocol(Version) ->
1108	     write_exdr(Control, "yes"), flush(Control)
1109	;    write_exdr(Control, Version), flush(Control),
1110	     printf(error, "Incompatible remote protocol on remote side: %w%n",
1111                 [RemoteVersion]),
1112	     set_bip_error(141)
1113	).
1114
1115timed_read_exdr(Stream, TimeOut, Data) :-
1116	stream_select([Stream], TimeOut, [Stream]),
1117	catch(read_exdr(Stream, Data), _, fail).
1118
1119timed_accept(Server, TimeOut, RemoteHost, NewQueue) :-
1120	stream_select([Server], TimeOut, [Server]),
1121	accept(Server, RemoteHost0/_, NewQueue),
1122	(RemoteHost = RemoteHost0 ->
1123            true ; close(NewQueue), fail
1124        ).
1125
1126
1127% events deferred!
1128run_remote_init(Init, Res, Mod) :-
1129	( nonvar(Init), var(Res) ->
1130	    catch((
1131		     events_nodefer,
1132		     (call(Init)@Mod -> Res = Init ; Res = fail),
1133		     events_defer
1134		 ),
1135		 _,
1136		 (events_defer, Res = throw)
1137            )
1138	; nonvar(Res) ->
1139	      printf(warning_output, "Warning: result argument %w for initial goal not a variable in remote_control_accept/5. Initial Goal not executed.", [Res])
1140	; true
1141	).
1142
1143peer_info(Peer, Info) :-
1144	recorded(peer_info, Peer-Info).
1145
1146peer(Peer) :-
1147	( var(Peer) ->
1148	     peer_info(Peer, _)
1149	; atom(Peer) ->
1150	     once peer_info(Peer, _)
1151	; error(5, peer(Peer))
1152	).
1153
1154peer_get_property(Peer, Property, Value) :-
1155	check_atom(Peer),
1156	check_var_or_atom(Property),
1157	!,
1158	once(peer_info(Peer, Info)),
1159	get_a_peer_property(Property, Info, Value).
1160peer_get_property(Peer, Property, Value) :-
1161	bip_error(peer_get_property(Peer,Property,Value)).
1162
1163set_embed_peer(Peer, Lang) :-
1164	\+peer(Peer),
1165	get_flag(hostname, Host),
1166	set_peer_property(Peer, peer_info{type:embed,lang:Lang,connect:embed(Host,Host,block)}).
1167
1168% all the predicates that access peer_info directly should be put here
1169get_embed_peer(Peer) :-
1170	recorded(peer_info, Peer-(peer_info{type: embed})), !.
1171
1172set_peer_property(Peer, Info) :-
1173	get_peer_dyn_info_key(Peer, Key),
1174        Info = peer_info{key: Key},
1175	recorda(peer_info, Peer-Info).
1176
1177get_a_peer_property(type, peer_info{type:Type}, Type).
1178get_a_peer_property(language, peer_info{lang:Lang}, Lang).
1179get_a_peer_property(connect, peer_info{connect:Connect}, Connect).
1180get_a_peer_property(queues, peer_info{key:Key}, Qs) :-
1181	findall(Queue,(recorded(Key,queue(Nr)),get_stream(Nr,Queue)), Qs).
1182
1183
1184peer_queue_get_property(Queue, Prop, Value) :-
1185	check_stream_spec(Queue),
1186	check_var_or_atom(Prop),
1187	!,
1188	get_queueinfo_st(Queue, _, QueueInfo),
1189	get_queueinfo_item(Prop, QueueInfo, Value).
1190peer_queue_get_property(Queue, Prop, Value) :-
1191	bip_error(peer_queue_get_property(Queue,Prop,Value)).
1192
1193
1194get_queue_info(Name, Nr, Peer, QType, Dir) :-
1195	get_queueinfo_st(Name, Nr, QueueInfo),
1196	get_queueinfo_item(peer, QueueInfo, Peer),
1197	get_queueinfo_item(type, QueueInfo, QType),
1198	get_queueinfo_item(direction, QueueInfo, Dir).
1199
1200get_queueinfo_st(Name, Nr, QueueInfo) :-
1201	current_stream(Name),
1202	get_stream_info(Name, physical_stream, Nr),
1203	get_peer_queue_key(Nr, Key),
1204	recorded(Key, QueueInfo), !.
1205
1206get_queueinfo_item(peer_type,  peer_queue{ptype:PeerType}, PeerType).
1207get_queueinfo_item(peer_name,  peer_queue{pname:Peer}, Peer).
1208get_queueinfo_item(type,      peer_queue{qtype:Type}, Type).
1209get_queueinfo_item(direction, peer_queue{dir:Direction}, Direction).
1210get_queueinfo_item(peer,  peer_queue{ptype:PeerType,pname:Peer},
1211                   PInfo) :- % for backwards compatibility
1212        PInfo =.. [PeerType, Peer].
1213
1214
1215is_remote_sync_queue(PhysicalStream, Socket, ControlStream) :-
1216	peer_queue_get_property(PhysicalStream, peer, remote(ControlStream)),
1217	peer_queue_get_property(PhysicalStream, type, sync(Socket)).
1218
1219
1220deregister_queue(Stream, Control) :-
1221	get_stream_info(Stream, physical_stream, StreamNum),
1222	get_peer_queue_key(StreamNum, Key),
1223	erase_all(Key),
1224	get_peer_dyn_info_key(Control, ControlKey),
1225	recorded(ControlKey, queue(StreamNum), Ref), !,
1226	erase(Ref).
1227
1228register_remote_queue(Name, Control, Type, Direction) :-
1229	get_stream_info(Name, physical_stream, Nr),
1230	get_peer_queue_key(Nr, Key),
1231	get_peer_dyn_info_key(Control, ControlKey),
1232	recorda(ControlKey, queue(Nr)),
1233	recorda(Key, peer_queue{ptype:remote, pname:Control, qtype:Type, dir:Direction}).
1234
1235
1236register_embed_queue(Name, Peer, Direction) :-
1237	get_stream_info(Name, physical_stream, Nr),
1238	get_peer_queue_key(Nr, Key),
1239	peer_get_property(Peer, type, embed),
1240	get_peer_dyn_info_key(Peer, PeerKey),
1241	recorda(PeerKey, queue(Nr)),
1242	recorda(Key, peer_queue{ptype:embed, pname:Peer, qtype:sync(Nr), dir:Direction}).
1243
1244
1245get_peer_dyn_info_key(Control, ControlKey) :-
1246	concat_atom([peer_dynamic_info, Control], ControlKey).
1247
1248get_peer_queue_key(N, Key) :-
1249	concat_atom([peer_queue, N], Key).
1250
1251
1252new_socket_server(Soc, Address, N) :-
1253	socket(internet, stream, Soc),
1254        bind(Soc, Address),
1255	listen(Soc, N).
1256
1257
1258peer_queue_close(Queue) :-
1259	check_stream_spec(Queue), !,
1260	get_queue_info(Queue, StreamNum, Peer, QType, _Direction),
1261	non_interruptable(
1262	    close_peer_queue_type(Peer, StreamNum, QType)
1263	).
1264peer_queue_close(Queue) :-
1265	bip_error(peer_queue_close(Queue)).
1266
1267
1268    close_peer_queue_type(remote(Peer), StreamNum, QType) :-
1269	remote_control_send(Peer, queue_close(StreamNum)),
1270	remote_control_read(Peer, ResumeMessage),
1271	close_remote_queue_eclipseside(Peer, StreamNum, QType),
1272	handle_ec_resume(ResumeMessage, Peer).
1273	close_peer_queue_type(embed(Peer), StreamNum, _QType) :-
1274	write_exdr(embed_info, queue_close(StreamNum)),
1275	flush(embed_info),
1276	close_embed_queue_eclipseside(Peer, StreamNum).
1277
1278    close_embed_queue_eclipseside(Peer, StreamNum) :-
1279	deregister_queue(StreamNum, Peer),
1280	close(StreamNum).
1281
1282    close_remote_queue_eclipseside(Control, StreamNum, QType) :-
1283	deregister_queue(StreamNum, Control),
1284	close_remote_physical_streams(QType, StreamNum).
1285
1286    close_remote_physical_streams(sync(Socket), StreamNum) :-
1287	(current_stream(StreamNum) -> close(StreamNum) ; true),
1288	(current_stream(Socket) -> close(Socket) ; true).
1289    close_remote_physical_streams(async, StreamNum) :-
1290	(current_stream(StreamNum) -> close(StreamNum) ; true).
1291
1292
1293
1294peer_queue_create(Name, Control, Sync, Direction, Event) :-
1295	non_interruptable(
1296	    peer_queue_create1(Name, Control, Sync, Direction, Event)
1297	).
1298
1299
1300peer_queue_create1(Name, Control, Sync, Direction, Event) :-
1301	(atom(Name), atom(Control), is_event(Event) ->
1302            true ; set_bip_error(5)
1303        ),
1304	peer_get_property(Control, connect, Type),
1305	(Sync == sync ->
1306	    (Direction == fromec ; Direction == toec ; set_bip_error(6))
1307	;
1308	 (Sync == async,  functor(Type,remote,_))
1309        ;
1310	 set_bip_error(6)
1311        ), !,
1312	create_peer_queue_type(Type, Name, Control, Sync, Direction, Event).
1313peer_queue_create1(Name, Control, Sync, Direction, Event) :-
1314	get_bip_error(E),
1315	error(E, peer_queue_create(Name, Control, Sync, Direction, Event)).
1316
1317
1318    % events deferred!
1319    create_peer_queue_type(remote(PeerHost,LocalHost,TimeOut), Name, Control, Sync, Direction, Event) ?-
1320	new_socket_server(Soc, LocalHost/Port, 1),
1321	remote_control_send(Control, socket_client(Port, Name, Sync, Direction)),
1322	remote_control_read(Control, ResumeMessage),
1323	(is_disconnection(ResumeMessage) ->
1324	    close(Soc),
1325	    handle_ec_resume(ResumeMessage, Control)
1326	;
1327	 ResumeMessage = socket_connect(Name,Status) ->
1328	    connect_remote_queue(Status, Soc, Name, Control, Sync, Direction, Event, TimeOut, PeerHost, Return),
1329	 Return \== fail % fails if connection failed
1330	;
1331	 printf(error, "Unexpected control message %w while creating peer queue %w on remote side %w; disconnecting.%n", [ResumeMessage, Name, Control]),
1332         close(Soc),
1333	 handle_ec_resume(disconnect, Control)
1334        ).
1335    create_peer_queue_type(embed(_,_,_), Name, _Peer, _Sync, Direction, Event) ?-
1336	ecl_create_embed_queue(Name, Direction, Event),
1337	get_stream_info(Name, physical_stream, Nr),
1338	write_exdr(embed_info, queue_connect(Name, Nr, Direction)),
1339	flush(embed_info).
1340
1341
1342    ecl_create_embed_queue(Name, Direction, Event) :-
1343	get_embed_peer(Peer),
1344	(Direction == fromec ->
1345	    Options = [yield(on)],
1346	    Mode = write
1347	;
1348	    (Event == '' -> Options = [yield(on)] ; Options = [event(Event)]),
1349	    Mode = read
1350	),
1351	open(queue(""), Mode, Name, Options),
1352	register_embed_queue(Name, Peer, Direction).
1353
1354
1355    is_disconnection(disconnect).
1356    is_disconnection(disconnect_resume).
1357    is_disconnection(end_of_file).
1358
1359    % events deferred!
1360    connect_remote_queue(success, Soc, Name, Control, Sync, Direction, Event, TimeOut, RHost, StreamId) :-
1361	catch(
1362	      (create_remote_queue(Sync, Direction, Soc, Name, Control, TimeOut, RHost, Event) ->
1363                   get_stream_info(Name, physical_stream, StreamId)
1364	      ;
1365		  % Timed out or other problem
1366		  close(Soc),
1367		  StreamId = fail
1368              ), _, ((current_stream(Soc) -> close(Soc);true), StreamId = fail)
1369	),
1370	remote_control_send(Control, socket_accept(Name,StreamId)),
1371	remote_control_read(Control, ResumeMessage),
1372	handle_ec_resume(ResumeMessage, Control).
1373    connect_remote_queue(fail, Soc, Name, Control, _, _, _, _, _, StreamId) :-
1374	close(Soc),
1375	StreamId = fail,
1376	remote_control_send(Control, socket_accept(Name, fail)),
1377	remote_control_read(Control, ResumeMessage),
1378	handle_ec_resume(ResumeMessage, Control).
1379
1380
1381    create_remote_queue(async, _, Soc, Name, Control, TimeOut, RHost, Event) ?-
1382	remote_create_async_queue(Soc, Name, Control, TimeOut, RHost, Event).
1383    create_remote_queue(sync, fromec, Soc, Name, Control, TimeOut, RHost, Event) ?-
1384	remote_create_fromec_queue(Soc, Name, Control, TimeOut, RHost, Event).
1385    create_remote_queue(sync, toec, Soc, Name, Control, TimeOut, RHost, Event) ?-
1386	remote_create_toec_queue(Soc, Name, Control, TimeOut, RHost, Event).
1387
1388    % memory queue needed to allow eof event to be raised reading empty queue
1389    remote_create_toec_queue(Soc, Name, Control, TimeOut, RemoteHost, Event) :-
1390	open(queue(""), update, Name),
1391	concat_atom([Name, soc], SocName),
1392	timed_accept(Soc, TimeOut, RemoteHost, SocName),
1393	close(Soc),
1394	(Event == '' ->
1395	    set_stream_property(Name, yield, on)
1396        ;   set_stream_property(Name, event, Event)
1397        ),
1398	register_remote_queue(Name, Control, sync(SocName), toec).
1399
1400
1401    % memory queue needed for buffering output.
1402    % Event is dummy for now, to be used for remote side requesting data
1403    remote_create_fromec_queue(Soc, Name, Control, TimeOut, RemoteHost, _Event) :-
1404	open(queue(""), update, Name, [yield(on)]),
1405	concat_atom([Name, soc], SocName),
1406	timed_accept(Soc, TimeOut, RemoteHost, SocName),
1407	close(Soc),
1408	register_remote_queue(Name, Control, sync(SocName), fromec).
1409
1410    remote_create_async_queue(Soc, Name, Control, TimeOut, RemoteHost, Event) :-
1411    % use Control to remember which remote process this stream is connected to
1412	timed_accept(Soc, TimeOut, RemoteHost, Name),
1413	(Event == '' ->
1414	    true
1415        ;   set_stream_property(Name, event, Event)
1416        ),
1417	close(Soc),
1418	register_remote_queue(Name, Control, async, bidirect).
1419
1420
1421% returns end_of_file as a message if something goes wrong
1422remote_control_read(Control, Message) :-
1423	catch((read_exdr(Control, Message) -> true ; Message = end_of_file),
1424	           _, Message = end_of_file
1425	).
1426
1427% catches any prblems before sending control message
1428remote_control_send(Control, Message) :-
1429	(stream_select([Control], 0, [Control]) ->
1430	    % unexpected message arrived on control stream
1431	    remote_control_read(Control, InMessage),
1432	    ((InMessage == disconnect_resume; InMessage == end_of_file) ->
1433		% unilateral disconnect from remote side; disconnect locally now
1434		remote_cleanup(Control),
1435		throw(peer_abort_disconnected)
1436	    ;   printf(error, "Unexpected incoming message %w on remote %w.\n", [InMessage,Control]),
1437	        throw(peer_abort_error)
1438	    )
1439	;
1440	    write_exdr(Control, Message),
1441	    flush(Control)
1442	).
1443
1444
1445:- local finalization(disconnect_remotes).
1446
1447disconnect_remotes :-
1448	recorded_list(peer_info, Remotes),
1449	disconnect_remotes(Remotes).
1450
1451disconnect_remotes([]).
1452disconnect_remotes([Control-_|Controls]) :-
1453	remote_disconnect(Control),
1454	disconnect_remotes(Controls).
1455
1456
1457remote_disconnect(Control) :-
1458	((nonvar(Control), current_stream(Control),
1459          peer_get_property(Control,type,remote)
1460         ) ->
1461	    remote_control_send(Control, disconnect),
1462	    (read_exdr(Control, disconnect_resume) ->
1463		remote_cleanup(Control)
1464	    ;   % if not resume, then problem....
1465	        true
1466	    )
1467	;   true  % Control is not a current remote peer...
1468	).
1469
1470
1471% events not deferred!
1472remote_output(PhysicalStream, ControlStream, RemoteStream) :-
1473	non_interruptable((
1474	    read_string(PhysicalStream, end_of_file, Len, Data),
1475	    yield_to_remote(ControlStream, ec_flushio(PhysicalStream, Len), RemoteStream, Data)
1476	)).
1477
1478    % events deferred!
1479    yield_to_remote(ControlStream, YieldMessage, DataStream, Data) :-
1480	remote_control_send(ControlStream, YieldMessage),
1481	write(DataStream, Data),
1482	flush(DataStream),
1483	remote_control_read(ControlStream, ResumeMessage),
1484	handle_ec_resume(ResumeMessage, ControlStream).
1485
1486
1487% events not deferred!
1488remote_input(PhysicalStream, ControlStream) :-
1489	non_interruptable((
1490	    remote_control_send(ControlStream, ec_waitio(PhysicalStream)),
1491	    wait_for_remote_input(PhysicalStream, ControlStream)
1492	)).
1493
1494    % wait for remote input to arrive, handle any messages before this,
1495    % data is then copied from the socket to the queue stream (physical stream)
1496    % events deferred!
1497    wait_for_remote_input(PhysicalStream, ControlStream) :-
1498	% we expect at least one rem_flushio-message and a resume
1499	setval(in_ec_waitio, PhysicalStream),
1500	remote_control_read(ControlStream, Message0),
1501	expect_control(ControlStream,
1502		[rem_flushio(PhysicalStream, _), rem_flushio(PhysicalStream)],
1503		Message0, Message1),
1504	handle_control(Message1, ControlStream, Message2),
1505	setval(in_ec_waitio, []),
1506	expect_control(ControlStream, [resume], Message2, _).
1507
1508
1509remote_rpc_handler(Rpc, Control) :-
1510	% The socket rpc can only handle a single rpc
1511	% the rpc goal corresponding to the control message must eventually
1512        % arrive on the Rpc socket stream
1513	stream_select([Rpc], block, [Rpc]), % wait until Rpc stream is ready..
1514	catch(execute_remote_rpc(Rpc, Control), _, handle_remote_rpc_throw(Rpc, Control)).
1515
1516    execute_remote_rpc(Rpc, Control) :-
1517	read_exdr(Rpc, Goal),
1518	events_nodefer,
1519	execute_rpc(Rpc, Goal, (
1520		events_defer,
1521		remote_control_send(Control,yield)
1522	    )).
1523
1524    handle_remote_rpc_throw(Rpc, Control) :-
1525	events_defer,
1526	remote_control_send(Control, yield),
1527	write_exdr(Rpc, throw), flush(Rpc).
1528
1529
1530% Handle initial message Message0 (and possibly further messages on Control)
1531% until we get one of the messages specified in the list Expected.
1532% The expected message itself is not handled, but returned as ExpectedMessage.
1533
1534% events deferred!
1535expect_control(Control, Expected, Message0, ExpectedMessage) :-
1536	( nonmember(Message0, Expected) ->
1537	    ( Message0 = resume ->
1538		printf(warning_output,
1539		    "Unexpected resume from remote peer %w while waiting for %w%n%b",
1540		    [Control, Expected]),
1541		% yield back and hope for the best
1542		remote_yield(Control, Message1)
1543	    ;
1544		% some other message, try to process it
1545		handle_control(Message0, Control, Message1)
1546	    ),
1547	    expect_control(Control, Expected, Message1, ExpectedMessage)
1548	;
1549	    ExpectedMessage = Message0
1550	).
1551
1552
1553% Handle initial message Message (and possibly further messages on Control).
1554% Return as soon as we get a resume message.
1555
1556% events deferred!
1557handle_ec_resume(Message, Control) :-
1558	expect_control(Control, [resume], Message, _Message).
1559
1560
1561% events deferred!
1562handle_control(rpc, Control, NextMsg) :- -?->  !, % rpc call
1563	get_rpcstream_names(Control, Rpc),
1564	remote_rpc_handler(Rpc, Control),
1565	remote_control_read(Control, NextMsg).
1566handle_control(disconnect, Control, _NextMsg) :- -?->  !, % disconnect request
1567	write_exdr(Control, disconnect_yield), % acknowledge disconnect
1568	flush(Control),
1569	remote_cleanup(Control),
1570	throw(peer_abort_disconnected).
1571handle_control(rem_flushio(Queue), Control, NextMsg) :- -?-> !,
1572	get_stream_info(Queue, device, Device),
1573	deal_with_remote_flush(Device, Queue, unknown),
1574	remote_yield(Control, NextMsg).
1575handle_control(rem_flushio(Queue, Len), Control, NextMsg) :- -?-> !,
1576	get_stream_info(Queue, device, Device),
1577	deal_with_remote_flush(Device, Queue, Len),
1578	remote_yield(Control, NextMsg).
1579handle_control(queue_create(Name,Sync,Direction,Event), Control, NextMsg) :- -?-> !,
1580	catch((
1581	   peer_queue_create1(Name, Control, Sync, Direction, Event) -> true;true),
1582           _, true
1583        ),
1584	remote_yield(Control, NextMsg).
1585handle_control(queue_close(Queue), Control, NextMsg) :- -?-> !,
1586	((current_stream(Queue),get_queue_info(Queue, Queue, remote(Control), QType, _)) ->
1587	    close_remote_queue_eclipseside(Control, Queue, QType)
1588	;   % not a remote queue, just ignore
1589	    true
1590	), remote_yield(Control, NextMsg).
1591handle_control(disconnect_resume, Control, _NextMsg) :- -?-> !,
1592% remote side already disconnected, no acknowledgement
1593	remote_cleanup(Control),
1594	throw(peer_abort_disconnected).
1595handle_control(end_of_file, Control, _NextMsg) :- -?-> !,
1596% Control is disconnected. Assume remote side disconnected unexpectedly
1597	remote_cleanup(Control),
1598	throw(peer_abort_disconnected).
1599handle_control(Message, Control, NextMsg) :-
1600	printf(error, "Unrecognised control signal %w; disconnecting.%n",
1601            [Message]),
1602	handle_control(disconnect, Control, NextMsg).
1603
1604
1605% events deferred!
1606deal_with_remote_flush(Device, Queue, Len) :-
1607	( getval(in_ec_waitio, Queue) ->
1608	    % this flush is the input corresponding to a ec_waitio
1609	    % don't handle events
1610	    catch((
1611		deal_with_remote_flush1(Device, Queue, Len) -> true ; true
1612		), _, true)	% ignore any problems with the handler
1613
1614	; events_nodefer ->     %%%% this can't fail!
1615	    % handle events during remote flush
1616	    catch((
1617		deal_with_remote_flush1(Device, Queue, Len) -> true ; true
1618		), _, true),	% ignore any problems with the handler
1619	    events_defer
1620	;
1621	    printf(error, "Unexpected events_nodefer state in remote flush %w%n", [Queue])
1622	).
1623
1624    deal_with_remote_flush1(socket, Queue, Len) ?- !,
1625	% raw socket, is an asyn. queue; user process the data
1626	get_stream_info(Queue, event, Event),
1627	error(Event, rem_flushio(Queue, Len)).
1628    deal_with_remote_flush1(_, Queue, Len) :-
1629	% non-socket case, read data into a buffer
1630	peer_queue_get_property(Queue, type, sync(SockName)),
1631	read_sync_data_to_buffer(Len, Queue, SockName).
1632
1633    read_sync_data_to_buffer(Len, Queue, SockName) :-
1634	(integer(Len) ->
1635	    (read_string(SockName, end_of_file, Len, Data) -> true ; Data = end_of_file),
1636	    write(Queue, Data)
1637	;   % Length unknown, read as exdr term
1638	    (read_exdr(SockName, Data) -> true ; Data = end_of_file),
1639	    write_exdr(Queue, Data)
1640	).
1641
1642
1643% make remote_cleanup more robust so that problems will not choke eclipse
1644% events deferred on entry, undeferred om exit
1645remote_cleanup(Control) :-
1646	catch(remote_cleanup_raw(Control), _, fail), !.
1647remote_cleanup(Control) :-
1648	printf(error, "Problem with cleaning up remote peer %w.%n", [Control]).
1649
1650
1651remote_cleanup_raw(Control) :-
1652	events_nodefer,			% to make next line work
1653	(event(Control) -> true ; true), % user defined cleanup
1654	reset_event_handler(Control),
1655	get_peer_dyn_info_key(Control, ControlKey),
1656	% get all the socket streams associated with this remote process
1657	recorded_list(ControlKey, RemoteDynInfo),
1658	cleanup_dynamic_infos(RemoteDynInfo, Control),
1659        cleanup_peer_multitask_infos(Control),
1660	erase_all(ControlKey),
1661	get_rpcstream_names(Control, Rpc),
1662	(erase(peer_info, Control-_) -> true;true),
1663	close(Rpc),
1664	close(Control).
1665
1666cleanup_dynamic_infos([Item|Infos], Control) :-
1667	(Item = queue(Queue) ->
1668             get_queue_info(Queue, StreamNum, remote(Peer), QType, _Dir),
1669	    close_remote_queue_eclipseside(Peer, StreamNum, QType)
1670	;
1671	    true
1672	),
1673	cleanup_dynamic_infos(Infos, Control).
1674cleanup_dynamic_infos([], _).
1675
1676
1677% events deferred!
1678remote_yield(Control, ResumeMessage) :-
1679	nonvar(Control),
1680        peer(Control),
1681	current_stream(Control),
1682	remote_control_send(Control, yield),
1683	remote_control_read(Control, ResumeMessage).
1684
1685% events deferred or undeferred!
1686remote_yield(Control) :-
1687        ( events_defer -> Reset=events_nodefer ; Reset=true ),
1688        remote_yield(Control, ResumeMessage),
1689        handle_ec_resume(ResumeMessage, Control),
1690        Reset.
1691
1692
1693get_rpcstream_names(Control, Rpc) :-
1694	concat_atom([Control, '_rpc'], Rpc).
1695
1696
1697%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1698% Peer Multitasking
1699
1700:- local struct(mt_peer(peer,msgq)).
1701:- local variable(peer_mt_status).
1702:- setval(peer_mt_status, off).
1703
1704peers_are_multitasking :-
1705        \+getval(peer_mt_status, off).
1706
1707peers_should_cycle :-
1708        getval(peer_mt_status, mt_set).
1709
1710
1711peer_register_multitask(Peer, MsgQ) :-
1712        (peer(Peer) ->
1713             \+ recorded(multitask_peers, mt_peer{peer:Peer}),
1714             concat_atom([Peer, multifrom], MsgQ),
1715             peer_queue_create(MsgQ, Peer, sync, fromec, ''),
1716             record(multitask_peers, mt_peer{peer:Peer,msgq:MsgQ})
1717        ;
1718             error(6, peer_register_multitask(Peer, MsgQ))
1719        ).
1720
1721peer_deregister_multitask(Peer) :-
1722        (peer(Peer) ->
1723             recorded(multitask_peers, mt_peer{peer:Peer,msgq:MsgQ}),
1724             cleanup_peer_multitask_infos(Peer),
1725             peer_queue_close(MsgQ)
1726        ;
1727             error(6, peer_deregister_multitask(Peer))
1728        ).
1729
1730peer_do_multitask(Type) :-
1731        \+peers_are_multitasking,
1732        /* multitasking will terminate if peers do not confirm multitasking */
1733        catch(( (peer_multitask_terminate,
1734                 peer_multitask_phase(Type, Err)
1735                )-> true
1736              ;     peer_end_multitask(Err)
1737              ),
1738              Tag, (peer_end_multitask(_Err2), Tag = Err)
1739        ),
1740        (nonvar(Err) -> throw(Err) ; true).
1741
1742    peer_multitask_phase(Type, Err) :-
1743        peers_mt_broadcast_with_cleanup(start_multitask(Type), Err),
1744        (nonvar(Err) -> true ; peers_mt_cycle(Err)),
1745        peer_end_multitask(Err).
1746
1747/* ensure that multitask phase is ended properly: if failure or
1748   throw occurs, broadcast end_multitask again */
1749peer_end_multitask(Err) :-
1750        catch(( (peers_mt_broadcast_with_cleanup(end_multitask, Err),
1751                 peer_multitask_off
1752                 ) -> true
1753                ;     peer_end_multitask(Err)
1754              ), _, peer_end_multitask(_)).
1755
1756
1757peer_multitask_terminate :-    setval(peer_mt_status, mt_reset).
1758peer_multitask_confirm   :-    setval(peer_mt_status, mt_set).
1759peer_multitask_off   :-    setval(peer_mt_status, off).
1760
1761% avoids pushing witness pword onto global stack by avoiding a CP here
1762% all peer_mt_status state must be given by the clauses
1763do_peers_mt_cycle(mt_set, Err) ?-
1764        sleep(0.01),
1765        peers_mt_broadcast_with_cleanup(interact, Err),
1766        peers_mt_cycle(Err).
1767do_peers_mt_cycle(mt_reset, _Err) ?- true.
1768do_peers_mt_cycle(off, _Err) ?- true.
1769
1770peers_mt_cycle(Err) :-
1771        getval(peer_mt_status, Status),
1772        do_peers_mt_cycle(Status, Err).
1773
1774peers_mt_broadcast_with_cleanup(Msg, Err) :-
1775        % rollback the garbage generated by peers_mt_broadcast/2
1776        % if no error occurred
1777        (peers_mt_broadcast(Msg, Err), nonvar(Err) -> true ; true).
1778
1779
1780peers_mt_broadcast(Msg, Err) :-
1781        recorded_list(multitask_peers, Ps),
1782        (Ps \== [] ->
1783             peers_mt_broadcast1(Ps, Msg, Err)
1784        ;
1785             peer_multitask_terminate,
1786             (Err = peer_multitask_empty -> true; true)
1787        ).
1788
1789peers_mt_broadcast1([], _, _).
1790peers_mt_broadcast1([mt_peer{peer:Peer,msgq:MQ}|Ps], Msg, Err) :-
1791        catch(send_mt_message(MQ, Msg), Tag,
1792              peer_mt_error_recover(Tag,Peer,Err)),
1793        peers_mt_broadcast1(Ps, Msg, Err).
1794
1795
1796send_mt_message(ToPQ, Msg) :-
1797        % ignore failure (invalid terms substituted by _)
1798        (write_exdr(ToPQ, Msg) -> true;true),
1799        flush(ToPQ).
1800
1801% First case happens if a remote peer has disconnected. In this case, the
1802% remote peer code should have cleaned up already
1803peer_mt_error_recover(peer_abort_disconnected, _, _) :- !.
1804peer_mt_error_recover(abort, _Peer, Err) :- !,
1805        % abort raised. Stop multitasking and allow abort to continue
1806        peer_multitask_terminate,
1807        (Err = abort -> true ; true).
1808peer_mt_error_recover(Tag, Peer, Err) :-
1809        % something went wrong, remove problematic peer from multitasking
1810        % list and end multitask, follow by aborting with first error
1811        peer(Peer),
1812        cleanup_peer_multitask_infos(Peer),
1813        peer_multitask_terminate,
1814        (Tag = Err -> true ; true).
1815
1816cleanup_peer_multitask_infos(Peer) :-
1817        (erase(multitask_peers, mt_peer{peer:Peer}) -> true ; true).
1818
1819
1820%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1821
1822% Allow users to trace grammar rules through phrase/2/3
1823?- unskipped
1824	phrase_body/3,
1825	phrase_body/4.
1826
1827% tool interfaces must be set to skipped explicitely
1828:- skipped
1829	file_query/2,
1830	global_op/3,
1831	op/3,
1832	read_token/2.
1833
1834% Set all output predicates to skipped in order not to trace the
1835% flush event handler (io_yield_handler) when it happens.
1836:- skipped
1837	flush/1,
1838	display/1,
1839	display/2,
1840	nl/0,
1841	nl/1,
1842	put/1,
1843	put/2,
1844	print/1,
1845	print/2,
1846	printf/2,
1847	printf/3,
1848	tyo/1,
1849	tyo/2,
1850	write/1,
1851	write/2,
1852	write_canonical/1,
1853	write_canonical/2,
1854	write_exdr/2,
1855	write_term/2,
1856	write_term/3,
1857	writeln/1,
1858	writeln/2,
1859	writeq/1,
1860	writeq/2.
1861
1862:- untraceable
1863	make/0.
1864
1865:- export
1866	file_query/2.
1867