1/* BEGIN LICENSE BLOCK
2 * Version: CMPL 1.1
3 *
4 * The contents of this file are subject to the Cisco-style Mozilla Public
5 * License Version 1.1 (the "License"); you may not use this file except
6 * in compliance with the License.  You may obtain a copy of the License
7 * at www.eclipse-clp.org/license.
8 *
9 * Software distributed under the License is distributed on an "AS IS"
10 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied.  See
11 * the License for the specific language governing rights and limitations
12 * under the License.
13 *
14 * The Original Code is  The ECLiPSe Constraint Logic Programming System.
15 * The Initial Developer of the Original Code is  Cisco Systems, Inc.
16 * Portions created by the Initial Developer are
17 * Copyright (C) 1994-2006 Cisco Systems, Inc.  All Rights Reserved.
18 *
19 * Contributor(s):
20 *
21 * END LICENSE BLOCK */
22
23/**********************************************************************
24**      System: Parallel Eclipse
25**        File: wm.c
26**      Author: Shyam Mudambi
27**		Liang-Liang Li
28** Description: Worker Manager for Parallel Eclipse
29***********************************************************************/
30
31/*LINTLIBRARY*/
32#include <sys/types.h>
33#include <sys/socket.h>
34#include <netdb.h>
35#include <netinet/in.h>
36#include <signal.h>
37#include <stdlib.h>
38#include <stdio.h>
39#include <sys/time.h>
40#include <sys/times.h>
41#include <sys/stat.h>
42#include <errno.h>
43#include <math.h>
44#include <fcntl.h>
45#include "config.h"
46
47#ifdef HAVE_STRING_H
48#include <string.h>
49#else
50extern char *strcpy();
51extern void *memcpy();
52#endif
53
54#ifdef BSD_TIMES
55#include <sys/timeb.h>
56#endif
57
58#ifdef HAVE_UNISTD_H
59#include <unistd.h>
60#endif
61
62#ifdef PATH_IN_LIMITS
63#  include 	<limits.h>
64#  define MAX_PATH_LEN	PATH_MAX
65#else
66#  include <sys/param.h>
67#  define MAX_PATH_LEN	MAXPATHLEN
68#endif
69
70#if !defined(HAVE_GETHOSTID)
71#include <sys/utsname.h>	/* for uname() */
72#endif
73
74#ifdef HAVE_SYS_SYSTEMINFO_H
75#include <sys/systeminfo.h>
76#endif
77
78#include "memman.h"
79#include "pds.h"
80#include "nsrv.h"
81
82#ifndef _PDS_TYPES_H_
83typedef long		*void_ptr;
84#endif /* _PDS_TYPES_H */
85
86#include "sch_types.h"
87#include "trace.h"
88#include "wm_msgs.h"
89#include "wm_types.h"
90
91/* #define DEBUG_MPS*/
92
93#if defined(__STDC__)
94    extern void	short_sleep(int);
95    extern char *eclipsehome(void);
96#else /* __STDC__ */
97    extern void short_sleep();
98    extern char *eclipsehome();
99#endif /* __STDC__ */
100
101/*------------------------------------------------------*/
102/* Types and Constant Definitions */
103/*------------------------------------------------------*/
104
105#define STATIC_PORTS 4
106
107#define WM_PORT_NAME		"Worker_Manager"
108#define WORKER_PORT_NAME	"parallel_eclipse"
109
110/* Static Port Numbers */
111/* A note by LLL: why do we need four aports of different interrupt levels?
112** First we need two separate ports for halt-system with highest priority,
113** in order to implement a two-phase halting protocol;
114** Then for the service of adding a new worker, there are at least two
115** messages involved: work-creation request from somewhere (wm-window or
116** within a worker) and creation-done acknowledgment from the just created
117** worker. A sort of 'atomic adding' is desired so that messages requesting
118** worker-management status get well-defined info, i.e. avoiding such
119** wm-status as that with half-added workers. Then having a separate aport
120** with higher priority serves this purpose. I.e. adding process can loop
121** to wait for the acknowledgement to avoid handling any messages
122** (wm-status requests, adding a worker, etc.), until the correspoding
123** acknowledgement is received (via a separate but higher-prirority port).
124** Therefore come into being four ports: halt2, halt1, high and low.
125** Accordingly, we let messages of adding workers as well as those
126** wm-status requests or wm-status updates go through the low port (the
127** underlying message passing system decrees a mutually exclusive handling
128** of the messages from a common aport) while the acknowledgement and other
129** wm-status insensitive messages go through the higher one. For example,
130** requests of reading various clocks administered by the wm.
131** For the detailed grouping, see the high/low-port-notify functions.
132** Note also that the higher ports haves to be attached with an asynchronous
133** message handler.
134*/
135
136#define HALT1_APORT_NUMBER 0
137#define HALT2_APORT_NUMBER 1
138#define HIGH_APORT_NUMBER 2
139#define LOW_APORT_NUMBER 3
140
141#define Notify(text) { if (wm_verbose) { printf(text); }}
142
143
144/*------------------------------------------------------*/
145/* Global Variables */
146/*------------------------------------------------------*/
147
148/* Interface (Tk) functions */
149int  tk_OneEvent(), tk_geval();
150
151char * map_dir;
152
153st_handle_t root_id;
154int root_id_sender;
155
156/* for accessing the command line in when asynchronously adding workers */
157char          **Argv;
158int             Argc;
159
160/* Variables set by command-line arguments */
161int dont_fork; /* set by -wnf flag */
162int interactive = 0; /* set by -wmi flag */
163int usemcfile = 0; /* set by -wf flag - if set we user .eclipse_machines
164		      file for startup configuration */
165static int wm_verbose_startup = 0; /* set by -wv flag */
166static int num_workers = 0;        /* set by -w flag */
167static int wm_verbose = 0;    /* will print each message recd if set to 1 */
168
169/* Name server Global Ids */
170nsrv_name_t domain_name;
171nsrv_name_t bdomain_key;
172nsrv_name_t session_key;
173nsrv_name_t wm_signature;
174nsrv_name_t wm_port_name;
175nsrv_name_t wm_halt1_aport_name,
176	    wm_halt2_aport_name,
177	    wm_high_aport_name,
178	    wm_low_aport_name;
179
180/* MPS Globals */
181bdomain_id_t domain_id;
182static bport_id_t wm_bport_id;
183static int wm_pid;
184bdomain_t bdomain;  /* the domain data structure */
185static aport_id_t local_aport_ids[STATIC_PORTS];
186static void (* notify [STATIC_PORTS]) ();
187
188
189hdr_mc_list_t mc_list;  /* Holds all the worker-specific information */
190char *init_host = NULL; /* may be set by -h flag */
191char *wm_host = NULL;   /* machine on which worker_manager is running */
192char * nsrv_host = NULL; /* machine on which name server is running */
193unsigned  nsrv_port_number = 0; /* port number of nsrv */
194static char *init_exec = NULL; /* may be set by -wx flag */
195
196static char mps_map_file[MAX_PATH_LEN] = "/tmp/mps.map";
197
198/* Volatiles - used for waiting for particular events */
199static volatile int init_acks = 0;
200static volatile int num_exited = 0;
201static volatile int closed_bports = 0;
202static volatile int got_root_id = 0;
203static volatile int wstat_received;
204
205/* Start time - used for sending session time calcuation */
206#ifdef HAVE_GETHRTIME
207static hrtime_t start_time;
208#else
209#ifdef BSD_TIMES
210static time_t start_time;
211#else
212static clock_t start_time;
213#endif
214#endif
215int clock_hz;
216
217#ifdef HAVE_SIGPROCMASK
218sigset_t sigio_mask;
219#endif
220
221/*-----------------------------------------------------------------*/
222/*            Machine list handling routines                          */
223/*-----------------------------------------------------------------*/
224
225void init_mc_list()
226{
227  mc_list.num_machines = 0;
228  mc_list.next_id = 1;
229  mc_list.total_workers = 0;
230}
231
232machine_t *get_mc(hostname)
233     char * hostname;
234{
235
236 int i;
237 int found = 0;
238
239  for(i = 0; (i < mc_list.num_machines) && !found; i++)
240    if(strcmp(hostname,mc_list.machines[i].hostname) == 0)
241      return(&(mc_list.machines[i]));
242
243 return(NULL);
244}
245
246machine_t *add_mc(hostname,exec_file,auto_start)
247char * hostname;
248char * exec_file;
249int auto_start;
250{
251  int mc_id;
252  machine_t * mc;
253
254  if((mc = get_mc(hostname)) == NULL)
255    {
256      Disable_Int();
257      mc_id = mc_list.num_machines;
258      mc_list.machines[mc_id].num_workers = 0;
259      mc_list.machines[mc_id].num_awake = 0;
260      mc_list.machines[mc_id].auto_start = auto_start;
261      strcpy(mc_list.machines[mc_id].hostname, hostname);
262      strcpy(mc_list.machines[mc_id].exec_file, exec_file);
263      sprintf(mc_list.machines[mc_id].heap_map_file,"%s/%d.%s.heap.map",
264	      map_dir,wm_pid,hostname);
265      mc_list.machines[mc_id].list = NULL;
266      mc_list.num_machines++;
267      Enable_Int();
268      return(&(mc_list.machines[mc_id]));
269    }
270  else
271    {
272      Disable_Int();
273      mc->auto_start = auto_start;
274      strcpy(mc->exec_file,exec_file);
275      Enable_Int();
276      return(mc);
277    }
278}
279
280int init_worker(hostname,redraw)
281char *hostname;
282int redraw;
283
284{
285  worker_ptr w;
286  machine_t *mc;
287
288  mc = get_mc(hostname);
289
290  if (mc == NULL)
291    {
292      fprintf(stderr,
293	      "Error: host %s not in Worker Manager host list\n",hostname);
294      return(0);
295    }
296
297  w = (worker_ptr) malloc(sizeof(worker_t));
298  w->index = mc_list.next_id;
299  sprintf(w->bport_name,"worker%d",w->index);
300  w->status = NOT_READY;
301  w->start_wstat.job_count = -1;
302  w->pid = 0;
303  Disable_Int();
304  if(mc->list == NULL)
305    {
306      w->next = NULL;
307      w->first = 1;
308    }
309  else
310    {
311      w->next = mc->list;
312      w->first = 0;
313    }
314
315  mc->list = w;
316  mc->num_workers++;
317  mc_list.total_workers++;
318  mc->num_awake++;
319  mc_list.next_id++;
320  Enable_Int();
321  if (interactive && redraw) {
322    tk_geval("remake_status");
323  }
324  return(w->index);
325}
326
327worker_ptr get_worker(index)
328     int index;
329{
330  worker_ptr w;
331  int i;
332
333  w = NULL;
334  for(i = 0; (i < mc_list.num_machines) && (w == NULL); i++)
335      for(w = mc_list.machines[i].list; (w != NULL) && (w->index != index);
336	  w = w->next);
337
338  if (w == NULL)
339    fprintf(stderr, "WM: get_worker - no worker with index %d\n",index);
340  return(w);
341}
342
343machine_t * get_mc_id(index)
344int index;
345{
346  worker_ptr w;
347  int i;
348
349  w = NULL;
350  for(i = 0; (i < mc_list.num_machines) && (w == NULL); i++)
351      for(w = mc_list.machines[i].list; w != NULL ; w = w->next)
352	if (w->index == index)
353	  return(&(mc_list.machines[i]));
354
355  if (w == NULL)
356    {
357      fprintf(stderr, "WM: get_mc_id - no worker with index %d\n",index);
358      return(NULL);
359    }
360}
361
362
363/*-----------------------------------------------------------------*/
364/*               Error handling routines                           */
365/*-----------------------------------------------------------------*/
366
367void check_nsrv(nret,line,err)
368nsrv_ret_t nret;
369int line;
370int err;
371
372{
373  if (nret != NSRV_OK)
374    {
375      fprintf(stderr,"**Worker Manager: Name Server (nsrv) Fatal Error**\n");
376      switch(err)
377	{
378	case 1:
379	  fprintf(stderr,"Looks like your nameserver has crashed.\n");
380	  fprintf(stderr,"Remove all files in $ECLIPSETMP ");
381	  fprintf(stderr,"and restart peclipse.\n");
382	  break;
383	case 2:
384	  fprintf(stderr,"Could not register information with nsrv.\n");
385	  break;
386	case 3:
387	  fprintf(stderr,"nsrv_new_bport_id call failed.\n");
388	  break;
389	case 4:
390	  fprintf(stderr,"nsrv_bdomain_look_up failed.\n");
391	  break;
392	case 5:
393	  fprintf(stderr,"nsrv_new_bdomain_id call failed.\n");
394	  break;
395	default:
396	  break;
397	}
398      fprintf(stderr,"pid = %d nret = %d at line %d in file %s\n",
399	     getpid(),nret,line,__FILE__);
400      exit(1);
401    }
402}
403
404void check_nsrv_soft(nret,line,err)
405nsrv_ret_t nret;
406int line;
407int err;
408{
409  if (nret != NSRV_OK)
410    {
411      fprintf(stderr,"**Worker Manager: Name Server (nsrv) Error** \n");
412      switch(err)
413	{
414	case 1:
415	  fprintf(stderr,"Could not deregister nameserver information");
416	  break;
417	case 2:
418	  fprintf(stderr,"Could not free nameserver ids");
419	  break;
420	default:
421	  break;
422	}
423      fprintf(stderr,"pid = %d nret = %d at line %d in file %s\n",
424	     getpid(),nret,line,__FILE__);
425      fprintf(stderr,"Trying to continue execution..\n");
426    }
427}
428
429void check_bmsg(bret,line,err)
430bmsg_ret_t bret;
431int line;
432int err;
433{
434  if (bret != BMSG_OK)
435    {
436      fprintf(stderr,"**Worker Manager: MPS B-layer Fatal Error: Aborting!\n");
437      switch(err)
438	{
439	case 1:
440	  fprintf(stderr,"Could not initialize Message Passing System.\n");
441	  break;
442	case 2:
443	  fprintf(stderr,"bport_port call failed.\n");
444	  break;
445	default:
446	  break;
447	}
448      fprintf(stderr,"pid = %d bret = %d at line %d in file %s\n",
449	     getpid(),bret,line,__FILE__);
450      exit(2);
451    }
452}
453
454void check_amsg(aret,line,err)
455amsg_ret_t aret;
456int line;
457int err;
458{
459  if (aret != AMSG_OK)
460    {
461      fprintf(stderr,"**Worker Manager: MPS A-Layer Fatal Error: Aborting!");
462      switch(err)
463	{
464	case 1:
465	  fprintf(stderr,"Could not initialize Message Passing System.\n");
466	  break;
467	case 2:
468	  fprintf(stderr,"Could not allocate message buffer.");
469	  fprintf(stderr,"Probably no memory left.\n");
470	  break;
471	case 3:
472	  fprintf(stderr,"Error in amsg_send.\n");
473	case 4:
474	  fprintf(stderr,"Error in amsg_type_define\n");
475	default:
476	  break;
477	}
478      fprintf(stderr,"pid = %d aret = %d at line %d in file %s\n",
479	     getpid(),aret,line,__FILE__);
480      exit(3);
481    }
482}
483
484void check_amsg_soft(aret,line,err)
485amsg_ret_t aret;
486int line;
487int err;
488{
489  if (aret != AMSG_OK)
490    {
491      fprintf(stderr,"**Worker Manager: MPS A-Layer Error: Warning");
492      switch(err)
493	{
494	case 1:
495	  fprintf(stderr,"Could not free message buffer.\n");
496	  break;
497	case 2:
498	  fprintf(stderr,"Error in recieving messages.\n");
499	  break;
500	case 3:
501	  fprintf(stderr,"Error in amsg_send.\n");
502	default:
503	  break;
504	}
505      fprintf(stderr,"pid = %d aret = %d at line %d in file %s\n",
506	     getpid(),aret,line,__FILE__);
507      fprintf(stderr,"Trying to continue execution..\n");
508    }
509}
510
511void wm_abort_error(msg)
512char * msg;
513{
514  fprintf(stderr,"** Worker Manager : Fatal error**\n");
515  fprintf(stderr,"%s\n",msg);
516}
517
518/*----------------------------------------------------------------*/
519/* Signal Handling */
520/*----------------------------------------------------------------*/
521
522static void
523handle_sigio()
524{
525    /* unblock SIGIO and SIGPOLL signals */
526#ifdef HAVE_SIGPROCMASK
527    (void) sigprocmask(SIG_UNBLOCK, &sigio_mask, (sigset_t *) 0);
528#else
529    {
530	int mask;
531#ifdef SIGPOLL
532	mask = sigblock(sigmask(SIGIO) | sigmask(SIGPOLL));
533	sigsetmask(mask & ~(sigmask(SIGIO) | sigmask(SIGPOLL)));
534#else
535	mask = sigblock(sigmask(SIGIO));
536	sigsetmask(mask & ~sigmask(SIGIO));
537#endif
538    }
539#endif
540    (void) bmsg_trigger((BMSG_INTER_DOMAIN |
541		    BMSG_INTRA_DOMAIN  ));
542}
543
544
545RETSIGTYPE
546sigio_handler()
547{
548    if (InterruptsDisabled) {
549        Set_Interrupts_Pending();
550    } else {
551	handle_sigio();
552    }
553}
554
555static void
556delayed_signal_handler()
557{
558    Clr_Interrupts_Pending();
559    handle_sigio();
560}
561
562
563/*-----------------------------------------------------------*/
564/*  Panic and warning routines */
565/*-----------------------------------------------------------*/
566
567void
568mem_panic(what, where)
569    char * what;
570    char * where;
571{
572    fprintf(stderr,"Panic: %s in %s\n", what, where);
573    exit(-1);
574}
575
576/*ARGSUSED*/
577void amsg_warn(msg_warn, culprit)
578     amsg_warn_t msg_warn;
579     aport_id_t culprit;
580{
581/*    printf("%d: amsg_warn: ...\n", bport_self());*/
582}
583
584/*ARGSUSED*/
585void
586amsg_panic(msg_panic,culprit)
587    amsg_panic_t msg_panic;
588    aport_id_t culprit;
589{
590}
591
592/*ARGSUSED*/
593void
594amsg_error(msg_error,culprit)
595    amsg_error_t msg_error;
596    aport_id_t culprit;
597{
598}
599
600/*ARGSUSED*/
601#if defined(__STDC__)
602void bmsg_warn(bmsg_warn_t msg_warn,
603	       bport_id_t culprit)
604#else
605void
606bmsg_warn(msg_warn, culprit)
607    bmsg_warn_t msg_warn;
608    bport_id_t culprit;
609#endif
610{
611/*
612    printf("Worker Manager: bmsg_warn: %d :error no: %d ...\n",
613           culprit,msg_warn);
614*/
615}
616
617/*ARGSUSED*/
618#if defined(__STDC__)
619void bmsg_panic(bmsg_warn_t msg_panic,
620	       bport_id_t culprit)
621#else
622void
623bmsg_panic(msg_panic,culprit)
624    bmsg_panic_t msg_panic;
625    bport_id_t culprit;
626#endif
627{
628    printf("Worker Manager: bmsg_panic: %d error no: %d\n",
629           culprit,msg_panic);
630}
631
632/*ARGSUSED*/
633#if defined(__STDC__)
634void bmsg_error(bmsg_warn_t msg_error,
635	       bport_id_t culprit)
636#else
637void
638bmsg_error(msg_error,culprit)
639    bmsg_error_t msg_error;
640    bport_id_t culprit;
641#endif
642{
643    switch (msg_error) {
644        case BMSG_WEP_PDIED :
645            if (culprit == NSRV_BPORT_ID)
646                printf("Worker Manager: bmsg_error: name server died !\n");
647            else
648                printf("Worker Manager: bmsg_error: bport %d died !\n",
649		       culprit);
650            return;
651        default :
652            break;
653    }
654    printf("Worker Manager: unknown bmsg_error: culprit %d, error no %d\n",
655           culprit,msg_error);
656}
657
658/*ARGSUSED*/
659void
660bmem_ack(mem_id,mem_primitive,ret)
661    bmem_id_t mem_id;
662    bmem_primitive_t mem_primitive;
663    bmsg_ret_t ret;
664{
665    printf("%d: bmem_ack: ...\n", bport_self());
666}
667
668/*ARGSUSED*/
669#if defined(__STDC__)
670void
671bmem_notify(bport_id_t port_id,
672	    bmem_primitive_t mem_primitive,
673	    bmem_address_t mem_address,
674	    bmem_size_t mem_data_size)
675#else
676void
677bmem_notify(port_id,mem_primitive,mem_address,mem_data_size)
678    bport_id_t port_id;
679    bmem_primitive_t mem_primitive;
680    bmem_address_t mem_address;
681    bmem_size_t mem_data_size;
682#endif
683{
684    printf("%d: bmem_notify: ...\n", bport_self());
685}
686
687/*----------------------------------------------------------------*/
688/* MPS B-layer port notify and acknowledge routines               */
689/*----------------------------------------------------------------*/
690
691/*ARGSUSED*/
692#if defined(__STDC__)
693void
694bport_notify(bport_id_t port_id,
695	     bport_primitive_t port_primitive)
696#else
697void
698bport_notify(port_id, port_primitive)
699    bport_id_t port_id;
700    bport_primitive_t port_primitive;
701#endif
702{
703    switch (port_primitive) {
704        case BPORT_OPEN :
705            break;
706        case BPORT_CLOSE :
707	  closed_bports++;
708            break;
709        case BPORT_BLOCK :
710            break;
711        case BPORT_UNBLOCK :
712            break;
713        default:
714            break;
715    }
716}
717
718/*ARGSUSED*/
719#if defined(__STDC__)
720void
721bport_ack(bport_id_t port_id,
722	  bport_primitive_t port_primitive,
723	  bmsg_ret_t ret)
724#else
725void
726bport_ack(port_id, port_primitive, ret)
727    bport_id_t port_id;
728    bport_primitive_t port_primitive;
729    bmsg_ret_t ret;
730#endif
731{
732
733    if (ret != BMSG_OK) {
734        switch (port_primitive) {
735            case BPORT_OPEN :
736                break;
737            case BPORT_CLOSE :
738                break;
739            case BPORT_BLOCK :
740                break;
741            case BPORT_UNBLOCK :
742                break;
743            default:
744                break;
745        }
746	return;
747    }
748}
749
750void
751bproc_trigger(port)
752    bport_t * port;
753{
754    if (port->bpid == wm_pid)
755    {
756	(void) bmsg_trigger(BMSG_INTRA_DOMAIN);
757    }
758    else if (kill(port->bpid, SIGIO) != 0)
759    {
760	fprintf(stderr, "bport %d died\n", (int) port->bport_id);
761    }
762}
763
764/*-----------------------------------------------------------------*/
765/*       Initialise Message Types                                  */
766/*-----------------------------------------------------------------*/
767
768amsg_type_t mdt_wm_header;
769amsg_type_t mdt_wm_simple;
770amsg_type_t mdt_wm_nsrvname;
771amsg_type_t mdt_wm_config;
772amsg_type_t mdt_wm_node;
773amsg_type_t mdt_sthandlet;
774amsg_type_t mdt_wm_mcstatus;
775amsg_type_t mdt_wm_status;
776amsg_type_t mdt_wm_hoststatus;
777amsg_type_t mdt_wm_workerinfo;
778amsg_type_t mdt_wm_time;
779amsg_type_t mdt_wm_starttime;
780amsg_type_t mdt_wm_portname;
781amsg_type_t mdt_wm_wstat;
782amsg_type_t mdt_wstat;
783
784
785wm_types_init()
786{
787  amsg_typedef_t template[20];
788
789  /* wm_msg_header_t */
790  template[0] = MDT_BEGIN;
791  template[1] = MDT_STRUCT_OPEN;
792  template[2] = MDT_INT32;
793  template[3] = MDT_INT32;
794  template[4] = MDT_STRUCT_CLOSE;
795  template[5] = MDT_END;
796
797  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 1,
798		       template, &mdt_wm_header), __LINE__, 4);
799
800  /* wm_simple_msg_t */
801  template[0] = MDT_BEGIN;
802  template[1] = MDT_STRUCT_OPEN;
803  template[2] = mdt_wm_header;
804  template[3] = MDT_INT32;
805  template[4] = MDT_STRUCT_CLOSE;
806  template[5] = MDT_END;
807
808  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 2,
809		       template, &mdt_wm_simple), __LINE__, 4);
810
811  /* host_name_msg_t structures */
812  template[0] = MDT_BEGIN;
813  template[1] = MDT_STRUCT_OPEN;
814  template[2] = mdt_wm_header;
815  template[3] = MDT_NSRVNAME;
816  template[4] = MDT_STRUCT_CLOSE;
817  template[5] = MDT_END;
818
819  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 3,
820		       template, &mdt_wm_nsrvname), __LINE__, 4);
821
822  /* st_handle_t */
823
824  template[0] = MDT_BEGIN;
825  template[1] = MDT_STRUCT_OPEN;
826  template[2] = MDT_APORTID;
827  template[3] = MDT_UINT32;
828  template[4] = MDT_UINT32;
829  template[5] = MDT_STRUCT_CLOSE;
830  template[6] = MDT_END;
831
832  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 4,
833		       template, &mdt_sthandlet), __LINE__, 4);
834
835  /* node_msg_t */
836
837  template[0] = MDT_BEGIN;
838  template[1] = MDT_STRUCT_OPEN;
839  template[2] = mdt_wm_header;
840  template[3] = mdt_sthandlet;
841  template[4] = MDT_STRUCT_CLOSE;
842  template[5] = MDT_END;
843
844  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 5,
845		       template, &mdt_wm_node), __LINE__, 4);
846
847  /* config_msg_t */
848  template[0] = MDT_BEGIN;
849  template[1] = MDT_STRUCT_OPEN;
850  template[2] = mdt_wm_header;
851  template[3] = MDT_INT32;
852  template[4] = MDT_NSRVNAME;
853  template[5] = MDT_STRUCT_CLOSE;
854  template[6] = MDT_END;
855
856  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 6,
857		       template, &mdt_wm_config), __LINE__, 4);
858
859  /* mc_status_t */
860  template[0] = MDT_BEGIN;
861  template[1] = MDT_STRUCT_OPEN;
862  template[2] = MDT_INT32;
863  template[3] = MDT_INT32;
864  template[4] = MDT_NSRVNAME;
865  template[5] = MDT_STRUCT_CLOSE;
866  template[6] = MDT_END;
867
868  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 7,
869		       template, &mdt_wm_mcstatus), __LINE__, 4);
870
871  /* mc_status_t */
872  template[0] = MDT_BEGIN;
873  template[1] = MDT_STRUCT_OPEN;
874  template[2] = mdt_wm_header;
875  template[3] = MDT_INT32;
876  template[4] = MDT_INT32;
877  template[5] = MDT_ARRAY_OF;
878  template[6] = MAX_MACHINES;
879  template[7] = mdt_wm_mcstatus;
880  template[8] = MDT_STRUCT_CLOSE;
881  template[9] = MDT_END;
882
883  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 8,
884		       template, &mdt_wm_status), __LINE__, 4);
885
886  /* host_status_msg_t */
887  template[0] = MDT_BEGIN;
888  template[1] = MDT_STRUCT_OPEN;
889  template[2] = mdt_wm_header;
890  template[3] = MDT_INT32;
891  template[4] = MDT_INT32;
892  template[5] = MDT_ARRAY_OF;
893  template[6] = MAX_PROCS;
894  template[7] = MDT_INT32;
895  template[8] = MDT_ARRAY_OF;
896  template[9] = MAX_PROCS;
897  template[10] = MDT_INT32;
898  template[11] = MDT_STRUCT_CLOSE;
899  template[12] = MDT_END;
900
901  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 9,
902		       template, &mdt_wm_hoststatus), __LINE__, 4);
903
904  /* start_time_msg_t */
905
906  template[0] = MDT_BEGIN;
907  template[1] = MDT_STRUCT_OPEN;
908  template[2] = mdt_wm_header;
909#ifdef HAVE_GETHRTIME
910  template[3] = MDT_DWORD;
911#else
912  template[3] = MDT_UINT32;
913#endif
914  template[4] = MDT_STRUCT_CLOSE;
915  template[5] = MDT_END;
916
917  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 10,
918		       template, &mdt_wm_starttime), __LINE__, 4);
919
920  /* time_msg_t */
921
922  template[0] = MDT_BEGIN;
923  template[1] = MDT_STRUCT_OPEN;
924  template[2] = mdt_wm_header;
925  template[3] = MDT_DP_FLOAT;
926  template[4] = MDT_STRUCT_CLOSE;
927  template[5] = MDT_END;
928
929  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 11,
930		       template, &mdt_wm_time), __LINE__, 4);
931
932  /* port_name_msg_t */
933  template[0] = MDT_BEGIN;
934  template[1] = MDT_STRUCT_OPEN;
935  template[2] = mdt_wm_header;
936  template[3] = MDT_INT32;
937  template[4] = MDT_NSRVNAME;
938  template[5] = MDT_APORTID;
939  template[6] = MDT_STRUCT_CLOSE;
940  template[7] = MDT_END;
941
942  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 12,
943		       template, &mdt_wm_portname), __LINE__, 4);
944
945  wstat_types_init(&mdt_wstat);
946  /* wstat_msg_t */
947  template[0] = MDT_BEGIN;
948  template[1] = MDT_STRUCT_OPEN;
949  template[2] = mdt_wm_header;
950  template[3] = mdt_wstat;
951  template[4] = MDT_STRUCT_CLOSE;
952  template[5] = MDT_END;
953
954  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 13,
955		       template, &mdt_wm_wstat), __LINE__, 4);
956}
957
958
959/*-----------------------------------------------------------------*/
960/*       Procedures for sending simple messages                    */
961/*-----------------------------------------------------------------*/
962
963
964amsg_ret_t send_simple_wm_message(port_id,msg_type,msg_value)
965     aport_id_t port_id;
966     int msg_type;
967     int msg_value;
968
969{
970  amsg_t msg;
971  amsg_data_t * msg_data;
972  wm_simple_msg_t * wm_simple_msg;
973  amsg_ret_t aret;
974
975  amsg_size_t size;
976
977  size = sizeof(wm_simple_msg_t);
978
979  check_amsg(amsg_alloc(size,
980			&msg_data,
981			&msg),__LINE__,2);
982
983  wm_simple_msg = (wm_simple_msg_t *) msg_data;
984  wm_simple_msg->header.msg_type = msg_type;
985  wm_simple_msg->header.wid = 0;
986  wm_simple_msg->msg_value = msg_value;
987  aret = amsg_send(port_id,msg,mdt_wm_simple, 1, 0);
988  return(aret);
989}
990
991int broadcast_halt(mtype)
992int mtype;
993{
994  worker_ptr cur;
995  int i;
996
997  for(i = 0; i < mc_list.num_machines; i++)
998    for(cur = mc_list.machines[i].list; cur != NULL; cur = cur->next)
999      if(cur->status != NOT_READY) {
1000         if (mtype == HALT_SYSTEM1)
1001	   check_amsg_soft(send_simple_wm_message(cur->halt1_aport_id,mtype,0),
1002		 __LINE__, 3);
1003	 else
1004	   check_amsg_soft(send_simple_wm_message(cur->halt2_aport_id,mtype,0),
1005		 __LINE__, 3);
1006      }
1007}
1008
1009int broadcast(mtype,mmsg)
1010int mtype;
1011int mmsg;
1012{
1013  worker_ptr cur;
1014  int i;
1015
1016  for(i = 0; i < mc_list.num_machines; i++)
1017    for(cur = mc_list.machines[i].list; cur != NULL; cur = cur->next)
1018      if(cur->status != NOT_READY)
1019	check_amsg_soft(send_simple_wm_message(cur->wm_aport_id,mtype,mmsg),
1020		 __LINE__, 3);
1021}
1022
1023int broadcast_awake(mtype,mmsg)
1024int mtype;
1025int mmsg;
1026{
1027  worker_ptr cur;
1028  int i;
1029
1030  for(i = 0; i < mc_list.num_machines; i++)
1031    for(cur = mc_list.machines[i].list; cur != NULL; cur = cur->next)
1032      if(cur->status == AWAKE)
1033	check_amsg_soft(send_simple_wm_message(cur->wm_aport_id,mtype,mmsg),
1034		 __LINE__, 3);
1035}
1036
1037/* emergency cleanup - use rsh to kill remote workers */
1038/* Try to remove mapped files */
1039int
1040kill_system()
1041{
1042  worker_ptr cur;
1043  int i, local;
1044
1045  for(i = 0; i < mc_list.num_machines; i++)
1046    {
1047      local = (strcmp(mc_list.machines[i].hostname,wm_host) == 0);
1048      for(cur = mc_list.machines[i].list; cur != NULL; cur = cur->next)
1049	if(cur->pid)
1050	  if(local)
1051	    {
1052	      if(kill(cur->pid,SIGKILL) != 0)
1053		perror("Kill Failed");
1054	    }
1055	  else
1056	    {
1057	      char command[1024];
1058	      sprintf(command,"rsh %s kill -9 %d",
1059		      mc_list.machines[i].hostname, cur->pid);
1060	      system(command);
1061	    }
1062      if(ec_unlink(mc_list.machines[i].heap_map_file) != 0)
1063	perror("Could not remove heap_map_file\n");
1064    }
1065  if(ec_unlink(mps_map_file) != 0)
1066	perror("Could not remove mps_map_file\n");
1067}
1068
1069/*-----------------------------------------------------------------*/
1070/*------------------ Notify Procedures ----------------------------*/
1071/*-----------------------------------------------------------------*/
1072
1073/* There are four aports used to communicate with the worker manager-
1074a. Halt1/2 aports - to halt the system. Either a normal exit or
1075		    a panic will cause a halt request. Two separate ports
1076		    to implement a two-phase exit protocol.
1077
1078b. High wm-aport  - used for start up, and some messages which does not rely
1079		    the well-being of the wm status.
1080
1081c. Low wm-aport   - used for all other messages, in particular those
1082		    requesting the wm status. Therefore this low port messages
1083		    should not be handled when the manager is in the middle of
1084		    a worker startup or exiting protocols. I.e. this port
1085		    should have lower priority.
1086*/
1087
1088void
1089halt2_notify(port_id)
1090  aport_id_t port_id;
1091{
1092    amsg_ret_t ret;
1093    amsg_t msg;
1094    amsg_data_t * msg_data;
1095    amsg_type_t msg_type;
1096    amsg_count_t msg_count;
1097
1098    while (( ret = amsg_receive(port_id,
1099                                &msg,
1100                                &msg_data,
1101                                &msg_type,
1102                                &msg_count, 0)) == AMSG_OK) {
1103       Notify("WM: Got EXITING message\n");
1104       num_exited++;
1105    }
1106}
1107
1108/*ARGSUSED*/
1109void
1110halt1_notify(port_id)
1111  aport_id_t port_id;
1112{
1113  void exit_system();
1114
1115  Notify("WM: Got HALT_SYSTEM_REQ message\n");
1116  exit_system(0,60);
1117}
1118
1119/*-----------------------------------------------------------------*/
1120/* Implements a two-phase exit protocol and each phase uses
1121   the supplied timeout */
1122/*-----------------------------------------------------------------*/
1123void exit_system(exit_code, timeout)
1124int exit_code;
1125int timeout;  /* in seconds */
1126
1127{
1128  int timer1 = timeout;
1129  int timer2 = timeout;
1130  int killed = 0;
1131
1132  broadcast_halt(HALT_SYSTEM1, 0);
1133  while (num_exited != mc_list.total_workers && (timer1-- > 0 || dont_fork))
1134     short_sleep(1000000);
1135  if (timer1 > 0 || dont_fork) {
1136     broadcast_halt(HALT_SYSTEM2, 0);
1137     while (closed_bports != mc_list.total_workers && (timer2-- > 0||dont_fork))
1138        short_sleep(1000000);
1139  }
1140
1141  if ((timer1 <= 0 || timer2 <= 0) && !dont_fork) {
1142     printf("Time out reached - Killing workers and exiting !!\n");
1143     kill_system();
1144     killed = 1;
1145  }
1146
1147  if (!killed) { /* dont try to deregister after an hard exit.
1148	            The MPS memory may be in an inconsistent state */
1149     /* release all name server ids */
1150     check_nsrv_soft(nsrv_aport_deregister(session_key,
1151			     wm_high_aport_name, wm_signature), __LINE__, 1);
1152     check_nsrv_soft(nsrv_aport_deregister(session_key,
1153			     wm_halt1_aport_name, wm_signature), __LINE__, 1);
1154     check_nsrv_soft(nsrv_aport_deregister(session_key,
1155			     wm_halt2_aport_name, wm_signature), __LINE__, 1);
1156     check_nsrv_soft(nsrv_aport_deregister(session_key,
1157			     wm_low_aport_name, wm_signature), __LINE__, 1);
1158     check_nsrv_soft(nsrv_bport_deregister(session_key,
1159			     wm_port_name, wm_signature), __LINE__, 1);
1160     check_nsrv_soft(nsrv_free_bport_id(wm_signature,
1161			     wm_bport_id), __LINE__, 2);
1162     check_nsrv_soft(nsrv_bdomain_deregister(bdomain_key,
1163			     domain_name, wm_signature), __LINE__, 1);
1164     check_nsrv_soft(nsrv_free_bdomain_id(wm_signature,
1165			     bdomain.bdomain_id), __LINE__, 2);
1166  }
1167  nsrv_exit();
1168  amsg_exit();
1169  bmsg_exit();
1170  exit(exit_code);
1171}
1172
1173/*------------------------------------------------------------*/
1174/* Low WM Port handling routines */
1175/*------------------------------------------------------------*/
1176void send_status(w)
1177worker_ptr w;
1178{
1179  amsg_t msg;
1180  amsg_data_t * msg_data;
1181  amsg_size_t size;
1182  status_msg_t *stat_msg;
1183  int i;
1184
1185  size = sizeof(status_msg_t);
1186
1187  check_amsg(amsg_alloc(size,
1188			&msg_data,
1189			&msg),__LINE__,2);
1190
1191  stat_msg = (status_msg_t *) msg_data;
1192  stat_msg->header.msg_type = STATUS_NOTIFY;
1193  stat_msg->header.wid = 0;
1194  stat_msg->num_machines = mc_list.num_machines;
1195  stat_msg->total_workers = mc_list.total_workers;
1196
1197  for(i = 0; i < mc_list.num_machines; i++)
1198    {
1199      stat_msg->machines[i].num_workers = mc_list.machines[i].num_workers;
1200      stat_msg->machines[i].num_awake = mc_list.machines[i].num_awake;
1201      strcpy(stat_msg->machines[i].hostname, mc_list.machines[i].hostname);
1202    }
1203  check_amsg_soft(amsg_send(w->wm_aport_id,msg,mdt_wm_status,1,0),__LINE__,3);
1204}
1205
1206/* send the host name of the worker manager machine to worker w */
1207void send_host_name(w)
1208worker_ptr w;
1209{
1210  amsg_t msg;
1211  amsg_data_t * msg_data;
1212  amsg_size_t size;
1213  host_name_msg_t *host_name_msg;
1214
1215  size = sizeof(host_name_msg_t);
1216
1217  check_amsg(amsg_alloc(size,
1218			&msg_data,
1219			&msg),__LINE__,2);
1220
1221  host_name_msg = (host_name_msg_t *) msg_data;
1222  host_name_msg->header.msg_type = WM_HOSTNAME;
1223  host_name_msg->header.wid = 0;
1224  strcpy(host_name_msg->hostname,wm_host);
1225  check_amsg_soft(amsg_send(w->wm_aport_id,msg,mdt_wm_nsrvname,1,0),
1226		  __LINE__,3);
1227}
1228
1229void send_host_status(w,hostname)
1230worker_ptr w;
1231char *hostname;
1232{
1233  amsg_t msg;
1234  amsg_data_t * msg_data;
1235  amsg_size_t size;
1236  host_status_msg_t *stat_msg;
1237  int asleep_count, awake_count;
1238  machine_t *mc;
1239  worker_ptr lw;
1240
1241  size = sizeof(host_status_msg_t);
1242
1243  check_amsg(amsg_alloc(size,
1244			&msg_data,
1245			&msg),__LINE__,2);
1246
1247  stat_msg = (host_status_msg_t *) msg_data;
1248  stat_msg->header.msg_type = HOST_STATUS_NOTIFY;
1249  stat_msg->header.wid = 0;
1250
1251  mc = get_mc(hostname);
1252  if(mc == NULL)
1253    {
1254      stat_msg->num_awake = 0;
1255      stat_msg->num_asleep = 0;
1256    }
1257  else
1258    {
1259      lw = mc->list;
1260
1261      awake_count = 0;
1262      asleep_count = 0;
1263      while(lw != NULL)
1264	{
1265	  if (lw->status == AWAKE)
1266	    {
1267	      stat_msg->awake_ids[awake_count] = lw->index;
1268	      awake_count++;
1269	    }
1270	  else if (lw->status == SLEEPING)
1271	    {
1272	      stat_msg->asleep_ids[asleep_count] = lw->index;
1273	      asleep_count++;
1274	    }
1275	  lw = lw->next;
1276	}
1277      stat_msg->num_awake = awake_count;
1278      stat_msg->num_asleep = asleep_count;
1279    }
1280  check_amsg_soft(amsg_send(w->wm_aport_id,msg,mdt_wm_hoststatus,1,0),
1281		  __LINE__,3);
1282}
1283
1284int send_to_sleep(w,mc)
1285worker_ptr w;
1286machine_t *mc;
1287
1288{
1289  amsg_ret_t aret;
1290
1291  aret = send_simple_wm_message(w->wm_aport_id, GOTO_SLEEP,0);
1292  if (aret != AMSG_OK)
1293    return(0);
1294  w->status = SLEEPING;
1295  mc->num_awake--;
1296  if (interactive) {
1297    tk_geval("remake_status");
1298  }
1299  return(1);
1300}
1301
1302int wakeup(w,mc)
1303worker_ptr w;
1304machine_t *mc;
1305
1306{
1307  amsg_ret_t aret;
1308
1309  aret = send_simple_wm_message(w->wm_aport_id, WAKEUP,0);
1310  if (aret != AMSG_OK)
1311    return(0);
1312  w->status = AWAKE;
1313  mc->num_awake++;
1314  if (interactive) {
1315    tk_geval("remake_status");
1316  }
1317  return(1);
1318}
1319
1320/*ARGSUSED*/
1321void
1322low_wm_notify(port_id)
1323aport_id_t port_id;
1324{
1325  amsg_ret_t ret;
1326  amsg_t msg;
1327  amsg_data_t * msg_data;
1328  amsg_count_t msg_count;
1329  amsg_type_t msg_type;
1330  wm_simple_msg_t * wm_simple_msg;
1331  int msg_value;
1332  config_msg_t *config_mess;
1333  int req_type;
1334  worker_info_msg_t * worker_info_msg;
1335  host_name_msg_t * host_name_msg;
1336  worker_ptr w;
1337  int i, worker_id;
1338  worker_ptr cur;
1339  int workers;
1340  char *hostname;
1341  machine_t *mc;
1342  int freemsg;
1343
1344
1345  freemsg = 1;
1346
1347  while (( ret = amsg_receive(port_id,
1348			      &msg,
1349			      &msg_data,
1350			      &msg_type,
1351			      &msg_count, 0)) == AMSG_OK) {
1352
1353    if(msg_type == mdt_wm_simple)
1354      {
1355	wm_simple_msg = (wm_simple_msg_t *) msg_data;
1356	worker_id = wm_simple_msg->header.wid;
1357	msg_value = wm_simple_msg->msg_value;
1358	switch(wm_simple_msg->header.msg_type) {
1359
1360	case START_INTERFACE:
1361	  if(!interactive)
1362	    {
1363	      start_tkwindow();
1364	      interactive = 1;
1365	    }
1366	  break;
1367	case REMOVE_INTERFACE:
1368	  if(interactive)
1369	    {
1370	      interactive = 0;
1371	      delete_tkwindow();
1372	    }
1373	  break;
1374	case SET_ONE_SLEEPING:
1375	  cur = get_worker(msg_value);
1376	  mc = get_mc_id(msg_value);
1377	  if (cur && mc)
1378	    send_to_sleep(cur,mc);
1379	  break;
1380
1381	case STATUS_REQ:
1382	  Notify("WM: Got STATUS_REQ message\n");
1383	  w = get_worker(worker_id);
1384	  send_status(w);
1385	  break;
1386
1387	case START_TRACING:
1388	  Notify("WM: Got START_TRACING message\n");
1389	  broadcast_awake(START_TRACING, 0);
1390	  break;
1391
1392	case STOP_TRACING:
1393	  Notify("WM: Got STOP_TRACING message\n");
1394	  broadcast_awake(STOP_TRACING, 0);
1395	  break;
1396
1397	default:
1398	  printf("Unknown simple Worker Manager message\n");
1399	  break;
1400	}
1401      }
1402    else if (msg_type == mdt_wm_config)
1403      {
1404	config_mess = (config_msg_t *) msg_data;
1405	req_type = config_mess->header.msg_type;
1406	worker_id = config_mess->header.wid;
1407	workers = config_mess->workers;
1408	hostname = config_mess->hostname;
1409	mc = get_mc(hostname);
1410	i = 0;
1411	if (mc != NULL)
1412	  {
1413	    if (req_type == ADD_WORKERS)
1414	      {
1415		Notify("WM: Got ADD_WORKERS message\n");
1416		for(i = 0; i < workers; i++)
1417		  if (!add_worker(Argc,Argv,mc))
1418		    break;
1419	      }
1420	    else if (req_type == SLEEP_WORKERS)
1421	      {
1422		Notify("WM: Got SLEEP_WORKERS message\n");
1423		if (workers < mc->num_awake)
1424		  {
1425		    cur = mc->list;
1426		    for (i = 0; i < workers; i++)
1427		      {
1428			while((cur->status == SLEEPING) || (cur->index == worker_id))
1429			  cur = cur->next;
1430			if(!send_to_sleep(cur,mc))    break;
1431		      }
1432		  }
1433	      }
1434	    else /* req_type == WAKEUP_WORKERS */
1435	      {
1436		Notify("WM: Got WAKEUP_WORKERS message\n");
1437		if (workers  <= mc->num_workers - mc->num_awake)
1438		  {
1439		    cur = mc->list;
1440		    for (i = 0; i < workers; i++)
1441		      {
1442			while(cur->status != SLEEPING)
1443			  cur = cur->next;
1444			if(!wakeup(cur,mc)) break;
1445		      }
1446		  }
1447	      }
1448	  }
1449	w = get_worker(worker_id);
1450	check_amsg_soft(send_simple_wm_message(w->wm_aport_id,
1451					       CONFIG_NOTIFY,i),
1452			__LINE__,3);
1453      }
1454    else if (msg_type == mdt_wm_nsrvname)
1455      {
1456
1457	host_name_msg = (host_name_msg_t *) msg_data;
1458	Notify("WM: Got HOST_STATUS_REQ message\n");
1459	w = get_worker(host_name_msg->header.wid);
1460	hostname = host_name_msg->hostname;
1461	send_host_status(w,hostname);
1462      }
1463    else if (msg_type == MDT_BYTE)
1464      {
1465	worker_info_msg = (worker_info_msg_t *) msg_data;
1466	if(worker_info_msg->header.msg_type == WORKER_INFO_SET)
1467	  {
1468	    Notify("WM: Got WORKER_INFO_SET message\n");
1469	    w = get_worker(worker_info_msg->pro_wid);
1470	    check_amsg_soft(amsg_send(w->wm_aport_id,msg,
1471				      MDT_BYTE,msg_count,0),__LINE__,3);
1472	  }
1473	else if(worker_info_msg->header.msg_type == WORKER_INFO_GET)
1474	  {
1475	    Notify("WM: Got WORKER_INFO_GET message\n");
1476	    w = get_worker(worker_info_msg->pro_wid);
1477	    check_amsg_soft(amsg_send(w->wm_aport_id,msg,
1478				      MDT_BYTE,msg_count,0),__LINE__,3);
1479	  }
1480	else if(worker_info_msg->header.msg_type == WORKER_INFO_NOTIFY)
1481	  {
1482	    Notify("WM: Got WORKER_INFO_NOTIFY message\n");
1483	    send_worker_info(worker_info_msg->req_wid, worker_info_msg->size,
1484			     (void_ptr) (worker_info_msg + 1));
1485	  }
1486	freemsg = 0;
1487      }
1488    if (freemsg)
1489      check_amsg_soft(amsg_free(msg),__LINE__,1);
1490    else
1491      freemsg = 1;
1492  }
1493  if (ret != AMSG_NOMESSAGE) {
1494    check_amsg_soft(ret,__LINE__, 2);
1495  }
1496  tk_doidledummy();
1497}
1498
1499/*------------------------------------------------------------*/
1500/* High WM port handling routines */
1501/*------------------------------------------------------------*/
1502int send_start_time(slave_no)
1503     int slave_no;
1504{
1505  amsg_t msg;
1506  amsg_data_t * msg_data;
1507  start_time_msg_t * start_time_msg;
1508  amsg_size_t size;
1509
1510  worker_ptr w;
1511
1512  size = sizeof(start_time_msg_t);
1513  check_amsg(amsg_alloc(size,
1514			&msg_data,
1515			&msg),__LINE__,2);
1516
1517  start_time_msg = (start_time_msg_t *) msg_data;
1518  start_time_msg->header.msg_type = SEND_START_TIME;
1519  start_time_msg->header.wid = 0;
1520  start_time_msg->start_time = start_time;
1521
1522  w = get_worker(slave_no);
1523  check_amsg_soft(amsg_send(w->wm_aport_id,msg,mdt_wm_starttime,1,0),
1524		  __LINE__,3);
1525}
1526
1527#ifdef HAVE_GETHRTIME
1528
1529hrtime_t get_time()
1530{
1531  return(gethrtime());
1532}
1533
1534double relative_elapsed_time(start_time)
1535hrtime_t start_time;
1536{
1537  hrtime_t thr_time;
1538  double elapsed;
1539
1540  thr_time = gethrtime();
1541  elapsed = (thr_time - start_time)/ 1000.0;
1542  elapsed = elapsed / 1000000.0;
1543  return(elapsed);
1544}
1545
1546#else
1547#ifdef BSD_TIMES
1548
1549time_t get_time()
1550{
1551  return(time((time_t *) 0));
1552}
1553
1554double relative_elapsed_time(start_time)
1555time_t start_time;
1556{
1557  struct timeb	realtime;
1558  double elapsed;
1559
1560  /* times() returns nothing useful in BSD, need ftime() for elapsed time */
1561  (void) ftime(&realtime);
1562  elapsed = (realtime.time - start_time) + (double)realtime.millitm/1000.0;
1563  return(elapsed);
1564}
1565#else
1566
1567clock_t get_time()
1568{
1569  struct tms dummy;
1570  return(times(&dummy));
1571}
1572
1573double relative_elapsed_time(start_time)
1574clock_t start_time;
1575{
1576  clock_t		realtime;
1577  struct tms		dummy;
1578  double                elapsed;
1579
1580  if ((realtime = times(&dummy)) == (clock_t) -1)
1581    return(0.0);
1582  elapsed = (double) (realtime - start_time) / clock_hz;
1583  return(elapsed);
1584}
1585#endif
1586#endif
1587
1588void send_elapsed_time(slave_no)
1589     int slave_no;
1590{
1591  amsg_t msg;
1592  amsg_data_t * msg_data;
1593  time_msg_t * time_msg;
1594  amsg_size_t size;
1595  double elapsed;
1596
1597  worker_ptr w;
1598
1599  elapsed = relative_elapsed_time(start_time);
1600
1601  size = sizeof(time_msg_t);
1602
1603  check_amsg(amsg_alloc(size,
1604			&msg_data,
1605			&msg),__LINE__,2);
1606
1607  time_msg = (time_msg_t *) msg_data;
1608  time_msg->header.msg_type = SEND_TIME;
1609  time_msg->header.wid = 0;
1610
1611  time_msg->cur_time = elapsed;
1612
1613  w = get_worker(slave_no);
1614  check_amsg_soft(amsg_send(w->wm_aport_id,msg,mdt_wm_time,1,0),__LINE__,3);
1615}
1616
1617
1618void
1619high_wm_notify(port_id)
1620   aport_id_t port_id;
1621
1622{
1623    amsg_ret_t ret;
1624    amsg_t msg;
1625    amsg_data_t * msg_data;
1626    amsg_type_t msg_type;
1627    amsg_count_t msg_count;
1628    wm_simple_msg_t * wm_simple_msg;
1629    node_msg_t * node_msg;
1630    worker_ptr w;
1631
1632    while (( ret = amsg_receive(port_id,
1633				&msg,
1634				&msg_data,
1635				&msg_type,
1636				&msg_count, 0)) == AMSG_OK) {
1637      if (msg_type == mdt_wm_simple)
1638	{
1639	  wm_simple_msg = (wm_simple_msg_t *) msg_data;
1640	  switch(wm_simple_msg->header.msg_type) {
1641
1642	  case DONE_INIT_OPENS:
1643	    Notify("WM: Got DONE_INIT_OPENS message\n");
1644	    init_acks++;
1645	    if (wm_verbose_startup)
1646	      printf("worker %d is up\n",wm_simple_msg->header.wid);
1647	    break;
1648
1649
1650	  case REQ_TIME:
1651	    Notify("WM: Got REQ_TIME message\n");
1652	    send_elapsed_time(wm_simple_msg->header.wid);
1653	    break;
1654
1655	  case REQ_START_TIME:
1656	    Notify("WM: Got REQ_START_TIME message\n");
1657	    send_start_time(wm_simple_msg->header.wid);
1658	    break;
1659
1660	  case REQ_WM_HOSTNAME:
1661	    Notify("WM: Got REQ_WM_HOSTNAME message\n");
1662	    w = get_worker(wm_simple_msg->header.wid);
1663	    send_host_name(w);
1664	    break;
1665
1666	  default:
1667	    break;
1668	  }
1669	}
1670      else if (msg_type == mdt_wm_node)
1671	{   /* ROOT_NODE_REGISTER message */
1672	  node_msg = (node_msg_t *) msg_data;
1673	  Notify("WM: Got ROOT_NODE_REGISTER message\n");
1674	  root_id = node_msg->node;
1675	  root_id_sender = node_msg->header.wid;
1676	  got_root_id = 1;
1677	}
1678      else if (msg_type == mdt_wm_wstat)
1679	{
1680	  /* WSTAT_RET message */
1681	  wstat_msg_t * wstat_msg;
1682	  Notify("WM: Got WSTAT_RET message\n");
1683	  wstat_msg = (wstat_msg_t *) msg_data;
1684	  get_worker(wstat_msg->header.wid)->cur_wstat =
1685	    wstat_msg->stat;
1686	  wstat_received = 1;
1687	}
1688
1689      check_amsg_soft(amsg_free(msg),__LINE__, 1);
1690    }
1691
1692    if (ret != AMSG_NOMESSAGE)
1693      check_amsg_soft(ret,__LINE__, 2);
1694}
1695
1696/*------------------------------------------------------------*/
1697
1698void
1699get_arguments(argv, argc, numworkers, nofork)
1700char *argv[];
1701int *argc;
1702int *numworkers;
1703int *nofork;
1704{
1705    int i, j, k;
1706    *numworkers = 1;
1707    *nofork = 0;
1708    for(i=1, j=1; i < *argc; i++)
1709    {
1710        if (strcmp(argv[i],"--") == 0)
1711	{
1712	    do {
1713		argv[j++] = argv[i++];
1714	    } while (i < *argc);
1715	}
1716	else if (strcmp(argv[i],"-wnf") == 0)
1717	{
1718	    *nofork = 1;
1719	}
1720	else if (strcmp(argv[i],"-wv") == 0)
1721	{
1722	    wm_verbose_startup = 1;
1723	}
1724	else if (strcmp(argv[i],"-wmi") == 0)
1725	{
1726	    interactive = 1;
1727	}
1728	else if (strcmp(argv[i],"-wf") == 0)
1729	{
1730	  usemcfile = 1;
1731	}
1732	else if (strcmp(argv[i],"-wh") == 0)
1733	{
1734	    if (++i < *argc)
1735	      init_host = argv[i];
1736	    else
1737	      fprintf(stderr,"Expected hostname after %s\n",argv[i-1]);
1738	}
1739	else if (strcmp(argv[i],"-wx") == 0)
1740	{
1741	    if (++i < *argc)
1742	      init_exec = argv[i];
1743	    else
1744	      fprintf(stderr,"Expected worker executable after %s\n",argv[i-1]);
1745	}
1746	else if (strcmp(argv[i],"-w") == 0)
1747	{
1748	    if (++i < *argc)
1749		*numworkers = atoi(argv[i]);
1750	    else
1751		fprintf(stderr,"Expected number of workers after %s\n",argv[i-1]);
1752	}
1753	else if (argv[i][0] == '-' && argv[i][1] == 'w' &&
1754		(k = atoi(&argv[i][2])) > 0)
1755	{
1756	    *numworkers = k;
1757	}
1758	else
1759	{
1760	    argv[j++] = argv[i];
1761	}
1762    }
1763    *argc = j;
1764    Argc = j;
1765    Argv = argv;
1766}
1767
1768/*----------------------------------------------------------------*/
1769/* read_mcfile()                                                  */
1770/*----------------------------------------------------------------*/
1771/* Read the file ".eclipse_machines" from the current directory,
1772   if one exists, otherwise from the lib directory.
1773
1774   Each line in the file should have the following format
1775   <host_name> <eclipse_exe_file> <num_workers> <auto_start>
1776
1777   where
1778   host_name is the name of the machine where workers are to be started.
1779   eclipse_exec_file is the full path name of the eclipse worker (on the
1780               specified host)
1781   num_workers is the number of workers to start initially
1782   auto_start can be 0 or 1 - if 1 the workers will be started
1783             via rsh. Otherwise, only the command line will be printed and
1784	     user must start the workers manually on the specified host.
1785*/
1786void read_mcfile()
1787{
1788  FILE * mcfile;
1789  char lib_mc_file[MAX_PATH_LEN], mcfilename[MAX_PATH_LEN],
1790       exec_file[MAX_PATH_LEN];
1791  struct stat buf;
1792  sstring hostname;
1793  int num_workers, i, done, val;
1794  int auto_start;
1795
1796  strcpy(mcfilename,".eclipse_machines");
1797
1798  if ((ec_stat(mcfilename,&buf) == -1) && errno == ENOENT)
1799    {
1800      sprintf(lib_mc_file,"%s/lib/%s",eclipsehome(),mcfilename);
1801      mcfile = fopen(lib_mc_file,"r");
1802    }
1803  else
1804    mcfile = fopen(mcfilename,"r");
1805
1806  if (mcfile != NULL) {
1807    done = 0;
1808    while (!done)  {
1809	val = fscanf(mcfile, "%s %s %d %d", hostname, exec_file, &num_workers,
1810		     &auto_start);
1811	if (val == EOF)
1812	  done = 1;
1813	else if (val == 4) {
1814/*	  printf("%s %s %d %d\n",hostname,exec_file,num_workers,auto_start);*/
1815	  add_mc(hostname,exec_file,auto_start);
1816	  if (usemcfile)
1817	    for(i = 0; i < num_workers; i++)
1818	      init_worker(hostname,0);
1819	}
1820	else  {
1821	  fprintf(stderr,"Illegal format in %s startup file\n", mcfilename);
1822	  exit(-1);
1823	}
1824      }
1825    fclose(mcfile);
1826  }
1827}
1828
1829void init_global_values()
1830{
1831  int i;
1832
1833  start_time = get_time();
1834
1835#ifdef HAVE_SYSCONF
1836	clock_hz = sysconf(_SC_CLK_TCK);
1837#else
1838#ifdef CLOCK_HZ
1839	clock_hz = CLOCK_HZ;
1840#else
1841	clock_hz = 60;
1842#endif
1843#endif
1844
1845  private_mem_init(mem_panic);
1846
1847  wm_pid = getpid();
1848
1849  sprintf(bdomain_key, WORKER_PORT_NAME);
1850/*  bdomain.bdomain_file = (char *) malloc(MAX_PATH_LEN);*/
1851  strcpy(wm_port_name, WM_PORT_NAME);
1852
1853  map_dir = getenv("ECLIPSETMP");
1854/* If not set use current directory - this will cause problems
1855   if the current directory is not writable */
1856  if (!map_dir)
1857    map_dir = getcwd(NULL, MAX_PATH_LEN);
1858
1859  sprintf(mps_map_file,"%s/%d.mps.map",map_dir,wm_pid);
1860
1861  wm_host = (char *) hp_alloc_size(MAXHOSTLEN);
1862  mygethostname(wm_host);
1863
1864  sprintf(session_key,"%s.%d",wm_host,wm_pid);
1865  sprintf(wm_signature,"%s.%d",wm_host,wm_pid);
1866
1867  nsrv_host = getenv("NSRV_HOST");
1868  if(!nsrv_host)
1869    nsrv_host = wm_host;
1870
1871  init_mc_list();
1872
1873  read_mcfile();
1874  if (!usemcfile)
1875    {
1876      if (!init_host)
1877	init_host = (char *) wm_host;
1878
1879      if (!init_exec) {
1880	init_exec = hp_alloc_size(sizeof(sstring));
1881	sprintf(init_exec,"%s/bin/%s/worker",eclipsehome(),HOSTARCH);
1882      }
1883
1884      add_mc(init_host,init_exec,1);
1885      for(i = 1; i <= num_workers; i++)
1886	init_worker(init_host,0);
1887    }
1888}
1889
1890mygethostname(host)
1891     char *host;
1892{
1893#if defined(HAVE_GETHOSTNAME)
1894  gethostname(host,MAXHOSTLEN);
1895#else
1896#  if defined(HAVE_SYSINFO) && defined(HAVE_SYS_SYSTEMINFO_H)
1897  sysinfo(SI_HOSTNAME, host, MAXHOSTLEN);
1898#  else
1899  struct utsname ut;
1900  uname(&ut);
1901  strcpy(host,ut.nodename);
1902#  endif
1903#endif
1904}
1905
1906/*-----------------------------------------------------------*/
1907/* MPS system initialisation routines */
1908/*-----------------------------------------------------------*/
1909
1910/* Register the wm's high_aport, halt1/2_aport and low_aport*/
1911void register_aport_names()
1912{
1913  aport_t aport;
1914
1915  sprintf(wm_high_aport_name,"%s_high_aport",wm_port_name);
1916  aport.aport_id = local_aport_ids[HIGH_APORT_NUMBER];
1917  aport.bport_id = wm_bport_id;
1918  aport.bdomain_id = bdomain.bdomain_id;
1919  check_nsrv(nsrv_aport_register(session_key,wm_high_aport_name,
1920				 wm_signature,&aport),__LINE__, 2);
1921
1922  sprintf(wm_halt2_aport_name,"%s_halt2_aport",wm_port_name);
1923  aport.aport_id = local_aport_ids[HALT2_APORT_NUMBER];
1924  check_nsrv(nsrv_aport_register(session_key,wm_halt2_aport_name,
1925				 wm_signature,&aport),__LINE__, 2);
1926
1927  sprintf(wm_halt1_aport_name,"%s_halt1_aport",wm_port_name);
1928  aport.aport_id = local_aport_ids[HALT1_APORT_NUMBER];
1929  check_nsrv(nsrv_aport_register(session_key,wm_halt1_aport_name,
1930				 wm_signature,&aport),__LINE__, 2);
1931
1932  sprintf(wm_low_aport_name,"%s_low_aport",wm_port_name);
1933  aport.aport_id = local_aport_ids[LOW_APORT_NUMBER];
1934  check_nsrv(nsrv_aport_register(session_key,wm_low_aport_name,
1935				 wm_signature,&aport),__LINE__, 2);
1936  wm_types_init();  /* can only be done after aport registration
1937		       since MDT_NSRVNAME is only defined at this point */
1938}
1939
1940void fill_in_domain(ldomain_id, lbdomain)
1941bdomain_id_t ldomain_id;
1942bdomain_t *lbdomain;
1943
1944{
1945  lbdomain->bdomain_id = ldomain_id;
1946  lbdomain->bdomain_size = 0x00800000;  /* 8 MB */
1947  if (!shared_mem_base())
1948      lbdomain->bdomain_start = (bmem_address_t) (shared_mem_base());
1949  else
1950      lbdomain->bdomain_start = (bmem_address_t) (shared_mem_base() + 0x00800000);
1951  strcpy(lbdomain->bdomain_file,mps_map_file);
1952}
1953
1954/* initialise message-passing system */
1955void mps_init()
1956{
1957  bport_t my_bport;
1958  nsrv_ret_t nret;
1959  char hostname[MAXHOSTLEN];
1960
1961  /* first set up the b-layer */
1962  check_nsrv(nsrv_new_bport_id(wm_signature,&wm_bport_id),__LINE__,3);
1963
1964  gethostname(hostname,MAXHOSTLEN);
1965
1966  sprintf(domain_name,"%s.%s.%s", WORKER_PORT_NAME, hostname,session_key);
1967
1968  nret = nsrv_bdomain_look_up(bdomain_key,domain_name,&bdomain);
1969
1970  if (nret == NSRV_NOT_REGISTERED)
1971    {
1972      /* This is not safe in case two wms try to create
1973       * the domain at the same time */
1974      check_nsrv(nsrv_new_bdomain_id(wm_signature,&domain_id),__LINE__,5);
1975      fill_in_domain(domain_id,&bdomain);
1976      check_bmsg(bmsg_init(wm_bport_id,&bdomain,BDOMAIN_CREATE |
1977#ifdef DEBUG_MPS
1978			   BMSG_ALOG_ON |
1979			   BMSG_ALOG_OPEN |
1980			   BMSG_ALOG_CLOSE |
1981			   BMSG_ALOG_MASTER |
1982#endif
1983			   BPORT_NOTIFICATION ),__LINE__, 1);
1984      check_nsrv(nsrv_bdomain_register(bdomain_key,domain_name,wm_signature,&bdomain),__LINE__,2);
1985    }
1986  else if (nret == NSRV_OK)
1987      check_bmsg(bmsg_init(wm_bport_id,&bdomain, BPORT_NOTIFICATION),__LINE__,1);
1988  else
1989    {
1990      check_nsrv(nret, __LINE__, 4);
1991    }
1992  check_bmsg(bport_port(wm_bport_id,&my_bport),__LINE__,2);
1993  check_nsrv(nsrv_bport_register(session_key,wm_port_name,
1994				 wm_signature,&my_bport),__LINE__,2);
1995
1996  /* Now set up the a-layer ports */
1997  notify[HALT1_APORT_NUMBER] = halt1_notify;
1998  notify[HALT2_APORT_NUMBER] = halt2_notify;
1999  notify[HIGH_APORT_NUMBER] = high_wm_notify;
2000  notify[LOW_APORT_NUMBER] = low_wm_notify;
2001
2002#ifdef HAVE_SIGPROCMASK
2003  sigemptyset(&sigio_mask);
2004  sigaddset(&sigio_mask, SIGIO);
2005#ifdef SIGPOLL
2006  sigaddset(&sigio_mask, SIGPOLL);
2007#endif
2008#endif
2009#ifdef HAVE_SIGACTION
2010  {
2011    struct sigaction sa;
2012    sa.sa_handler = sigio_handler;
2013    sigemptyset(&sa.sa_mask);
2014    sa.sa_flags = 0;
2015    (void) sigaction(SIGIO, &sa, (struct sigaction *) 0);
2016#ifdef SIGPOLL
2017    (void) sigaction(SIGPOLL, &sa, (struct sigaction *) 0);
2018#endif
2019    sa.sa_handler = SIG_IGN;
2020    (void) sigaction(SIGINT,&sa, (struct sigaction *) 0);
2021  }
2022#else
2023  signal(SIGINT, SIG_IGN);
2024  signal(SIGIO,sigio_handler);
2025#ifdef SIGPOLL
2026  signal(SIGPOLL,sigio_handler);
2027#endif
2028#endif
2029
2030  check_amsg(amsg_init(STATIC_PORTS,notify,local_aport_ids,0), __LINE__,1);
2031
2032  check_amsg_soft(aport_set_option(local_aport_ids[LOW_APORT_NUMBER],
2033				   APORT_NOTIFY_LEVEL,
2034				   (aport_optval_t) 1),__LINE__, 4);
2035  check_amsg_soft(aport_set_option(local_aport_ids[HIGH_APORT_NUMBER],
2036				   APORT_NOTIFY_LEVEL,
2037				   (aport_optval_t) 2),__LINE__, 4);
2038  check_amsg_soft(aport_set_option(local_aport_ids[HALT1_APORT_NUMBER],
2039				   APORT_NOTIFY_LEVEL,
2040				   (aport_optval_t) 3),__LINE__, 4);
2041  check_amsg_soft(aport_set_option(local_aport_ids[HALT2_APORT_NUMBER],
2042				   APORT_NOTIFY_LEVEL,
2043				   (aport_optval_t) 4),__LINE__, 4);
2044  register_aport_names();
2045}
2046
2047/*-------------------------------------------------------------*/
2048/*          Worker startup Routines                            */
2049/*-------------------------------------------------------------*/
2050
2051/* Starting up a new worker involves
2052  i. Creating it : bproc_create(...)
2053 ii. Waiting until the new worker registers its bport with the name server:
2054     wait_for_bport_connections(...)
2055 iii. Waiting until the new worker registers its aports with the name server:
2056     wait_for_aport_connections.
2057 iv. Sending the new worker the port names of all other workers
2058     send_port_names(..)
2059  v. Sending all the other (old) workers the port name of the new worker
2060     send_new_port(...)
2061  vi. Waiting for an acknowledge message from the new worker:
2062      the DONE_INIT_OPENS message.
2063
2064This is the protocol implemented in add_worker().
2065
2066At initial startup, instead of starting workers one by one, we create
2067all of them (see create_workers()), wait for all the bport and aport
2068registrations and then send the all the portnames to each worker
2069(see initialise_workers()).
2070
2071*/
2072
2073/* bproc_create - start a worker */
2074
2075#define LOCAL_SLAVE_EXTRA_ARGS 7
2076#define REM_SLAVE_EXTRA_ARGS 10
2077
2078/*ARGSUSED*/
2079int bproc_create(hostname,slave_no,lsession_key,first,argc,argv)
2080char * hostname;
2081int slave_no;
2082nsrv_name_t lsession_key;
2083int first;
2084int argc;
2085char ** argv;
2086
2087{
2088  bmsg_ret_t ret;
2089  sstring s_slave_no;
2090  sstring s_nsrv_port;
2091  char ** slave_argv;
2092  int slave_argc;
2093  int i,j;
2094  machine_t *mc;
2095  worker_t *w;
2096
2097  if(gethostbyname(hostname) == NULL)
2098    {
2099      fprintf(stderr,"Invalid hostname %s\n",hostname);
2100      return(0);
2101    }
2102
2103  mc = get_mc(hostname);
2104  ret = fork();
2105
2106  if (ret == 0)  { /* child */
2107    sprintf(s_slave_no,"%d",slave_no);
2108    sprintf(s_nsrv_port,"%d",nsrv_port_number);
2109    if(strcmp(hostname,wm_host) == 0) { /* local worker */
2110
2111      slave_argc = argc + LOCAL_SLAVE_EXTRA_ARGS;
2112      slave_argv = (char **) hp_alloc_size(sizeof(char *) *
2113					   (slave_argc + 1));
2114
2115      /* client needs slave_no, session_key */
2116
2117      slave_argv[0] = mc->exec_file;
2118      slave_argv[1] = "-a";
2119      slave_argv[2] = s_slave_no;
2120      slave_argv[3] = lsession_key;
2121      slave_argv[4] = nsrv_host;
2122      slave_argv[5] = s_nsrv_port;
2123      slave_argv[6] = first == 1 ? "-m" : "-c";
2124      slave_argv[7] = mc->heap_map_file;
2125
2126      for(i = LOCAL_SLAVE_EXTRA_ARGS+1, j = 1 ; i < slave_argc ; i++, j++)
2127	slave_argv[i] = argv[j];
2128
2129      slave_argv[slave_argc + 1] = (char *) 0;
2130
2131      ret = execvp(mc->exec_file,slave_argv);
2132      /*NOTREACHED*/
2133      perror("Execvp failed");
2134      exit(4);
2135    }
2136
2137    else
2138      { /* Non-local worker to be created - use rsh */
2139
2140/*	fclose(stdout); */
2141	slave_argc =  argc + REM_SLAVE_EXTRA_ARGS;
2142	slave_argv = (char **) hp_alloc_size(sizeof(char *) *
2143					     (slave_argc + 1));
2144	slave_argv[0] = "rsh";
2145	slave_argv[1] = slave_no == 1? "": "-n";  /*slave worker needn't stdin*/
2146	slave_argv[2] = hostname;
2147	slave_argv[3] = mc->exec_file;
2148	slave_argv[4] = "-a";
2149	slave_argv[5] = s_slave_no;
2150	slave_argv[6] = lsession_key;
2151	slave_argv[7] = nsrv_host;
2152	slave_argv[8] = s_nsrv_port;
2153	slave_argv[9] = first == 1 ? "-m" : "-c";
2154	slave_argv[10] = mc->heap_map_file;
2155
2156	for(i = REM_SLAVE_EXTRA_ARGS+1, j = 1 ; i < slave_argc ; i++, j++)
2157	  slave_argv[i] = argv[j];
2158	slave_argv[slave_argc + 1] = (char *) 0;
2159
2160	ret = execvp(slave_argv[0],slave_argv);
2161
2162	perror("Execvp failed");
2163	exit(4);
2164    }
2165  }
2166  else if (ret > 0)
2167    {
2168      if (wm_verbose_startup)
2169	printf("created worker %d, pid = %d\n",slave_no,ret);
2170      w = get_worker(slave_no);
2171      w->pid = ret;
2172      return(1);
2173	}
2174  else
2175    {
2176      fprintf(stderr,"Could not fork slave %d\n",slave_no);
2177      perror("Could not fork");
2178      return(0);
2179    }
2180  /* should never be reached - lint warning fix */
2181   return(-1);
2182}
2183
2184
2185#define TIME_OUT 3000
2186#define BPORT 1
2187#define APORT 2
2188
2189void wait_for_port_connection(w,port_type,dont_time_out)
2190worker_ptr w;
2191int port_type;
2192int dont_time_out; /* do not time out if starting workers manually */
2193{
2194  nsrv_ret_t ret;
2195  int j=0, time_out=TIME_OUT; /* 30 seconds */
2196
2197  if (port_type == BPORT) {
2198     bport_t slave_bport;
2199     do {
2200	ret = nsrv_bport_look_up(session_key,w->bport_name,&slave_bport);
2201	if (ret == NSRV_OK) {
2202	   w->pid = slave_bport.bpid;
2203	   w->bport_id = slave_bport.bport_id;
2204	   return;
2205	}
2206	short_sleep(1000);
2207	j++;
2208     } while(ret != NSRV_OK && ((j < time_out) || dont_time_out));
2209  } else {
2210     nsrv_name_t w_aport_name;
2211     aport_t slave_aport;
2212     int wm_connected, halt1_connected, halt2_connected;
2213     wm_connected = halt1_connected = halt2_connected = 0;
2214     do {
2215        if (!wm_connected) {
2216	   sprintf(w_aport_name,"%s_wm_aport",w->bport_name);
2217	   ret = nsrv_aport_look_up(session_key,w_aport_name,&slave_aport);
2218	   if (ret==NSRV_OK) {
2219	      w->wm_aport_id = slave_aport.aport_id;
2220	      wm_connected = 1;
2221	   }
2222	}
2223        if (!halt1_connected) {
2224	   sprintf(w_aport_name,"%s_halt1_aport",w->bport_name);
2225	   ret = nsrv_aport_look_up(session_key,w_aport_name,&slave_aport);
2226	   if (ret==NSRV_OK) {
2227	      w->halt1_aport_id = slave_aport.aport_id;
2228	      halt1_connected = 1;
2229	   }
2230	}
2231        if (!halt2_connected) {
2232	   sprintf(w_aport_name,"%s_halt2_aport",w->bport_name);
2233	   ret = nsrv_aport_look_up(session_key,w_aport_name,&slave_aport);
2234	   if (ret==NSRV_OK) {
2235	      w->halt2_aport_id = slave_aport.aport_id;
2236	      halt2_connected = 1;
2237	   }
2238	}
2239        if (wm_connected&&halt1_connected&&halt2_connected)
2240	   return;
2241	short_sleep(1000);
2242	j++;
2243     } while (j < time_out || dont_time_out);
2244  }
2245  wm_abort_error("Workers not responding.");
2246  kill_system();
2247  exit(5);
2248}
2249
2250void wait_for_bport_connections()
2251{
2252  int i;
2253  bmsg_ret_t bret;
2254  bport_t slave_bport;
2255  worker_ptr cur;
2256
2257  for(i = 0; i < mc_list.num_machines; i++)
2258    for (cur = mc_list.machines[i].list; cur != NULL; cur = cur->next)
2259	wait_for_port_connection(cur,BPORT,dont_fork ||
2260				 mc_list.machines[i].auto_start);
2261
2262  /* wait until all workers have made their bport connections */
2263
2264  for(i = 0; i < mc_list.num_machines; i++)
2265    for (cur = mc_list.machines[i].list; cur != NULL; cur = cur->next)
2266    {
2267      do {
2268	bret = bport_port(cur->bport_id,&slave_bport);
2269      } while(bret != BMSG_OK);
2270    }
2271  Notify("Slave bports opened\n");
2272}
2273
2274void wait_for_aport_connections()
2275{
2276  int i;
2277  worker_ptr cur;
2278
2279  for(i = 0; i < mc_list.num_machines; i++)
2280    for (cur = mc_list.machines[i].list; cur != NULL; cur = cur->next)
2281      wait_for_port_connection(cur,APORT,dont_fork ||
2282				 mc_list.machines[i].auto_start);
2283
2284  Notify("Slave a-layer ready \n");
2285}
2286
2287void send_port(worker_aport_id,worker)
2288     aport_id_t worker_aport_id;
2289     worker_ptr worker;
2290{
2291  amsg_t msg;
2292  amsg_data_t * msg_data;
2293  port_name_msg_t * port_name_msg;
2294  amsg_size_t size;
2295
2296  size = sizeof(port_name_msg_t);
2297  check_amsg(amsg_alloc(size,
2298			&msg_data,
2299			&msg),__LINE__, 2);
2300
2301  port_name_msg = (port_name_msg_t *) msg_data;
2302  port_name_msg->header.msg_type = PORT_NAME;
2303  port_name_msg->header.wid = 0;
2304
2305  port_name_msg->index = worker->index;
2306  strcpy(port_name_msg->port_name,worker->bport_name);
2307  port_name_msg->wm_aport_id = worker->wm_aport_id;
2308
2309  check_amsg_soft(amsg_send(worker_aport_id,msg,mdt_wm_portname,1,0),
2310		  __LINE__, 3);
2311}
2312
2313void send_new_port(new_worker)
2314     worker_ptr new_worker;
2315{
2316  worker_ptr w;
2317  int i;
2318
2319  for(i = 0; i < mc_list.num_machines; i++)
2320    for (w = mc_list.machines[i].list; w != NULL; w = w->next)
2321    if (w != new_worker)
2322      send_port(w->wm_aport_id,new_worker);
2323}
2324
2325void send_port_names(worker_aport_id)
2326     aport_id_t worker_aport_id;
2327{
2328  amsg_t msg;
2329  amsg_data_t * msg_data;
2330  worker_ptr w;
2331  wm_simple_msg_t * wm_mess_hdr;
2332  amsg_size_t size;
2333  int i;
2334
2335  for(i = 0; i < mc_list.num_machines; i++)
2336    for (w = mc_list.machines[i].list; w != NULL; w = w->next)
2337      send_port(worker_aport_id,w);
2338
2339  send_simple_wm_message(worker_aport_id, SENT_INIT_PORT_NAMES,
2340			 mc_list.total_workers);
2341}
2342
2343
2344void send_init_port_names()
2345{
2346  worker_ptr cur;
2347  int i;
2348
2349  for(i = 0; i < mc_list.num_machines; i++)
2350    for (cur = mc_list.machines[i].list; cur != NULL; cur = cur->next)
2351      send_port_names(cur->wm_aport_id);
2352}
2353
2354void synch_with_slaves()
2355{
2356  /* a barrier synchronisation, each slave sends an ack and
2357     once all have done, we send acks back to all slaves,
2358     After this point, all slaves are connected to each other */
2359
2360  worker_ptr cur;
2361  int i;
2362
2363  while (init_acks != (mc_list.total_workers)) short_sleep(1000);
2364
2365  for(i = 0; i < mc_list.num_machines; i++)
2366    for (cur = mc_list.machines[i].list; cur != NULL; cur = cur->next)
2367    {
2368      check_amsg_soft(send_simple_wm_message(cur->wm_aport_id,ALL_CONNECTED,0),
2369		 __LINE__, 3);
2370      cur->status = AWAKE;
2371    }
2372
2373}
2374
2375int print_worker_command(index,file_flag,argc,argv,hostname)
2376int index;
2377char * file_flag;
2378int argc;
2379char * argv[];
2380char * hostname;
2381{
2382  sstring slave_file;
2383  int i;
2384  machine_t *mc;
2385
2386  mc = get_mc(hostname);
2387
2388  if (init_exec != NULL)
2389    strcpy(slave_file,init_exec);
2390  else
2391    sprintf(slave_file,"%s/bin/%s/worker",eclipsehome(),HOSTARCH);
2392
2393  printf("Slave command is: \n run -a %d %s %s %d %s %s",
2394	 index, session_key, nsrv_host, nsrv_port_number,
2395	 file_flag,mc->heap_map_file);
2396
2397  for(i = 1; i < argc; i++)
2398    printf(" %s",argv[i]);
2399  printf("\non host %s\n", hostname);
2400
2401}
2402
2403int add_worker(argc,argv,mc)
2404     int argc;
2405     char * argv[];
2406     machine_t *mc;
2407{
2408  int worker_index;
2409  worker_ptr new_worker;
2410  int ret;
2411
2412  worker_index = init_worker(mc->hostname,1);
2413  new_worker = get_worker(worker_index);
2414
2415  if (dont_fork || !(mc->auto_start))  {
2416    printf("Worker manager (session %s) waiting for 1 worker\n",
2417	   session_key);
2418    print_worker_command(worker_index,(new_worker->first? "-m" : "-c"),
2419			 argc,argv,mc->hostname);
2420    ret = 1;
2421  }
2422  else  {
2423    ret = bproc_create(mc->hostname,worker_index,session_key,
2424		       new_worker->first,argc,argv);
2425  }
2426  if (ret)  {
2427    new_worker = get_worker(worker_index);
2428    wait_for_port_connection(new_worker,BPORT);
2429    wait_for_port_connection(new_worker,APORT);
2430
2431    send_port_names(new_worker->wm_aport_id);
2432    send_new_port(new_worker);
2433
2434    while (init_acks != (mc_list.total_workers)) short_sleep(1000);
2435    check_amsg_soft(send_simple_wm_message(new_worker->wm_aport_id,
2436					   ALL_CONNECTED,0),__LINE__, 3);
2437    new_worker->status = AWAKE;
2438    check_amsg_soft(send_simple_wm_message(new_worker->wm_aport_id,
2439					   ROOT_INITIALISED,0), __LINE__, 3);
2440    return(1);
2441  }
2442  else
2443    return(ret);
2444}
2445
2446/*-----------------------------------------------------------------*/
2447/* The worker manager event-handling loop */
2448/*-----------------------------------------------------------------*/
2449void wait_for_end()
2450{
2451  while (1)
2452    {
2453      if (interactive)
2454      {
2455	(void) aport_set_option(local_aport_ids[LOW_APORT_NUMBER],
2456				APORT_NOTIFY, (aport_optval_t) AMSG_OFF);
2457	tk_OneEvent(1);
2458	update_perf_window();
2459	(void) aport_set_option(local_aport_ids[LOW_APORT_NUMBER],
2460				APORT_NOTIFY, (aport_optval_t) AMSG_ON);
2461	short_sleep(10000);
2462      }
2463      else
2464      {
2465	short_sleep(1800000000);	/* half an hour :-) */
2466      }
2467    }
2468}
2469
2470void fork_nameserver()
2471{
2472  char nsrv_exec_file[MAX_PATH_LEN];
2473  char s_nsrv_port_number[128];
2474#ifdef HAVE_SYSINFO
2475  char mcname[128];
2476#endif
2477  int ret;
2478  int j = 60; /* number of seconds to wait for nameserver to be up */
2479
2480
2481  sprintf(nsrv_exec_file,"%s/bin/%s/nsrv",eclipsehome(),HOSTARCH);
2482  ret = fork();
2483  if (ret == 0) /* child */
2484    {
2485      char *nsrv_argv[6];
2486
2487      setsid();  /* create a new process group for the name server */
2488      nsrv_argv[0] = nsrv_exec_file;
2489      nsrv_argv[1] = "-p";
2490      nsrv_argv[2] = map_dir;
2491#if defined(HAVE_SYSINFO) && defined(HAVE_SYS_SYSTEMINFO_H)
2492      sysinfo(SI_MACHINE, mcname, 128);
2493      if(strcmp("DRS 6000",mcname) == 0) {
2494	nsrv_argv[3] = (char *) 0;
2495	ret = execvp(nsrv_exec_file,nsrv_argv);
2496	perror("Could not start (exec) the name server");
2497	exit(6);
2498      }
2499      else
2500#endif /* HAVE_SYSINFO */
2501	{
2502	  nsrv_argv[3] = "-nshm"; /* no shared-memory interaction */
2503	  nsrv_argv[4] = "-npds"; /* no pds-mps based interaction */
2504	  nsrv_argv[5] = (char *) 0;
2505	  ret = execvp(nsrv_exec_file,nsrv_argv);
2506	  perror("Could not start (exec) the name server");
2507	  exit(6);
2508	}
2509    }
2510  else if (ret > 0)
2511    {
2512      fprintf(stderr,"Starting Name Server %s pid = %d...",nsrv_exec_file,ret);
2513      fflush(stderr);
2514      while((j > 0) && (nsrv_init(nsrv_host, &nsrv_port_number) != NSRV_OK))
2515	{
2516	  short_sleep(1000000);
2517	  j--;
2518	}
2519      if (j == 0)
2520	{
2521	  wm_abort_error("Name server not responding.\n Try starting a new name server (nsrv) manually.\n");
2522	  exit(6);
2523	}
2524      else
2525	fprintf(stderr," ... done\n");
2526    }
2527  else {
2528    perror("Could not fork name serever!");
2529    exit(6);
2530  }
2531}
2532
2533/* Creation of "initial" workers */
2534void create_workers(argc,argv)
2535     int argc;
2536     char * argv[];
2537{
2538  worker_ptr w;
2539  int i;
2540
2541  if (dont_fork)
2542    {
2543      printf("Worker manager (session %s) waiting for %d worker(s)\n",
2544	     session_key, mc_list.total_workers);
2545      for(i = 0; i < mc_list.num_machines; i++)
2546	for(w = mc_list.machines[i].list; w != NULL; w = w->next)
2547	  print_worker_command(w->index,w->first ? "-m" : "-c",argc,argv,
2548			       mc_list.machines[i].hostname);
2549    }
2550  else
2551    {
2552      for(i = 0; i < mc_list.num_machines; i++)
2553	if(mc_list.machines[i].auto_start)
2554	  {
2555	    for(w = mc_list.machines[i].list; w != NULL; w = w->next)
2556	      if (!bproc_create(mc_list.machines[i].hostname,w->index,
2557				session_key, w->first,argc,argv))
2558		{
2559		  fprintf(stderr,"Could not create slave: %d\n",w->index);
2560		  kill_system();
2561		  exit(7);
2562		}
2563	  }
2564	else
2565	  for(w = mc_list.machines[i].list; w != NULL; w = w->next)
2566	    print_worker_command(w->index,w->first ? "-m" : "-c",
2567				 argc,argv,mc_list.machines[i].hostname);
2568    }
2569}
2570
2571/* Implements the initial startup protocol -
2572   All the "initial" workers (specified via the -w command-line
2573   option or the .eclipse_machines file) have been started, before
2574   this routine is called */
2575
2576void initialise_workers()
2577{
2578 worker_ptr w;
2579 int i;
2580
2581 wait_for_bport_connections();
2582 wait_for_aport_connections();
2583
2584 send_init_port_names();
2585 Notify("Sent port names and Root Id\n");
2586
2587 synch_with_slaves();
2588
2589 while(!got_root_id) short_sleep(2000);
2590
2591 for(i = 0; i < mc_list.num_machines; i++)
2592   for(w = mc_list.machines[i].list; w != NULL; w = w->next)
2593     if(w->index != root_id_sender)
2594       check_amsg(send_simple_wm_message(w->wm_aport_id,
2595					 ROOT_INITIALISED, 0), __LINE__, 3);
2596
2597}
2598/*-------------------------------------------------------*/
2599int main(argc,argv)
2600     int argc;
2601     char * argv[];
2602{
2603  nsrv_ret_t nret;
2604  int i;
2605
2606
2607  get_arguments(argv, &argc, &num_workers, &dont_fork);
2608  irq_lock_init(delayed_signal_handler);
2609  init_global_values();
2610
2611  nret = nsrv_init(nsrv_host, &nsrv_port_number);
2612  if (nret != NSRV_OK)
2613    {
2614      if (nret == NSRV_NOSERVER)
2615	{
2616	  /* try starting a nameserver on the local machine */
2617	  nsrv_host = wm_host;
2618	  fork_nameserver();
2619	}
2620      else
2621	check_nsrv(nret,__LINE__, 1);
2622    }
2623
2624  mps_init();
2625
2626  create_workers(argc,argv);
2627
2628  initialise_workers();
2629
2630  if (interactive)
2631    start_tkwindow();
2632
2633  Notify("Waiting for end\n");
2634  wait_for_end();
2635}
2636
2637#ifndef HAVE_RANDOM
2638long
2639random()
2640{
2641	return (long) rand();
2642}
2643
2644srandom(x)
2645int x;
2646{
2647	srand(x);
2648}
2649#endif
2650
2651void
2652short_sleep(usec)
2653int usec;
2654{
2655#ifdef HAVE_SELECT
2656    struct timeval sleep_time;
2657    sleep_time.tv_sec  = usec / 1000000;
2658    sleep_time.tv_usec = usec % 1000000;
2659    (void) select(0, 0, 0, 0, &sleep_time);
2660#endif
2661}
2662
2663/* worker info setting and getting functions */
2664
2665send_worker_info(reqwid, bufsize, buf)
2666int reqwid, bufsize;
2667void_ptr buf;
2668{
2669
2670  amsg_t msg;
2671  amsg_data_t * msg_data;
2672  worker_info_msg_t * worker_info_msg;
2673  amsg_size_t size;
2674  worker_ptr w;
2675
2676  w = get_worker(reqwid);
2677
2678  size = sizeof(worker_info_msg_t) + bufsize;
2679
2680  check_amsg(amsg_alloc(size,
2681			&msg_data,
2682			&msg),__LINE__, 2);
2683
2684  worker_info_msg = (worker_info_msg_t *) msg_data;
2685  worker_info_msg->header.msg_type = WORKER_INFO_NOTIFY;
2686  worker_info_msg->header.wid = 0;
2687  worker_info_msg->size = bufsize;
2688  memcpy((void_ptr) ((worker_info_msg_t *) msg_data + 1), buf, bufsize);
2689  check_amsg_soft(amsg_send(w->wm_aport_id,msg,MDT_BYTE,size,0),__LINE__,3);
2690}
2691
2692/*-----------------------------------------------------------------*/
2693/* Worker Statistics support */
2694/*-----------------------------------------------------------------*/
2695
2696
2697/* called from update_perf_window (in wm_interface.c) */
2698/* The return message (WSTAT_RET) updates the current workers
2699   cur_wstat field. This message is received on the high wm-aport
2700   since we are busy waiting on the reply. */
2701
2702get_worker_stats(flag)
2703int flag;
2704{
2705  worker_ptr cur;
2706  int i;
2707  struct worker_stat_ext wstat;
2708
2709  for(i = 0; i < mc_list.num_machines; i++)
2710    for(cur = mc_list.machines[i].list; cur != NULL; cur = cur->next)
2711      {
2712	if(flag == 0) /* top-like display */
2713	  cur->start_wstat = cur->cur_wstat;
2714	wstat_received = 0;
2715	check_amsg_soft(send_simple_wm_message(cur->wm_aport_id,
2716					       WSTAT_REQ, 0),
2717			__LINE__, 3);
2718	while(!wstat_received)
2719	  short_sleep(1000);
2720
2721	if (flag == 1) /* reset */
2722	  cur->start_wstat = cur->cur_wstat;
2723      }
2724}
2725
2726/* mdt type definition for worker_stat_ext structure */
2727/* mirrored in trace.c */
2728wstat_types_init(mdt_wstat)
2729amsg_type_t * mdt_wstat;
2730
2731{
2732  amsg_typedef_t template[33];
2733
2734  /* mdt_wstat */
2735  template[0] = MDT_BEGIN;
2736  template[1] = MDT_STRUCT_OPEN;
2737  template[2] = MDT_INT32;
2738  template[3] = MDT_INT32;
2739  template[4] = MDT_INT32;
2740  template[5] = MDT_INT32;
2741  template[6] = MDT_INT32;
2742  template[7] = MDT_INT32;
2743  template[8] = MDT_INT32;
2744  template[9] = MDT_INT32;
2745  template[10] = MDT_INT32;
2746  template[11] = MDT_INT32;
2747  template[12] = MDT_INT32;
2748  template[13] = MDT_INT32;
2749  template[14] = MDT_INT32;
2750  template[15] = MDT_INT32;
2751  template[16] = MDT_INT32;
2752  template[17] = MDT_INT32;
2753  template[18] = MDT_INT32;
2754  template[19] = MDT_ARRAY_OF;
2755  template[20] = 3;
2756  template[21] = MDT_DP_FLOAT;
2757  template[22] = MDT_DP_FLOAT;
2758  template[23] = MDT_ARRAY_OF;
2759  template[24] = MAX_NUM_EVENTS;
2760  template[25] = MDT_DP_FLOAT;
2761  template[26] = MDT_INT32;
2762  template[27] = MDT_INT32;
2763  template[28] = MDT_ARRAY_OF;
2764  template[29] = MAX_NESTING;
2765  template[30] = MDT_INT32;
2766  template[31] = MDT_STRUCT_CLOSE;
2767  template[32] = MDT_END;
2768  check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 100,
2769		       template, mdt_wstat), __LINE__, 4);
2770}
2771
2772