1/* BEGIN LICENSE BLOCK
2 * Version: CMPL 1.1
3 *
4 * The contents of this file are subject to the Cisco-style Mozilla Public
5 * License Version 1.1 (the "License"); you may not use this file except
6 * in compliance with the License.  You may obtain a copy of the License
7 * at www.eclipse-clp.org/license.
8 *
9 * Software distributed under the License is distributed on an "AS IS"
10 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied.  See
11 * the License for the specific language governing rights and limitations
12 * under the License.
13 *
14 * The Original Code is  The ECLiPSe Constraint Logic Programming System.
15 * The Initial Developer of the Original Code is  Cisco Systems, Inc.
16 * Portions created by the Initial Developer are
17 * Copyright (C) 1994-2006 Cisco Systems, Inc.  All Rights Reserved.
18 *
19 * Contributor(s):
20 *
21 * END LICENSE BLOCK */
22
23/**********************************************************************
24**      System: Parallel Distributed System
25**        File: worker.c
26**      Author: Shyam Mudambi
27**		Liang-Liang Li
28** Description: Slave process mps setup and handling routines
29***********************************************************************/
30
31#include "config.h"
32#include <sys/types.h>
33#include <sys/param.h>
34#include <signal.h>
35#include <stdlib.h>
36#include <stdio.h>
37#include <sys/time.h>
38#include <sys/times.h>
39
40#ifdef HAVE_UNISTD_H
41#include <unistd.h>
42#endif
43
44#if HAVE_STRING_H
45#  include <string.h>
46#  ifdef MEMCPY_STRING
47#    define bcopy(s1, s2, n)	(void) memcpy((void *)(s2),(void *)(s1), n)
48#  endif
49#endif
50#ifdef MEMCPY_MEMORY
51#  include <memory.h>
52#  define bcopy(s1, s2, n)	(void) memcpy((char *)(s2), (char *)(s1), n)
53#endif
54
55#ifdef PATH_IN_LIMITS
56#  include 	<limits.h>
57#  define MAX_PATH_LEN	PATH_MAX
58#else
59#  include <sys/param.h>
60#  define MAX_PATH_LEN	MAXPATHLEN
61#endif
62
63#ifdef BSD_TIMES
64#include <sys/timeb.h>
65#endif
66
67#if !defined(HAVE_GETHOSTID)
68#include <sys/utsname.h>	/* for uname() */
69#endif
70
71#ifdef HAVE_SYS_SYSTEMINFO_H
72#include <sys/systeminfo.h>
73#endif
74
75#include "sepia.h"
76#include <pds.h>   /* pds.h has to be included before types.h since both
77                      define machine word size dependent types */
78#include <nsrv.h>
79#include "types.h"
80#include "embed.h"
81#include "error.h"
82#include "mem.h"
83#include "dict.h"
84#include "memman.h"
85#include "trace.h"
86#include "wm_msgs.h"
87#include "wm.h"
88
89
90#define ST_HANDLE_DS_DEFINED 1
91#include "sch_types.h"
92#include "sch_eng_interface.h"
93
94
95/*------------------------------------------------------*/
96/* Types and Constant Definitions */
97/*------------------------------------------------------*/
98
99#define ParallelWorker (ec_options.parallel_worker)
100
101#define Notify(text) \
102{ if (worker_verbose) { printf("%d: %s\n", ParallelWorker, text); }}
103
104typedef struct worker_struct * worker_ptr;
105struct worker_struct {
106  int index;
107  int pid;
108  nsrv_name_t bport_name;
109  bport_id_t bport_id;
110  aport_id_t wm_aport_id;
111  int status;
112  worker_ptr next;
113} ;
114
115typedef struct worker_struct worker_t;
116
117typedef struct {
118  int num_workers;
119  worker_ptr list;
120} hdr_worker_list_t;
121
122/*------------------------------------------------------*/
123/* External Variables */
124/*------------------------------------------------------*/
125
126extern RETSIGTYPE sigmsg_handler();
127
128extern void	sch_wake_eng(),
129		sch_idle_eng(),
130		ec_worker_cleanup(),
131		unblock_signals();
132
133/* extern void	exit(); */
134
135extern void short_sleep();
136void poll_short_sleep();
137#ifdef WORKER_TRACING
138extern void trace_types_init();
139extern trace_header_t * get_trace_ptr();
140#endif
141
142/*------------------------------------------------------*/
143/* Global Variables */
144/*------------------------------------------------------*/
145
146aport_id_t	wm_halt1_aport_id;
147aport_id_t	wm_halt2_aport_id;
148aport_id_t	wm_high_aport_id;
149aport_id_t	wm_low_aport_id;  /* used in trace.c */
150
151static int worker_verbose = 0;
152static hdr_worker_list_t worker_list;
153
154st_handle_t root_id;
155
156nsrv_name_t bdomain_key;
157nsrv_name_t session_key;
158static nsrv_name_t my_port_name;
159static nsrv_name_t my_signature;
160bdomain_t bdomain;
161
162
163bdomain_id_t domain_id; /* only used if this worker creates a new
164			   shared memory domain */
165
166status_msg_t status;
167host_status_msg_t host_status;
168
169static bport_t	wm_bport;
170aport_id_t	aports[TOTAL_APORT_NUMBER];
171static bport_t my_bport;
172static bport_id_t my_bport_id;
173static int my_pid;
174
175#if defined(__STDC__)
176static volatile int config_ret = 0;
177static volatile int status_ret = 0;
178static volatile int worker_info_ret = 0;
179static volatile int got_wm_hostname = 0;
180static volatile int all_connected = 0;
181static volatile int got_halt = 0;
182static volatile int got_root_id = 0;
183static volatile int root_initialised = 0;
184static volatile int sent_init_table = 0;
185static volatile int wm_b_connected = 0;
186static volatile int open_bports = 0;
187static volatile int closed_bports = 0;
188static volatile double cur_time = 0.0;
189#else
190static int config_ret = 0;
191static int status_ret = 0;
192static int worker_info_ret = 0;
193static int got_wm_hostname = 0;
194static int all_connected = 0;
195static int got_halt = 0;
196static int got_root_id = 0;
197static int root_initialised = 0;
198static int sent_init_table = 0;
199static int wm_b_connected = 0;
200static int open_bports = 0;
201static int closed_bports = 0;
202static double cur_time = 0.0;
203#endif
204static char * map_dir;
205nsrv_name_t wm_hostname; /* name of worker manager host */
206int local_wm = 0;  /* set if worker manager is on the same host */
207
208/* for sending and rec. worker info */
209void_ptr worker_info_buf, worker_info_bufin;
210int worker_info_bufsize, worker_info_bufinsize;
211
212/* for receiving worker statistics */
213struct worker_stat_ext * wstat_rec_buf;
214
215#ifdef WORKER_TRACING
216/* for receiving trace header */
217trace_header_t * trace_header_buf;
218#endif
219
220/* below used for calculating elapsed session time */
221/* wm_start_time is only used if we on the same host as the
222   worker manager */
223#ifdef HAVE_GETHRTIME
224static volatile hrtime_t wm_start_time;
225#else
226#ifdef BSD_TIMES
227static volatile time_t wm_start_time;
228#else
229static volatile clock_t wm_start_time;
230#endif
231#endif
232extern int clock_hz;
233
234/*-----------------------------------------------------------------*/
235/*            Port list handling routines                          */
236/*-----------------------------------------------------------------*/
237
238void init_worker_list()
239{
240  worker_list.num_workers = 0;
241  worker_list.list = NULL;
242}
243
244void insert_port(index,port_name,wm_aport_id)
245int index;
246nsrv_name_t port_name;
247aport_id_t wm_aport_id;
248
249{
250  worker_ptr worker;
251
252  worker = (worker_ptr) hp_alloc(sizeof(worker_t));
253  worker->index = index;
254  (void) strcpy(worker->bport_name,port_name);
255  worker->status = 0;
256  worker->wm_aport_id = wm_aport_id;
257
258  if(worker_list.list == NULL)
259    worker->next = NULL;
260  else
261    worker->next = worker_list.list;
262
263  worker_list.list = worker;
264  worker_list.num_workers++;
265}
266
267worker_ptr get_worker(wid)
268int wid;
269{
270  worker_ptr cur;
271
272  for(cur = worker_list.list; cur != NULL; cur = cur->next)
273    if (cur->index == wid)
274      return(cur);
275
276  return(0);
277}
278
279
280int valid_wid(wid)
281int wid;
282{
283  worker_ptr cur;
284
285  for(cur = worker_list.list; cur != NULL; cur = cur->next)
286    if (cur->index == wid)
287      return(1);
288
289  return(0);
290}
291
292/*-----------------------------------------------------------------*/
293
294void check_nsrv(nret,line,err)
295nsrv_ret_t nret;
296int line;
297int err;
298
299{
300  if (nret != NSRV_OK)
301    {
302      printf("**Worker %d: Name Server (nsrv) Fatal Error**\n",ParallelWorker);
303      switch(err)
304	{
305	case 1:
306	  printf("Looks like your nameserver has crashed.\n");
307	  printf("Kill all workers, remove all files in $ECLIPSETMP ");
308	  printf("and restart peclipse.\n");
309	  break;
310	case 2:
311	  printf("Could not register information with nsrv.\n");
312	  break;
313	case 3:
314	  printf("nsrv_new_bport_id call failed.\n");
315	  break;
316	case 4:
317	  printf("nsrv_bdomain_look_up failed.\n");
318	  break;
319	case 5:
320	  printf("nsrv_bport_look_up failed.\n");
321	  break;
322	case 6:
323	  printf("nsrv_aport_look_up failed.\n");
324	  break;
325	default:
326	  break;
327	}
328      printf("pid = %d nret = %d at line %d in file %s\n",
329	     getpid(),nret,line,__FILE__);
330      exit(1);
331    }
332}
333
334void check_nsrv_soft(nret,line,err)
335nsrv_ret_t nret;
336int line;
337int err;
338{
339  if (nret != NSRV_OK)
340    {
341      printf("**Worker %d: Name Server (nsrv) Error** \n",ParallelWorker);
342      switch(err)
343	{
344	case 1:
345	  printf("Could not deregister nameserver information");
346	  break;
347	case 2:
348	  printf("Could not free nameserver ids");
349	  break;
350	default:
351	  break;
352	}
353      printf("pid = %d nret = %d at line %d in file %s\n",
354	     getpid(),nret,line,__FILE__);
355      printf("Trying to continue execution..\n");
356    }
357}
358
359void check_bmsg(bret,line,err)
360bmsg_ret_t bret;
361int line;
362int err;
363{
364  if (bret != BMSG_OK)
365    {
366      printf("**Worker %d: MPS B-layer Fatal Error: Aborting!\n",ParallelWorker);
367      switch(err)
368	{
369	case 1:
370	  printf("Could not initialize Message Passing System.\n");
371	  break;
372	case 2:
373	  printf("bport_port call failed.\n");
374	  break;
375	case 3:
376	  printf("bport_open call failed.\n");
377	  break;
378	default:
379	  break;
380	}
381      printf("pid = %d bret = %d at line %d in file %s\n",
382	     getpid(),bret,line,__FILE__);
383      exit(2);
384    }
385}
386
387void check_amsg(aret,line,err)
388amsg_ret_t aret;
389int line;
390int err;
391{
392  if (aret != AMSG_OK)
393    {
394      printf("**Worker %d: MPS A-Layer Fatal Error: Aborting!",ParallelWorker);
395      switch(err)
396	{
397	case 1:
398	  printf("Could not initialize Message Passing System.\n");
399	  break;
400	case 2:
401	  printf("Could not allocate message buffer.");
402	  printf("Probably no memory left.\n");
403	  break;
404	case 3:
405	  printf("Error in amsg_send.\n");
406	default:
407	  break;
408	}
409      printf("pid = %d aret = %d at line %d in file %s\n",
410	     getpid(),aret,line,__FILE__);
411      exit(3);
412    }
413}
414
415void check_amsg_soft(aret,line,err)
416amsg_ret_t aret;
417int line;
418int err;
419{
420  if (aret != AMSG_OK)
421    {
422      printf("**Worker %d: MPS A-Layer Error: Warning",ParallelWorker);
423      switch(err)
424	{
425	case 1:
426	  printf("Could not free message buffer.\n");
427	  break;
428	case 2:
429	  printf("Error in receiving messages.\n");
430	  break;
431	case 3:
432	  printf("Error in amsg_send.\n");
433	default:
434	  break;
435	}
436      printf("pid = %d aret = %d at line %d in file %s\n",
437	     getpid(),aret,line,__FILE__);
438      printf("Trying to continue execution..\n");
439    }
440}
441
442
443/*-----------------------------------------------------------*/
444/*  Panic and warning routines */
445/*-----------------------------------------------------------*/
446
447static bport_id_t nsrv_port_id;
448
449
450/*ARGSUSED*/
451void
452worker_bport_notify(port_id,port_primitive)
453    bport_id_t port_id;
454    bport_primitive_t port_primitive;
455{
456    switch (port_primitive) {
457        case BPORT_OPEN :
458/* 	    printf("%d: bport_notify: BPORT_OPEN:%d\n",ParallelWorker,port_id); */
459	    open_bports++;
460            break;
461        case BPORT_CLOSE :
462/*	    printf("%d: bport_notify: BPORT_CLOSE:%d\n",ParallelWorker,port_id); */
463	    closed_bports++;
464            break;
465        case BPORT_BLOCK :
466            break;
467        case BPORT_UNBLOCK :
468            break;
469        default:
470            break;
471    }
472}
473
474int mem_worker_list(bport_id)
475     bport_id_t bport_id;
476
477{
478  worker_ptr cur;
479
480  for(cur = worker_list.list; cur != NULL; cur = cur->next)
481    if (cur->bport_id == bport_id)
482      return(1);
483
484  return(0);
485}
486
487void
488worker_bport_ack(port_id,port_primitive,ret)
489    bport_id_t port_id;
490    bport_primitive_t port_primitive;
491    bmsg_ret_t ret;
492{
493
494    if (ret == BMSG_OK) {
495        switch (port_primitive) {
496            case BPORT_OPEN :
497/*	        printf("%d: bport_ack: BPORT_OPEN:%d\n",ParallelWorker,port_id); */
498		if (port_id == wm_bport.bport_id)
499		    wm_b_connected = 1;
500		else if (mem_worker_list(port_id))
501		    open_bports++;
502		else
503		  nsrv_port_id = port_id;
504                break;
505            case BPORT_CLOSE :
506/*	        printf("%d: bport_ack: BPORT_CLOSE:%d\n",ParallelWorker,port_id); */
507	        closed_bports++;
508                break;
509            case BPORT_BLOCK :
510                break;
511            case BPORT_UNBLOCK :
512                break;
513            default:
514                break;
515        }
516      }
517    else
518      {
519	printf("%d: Bport_ack: error in ",ParallelWorker);
520	switch (port_primitive) {
521	case BPORT_OPEN :
522	  printf("opening ");
523	  break;
524	case BPORT_CLOSE :
525	  printf("closing ");
526	  break;
527	case BPORT_BLOCK :
528	  printf("blocking ");
529	  break;
530	case BPORT_UNBLOCK :
531	  printf("unblocking ");
532	  break;
533	default:
534	  break;
535        }
536	printf(" bport_id %d\n", port_id);
537      }
538	return;
539}
540
541
542
543/*-----------------------------------------------------------------*/
544/*       Initialise Message Types                                  */
545/*-----------------------------------------------------------------*/
546
547amsg_type_t mdt_wm_header;
548amsg_type_t mdt_wm_simple;
549amsg_type_t mdt_wm_nsrvname;
550amsg_type_t mdt_wm_config;
551amsg_type_t mdt_wm_node;
552amsg_type_t mdt_sthandlet;
553amsg_type_t mdt_wm_mcstatus;
554amsg_type_t mdt_wm_status;
555amsg_type_t mdt_wm_hoststatus;
556amsg_type_t mdt_wm_time;
557amsg_type_t mdt_wm_starttime;
558amsg_type_t mdt_wm_portname;
559amsg_type_t mdt_wm_wstat;
560#ifdef WORKER_TRACING
561amsg_type_t mdt_wm_trace;
562amsg_type_t mdt_trace;
563#endif
564amsg_type_t mdt_wstat;
565
566wm_types_init()
567{
568  amsg_typedef_t template[20];
569
570  /* wm_msg_header_t */
571  template[0] = MDT_BEGIN;
572  template[1] = MDT_STRUCT_OPEN;
573  template[2] = MDT_INT32;
574  template[3] = MDT_INT32;
575  template[4] = MDT_STRUCT_CLOSE;
576  template[5] = MDT_END;
577
578  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 1,
579		       template, &mdt_wm_header), __LINE__, 4);
580
581  /* wm_simple_msg_t */
582  template[0] = MDT_BEGIN;
583  template[1] = MDT_STRUCT_OPEN;
584  template[2] = mdt_wm_header;
585  template[3] = MDT_INT32;
586  template[4] = MDT_STRUCT_CLOSE;
587  template[5] = MDT_END;
588
589  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 2,
590		       template, &mdt_wm_simple), __LINE__, 4);
591
592  /* host_status_req_t structures */
593  template[0] = MDT_BEGIN;
594  template[1] = MDT_STRUCT_OPEN;
595  template[2] = mdt_wm_header;
596  template[3] = MDT_NSRVNAME;
597  template[4] = MDT_STRUCT_CLOSE;
598  template[5] = MDT_END;
599
600  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 3,
601		       template, &mdt_wm_nsrvname), __LINE__, 4);
602
603  /* st_handle_t */
604
605  template[0] = MDT_BEGIN;
606  template[1] = MDT_STRUCT_OPEN;
607  template[2] = MDT_APORTID;
608  template[3] = MDT_UINT32;
609  template[4] = MDT_UINT32;
610  template[5] = MDT_STRUCT_CLOSE;
611  template[6] = MDT_END;
612
613  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 4,
614		       template, &mdt_sthandlet), __LINE__, 4);
615
616  /* node_msg_t */
617
618  template[0] = MDT_BEGIN;
619  template[1] = MDT_STRUCT_OPEN;
620  template[2] = mdt_wm_header;
621  template[3] = mdt_sthandlet;
622  template[4] = MDT_STRUCT_CLOSE;
623  template[5] = MDT_END;
624
625  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 5,
626		       template, &mdt_wm_node), __LINE__, 4);
627
628  /* config_msg_t */
629  template[0] = MDT_BEGIN;
630  template[1] = MDT_STRUCT_OPEN;
631  template[2] = mdt_wm_header;
632  template[3] = MDT_INT32;
633  template[4] = MDT_NSRVNAME;
634  template[5] = MDT_STRUCT_CLOSE;
635  template[6] = MDT_END;
636
637  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 6,
638		       template, &mdt_wm_config), __LINE__, 4);
639
640  /* mc_status_t */
641  template[0] = MDT_BEGIN;
642  template[1] = MDT_STRUCT_OPEN;
643  template[2] = MDT_INT32;
644  template[3] = MDT_INT32;
645  template[4] = MDT_NSRVNAME;
646  template[5] = MDT_STRUCT_CLOSE;
647  template[6] = MDT_END;
648
649  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 7,
650		       template, &mdt_wm_mcstatus), __LINE__, 4);
651
652  /* mc_status_t */
653  template[0] = MDT_BEGIN;
654  template[1] = MDT_STRUCT_OPEN;
655  template[2] = mdt_wm_header;
656  template[3] = MDT_INT32;
657  template[4] = MDT_INT32;
658  template[5] = MDT_ARRAY_OF;
659  template[6] = MAX_MACHINES;
660  template[7] = mdt_wm_mcstatus;
661  template[8] = MDT_STRUCT_CLOSE;
662  template[9] = MDT_END;
663
664  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 8,
665		       template, &mdt_wm_status), __LINE__, 4);
666
667  /* host_status_msg_t */
668  template[0] = MDT_BEGIN;
669  template[1] = MDT_STRUCT_OPEN;
670  template[2] = mdt_wm_header;
671  template[3] = MDT_INT32;
672  template[4] = MDT_INT32;
673  template[5] = MDT_ARRAY_OF;
674  template[6] = MAX_PROCS;
675  template[7] = MDT_INT32;
676  template[8] = MDT_ARRAY_OF;
677  template[9] = MAX_PROCS;
678  template[10] = MDT_INT32;
679  template[11] = MDT_STRUCT_CLOSE;
680  template[12] = MDT_END;
681
682  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 9,
683		       template, &mdt_wm_hoststatus), __LINE__, 4);
684
685
686  /* start_time_msg_t */
687
688  template[0] = MDT_BEGIN;
689  template[1] = MDT_STRUCT_OPEN;
690  template[2] = mdt_wm_header;
691#ifdef HAVE_GETHRTIME
692  template[3] = MDT_DWORD;
693#else
694  template[3] = MDT_UINT32;
695#endif
696  template[4] = MDT_STRUCT_CLOSE;
697  template[5] = MDT_END;
698
699  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 10,
700		       template, &mdt_wm_starttime), __LINE__, 4);
701
702  /* time_msg_t */
703
704  template[0] = MDT_BEGIN;
705  template[1] = MDT_STRUCT_OPEN;
706  template[2] = mdt_wm_header;
707  template[3] = MDT_DP_FLOAT;
708  template[4] = MDT_STRUCT_CLOSE;
709  template[5] = MDT_END;
710
711  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 11,
712		       template, &mdt_wm_time), __LINE__, 4);
713
714  /* port_name_msg_t */
715  template[0] = MDT_BEGIN;
716  template[1] = MDT_STRUCT_OPEN;
717  template[2] = mdt_wm_header;
718  template[3] = MDT_INT32;
719  template[4] = MDT_NSRVNAME;
720  template[5] = MDT_APORTID;
721  template[6] = MDT_STRUCT_CLOSE;
722  template[7] = MDT_END;
723
724  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 12,
725		       template, &mdt_wm_portname), __LINE__, 4);
726
727  wstat_types_init(&mdt_wstat);
728  /* wstat_msg_t */
729  template[0] = MDT_BEGIN;
730  template[1] = MDT_STRUCT_OPEN;
731  template[2] = mdt_wm_header;
732  template[3] = mdt_wstat;
733  template[4] = MDT_STRUCT_CLOSE;
734  template[5] = MDT_END;
735
736  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 13,
737		       template, &mdt_wm_wstat), __LINE__, 4);
738
739  /* trace_msg_t */
740#ifdef WORKER_TRACING
741  trace_types_init(&mdt_trace);
742  template[0] = MDT_BEGIN;
743  template[1] = MDT_STRUCT_OPEN;
744  template[2] = mdt_wm_header;
745  template[3] = mdt_trace;
746  template[4] = MDT_STRUCT_CLOSE;
747  template[5] = MDT_END;
748
749  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 14,
750		       template, &mdt_wm_trace), __LINE__, 4);
751#endif
752}
753
754void send_simple_wm_message(port_id,msg_type,msg_value)
755aport_id_t port_id;
756int msg_type;
757int msg_value;
758{
759 amsg_t msg;
760 amsg_data_t * msg_data;
761 wm_simple_msg_t * wm_mess_hdr;
762
763 amsg_size_t size;
764
765 size = sizeof(wm_simple_msg_t);
766
767 check_amsg(amsg_alloc(size,
768		       &msg_data,
769		       &msg),__LINE__, 2);
770
771 wm_mess_hdr = (wm_simple_msg_t *) msg_data;
772 wm_mess_hdr->header.msg_type = msg_type;
773 wm_mess_hdr->header.wid = ParallelWorker;
774 wm_mess_hdr->msg_value = msg_value;
775 check_amsg_soft(amsg_send(port_id,msg,mdt_wm_simple,1,0),__LINE__, 3);
776}
777
778/*-----------------------------------------------------------------*/
779/*       Aport Notify Procedures                                   */
780/*-----------------------------------------------------------------*/
781
782void
783halt1_notify(port_id)
784    aport_id_t port_id;
785{
786    amsg_ret_t ret;
787    amsg_t msg;
788    amsg_data_t * msg_data;
789    amsg_type_t msg_type;
790    amsg_count_t msg_count;
791
792    ret = amsg_receive(port_id,
793                                &msg,
794                                &msg_data,
795                                &msg_type,
796                                &msg_count, 0);
797    if (ret == AMSG_OK) {
798       Notify("Got HALT_SYSTEM1 message");
799       ec_worker_cleanup();
800       exit(1);
801    }
802}
803
804/*ARGSUSED*/
805void
806halt2_notify(port_id)
807   aport_id_t port_id;
808{
809   Notify("Got HALT_SYSTEM2 message");
810   got_halt = 1;
811}
812
813void
814wm_notify(port_id)
815     aport_id_t port_id;
816
817{
818  amsg_ret_t ret;
819  amsg_t msg;
820  amsg_data_t * msg_data;
821  amsg_type_t msg_type;
822  amsg_count_t msg_count;
823  wm_simple_msg_t * wm_simple_msg;
824
825  while (( ret = amsg_receive(port_id,
826			      &msg,
827			      &msg_data,
828			      &msg_type,
829			      &msg_count,0)) == AMSG_OK) {
830
831    if (msg_type == mdt_wm_simple)
832      {
833	wm_simple_msg = (wm_simple_msg_t *) msg_data;
834	switch(wm_simple_msg->header.msg_type) {
835
836	case SENT_INIT_PORT_NAMES:
837	  Notify("Got SENT_INIT_PORT_NAMES message");
838	  sent_init_table = 1;
839	  break;
840
841	case ALL_CONNECTED:
842	  Notify("Got ALL_CONNECTED message");
843	  all_connected = 1;
844	  break;
845
846	case ROOT_INITIALISED:
847	  Notify("Got ROOT_INITIALISED message");
848	  root_initialised = 1;
849	  break;
850
851	case GOTO_SLEEP:
852	  Notify("Got GOTO_SLEEP message");
853	  (void) sch_idle_eng(aports[SCH_APORT_NUMBER]);
854	  break;
855
856	case WAKEUP:
857	  Notify("Got WAKEUP message");
858	  (void) sch_wake_eng(aports[SCH_APORT_NUMBER]);
859	  break;
860
861	case CONFIG_NOTIFY:
862	  Notify("Got CONFIG_NOTIFY message");
863	  config_ret = wm_simple_msg->msg_value;
864	  break;
865
866	case WSTAT_REQ:
867	  {
868	    Notify("Got WSTAT_REQ message");
869	    send_wstat(wm_simple_msg->header.wid);
870	  }
871	  break;
872
873	case WSTAT_RESET:
874	  {
875	    int wid = wm_simple_msg->header.wid;
876
877	    Notify("Got WSTAT_RESET message");
878	    reset_worker_stat();
879	    send_simple_wm_message((get_worker(wid))->wm_aport_id, WSTAT_RET, 0);
880	    break;
881	  }
882
883	case WSTAT_RET:
884	  Notify("Got WSTAT_RET message");
885	  worker_info_ret = 1;
886	  break;
887
888#ifdef WORKER_TRACING
889	case START_TRACING:
890	  Notify("Got START_TRACING message");
891	  start_tracing();
892	  break;
893
894	case STOP_TRACING:
895	  Notify("Got STOP_TRACING message");
896	  stop_tracing();
897	  break;
898
899	case GET_TRACE_HEADER:
900	  Notify("Got GET_TRACE_HEADER message");
901	  send_trace_header(wm_simple_msg->header.wid,
902			    GET_TRACE_RET, get_trace_ptr());
903	  break;
904
905	case SET_TRACE_RET:
906	  Notify("Got SET_TRACE_HEADER message");
907	  worker_info_ret = 1;
908	  break;
909#endif
910
911	default:
912	  break;
913	}
914      }
915    else if (msg_type == mdt_wm_portname)
916      {
917	/*  PORT_NAME message */
918	port_name_msg_t * port_name_msg;
919	Notify("Got PORT_NAME message");
920	port_name_msg = (port_name_msg_t *) msg_data;
921	insert_port(port_name_msg->index,port_name_msg->port_name,
922		    port_name_msg->wm_aport_id);
923      }
924    else if (msg_type == mdt_wm_nsrvname)
925      {
926	/* WM_HOSTNAME message */
927	host_name_msg_t *host_name_msg;
928	host_name_msg = (host_name_msg_t *) msg_data;
929	(void) strcpy(wm_hostname,host_name_msg->hostname);
930	got_wm_hostname = 1;
931      }
932    else if (msg_type == mdt_wm_node)
933      {
934	/* SET_ROOT message */
935	node_msg_t *node_msg;
936	node_msg = (node_msg_t *) msg_data;
937	if(node_msg->header.msg_type == SET_ROOT)
938	  {
939	    Notify("Got SET_ROOT message");
940	    root_id = node_msg->node;
941	    got_root_id = 1;
942	  }
943	else if (node_msg->header.msg_type == GET_ROOT)
944	  {
945	    st_handle_t local;
946	    Notify("Got GET_ROOT message");
947	    local = node_msg->node;
948	    sch_init_lodge(aports[SCH_APORT_NUMBER], node_msg->header.wid,
949			   &local);
950	  }
951      }
952    else if (msg_type == mdt_wm_time)
953      {
954	/*  SEND_TIME message */
955	time_msg_t * time_msg;
956	Notify("Got SEND_TIME message");
957	time_msg = (time_msg_t *) msg_data;
958	cur_time = time_msg->cur_time;
959      }
960
961    else if (msg_type == mdt_wm_starttime)
962      {
963	/*  SEND_START_TIME message */
964	start_time_msg_t * start_time_msg;
965	start_time_msg = (start_time_msg_t *) msg_data;
966	Notify("Got SEND_START_TIME message");
967	wm_start_time = start_time_msg->start_time;
968      }
969    else if (msg_type == mdt_wm_status)
970      {
971	/* STATUS_NOTIFY message */
972	status_msg_t * status_msg;
973	Notify("Got STATUS_NOTIFY message");
974	status_msg = (status_msg_t *) msg_data;
975	{
976	  int i;
977	  status.num_machines = status_msg->num_machines;
978	  for(i = 0 ; i < status_msg->num_machines; i++)
979	    status.machines[i] = status_msg->machines[i];
980	}
981	status_ret = status_msg->total_workers;
982      }
983
984    else if (msg_type == mdt_wm_hoststatus)
985      {
986	/*  HOST_STATUS_NOTIFY */
987	host_status_msg_t * host_status_msg;
988	int i;
989
990	Notify("Got HOST_STATUS_NOTIFY message");
991	host_status_msg = (host_status_msg_t *) msg_data;
992	host_status.num_awake = host_status_msg->num_awake;
993	host_status.num_asleep = host_status_msg->num_asleep;
994	for(i = 0; i < host_status.num_awake; i++)
995	  host_status.awake_ids[i] = host_status_msg->awake_ids[i];
996	for(i = 0; i < host_status.num_asleep; i++)
997	  host_status.asleep_ids[i] = host_status_msg->asleep_ids[i];
998
999	status_ret = host_status.num_awake + host_status.num_asleep;
1000      }
1001    else if (msg_type == mdt_wm_wstat)
1002      {
1003	/* WSTAT_RET */
1004	wstat_msg_t * wstat_msg;
1005	Notify("Got WSTAT_RET message");
1006	wstat_msg = (wstat_msg_t *) msg_data;
1007	*wstat_rec_buf = wstat_msg->stat;
1008	worker_info_ret = 1;
1009	break;
1010      }
1011#ifdef WORKER_TRACING
1012    else if (msg_type == mdt_wm_trace)
1013      {
1014	trace_msg_t * trace_msg;
1015	int wid;
1016
1017	trace_msg = (trace_msg_t *) msg_data;
1018	wid = trace_msg->header.wid;
1019	if(trace_msg->header.msg_type == GET_TRACE_RET)
1020	  {
1021	    *trace_header_buf = trace_msg->trace_header;
1022	    worker_info_ret = 1;
1023	  }
1024	else if (trace_msg->header.msg_type == SET_TRACE_HEADER)
1025	  {
1026	    print_trace(&(trace_msg->trace_header));
1027	    send_simple_wm_message(get_worker(wid)->wm_aport_id,
1028				   SET_TRACE_RET, 0);
1029	  }
1030	break;
1031      }
1032#endif
1033    else if (msg_type == MDT_BYTE)
1034      {
1035	worker_info_msg_t * worker_info_msg;
1036	worker_info_msg = (worker_info_msg_t *) msg_data;
1037	switch (worker_info_msg->header.msg_type)
1038	  {
1039	  case WORKER_INFO_SET:
1040	    Notify("Got WORKER_INFO_SET message");
1041	    set_worker_info(worker_info_msg->req_wid,
1042			    worker_info_msg->infotype,
1043			    (void_ptr) (worker_info_msg + 1));
1044	    send_worker_info(worker_info_msg->req_wid,
1045			     worker_info_msg->infotype,
1046			     0, (void_ptr) NULL);
1047	    break;
1048
1049	  case WORKER_INFO_GET:
1050	    Notify("Got WORKER_INFO_GET message");
1051	    get_worker_info(worker_info_msg->infotype,
1052			    &worker_info_bufinsize,
1053			    &worker_info_bufin);
1054	    send_worker_info(worker_info_msg->req_wid,
1055			     worker_info_msg->infotype,
1056			     worker_info_bufinsize, worker_info_bufin);
1057	    break;
1058
1059	  case WORKER_INFO_NOTIFY:
1060	    Notify("Got WORKER_INFO_NOTIFY message");
1061	      if(worker_info_bufsize >= worker_info_msg->size)
1062		bcopy((char *) (msg_data + sizeof(worker_info_msg_t)),
1063			(char *) worker_info_buf,
1064			worker_info_msg->size);
1065	      else
1066		bcopy((char *) (msg_data + sizeof(worker_info_msg_t)),
1067			(char *) worker_info_buf,
1068		       worker_info_bufsize);
1069	    worker_info_ret = 1;
1070	    break;
1071	  }
1072      }
1073    check_amsg_soft(amsg_free(msg), __LINE__, 1);
1074  }
1075  if (ret != AMSG_NOMESSAGE) {
1076    check_amsg_soft(ret,__LINE__, 2);
1077  }
1078}
1079
1080void fill_in_domain(ldomain_id, lbdomain, hostname)
1081bdomain_id_t ldomain_id;
1082bdomain_t *lbdomain;
1083char *hostname;
1084
1085{
1086  lbdomain->bdomain_id = ldomain_id;
1087  lbdomain->bdomain_size = 0x00800000;  /* 8 MB */
1088  if (!shared_mem_base())
1089    lbdomain->bdomain_start = (bmem_address_t) (shared_mem_base());
1090  else
1091    lbdomain->bdomain_start = (bmem_address_t) (shared_mem_base() + 0x00800000);
1092  sprintf(lbdomain->bdomain_file,"%s/%d.%s.mps.map",
1093	  map_dir, my_pid, hostname);
1094}
1095
1096void mps_init(create)
1097int create;
1098{
1099
1100  nsrv_ret_t nret;
1101  char hostname[MAXHOSTLEN];
1102  nsrv_name_t domain_name;
1103  bmsg_ret_t bret;
1104
1105  check_nsrv(nsrv_new_bport_id(my_signature,&my_bport_id),__LINE__, 3);
1106
1107  mygethostname(hostname);
1108
1109/* Turn on the define below if you want each worker to create its
1110   own shared memory domain. This means that all communication will
1111   take place via sockets i.e. no shared memory communication. */
1112
1113/*#define TEST_INTER_DOMAIN 1*/
1114#ifdef TEST_INTER_DOMAIN
1115  sprintf(domain_name,"%s.%s.%s", WORKER_PORT_NAME, hostname,my_signature);
1116  check_nsrv(nsrv_new_bdomain_id(my_signature,&domain_id),__LINE__,5);
1117  fill_in_domain(domain_id,&bdomain,hostname);
1118  check_bmsg(bmsg_init(my_bport_id,&bdomain,
1119		       BDOMAIN_CREATE |
1120#ifdef DEBUG_MPS
1121		       BMSG_ALOG_ON |
1122		       BMSG_ALOG_OPEN |
1123		       BMSG_ALOG_CLOSE |
1124		       BMSG_ALOG_MASTER |
1125#endif
1126		       BPORT_NOTIFICATION), __LINE__, 1);
1127  check_nsrv(nsrv_bdomain_register(bdomain_key,domain_name,
1128				   my_signature,&bdomain),__LINE__,2);
1129#else /* TEST_INTER_DOMAIN */
1130
1131  sprintf(domain_name,"%s.%s.%s", WORKER_PORT_NAME, hostname,session_key);
1132  /* Check if domain exists - it will exist if we are on the
1133     same host as the worker manager */
1134  nret = nsrv_bdomain_look_up(bdomain_key,domain_name,&bdomain);
1135
1136  /* Loop until the domain is created - should have a timeout? */
1137  while((nret != NSRV_OK) && !create) {
1138      short_sleep(1000);
1139      nret = nsrv_bdomain_look_up(bdomain_key,domain_name,&bdomain);
1140    }
1141
1142  if (nret == NSRV_OK)
1143      check_bmsg(bmsg_init(my_bport_id,&bdomain,
1144#ifdef DEBUG_MPS
1145			   BMSG_ALOG_ON |
1146			   BMSG_ALOG_OPEN |
1147			   BMSG_ALOG_CLOSE |
1148			   BMSG_ALOG_MASTER |
1149#endif
1150			   BPORT_NOTIFICATION),__LINE__, 1);
1151  else if ((nret == NSRV_NOT_REGISTERED)  && create)
1152    { /* we are the first worker on a new host, so create a new
1153         domain and register it */
1154      check_nsrv(nsrv_new_bdomain_id(my_signature,&domain_id),__LINE__,5);
1155      fill_in_domain(domain_id,&bdomain,hostname);
1156      check_bmsg(bmsg_init(my_bport_id,&bdomain,
1157			   BDOMAIN_CREATE |
1158#ifdef DEBUG_MPS
1159			   BMSG_ALOG_ON |
1160			   BMSG_ALOG_OPEN |
1161			   BMSG_ALOG_CLOSE |
1162			   BMSG_ALOG_MASTER |
1163#endif
1164			   BPORT_NOTIFICATION), __LINE__, 1);
1165      check_nsrv(nsrv_bdomain_register(bdomain_key,domain_name,
1166				       my_signature,&bdomain),__LINE__,2);
1167    }
1168  else
1169    {
1170      printf("nsrv_bdomain_look_up failed! nret = %d\n",nret);
1171      exit(1);
1172    }
1173#endif /* TEST_INTER_DOMAIN */
1174  check_bmsg(bport_port(my_bport_id,&my_bport),__LINE__, 2);
1175  check_nsrv(nsrv_bport_register(session_key,my_port_name,my_signature,&my_bport),__LINE__,2);
1176
1177  bret = bport_open(&wm_bport);
1178  if (bret != BMSG_POPENING)
1179    check_bmsg(bret,__LINE__, 3);
1180}
1181
1182mygethostname(host)
1183     char *host;
1184{
1185#if defined(HAVE_GETHOSTNAME)
1186    (void) gethostname(host,MAXHOSTLEN);
1187#else
1188#  if defined(HAVE_SYSINFO) && defined(HAVE_SYS_SYSTEMINFO_H)
1189    sysinfo(SI_HOSTNAME, host, MAXHOSTLEN);
1190#  else
1191    struct utsname ut;
1192    uname(&ut);
1193    (void) strcpy(host,ut.nodename);
1194#  endif
1195#endif
1196}
1197
1198void wait_for_session_table()
1199{
1200  while(!sent_init_table)
1201    short_sleep(1000);
1202}
1203
1204void open_worker_connections()
1205{
1206  bport_t rem_bport;
1207  bmsg_ret_t bret;
1208  worker_ptr cur;
1209
1210  for(cur = worker_list.list; cur != NULL; cur = cur->next)
1211    {
1212      check_nsrv(nsrv_bport_look_up(session_key,cur->bport_name,&rem_bport),
1213		 __LINE__,5);
1214      cur->bport_id = rem_bport.bport_id;
1215      if(cur->index < ParallelWorker)
1216	{
1217	  bret = bport_open(&rem_bport);
1218	  if (bret != BMSG_POPENING)
1219	    check_bmsg(bret,__LINE__, 3);
1220	}
1221    }
1222
1223  while (open_bports != (worker_list.num_workers - 1))
1224    short_sleep(1000);
1225}
1226
1227void register_std_aports(aport_ids)
1228     aport_id_t aport_ids[];
1229{
1230  nsrv_name_t aport_name;
1231  aport_t aport;
1232
1233  sprintf(aport_name,"%s_halt2_aport",my_port_name);
1234  aport.aport_id = aport_ids[HALT2_APORT_NUMBER];
1235  aport.bport_id = my_bport_id;
1236  aport.bdomain_id = bdomain.bdomain_id;
1237  check_nsrv(nsrv_aport_register(session_key,aport_name,
1238				 my_signature,&aport),__LINE__, 2);
1239  sprintf(aport_name,"%s_halt1_aport",my_port_name);
1240  aport.aport_id = aport_ids[HALT1_APORT_NUMBER];
1241  aport.bport_id = my_bport_id;
1242  aport.bdomain_id = bdomain.bdomain_id;
1243  check_nsrv(nsrv_aport_register(session_key,aport_name,
1244				 my_signature,&aport),__LINE__, 2);
1245  wm_types_init();  /* can only be done after an aport registration
1246		       since MDT_NSRVNAME is only defined at this point */
1247  sprintf(aport_name,"%s_wm_aport",my_port_name);
1248  aport.aport_id = aport_ids[WM_APORT_NUMBER];
1249  aport.bport_id = my_bport_id;
1250  aport.bdomain_id = bdomain.bdomain_id;
1251  check_nsrv(nsrv_aport_register(session_key,aport_name,
1252				 my_signature,&aport),__LINE__, 2);
1253
1254
1255}
1256
1257void deregister_std_aports()
1258{
1259  nsrv_name_t aport_name;
1260
1261  sprintf(aport_name,"%s_wm_aport",my_port_name);
1262  check_nsrv_soft(nsrv_aport_deregister(session_key,aport_name,my_signature),
1263	     __LINE__, 1);
1264
1265  sprintf(aport_name,"%s_halt1_aport",my_port_name);
1266  check_nsrv_soft(nsrv_aport_deregister(session_key,aport_name,my_signature),
1267		  __LINE__, 1);
1268  sprintf(aport_name,"%s_halt2_aport",my_port_name);
1269  check_nsrv_soft(nsrv_aport_deregister(session_key,aport_name,my_signature),
1270		  __LINE__, 1);
1271}
1272
1273
1274void setup_mps(slave_no,session, nsrv_host_name, nsrv_port_number, create)
1275     int slave_no;
1276     char *session;
1277     char *nsrv_host_name;
1278     unsigned nsrv_port_number;
1279     int create;      /* used in mps_init */
1280{
1281  extern void eng_port_upcall();
1282  extern void sch_port_upcall();
1283  extern void io_port_upcall();
1284  aport_t wm_aport; /* needed to lookup wm aportids in name server */
1285  void (* notify_procs [TOTAL_APORT_NUMBER]) ();
1286
1287  /* preliminary setting of the SIGIO handler for message passing */
1288#ifdef HAVE_SIGACTION
1289  {
1290    struct sigaction sa;
1291    sa.sa_handler = sigmsg_handler;
1292    (void) sigemptyset(&sa.sa_mask);
1293    sa.sa_flags = 0;
1294    (void) sigaction(SIGIO, &sa, (struct sigaction *) 0);
1295#ifdef SIGPOLL
1296    (void) sigaction(SIGPOLL, &sa, (struct sigaction *) 0);
1297#endif
1298  }
1299#else
1300  signal(SIGIO, sigmsg_handler);
1301#ifdef SIGPOLL
1302  signal(SIGPOLL, sigmsg_handler);
1303#endif
1304#endif
1305
1306  map_dir = getenv("ECLIPSETMP");
1307  if(!map_dir) map_dir = "/tmp";
1308
1309  check_nsrv(nsrv_init(nsrv_host_name, &nsrv_port_number),__LINE__, 1);
1310
1311  my_pid = getpid();
1312  (void) strcpy(session_key,session);
1313  sprintf(my_port_name,"worker%d",slave_no);
1314  sprintf(my_signature,"%d",my_pid);
1315  sprintf(bdomain_key, WORKER_PORT_NAME);
1316  init_worker_list();
1317
1318  check_nsrv(nsrv_bport_look_up(session_key, WM_PORT_NAME, &wm_bport),
1319	     __LINE__, 5);
1320
1321  mps_init(create);
1322
1323  notify_procs[HALT1_APORT_NUMBER] = halt1_notify;
1324  notify_procs[HALT2_APORT_NUMBER] = halt2_notify;
1325  notify_procs[WM_APORT_NUMBER]     = wm_notify;
1326  notify_procs[SCH_APORT_NUMBER] = sch_port_upcall;
1327  notify_procs[ENG_APORT_NUMBER] = eng_port_upcall;
1328  notify_procs[IO_APORT_NUMBER] = io_port_upcall;
1329  notify_procs[IO_REPLY_APORT_NUMBER] = 0;
1330  check_amsg(amsg_init(TOTAL_APORT_NUMBER,notify_procs,aports,0),__LINE__, 1);
1331
1332  /* set port notification levels */
1333  check_amsg_soft(aport_set_option(aports[IO_APORT_NUMBER],
1334				   APORT_NOTIFY_LEVEL,
1335				   (aport_optval_t) 1),__LINE__, 4);
1336  check_amsg_soft(aport_set_option(aports[ENG_APORT_NUMBER],
1337				   APORT_NOTIFY_LEVEL,
1338				   (aport_optval_t) 2),__LINE__, 4);
1339  check_amsg_soft(aport_set_option(aports[SCH_APORT_NUMBER],
1340				   APORT_NOTIFY_LEVEL,
1341				   (aport_optval_t) 2),__LINE__, 4);
1342  check_amsg_soft(aport_set_option(aports[WM_APORT_NUMBER],
1343				   APORT_NOTIFY_LEVEL,
1344				   (aport_optval_t) 3),__LINE__, 4);
1345  check_amsg_soft(aport_set_option(aports[HALT1_APORT_NUMBER],
1346				   APORT_NOTIFY_LEVEL,
1347				   (aport_optval_t) 4),__LINE__, 4);
1348  check_amsg_soft(aport_set_option(aports[HALT2_APORT_NUMBER],
1349				   APORT_NOTIFY_LEVEL,
1350				   (aport_optval_t) 5),__LINE__, 4);
1351
1352  register_std_aports(aports);
1353
1354  check_nsrv(nsrv_aport_look_up(session_key, WM_HIGH_APORT_NAME, &wm_aport),
1355             __LINE__, 6);
1356  wm_high_aport_id = wm_aport.aport_id;
1357
1358  check_nsrv(nsrv_aport_look_up(session_key, WM_HALT1_APORT_NAME, &wm_aport),
1359             __LINE__, 6);
1360  wm_halt1_aport_id = wm_aport.aport_id;
1361
1362  check_nsrv(nsrv_aport_look_up(session_key, WM_HALT2_APORT_NAME, &wm_aport),
1363             __LINE__, 6);
1364  wm_halt2_aport_id = wm_aport.aport_id;
1365
1366  check_nsrv(nsrv_aport_look_up(session_key, WM_LOW_APORT_NAME,
1367				&wm_aport),__LINE__, 6);
1368  wm_low_aport_id = wm_aport.aport_id;
1369
1370  while (!wm_b_connected) poll_short_sleep(1000);
1371
1372  wait_for_session_table();
1373
1374  open_worker_connections();
1375
1376#ifdef WORKER_TRACING
1377  init_trace_file_names();
1378#endif
1379
1380  set_local_wm_flag();
1381  set_start_time();
1382
1383  /* Tell wm that my mps setup is done */
1384  send_simple_wm_message(wm_high_aport_id,DONE_INIT_OPENS,0);
1385
1386  /* Synchronize with all other workers since even after this point
1387     the worker is not completely set up - it still needs to lodge at
1388     the root node. This can cause workers not to exit if the system
1389     is halted immediately after adding this worker.  Currently the
1390     worker manager just kills the workers and exits (using a timeout)*/
1391
1392  while(!all_connected)
1393    poll_short_sleep(1000);
1394
1395}
1396
1397void block_wm_aports()
1398{
1399  (void) aport_set_option(aports[WM_APORT_NUMBER],
1400                                APORT_NOTIFY, (aport_optval_t) AMSG_OFF);
1401  (void) aport_set_option(aports[SCH_APORT_NUMBER],
1402                                APORT_NOTIFY, (aport_optval_t) AMSG_OFF);
1403  (void) aport_set_option(aports[ENG_APORT_NUMBER],
1404                                APORT_NOTIFY, (aport_optval_t) AMSG_OFF);
1405  (void) aport_set_option(aports[IO_APORT_NUMBER],
1406                                APORT_NOTIFY, (aport_optval_t) AMSG_OFF);
1407  (void) bmsg_set_option(BMSG_INTRA_DOMAIN_TRIGGERING, BMSG_ON);
1408}
1409
1410/*ARGSUSED*/
1411void halt_system(exit_code)
1412int exit_code;
1413{
1414  (void) block_wm_aports();
1415  unblock_signals();
1416  send_simple_wm_message(wm_halt1_aport_id,HALT_SYSTEM_REQ,exit_code);
1417  while (1)
1418    poll_short_sleep(1000);
1419}
1420/*ARGSUSED*/
1421void panic_halt_system(exit_code)
1422int exit_code;
1423{
1424  halt_system(exit_code);
1425}
1426
1427void exit_mps()
1428{
1429  worker_ptr cur;
1430  bmsg_ret_t bret;
1431
1432  send_simple_wm_message(wm_halt2_aport_id,EXITING,0);
1433  while(!got_halt) {
1434      poll_short_sleep(1000);
1435  }
1436  for(cur = worker_list.list; cur != NULL; cur = cur->next)
1437    {
1438      if (cur->index < ParallelWorker)
1439	{
1440	  bret = bport_close(cur->bport_id);
1441	  while (bret != BMSG_NOPORT)
1442	    {
1443/*	      printf("slave %d: Cannot close port %d bret = %d\n",
1444		     ParallelWorker,cur->bport_id,bret);*/
1445	      poll_short_sleep(100000);
1446	      bret = bport_close(cur->bport_id);
1447	    }
1448	}
1449    }
1450
1451  while (closed_bports != (worker_list.num_workers - 1))
1452      poll_short_sleep(1000);
1453
1454  bret = bport_close(wm_bport.bport_id);
1455  while (bret != BMSG_NOPORT)
1456  {
1457/*    printf("slave %d: Cannot close port %d bret = %d\n",
1458	     ParallelWorker,wm_bport.bport_id,bret);*/
1459      poll_short_sleep(100000);
1460      bret = bport_close(wm_bport.bport_id);
1461  }
1462
1463  while (closed_bports != (worker_list.num_workers))
1464      poll_short_sleep(1000);
1465
1466  deregister_std_aports();
1467  check_nsrv_soft(nsrv_bport_deregister(session_key,my_port_name,my_signature),
1468	     __LINE__, 1);
1469  check_nsrv_soft(nsrv_free_bport_id(my_signature,my_bport_id),__LINE__, 2);
1470
1471  nsrv_exit();
1472  amsg_exit();
1473  bmsg_exit();
1474}
1475
1476/*-------------------------------------------------------------------*/
1477/* Scheduler initialization support functions */
1478/*-------------------------------------------------------------------*/
1479
1480void send_node_msg(wm_aport_id, msg_type, node)
1481aport_id_t wm_aport_id;
1482int msg_type;
1483st_handle_t * node;
1484{
1485  amsg_t msg;
1486  amsg_data_t * msg_data;
1487  node_msg_t * node_msg;
1488  st_handle_t *local_node;
1489
1490  amsg_size_t size;
1491
1492  size = sizeof(node_msg_t);
1493  check_amsg(amsg_alloc(size,
1494			&msg_data,
1495			&msg),__LINE__, 2);
1496
1497  node_msg = (node_msg_t *) msg_data;
1498  node_msg->header.msg_type = msg_type;
1499  node_msg->header.wid = ParallelWorker;
1500
1501  local_node = (st_handle_t *) &(node_msg->node);
1502  *local_node = *node;
1503  check_amsg(amsg_send(wm_aport_id,msg,mdt_wm_node,1,0),__LINE__, 3);
1504}
1505
1506st_handle_t *
1507get_root_id(leaf)
1508st_handle_t * leaf;
1509{
1510  /* wait until first worker has initialised the root */
1511  while(!root_initialised) poll_short_sleep(1000);
1512
1513  /* Now send first worker my leaf id  */
1514  send_node_msg((get_worker(1))->wm_aport_id, GET_ROOT, leaf);
1515
1516  /* wait until I have received my root id */
1517  while(!got_root_id) poll_short_sleep(1000);
1518  return &root_id;
1519}
1520
1521/* Only called by first worker.
1522   Send req_wid the root.
1523   Called in response to GET_ROOT message (see above)
1524*/
1525void wm_init_lodged(req_wid, root)
1526int req_wid;
1527st_handle_t *root;
1528{
1529  send_node_msg((get_worker(req_wid))->wm_aport_id,SET_ROOT, root);
1530}
1531
1532/* Called only by first worker.
1533   Inform wm that root node has been initialised
1534*/
1535void root_node_register(aport, root)
1536aport_id_t aport;
1537st_handle_t *root;
1538
1539{
1540  send_node_msg(aport, ROOT_NODE_REGISTER, root);
1541  root_id = *root;	/* init locally as well */
1542  got_root_id = 1;
1543}
1544
1545/*-----------------------------------------------------------*/
1546/* Session_time support functions */
1547/*-----------------------------------------------------------*/
1548
1549/* get the worker manager hostname and set the local_wm flag
1550   if we are on the same host */
1551int set_local_wm_flag()
1552{
1553  char hostname[MAXHOSTLEN];
1554
1555  mygethostname(hostname);
1556  got_wm_hostname = 0;
1557  send_simple_wm_message(wm_high_aport_id,REQ_WM_HOSTNAME,0);
1558  while (!got_wm_hostname)
1559    short_sleep(1000);
1560  if(strcmp(wm_hostname,hostname) == 0)
1561    local_wm = 1;
1562  else
1563    local_wm = 0;
1564}
1565
1566/* Set the start time if we on the same host as the worker manager */
1567int set_start_time()
1568{
1569  if(local_wm)
1570    {
1571      /* Only useful if the worker manager is on the same host */
1572      wm_start_time = 0;
1573      send_simple_wm_message(wm_high_aport_id,REQ_START_TIME,0);
1574      while(wm_start_time == 0);
1575    }
1576}
1577
1578double calc_elapsed_time()
1579{
1580  double elapsed;
1581
1582#ifdef HAVE_GETHRTIME
1583
1584  elapsed = (gethrtime() - wm_start_time) / 1000.0;
1585  elapsed = elapsed / 1000000.0;
1586
1587#else
1588#ifdef BSD_TIMES
1589
1590  struct timeb	realtime;
1591  (void) ftime(&realtime);
1592  elapsed = (realtime.time - wm_start_time) + (double)realtime.millitm/1000.0;
1593
1594#else
1595
1596  struct tms		dummy;
1597  clock_t		realtime;
1598
1599  if ((realtime = times(&dummy)) == (clock_t) -1)
1600    {
1601      return(0.0);
1602    }
1603  elapsed = (double) (realtime - wm_start_time) / clock_hz;
1604
1605#endif
1606#endif
1607  return(elapsed);
1608}
1609
1610/*---------------------------------------------------------------*/
1611/* Worker Manager builtins */
1612/*---------------------------------------------------------------*/
1613
1614/* If we are on the same host as the worker manager no message is
1615   sent */
1616double elapsed_session_time()
1617{
1618  if(local_wm)
1619    /* can get it locally */
1620      cur_time = calc_elapsed_time();
1621  else
1622    {
1623      /* have to sent a message */
1624      cur_time = 0.0;
1625      send_simple_wm_message(wm_high_aport_id,REQ_TIME,0);
1626      while(cur_time <= 0.0);
1627    }
1628  return(cur_time);
1629}
1630
1631int wm_command(command,hostname,workers)
1632char * command;
1633char * hostname;
1634long workers;
1635{
1636  amsg_t msg;
1637  amsg_data_t * msg_data;
1638  config_msg_t * config_msg;
1639  amsg_size_t size;
1640  int reduce_worker = 0;
1641
1642  size = sizeof(config_msg_t);
1643
1644  check_amsg(amsg_alloc(size,
1645			&msg_data,
1646			&msg),__LINE__, 2);
1647
1648  config_msg = (config_msg_t *) msg_data;
1649
1650  if(strcmp(command,"add") == 0)
1651    config_msg->header.msg_type = ADD_WORKERS;
1652  else if (strcmp(command,"sleep") == 0) {
1653    config_msg->header.msg_type = SLEEP_WORKERS;
1654    reduce_worker = 1;
1655  }
1656  else if (strcmp(command,"wake") == 0)
1657    config_msg->header.msg_type = WAKEUP_WORKERS;
1658  else
1659    return(0);
1660  config_msg->header.wid = ParallelWorker;
1661
1662  config_msg->workers = workers;
1663  (void) strcpy(config_msg->hostname,hostname);
1664
1665  config_ret = -1;
1666  check_amsg(amsg_send(wm_low_aport_id,msg,mdt_wm_config,1,0),
1667	     __LINE__, 3);
1668  while (config_ret < 0) poll_short_sleep(1000);
1669
1670  if (reduce_worker) (void) sch_reduce_worker(aports[SCH_APORT_NUMBER]);
1671
1672  return(config_ret == workers);
1673}
1674
1675
1676int
1677wm_host_status(hostname)
1678char *hostname;
1679{
1680  amsg_t msg;
1681  amsg_data_t * msg_data;
1682  host_name_msg_t * host_name_msg;
1683  amsg_size_t size;
1684
1685  size = sizeof(host_name_msg_t);
1686
1687  check_amsg(amsg_alloc(size,
1688			&msg_data,
1689			&msg),__LINE__, 2);
1690
1691  host_name_msg = (host_name_msg_t *) msg_data;
1692  host_name_msg->header.msg_type = HOST_STATUS_REQ;
1693  host_name_msg->header.wid = ParallelWorker;
1694
1695  (void) strcpy(host_name_msg->hostname,hostname);
1696
1697  status_ret = -1;
1698  check_amsg(amsg_send(wm_low_aport_id,msg,mdt_wm_nsrvname,1,0),
1699	     __LINE__, 3);
1700  while (status_ret < 0) poll_short_sleep(1000);
1701
1702  return(status_ret);
1703}
1704
1705int wm_status()
1706
1707{
1708
1709  status_ret = -1;
1710  send_simple_wm_message(wm_low_aport_id,STATUS_REQ,0);
1711  while (status_ret < 0) poll_short_sleep(1000);
1712
1713  return(status_ret);
1714}
1715
1716int
1717p_wm_interface(vcommand, tcommand)
1718value vcommand;
1719type tcommand;
1720{
1721  char *command;
1722
1723  Get_Name(vcommand,tcommand,command);
1724  if(strcmp(command,"on") == 0)
1725    send_simple_wm_message(wm_low_aport_id,START_INTERFACE,0);
1726  else if (strcmp(command,"off") == 0)
1727    send_simple_wm_message(wm_low_aport_id,REMOVE_INTERFACE,0);
1728  Succeed_;
1729}
1730
1731
1732int
1733p_wm_set(vcommand, tcommand,vhostname,thostname,vworkers,tworkers)
1734value vcommand;
1735type tcommand;
1736value vhostname;
1737type thostname;
1738value vworkers;
1739type tworkers;
1740{
1741  char * hostname;
1742  char *command;
1743
1744  Get_Name(vcommand,tcommand,command)
1745  Get_Name(vhostname,thostname,hostname)
1746  Check_Integer(tworkers);
1747  Succeed_If(wm_command(command,hostname,vworkers.nint));
1748}
1749
1750int
1751p_wm_get(vstatus,tstatus)
1752value vstatus;
1753type tstatus;
1754
1755{
1756  pword  *cur_mc, *prev_mc, *cur_tail, *cur_head;
1757  int i;
1758
1759  if (wm_status())
1760  {
1761    prev_mc = NULL;
1762    for(i = 0; i < status.num_machines; i++)
1763      {
1764	cur_mc = TG;
1765	Push_List_Frame()
1766	cur_head = TG;
1767	Push_List_Frame()
1768	Make_List(cur_mc,cur_head);
1769	Make_Integer(cur_head,status.machines[i].num_workers)
1770	cur_tail = cur_head + 1;
1771	cur_head = TG;
1772	Push_List_Frame()
1773	Make_List(cur_tail,cur_head);
1774	Make_Integer(cur_head,status.machines[i].num_awake)
1775        cur_tail = cur_head + 1;
1776        cur_head = TG;
1777	Push_List_Frame()
1778	Make_List(cur_tail,cur_head);
1779	Make_String(cur_head,status.machines[i].hostname);
1780	cur_tail = cur_head + 1;
1781	Make_Nil(cur_tail)
1782	cur_tail = cur_mc + 1;
1783	if (prev_mc == NULL)
1784	  Make_Nil(cur_tail)
1785	else
1786	  {
1787	    Make_List(cur_tail, prev_mc);
1788	  }
1789	prev_mc = cur_mc;
1790      }
1791    Return_Unify_List(vstatus,tstatus,cur_mc)
1792  }
1793  else
1794      Return_Unify_Nil(vstatus,tstatus)
1795}
1796
1797p_wm_get_ids(vhostname,thostname,vstatus, tstatus)
1798value vhostname, vstatus;
1799type thostname, tstatus;
1800{
1801  pword *awake_list, *sleep_list, *tail, *stat_list, * prev;
1802  char * hostname;
1803  int i;
1804
1805  Get_Name(vhostname,thostname,hostname)
1806  if (wm_host_status(hostname))
1807    {
1808      stat_list = TG;
1809      Push_List_Frame()
1810      tail = stat_list + 1;
1811      Make_Nil(tail)
1812      prev = NULL;
1813      if (host_status.num_asleep > 0)
1814	{
1815	  for(i = 0; i < host_status.num_asleep ; i++)
1816	    {
1817	      sleep_list = TG;
1818	      Push_List_Frame()
1819  	      Make_Integer(sleep_list,host_status.asleep_ids[i])
1820	      tail = sleep_list + 1;
1821	      if (prev == NULL)
1822		Make_Nil(tail)
1823	      else
1824		{
1825		  Make_List(tail,prev);
1826		}
1827	      prev = sleep_list;
1828	    }
1829	  Make_List(stat_list, sleep_list);
1830	}
1831      else
1832	{
1833	  Make_Nil(stat_list);
1834	}
1835
1836      prev = stat_list;
1837      stat_list = TG;
1838      Push_List_Frame()
1839      tail = stat_list + 1;
1840      Make_List(tail, prev);
1841
1842      prev = NULL;
1843      if (host_status.num_awake > 0)
1844	{
1845	  for(i = 0; i < host_status.num_awake; i++)
1846	    {
1847	      awake_list = TG;
1848	      Push_List_Frame()
1849	      Make_Integer(awake_list,host_status.awake_ids[i])
1850	      tail = awake_list + 1;
1851	      if (prev == NULL)
1852		Make_Nil(tail)
1853	      else
1854		{
1855		  Make_List(tail,prev);
1856		}
1857	      prev = awake_list;
1858	    }
1859	  Make_List(stat_list,awake_list);
1860	}
1861      else
1862	{
1863	  Make_Nil(stat_list);
1864	}
1865
1866      Return_Unify_List(vstatus,tstatus,stat_list);
1867    }
1868  else
1869      Return_Unify_Nil(vstatus,tstatus)
1870}
1871
1872void wm_set_worker_info(wid, infotype, bufsize, buf)
1873int wid;
1874int infotype;
1875int bufsize;
1876void_ptr buf;
1877
1878{
1879 amsg_t msg;
1880 amsg_data_t * msg_data;
1881 worker_info_msg_t * winfo_mess;
1882 amsg_size_t size;
1883 int dontack = 0;
1884
1885 if (wid < 0)   /* dont send acknowledge */
1886   {
1887     dontack = 1;
1888     wid = -wid;
1889   }
1890 size = sizeof(worker_info_msg_t) + bufsize;
1891
1892 check_amsg(amsg_alloc(size,
1893		       &msg_data,
1894		       &msg),__LINE__, 2);
1895
1896 winfo_mess = (worker_info_msg_t *) msg_data;
1897 winfo_mess->header.msg_type = WORKER_INFO_SET;
1898 winfo_mess->header.wid = ParallelWorker;
1899 if (dontack)
1900   winfo_mess->req_wid = -ParallelWorker;
1901 else
1902   winfo_mess->req_wid = ParallelWorker;
1903 winfo_mess->pro_wid = wid;
1904 winfo_mess->infotype = infotype;
1905 winfo_mess->size = bufsize;
1906 bcopy((char *) buf, (char *) (winfo_mess + 1), bufsize);
1907
1908 if (dontack)
1909   check_amsg_soft(amsg_send(wm_low_aport_id,msg,MDT_BYTE,size,0),__LINE__, 3);
1910 else   {
1911   worker_info_ret = -1;
1912   check_amsg_soft(amsg_send(wm_low_aport_id,msg,MDT_BYTE,size,0),__LINE__, 3);
1913   while(worker_info_ret < 0)
1914     poll_short_sleep(100);
1915 }
1916}
1917
1918
1919void
1920wm_get_worker_info( wid, infotype, bufsize, buf)
1921int wid;
1922int infotype;
1923int bufsize;
1924void_ptr buf;
1925{
1926  amsg_t msg;
1927  amsg_data_t * msg_data;
1928  amsg_size_t size;
1929  worker_info_msg_t * worker_info_mess;
1930
1931  size = sizeof(worker_info_msg_t);
1932
1933  check_amsg(amsg_alloc(size,
1934			&msg_data,
1935			&msg),__LINE__, 2);
1936
1937  worker_info_mess = (worker_info_msg_t *) msg_data;
1938  worker_info_mess->header.msg_type = WORKER_INFO_GET;
1939  worker_info_mess->header.wid = ParallelWorker;
1940  worker_info_mess->infotype = infotype;
1941  worker_info_mess->pro_wid = wid;
1942  worker_info_mess->req_wid = ParallelWorker;
1943  worker_info_buf = buf;
1944  worker_info_bufsize = bufsize;
1945  worker_info_ret = -1;
1946  check_amsg_soft(amsg_send(wm_low_aport_id,msg,MDT_BYTE,size,0),__LINE__, 3);
1947
1948  while (worker_info_ret < 0)
1949    poll_short_sleep(1000);
1950
1951}
1952
1953send_worker_info(recwid,infotype,bufsize,buf)
1954int recwid;
1955int infotype;
1956int bufsize;
1957void_ptr buf;
1958{
1959  amsg_t msg;
1960  amsg_data_t * msg_data;
1961  amsg_size_t size;
1962  worker_info_msg_t * worker_info_mess;
1963
1964  if (recwid > 0)
1965    {
1966      size = sizeof(worker_info_msg_t) + bufsize;
1967
1968      check_amsg(amsg_alloc(size,
1969			    &msg_data,
1970			    &msg),__LINE__, 2);
1971
1972      worker_info_mess = (worker_info_msg_t *) msg_data;
1973      worker_info_mess->header.msg_type = WORKER_INFO_NOTIFY;
1974      worker_info_mess->header.wid = ParallelWorker;
1975      worker_info_mess->req_wid = recwid;
1976      worker_info_mess->pro_wid = ParallelWorker;
1977      worker_info_mess->infotype = infotype;
1978      worker_info_mess->size = bufsize;
1979      bcopy((char *) buf, (char *) (msg_data + sizeof(worker_info_msg_t)),
1980	bufsize);
1981      check_amsg_soft(amsg_send(wm_low_aport_id,msg,MDT_BYTE,size,0),__LINE__, 3);
1982    }
1983}
1984
1985
1986
1987get_worker_info(infotype,infosize,infoval)
1988int infotype;
1989int *infosize;
1990void_ptr * infoval;
1991
1992{
1993
1994  switch (infotype) {
1995
1996  case SCHED_INFO:  /* scheduler */
1997    sch_get_info(aports[SCH_APORT_NUMBER], infosize, infoval);
1998    break;
1999
2000  default:
2001    printf("%d: Unimplemented infotype = %d\n", ParallelWorker, infotype);
2002  }
2003}
2004
2005set_worker_info(req_wid, infotype, infoval)
2006int req_wid;  /* worker id of requesting worker */
2007int infotype;
2008void_ptr infoval;
2009{
2010  if (req_wid < 0)
2011    req_wid = - req_wid;
2012  switch (infotype) {
2013
2014  case SCHED_INFO: /* scheduler */
2015    sch_set_info(aports[SCH_APORT_NUMBER],infoval);
2016    break;
2017
2018  default:
2019    printf("%d: Unimplemented infotype = %d\n", ParallelWorker, infotype);
2020  }
2021}
2022
2023/* set a specific worker to sleep */
2024
2025p_set_sleeping(vwid, twid)
2026value vwid;
2027type twid;
2028
2029{
2030  amsg_t msg;
2031  amsg_data_t * msg_data;
2032  wm_simple_msg_t * wm_simple_msg;
2033  amsg_size_t size;
2034
2035  Check_Integer(twid);
2036
2037  size = sizeof(wm_simple_msg_t);
2038
2039  check_amsg(amsg_alloc(size,
2040			&msg_data,
2041			&msg),__LINE__, 2);
2042
2043  wm_simple_msg = (wm_simple_msg_t *) msg_data;
2044  wm_simple_msg->header.msg_type = SET_ONE_SLEEPING;
2045  wm_simple_msg->header.wid = ParallelWorker;
2046  wm_simple_msg->msg_value = vwid.nint;
2047  check_amsg_soft(amsg_send(wm_low_aport_id,msg,mdt_wm_simple,1,0),__LINE__, 3);
2048
2049  Succeed_;
2050}
2051
2052/* Sleep for n usecs and poll for messages*/
2053
2054void poll_short_sleep(n)
2055int n;
2056{
2057  bmsg_optval_t optval;
2058
2059  short_sleep(n);
2060  bmsg_get_option(BMSG_INTRA_DOMAIN_TRIGGERING, &optval);
2061  if(optval == BMSG_OFF)
2062    bmsg_trigger(BMSG_INTRA_DOMAIN);
2063}
2064
2065/* Ask for worker statistics */
2066
2067request_wstat(wid, stat)
2068int wid;
2069struct worker_stat_ext * stat;
2070{
2071  worker_ptr w;
2072
2073  w = get_worker(wid);
2074  wstat_rec_buf = stat;
2075  worker_info_ret = -1;
2076  send_simple_wm_message(w->wm_aport_id, WSTAT_REQ, 0);
2077  while (worker_info_ret < 0)
2078    poll_short_sleep(1000);
2079}
2080
2081/* Ask remote worker (wid) to reset worker statistics and wait for ack*/
2082reset_wstat(wid)
2083int wid;
2084{
2085  worker_ptr w;
2086
2087  w = get_worker(wid);
2088
2089  worker_info_ret = -1;
2090  send_simple_wm_message(w->wm_aport_id, WSTAT_RESET, 0);
2091  while (worker_info_ret < 0)
2092    poll_short_sleep(1000);
2093
2094}
2095
2096/* reply to WSTAT_REQ message from worker wid */
2097send_wstat(wid)
2098int wid;
2099{
2100  amsg_t msg;
2101  amsg_data_t * msg_data;
2102  wstat_msg_t * wstat_msg;
2103  amsg_size_t size;
2104  struct worker_stat_ext * local_stat;
2105  worker_ptr w;
2106
2107  if (wid != 0) /* not a Worker Manager request */
2108    w = get_worker(wid);
2109  size = sizeof(wstat_msg_t);
2110
2111  check_amsg(amsg_alloc(size,
2112			&msg_data,
2113			&msg),__LINE__, 2);
2114
2115  wstat_msg = (wstat_msg_t *) msg_data;
2116  wstat_msg->header.msg_type = WSTAT_RET;
2117  wstat_msg->header.wid = ParallelWorker;
2118
2119  local_stat = (struct worker_stat_ext *) &(wstat_msg->stat);
2120  get_worker_stat(local_stat);
2121  check_amsg(amsg_send(wid ? w->wm_aport_id : wm_high_aport_id,
2122		       msg,mdt_wm_wstat,1,0),__LINE__, 3);
2123}
2124
2125/* tracing messages */
2126#ifdef WORKER_TRACING
2127get_trace_header(wid, trace_header)
2128int wid;
2129trace_header_t * trace_header;
2130{
2131  worker_ptr w;
2132  w = get_worker(wid);
2133
2134  trace_header_buf = trace_header;
2135  worker_info_ret = -1;
2136  send_simple_wm_message(w->wm_aport_id, GET_TRACE_HEADER, 0);
2137
2138  while(worker_info_ret < 0)
2139    poll_short_sleep(1000);
2140}
2141
2142set_trace_header(wid, theader)
2143int wid;
2144trace_header_t * theader;
2145{
2146  worker_info_ret = -1;
2147  send_trace_header(wid, SET_TRACE_HEADER, theader);
2148  while(worker_info_ret < 0)
2149    poll_short_sleep(1000);
2150}
2151
2152send_trace_header(wid,msg_type, theader)
2153int wid;
2154int msg_type;
2155trace_header_t * theader;
2156{
2157  worker_ptr w = get_worker(wid);
2158  amsg_t msg;
2159  amsg_data_t * msg_data;
2160  trace_msg_t * trace_msg;
2161  amsg_size_t size;
2162
2163  size = sizeof(trace_msg_t);
2164
2165  check_amsg(amsg_alloc(size,
2166			&msg_data,
2167			&msg),__LINE__, 2);
2168
2169  trace_msg = (trace_msg_t *) msg_data;
2170  trace_msg->header.msg_type = msg_type;
2171  trace_msg->header.wid = ParallelWorker;
2172  trace_msg->trace_header = *theader;
2173  check_amsg(amsg_send(w->wm_aport_id,msg,mdt_wm_trace,1,0),__LINE__, 3);
2174}
2175#endif
2176