1/* BEGIN LICENSE BLOCK 2 * Version: CMPL 1.1 3 * 4 * The contents of this file are subject to the Cisco-style Mozilla Public 5 * License Version 1.1 (the "License"); you may not use this file except 6 * in compliance with the License. You may obtain a copy of the License 7 * at www.eclipse-clp.org/license. 8 * 9 * Software distributed under the License is distributed on an "AS IS" 10 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See 11 * the License for the specific language governing rights and limitations 12 * under the License. 13 * 14 * The Original Code is The ECLiPSe Constraint Logic Programming System. 15 * The Initial Developer of the Original Code is Cisco Systems, Inc. 16 * Portions created by the Initial Developer are 17 * Copyright (C) 1994-2006 Cisco Systems, Inc. All Rights Reserved. 18 * 19 * Contributor(s): 20 * 21 * END LICENSE BLOCK */ 22 23/********************************************************************** 24** System: Parallel Eclipse 25** File: wm.c 26** Author: Shyam Mudambi 27** Liang-Liang Li 28** Description: Worker Manager for Parallel Eclipse 29***********************************************************************/ 30 31/*LINTLIBRARY*/ 32#include <sys/types.h> 33#include <sys/socket.h> 34#include <netdb.h> 35#include <netinet/in.h> 36#include <signal.h> 37#include <stdlib.h> 38#include <stdio.h> 39#include <sys/time.h> 40#include <sys/times.h> 41#include <sys/stat.h> 42#include <errno.h> 43#include <math.h> 44#include <fcntl.h> 45#include "config.h" 46 47#ifdef HAVE_STRING_H 48#include <string.h> 49#else 50extern char *strcpy(); 51extern void *memcpy(); 52#endif 53 54#ifdef BSD_TIMES 55#include <sys/timeb.h> 56#endif 57 58#ifdef HAVE_UNISTD_H 59#include <unistd.h> 60#endif 61 62#ifdef PATH_IN_LIMITS 63# include <limits.h> 64# define MAX_PATH_LEN PATH_MAX 65#else 66# include <sys/param.h> 67# define MAX_PATH_LEN MAXPATHLEN 68#endif 69 70#if !defined(HAVE_GETHOSTID) 71#include <sys/utsname.h> /* for uname() */ 72#endif 73 74#ifdef HAVE_SYS_SYSTEMINFO_H 75#include <sys/systeminfo.h> 76#endif 77 78#include "memman.h" 79#include "pds.h" 80#include "nsrv.h" 81 82#ifndef _PDS_TYPES_H_ 83typedef long *void_ptr; 84#endif /* _PDS_TYPES_H */ 85 86#include "sch_types.h" 87#include "trace.h" 88#include "wm_msgs.h" 89#include "wm_types.h" 90 91/* #define DEBUG_MPS*/ 92 93#if defined(__STDC__) 94 extern void short_sleep(int); 95 extern char *eclipsehome(void); 96#else /* __STDC__ */ 97 extern void short_sleep(); 98 extern char *eclipsehome(); 99#endif /* __STDC__ */ 100 101/*------------------------------------------------------*/ 102/* Types and Constant Definitions */ 103/*------------------------------------------------------*/ 104 105#define STATIC_PORTS 4 106 107#define WM_PORT_NAME "Worker_Manager" 108#define WORKER_PORT_NAME "parallel_eclipse" 109 110/* Static Port Numbers */ 111/* A note by LLL: why do we need four aports of different interrupt levels? 112** First we need two separate ports for halt-system with highest priority, 113** in order to implement a two-phase halting protocol; 114** Then for the service of adding a new worker, there are at least two 115** messages involved: work-creation request from somewhere (wm-window or 116** within a worker) and creation-done acknowledgment from the just created 117** worker. A sort of 'atomic adding' is desired so that messages requesting 118** worker-management status get well-defined info, i.e. avoiding such 119** wm-status as that with half-added workers. Then having a separate aport 120** with higher priority serves this purpose. I.e. adding process can loop 121** to wait for the acknowledgement to avoid handling any messages 122** (wm-status requests, adding a worker, etc.), until the correspoding 123** acknowledgement is received (via a separate but higher-prirority port). 124** Therefore come into being four ports: halt2, halt1, high and low. 125** Accordingly, we let messages of adding workers as well as those 126** wm-status requests or wm-status updates go through the low port (the 127** underlying message passing system decrees a mutually exclusive handling 128** of the messages from a common aport) while the acknowledgement and other 129** wm-status insensitive messages go through the higher one. For example, 130** requests of reading various clocks administered by the wm. 131** For the detailed grouping, see the high/low-port-notify functions. 132** Note also that the higher ports haves to be attached with an asynchronous 133** message handler. 134*/ 135 136#define HALT1_APORT_NUMBER 0 137#define HALT2_APORT_NUMBER 1 138#define HIGH_APORT_NUMBER 2 139#define LOW_APORT_NUMBER 3 140 141#define Notify(text) { if (wm_verbose) { printf(text); }} 142 143 144/*------------------------------------------------------*/ 145/* Global Variables */ 146/*------------------------------------------------------*/ 147 148/* Interface (Tk) functions */ 149int tk_OneEvent(), tk_geval(); 150 151char * map_dir; 152 153st_handle_t root_id; 154int root_id_sender; 155 156/* for accessing the command line in when asynchronously adding workers */ 157char **Argv; 158int Argc; 159 160/* Variables set by command-line arguments */ 161int dont_fork; /* set by -wnf flag */ 162int interactive = 0; /* set by -wmi flag */ 163int usemcfile = 0; /* set by -wf flag - if set we user .eclipse_machines 164 file for startup configuration */ 165static int wm_verbose_startup = 0; /* set by -wv flag */ 166static int num_workers = 0; /* set by -w flag */ 167static int wm_verbose = 0; /* will print each message recd if set to 1 */ 168 169/* Name server Global Ids */ 170nsrv_name_t domain_name; 171nsrv_name_t bdomain_key; 172nsrv_name_t session_key; 173nsrv_name_t wm_signature; 174nsrv_name_t wm_port_name; 175nsrv_name_t wm_halt1_aport_name, 176 wm_halt2_aport_name, 177 wm_high_aport_name, 178 wm_low_aport_name; 179 180/* MPS Globals */ 181bdomain_id_t domain_id; 182static bport_id_t wm_bport_id; 183static int wm_pid; 184bdomain_t bdomain; /* the domain data structure */ 185static aport_id_t local_aport_ids[STATIC_PORTS]; 186static void (* notify [STATIC_PORTS]) (); 187 188 189hdr_mc_list_t mc_list; /* Holds all the worker-specific information */ 190char *init_host = NULL; /* may be set by -h flag */ 191char *wm_host = NULL; /* machine on which worker_manager is running */ 192char * nsrv_host = NULL; /* machine on which name server is running */ 193unsigned nsrv_port_number = 0; /* port number of nsrv */ 194static char *init_exec = NULL; /* may be set by -wx flag */ 195 196static char mps_map_file[MAX_PATH_LEN] = "/tmp/mps.map"; 197 198/* Volatiles - used for waiting for particular events */ 199static volatile int init_acks = 0; 200static volatile int num_exited = 0; 201static volatile int closed_bports = 0; 202static volatile int got_root_id = 0; 203static volatile int wstat_received; 204 205/* Start time - used for sending session time calcuation */ 206#ifdef HAVE_GETHRTIME 207static hrtime_t start_time; 208#else 209#ifdef BSD_TIMES 210static time_t start_time; 211#else 212static clock_t start_time; 213#endif 214#endif 215int clock_hz; 216 217#ifdef HAVE_SIGPROCMASK 218sigset_t sigio_mask; 219#endif 220 221/*-----------------------------------------------------------------*/ 222/* Machine list handling routines */ 223/*-----------------------------------------------------------------*/ 224 225void init_mc_list() 226{ 227 mc_list.num_machines = 0; 228 mc_list.next_id = 1; 229 mc_list.total_workers = 0; 230} 231 232machine_t *get_mc(hostname) 233 char * hostname; 234{ 235 236 int i; 237 int found = 0; 238 239 for(i = 0; (i < mc_list.num_machines) && !found; i++) 240 if(strcmp(hostname,mc_list.machines[i].hostname) == 0) 241 return(&(mc_list.machines[i])); 242 243 return(NULL); 244} 245 246machine_t *add_mc(hostname,exec_file,auto_start) 247char * hostname; 248char * exec_file; 249int auto_start; 250{ 251 int mc_id; 252 machine_t * mc; 253 254 if((mc = get_mc(hostname)) == NULL) 255 { 256 Disable_Int(); 257 mc_id = mc_list.num_machines; 258 mc_list.machines[mc_id].num_workers = 0; 259 mc_list.machines[mc_id].num_awake = 0; 260 mc_list.machines[mc_id].auto_start = auto_start; 261 strcpy(mc_list.machines[mc_id].hostname, hostname); 262 strcpy(mc_list.machines[mc_id].exec_file, exec_file); 263 sprintf(mc_list.machines[mc_id].heap_map_file,"%s/%d.%s.heap.map", 264 map_dir,wm_pid,hostname); 265 mc_list.machines[mc_id].list = NULL; 266 mc_list.num_machines++; 267 Enable_Int(); 268 return(&(mc_list.machines[mc_id])); 269 } 270 else 271 { 272 Disable_Int(); 273 mc->auto_start = auto_start; 274 strcpy(mc->exec_file,exec_file); 275 Enable_Int(); 276 return(mc); 277 } 278} 279 280int init_worker(hostname,redraw) 281char *hostname; 282int redraw; 283 284{ 285 worker_ptr w; 286 machine_t *mc; 287 288 mc = get_mc(hostname); 289 290 if (mc == NULL) 291 { 292 fprintf(stderr, 293 "Error: host %s not in Worker Manager host list\n",hostname); 294 return(0); 295 } 296 297 w = (worker_ptr) malloc(sizeof(worker_t)); 298 w->index = mc_list.next_id; 299 sprintf(w->bport_name,"worker%d",w->index); 300 w->status = NOT_READY; 301 w->start_wstat.job_count = -1; 302 w->pid = 0; 303 Disable_Int(); 304 if(mc->list == NULL) 305 { 306 w->next = NULL; 307 w->first = 1; 308 } 309 else 310 { 311 w->next = mc->list; 312 w->first = 0; 313 } 314 315 mc->list = w; 316 mc->num_workers++; 317 mc_list.total_workers++; 318 mc->num_awake++; 319 mc_list.next_id++; 320 Enable_Int(); 321 if (interactive && redraw) { 322 tk_geval("remake_status"); 323 } 324 return(w->index); 325} 326 327worker_ptr get_worker(index) 328 int index; 329{ 330 worker_ptr w; 331 int i; 332 333 w = NULL; 334 for(i = 0; (i < mc_list.num_machines) && (w == NULL); i++) 335 for(w = mc_list.machines[i].list; (w != NULL) && (w->index != index); 336 w = w->next); 337 338 if (w == NULL) 339 fprintf(stderr, "WM: get_worker - no worker with index %d\n",index); 340 return(w); 341} 342 343machine_t * get_mc_id(index) 344int index; 345{ 346 worker_ptr w; 347 int i; 348 349 w = NULL; 350 for(i = 0; (i < mc_list.num_machines) && (w == NULL); i++) 351 for(w = mc_list.machines[i].list; w != NULL ; w = w->next) 352 if (w->index == index) 353 return(&(mc_list.machines[i])); 354 355 if (w == NULL) 356 { 357 fprintf(stderr, "WM: get_mc_id - no worker with index %d\n",index); 358 return(NULL); 359 } 360} 361 362 363/*-----------------------------------------------------------------*/ 364/* Error handling routines */ 365/*-----------------------------------------------------------------*/ 366 367void check_nsrv(nret,line,err) 368nsrv_ret_t nret; 369int line; 370int err; 371 372{ 373 if (nret != NSRV_OK) 374 { 375 fprintf(stderr,"**Worker Manager: Name Server (nsrv) Fatal Error**\n"); 376 switch(err) 377 { 378 case 1: 379 fprintf(stderr,"Looks like your nameserver has crashed.\n"); 380 fprintf(stderr,"Remove all files in $ECLIPSETMP "); 381 fprintf(stderr,"and restart peclipse.\n"); 382 break; 383 case 2: 384 fprintf(stderr,"Could not register information with nsrv.\n"); 385 break; 386 case 3: 387 fprintf(stderr,"nsrv_new_bport_id call failed.\n"); 388 break; 389 case 4: 390 fprintf(stderr,"nsrv_bdomain_look_up failed.\n"); 391 break; 392 case 5: 393 fprintf(stderr,"nsrv_new_bdomain_id call failed.\n"); 394 break; 395 default: 396 break; 397 } 398 fprintf(stderr,"pid = %d nret = %d at line %d in file %s\n", 399 getpid(),nret,line,__FILE__); 400 exit(1); 401 } 402} 403 404void check_nsrv_soft(nret,line,err) 405nsrv_ret_t nret; 406int line; 407int err; 408{ 409 if (nret != NSRV_OK) 410 { 411 fprintf(stderr,"**Worker Manager: Name Server (nsrv) Error** \n"); 412 switch(err) 413 { 414 case 1: 415 fprintf(stderr,"Could not deregister nameserver information"); 416 break; 417 case 2: 418 fprintf(stderr,"Could not free nameserver ids"); 419 break; 420 default: 421 break; 422 } 423 fprintf(stderr,"pid = %d nret = %d at line %d in file %s\n", 424 getpid(),nret,line,__FILE__); 425 fprintf(stderr,"Trying to continue execution..\n"); 426 } 427} 428 429void check_bmsg(bret,line,err) 430bmsg_ret_t bret; 431int line; 432int err; 433{ 434 if (bret != BMSG_OK) 435 { 436 fprintf(stderr,"**Worker Manager: MPS B-layer Fatal Error: Aborting!\n"); 437 switch(err) 438 { 439 case 1: 440 fprintf(stderr,"Could not initialize Message Passing System.\n"); 441 break; 442 case 2: 443 fprintf(stderr,"bport_port call failed.\n"); 444 break; 445 default: 446 break; 447 } 448 fprintf(stderr,"pid = %d bret = %d at line %d in file %s\n", 449 getpid(),bret,line,__FILE__); 450 exit(2); 451 } 452} 453 454void check_amsg(aret,line,err) 455amsg_ret_t aret; 456int line; 457int err; 458{ 459 if (aret != AMSG_OK) 460 { 461 fprintf(stderr,"**Worker Manager: MPS A-Layer Fatal Error: Aborting!"); 462 switch(err) 463 { 464 case 1: 465 fprintf(stderr,"Could not initialize Message Passing System.\n"); 466 break; 467 case 2: 468 fprintf(stderr,"Could not allocate message buffer."); 469 fprintf(stderr,"Probably no memory left.\n"); 470 break; 471 case 3: 472 fprintf(stderr,"Error in amsg_send.\n"); 473 case 4: 474 fprintf(stderr,"Error in amsg_type_define\n"); 475 default: 476 break; 477 } 478 fprintf(stderr,"pid = %d aret = %d at line %d in file %s\n", 479 getpid(),aret,line,__FILE__); 480 exit(3); 481 } 482} 483 484void check_amsg_soft(aret,line,err) 485amsg_ret_t aret; 486int line; 487int err; 488{ 489 if (aret != AMSG_OK) 490 { 491 fprintf(stderr,"**Worker Manager: MPS A-Layer Error: Warning"); 492 switch(err) 493 { 494 case 1: 495 fprintf(stderr,"Could not free message buffer.\n"); 496 break; 497 case 2: 498 fprintf(stderr,"Error in recieving messages.\n"); 499 break; 500 case 3: 501 fprintf(stderr,"Error in amsg_send.\n"); 502 default: 503 break; 504 } 505 fprintf(stderr,"pid = %d aret = %d at line %d in file %s\n", 506 getpid(),aret,line,__FILE__); 507 fprintf(stderr,"Trying to continue execution..\n"); 508 } 509} 510 511void wm_abort_error(msg) 512char * msg; 513{ 514 fprintf(stderr,"** Worker Manager : Fatal error**\n"); 515 fprintf(stderr,"%s\n",msg); 516} 517 518/*----------------------------------------------------------------*/ 519/* Signal Handling */ 520/*----------------------------------------------------------------*/ 521 522static void 523handle_sigio() 524{ 525 /* unblock SIGIO and SIGPOLL signals */ 526#ifdef HAVE_SIGPROCMASK 527 (void) sigprocmask(SIG_UNBLOCK, &sigio_mask, (sigset_t *) 0); 528#else 529 { 530 int mask; 531#ifdef SIGPOLL 532 mask = sigblock(sigmask(SIGIO) | sigmask(SIGPOLL)); 533 sigsetmask(mask & ~(sigmask(SIGIO) | sigmask(SIGPOLL))); 534#else 535 mask = sigblock(sigmask(SIGIO)); 536 sigsetmask(mask & ~sigmask(SIGIO)); 537#endif 538 } 539#endif 540 (void) bmsg_trigger((BMSG_INTER_DOMAIN | 541 BMSG_INTRA_DOMAIN )); 542} 543 544 545RETSIGTYPE 546sigio_handler() 547{ 548 if (InterruptsDisabled) { 549 Set_Interrupts_Pending(); 550 } else { 551 handle_sigio(); 552 } 553} 554 555static void 556delayed_signal_handler() 557{ 558 Clr_Interrupts_Pending(); 559 handle_sigio(); 560} 561 562 563/*-----------------------------------------------------------*/ 564/* Panic and warning routines */ 565/*-----------------------------------------------------------*/ 566 567void 568mem_panic(what, where) 569 char * what; 570 char * where; 571{ 572 fprintf(stderr,"Panic: %s in %s\n", what, where); 573 exit(-1); 574} 575 576/*ARGSUSED*/ 577void amsg_warn(msg_warn, culprit) 578 amsg_warn_t msg_warn; 579 aport_id_t culprit; 580{ 581/* printf("%d: amsg_warn: ...\n", bport_self());*/ 582} 583 584/*ARGSUSED*/ 585void 586amsg_panic(msg_panic,culprit) 587 amsg_panic_t msg_panic; 588 aport_id_t culprit; 589{ 590} 591 592/*ARGSUSED*/ 593void 594amsg_error(msg_error,culprit) 595 amsg_error_t msg_error; 596 aport_id_t culprit; 597{ 598} 599 600/*ARGSUSED*/ 601#if defined(__STDC__) 602void bmsg_warn(bmsg_warn_t msg_warn, 603 bport_id_t culprit) 604#else 605void 606bmsg_warn(msg_warn, culprit) 607 bmsg_warn_t msg_warn; 608 bport_id_t culprit; 609#endif 610{ 611/* 612 printf("Worker Manager: bmsg_warn: %d :error no: %d ...\n", 613 culprit,msg_warn); 614*/ 615} 616 617/*ARGSUSED*/ 618#if defined(__STDC__) 619void bmsg_panic(bmsg_warn_t msg_panic, 620 bport_id_t culprit) 621#else 622void 623bmsg_panic(msg_panic,culprit) 624 bmsg_panic_t msg_panic; 625 bport_id_t culprit; 626#endif 627{ 628 printf("Worker Manager: bmsg_panic: %d error no: %d\n", 629 culprit,msg_panic); 630} 631 632/*ARGSUSED*/ 633#if defined(__STDC__) 634void bmsg_error(bmsg_warn_t msg_error, 635 bport_id_t culprit) 636#else 637void 638bmsg_error(msg_error,culprit) 639 bmsg_error_t msg_error; 640 bport_id_t culprit; 641#endif 642{ 643 switch (msg_error) { 644 case BMSG_WEP_PDIED : 645 if (culprit == NSRV_BPORT_ID) 646 printf("Worker Manager: bmsg_error: name server died !\n"); 647 else 648 printf("Worker Manager: bmsg_error: bport %d died !\n", 649 culprit); 650 return; 651 default : 652 break; 653 } 654 printf("Worker Manager: unknown bmsg_error: culprit %d, error no %d\n", 655 culprit,msg_error); 656} 657 658/*ARGSUSED*/ 659void 660bmem_ack(mem_id,mem_primitive,ret) 661 bmem_id_t mem_id; 662 bmem_primitive_t mem_primitive; 663 bmsg_ret_t ret; 664{ 665 printf("%d: bmem_ack: ...\n", bport_self()); 666} 667 668/*ARGSUSED*/ 669#if defined(__STDC__) 670void 671bmem_notify(bport_id_t port_id, 672 bmem_primitive_t mem_primitive, 673 bmem_address_t mem_address, 674 bmem_size_t mem_data_size) 675#else 676void 677bmem_notify(port_id,mem_primitive,mem_address,mem_data_size) 678 bport_id_t port_id; 679 bmem_primitive_t mem_primitive; 680 bmem_address_t mem_address; 681 bmem_size_t mem_data_size; 682#endif 683{ 684 printf("%d: bmem_notify: ...\n", bport_self()); 685} 686 687/*----------------------------------------------------------------*/ 688/* MPS B-layer port notify and acknowledge routines */ 689/*----------------------------------------------------------------*/ 690 691/*ARGSUSED*/ 692#if defined(__STDC__) 693void 694bport_notify(bport_id_t port_id, 695 bport_primitive_t port_primitive) 696#else 697void 698bport_notify(port_id, port_primitive) 699 bport_id_t port_id; 700 bport_primitive_t port_primitive; 701#endif 702{ 703 switch (port_primitive) { 704 case BPORT_OPEN : 705 break; 706 case BPORT_CLOSE : 707 closed_bports++; 708 break; 709 case BPORT_BLOCK : 710 break; 711 case BPORT_UNBLOCK : 712 break; 713 default: 714 break; 715 } 716} 717 718/*ARGSUSED*/ 719#if defined(__STDC__) 720void 721bport_ack(bport_id_t port_id, 722 bport_primitive_t port_primitive, 723 bmsg_ret_t ret) 724#else 725void 726bport_ack(port_id, port_primitive, ret) 727 bport_id_t port_id; 728 bport_primitive_t port_primitive; 729 bmsg_ret_t ret; 730#endif 731{ 732 733 if (ret != BMSG_OK) { 734 switch (port_primitive) { 735 case BPORT_OPEN : 736 break; 737 case BPORT_CLOSE : 738 break; 739 case BPORT_BLOCK : 740 break; 741 case BPORT_UNBLOCK : 742 break; 743 default: 744 break; 745 } 746 return; 747 } 748} 749 750void 751bproc_trigger(port) 752 bport_t * port; 753{ 754 if (port->bpid == wm_pid) 755 { 756 (void) bmsg_trigger(BMSG_INTRA_DOMAIN); 757 } 758 else if (kill(port->bpid, SIGIO) != 0) 759 { 760 fprintf(stderr, "bport %d died\n", (int) port->bport_id); 761 } 762} 763 764/*-----------------------------------------------------------------*/ 765/* Initialise Message Types */ 766/*-----------------------------------------------------------------*/ 767 768amsg_type_t mdt_wm_header; 769amsg_type_t mdt_wm_simple; 770amsg_type_t mdt_wm_nsrvname; 771amsg_type_t mdt_wm_config; 772amsg_type_t mdt_wm_node; 773amsg_type_t mdt_sthandlet; 774amsg_type_t mdt_wm_mcstatus; 775amsg_type_t mdt_wm_status; 776amsg_type_t mdt_wm_hoststatus; 777amsg_type_t mdt_wm_workerinfo; 778amsg_type_t mdt_wm_time; 779amsg_type_t mdt_wm_starttime; 780amsg_type_t mdt_wm_portname; 781amsg_type_t mdt_wm_wstat; 782amsg_type_t mdt_wstat; 783 784 785wm_types_init() 786{ 787 amsg_typedef_t template[20]; 788 789 /* wm_msg_header_t */ 790 template[0] = MDT_BEGIN; 791 template[1] = MDT_STRUCT_OPEN; 792 template[2] = MDT_INT32; 793 template[3] = MDT_INT32; 794 template[4] = MDT_STRUCT_CLOSE; 795 template[5] = MDT_END; 796 797 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 1, 798 template, &mdt_wm_header), __LINE__, 4); 799 800 /* wm_simple_msg_t */ 801 template[0] = MDT_BEGIN; 802 template[1] = MDT_STRUCT_OPEN; 803 template[2] = mdt_wm_header; 804 template[3] = MDT_INT32; 805 template[4] = MDT_STRUCT_CLOSE; 806 template[5] = MDT_END; 807 808 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 2, 809 template, &mdt_wm_simple), __LINE__, 4); 810 811 /* host_name_msg_t structures */ 812 template[0] = MDT_BEGIN; 813 template[1] = MDT_STRUCT_OPEN; 814 template[2] = mdt_wm_header; 815 template[3] = MDT_NSRVNAME; 816 template[4] = MDT_STRUCT_CLOSE; 817 template[5] = MDT_END; 818 819 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 3, 820 template, &mdt_wm_nsrvname), __LINE__, 4); 821 822 /* st_handle_t */ 823 824 template[0] = MDT_BEGIN; 825 template[1] = MDT_STRUCT_OPEN; 826 template[2] = MDT_APORTID; 827 template[3] = MDT_UINT32; 828 template[4] = MDT_UINT32; 829 template[5] = MDT_STRUCT_CLOSE; 830 template[6] = MDT_END; 831 832 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 4, 833 template, &mdt_sthandlet), __LINE__, 4); 834 835 /* node_msg_t */ 836 837 template[0] = MDT_BEGIN; 838 template[1] = MDT_STRUCT_OPEN; 839 template[2] = mdt_wm_header; 840 template[3] = mdt_sthandlet; 841 template[4] = MDT_STRUCT_CLOSE; 842 template[5] = MDT_END; 843 844 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 5, 845 template, &mdt_wm_node), __LINE__, 4); 846 847 /* config_msg_t */ 848 template[0] = MDT_BEGIN; 849 template[1] = MDT_STRUCT_OPEN; 850 template[2] = mdt_wm_header; 851 template[3] = MDT_INT32; 852 template[4] = MDT_NSRVNAME; 853 template[5] = MDT_STRUCT_CLOSE; 854 template[6] = MDT_END; 855 856 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 6, 857 template, &mdt_wm_config), __LINE__, 4); 858 859 /* mc_status_t */ 860 template[0] = MDT_BEGIN; 861 template[1] = MDT_STRUCT_OPEN; 862 template[2] = MDT_INT32; 863 template[3] = MDT_INT32; 864 template[4] = MDT_NSRVNAME; 865 template[5] = MDT_STRUCT_CLOSE; 866 template[6] = MDT_END; 867 868 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 7, 869 template, &mdt_wm_mcstatus), __LINE__, 4); 870 871 /* mc_status_t */ 872 template[0] = MDT_BEGIN; 873 template[1] = MDT_STRUCT_OPEN; 874 template[2] = mdt_wm_header; 875 template[3] = MDT_INT32; 876 template[4] = MDT_INT32; 877 template[5] = MDT_ARRAY_OF; 878 template[6] = MAX_MACHINES; 879 template[7] = mdt_wm_mcstatus; 880 template[8] = MDT_STRUCT_CLOSE; 881 template[9] = MDT_END; 882 883 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 8, 884 template, &mdt_wm_status), __LINE__, 4); 885 886 /* host_status_msg_t */ 887 template[0] = MDT_BEGIN; 888 template[1] = MDT_STRUCT_OPEN; 889 template[2] = mdt_wm_header; 890 template[3] = MDT_INT32; 891 template[4] = MDT_INT32; 892 template[5] = MDT_ARRAY_OF; 893 template[6] = MAX_PROCS; 894 template[7] = MDT_INT32; 895 template[8] = MDT_ARRAY_OF; 896 template[9] = MAX_PROCS; 897 template[10] = MDT_INT32; 898 template[11] = MDT_STRUCT_CLOSE; 899 template[12] = MDT_END; 900 901 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 9, 902 template, &mdt_wm_hoststatus), __LINE__, 4); 903 904 /* start_time_msg_t */ 905 906 template[0] = MDT_BEGIN; 907 template[1] = MDT_STRUCT_OPEN; 908 template[2] = mdt_wm_header; 909#ifdef HAVE_GETHRTIME 910 template[3] = MDT_DWORD; 911#else 912 template[3] = MDT_UINT32; 913#endif 914 template[4] = MDT_STRUCT_CLOSE; 915 template[5] = MDT_END; 916 917 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 10, 918 template, &mdt_wm_starttime), __LINE__, 4); 919 920 /* time_msg_t */ 921 922 template[0] = MDT_BEGIN; 923 template[1] = MDT_STRUCT_OPEN; 924 template[2] = mdt_wm_header; 925 template[3] = MDT_DP_FLOAT; 926 template[4] = MDT_STRUCT_CLOSE; 927 template[5] = MDT_END; 928 929 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 11, 930 template, &mdt_wm_time), __LINE__, 4); 931 932 /* port_name_msg_t */ 933 template[0] = MDT_BEGIN; 934 template[1] = MDT_STRUCT_OPEN; 935 template[2] = mdt_wm_header; 936 template[3] = MDT_INT32; 937 template[4] = MDT_NSRVNAME; 938 template[5] = MDT_APORTID; 939 template[6] = MDT_STRUCT_CLOSE; 940 template[7] = MDT_END; 941 942 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 12, 943 template, &mdt_wm_portname), __LINE__, 4); 944 945 wstat_types_init(&mdt_wstat); 946 /* wstat_msg_t */ 947 template[0] = MDT_BEGIN; 948 template[1] = MDT_STRUCT_OPEN; 949 template[2] = mdt_wm_header; 950 template[3] = mdt_wstat; 951 template[4] = MDT_STRUCT_CLOSE; 952 template[5] = MDT_END; 953 954 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 13, 955 template, &mdt_wm_wstat), __LINE__, 4); 956} 957 958 959/*-----------------------------------------------------------------*/ 960/* Procedures for sending simple messages */ 961/*-----------------------------------------------------------------*/ 962 963 964amsg_ret_t send_simple_wm_message(port_id,msg_type,msg_value) 965 aport_id_t port_id; 966 int msg_type; 967 int msg_value; 968 969{ 970 amsg_t msg; 971 amsg_data_t * msg_data; 972 wm_simple_msg_t * wm_simple_msg; 973 amsg_ret_t aret; 974 975 amsg_size_t size; 976 977 size = sizeof(wm_simple_msg_t); 978 979 check_amsg(amsg_alloc(size, 980 &msg_data, 981 &msg),__LINE__,2); 982 983 wm_simple_msg = (wm_simple_msg_t *) msg_data; 984 wm_simple_msg->header.msg_type = msg_type; 985 wm_simple_msg->header.wid = 0; 986 wm_simple_msg->msg_value = msg_value; 987 aret = amsg_send(port_id,msg,mdt_wm_simple, 1, 0); 988 return(aret); 989} 990 991int broadcast_halt(mtype) 992int mtype; 993{ 994 worker_ptr cur; 995 int i; 996 997 for(i = 0; i < mc_list.num_machines; i++) 998 for(cur = mc_list.machines[i].list; cur != NULL; cur = cur->next) 999 if(cur->status != NOT_READY) { 1000 if (mtype == HALT_SYSTEM1) 1001 check_amsg_soft(send_simple_wm_message(cur->halt1_aport_id,mtype,0), 1002 __LINE__, 3); 1003 else 1004 check_amsg_soft(send_simple_wm_message(cur->halt2_aport_id,mtype,0), 1005 __LINE__, 3); 1006 } 1007} 1008 1009int broadcast(mtype,mmsg) 1010int mtype; 1011int mmsg; 1012{ 1013 worker_ptr cur; 1014 int i; 1015 1016 for(i = 0; i < mc_list.num_machines; i++) 1017 for(cur = mc_list.machines[i].list; cur != NULL; cur = cur->next) 1018 if(cur->status != NOT_READY) 1019 check_amsg_soft(send_simple_wm_message(cur->wm_aport_id,mtype,mmsg), 1020 __LINE__, 3); 1021} 1022 1023int broadcast_awake(mtype,mmsg) 1024int mtype; 1025int mmsg; 1026{ 1027 worker_ptr cur; 1028 int i; 1029 1030 for(i = 0; i < mc_list.num_machines; i++) 1031 for(cur = mc_list.machines[i].list; cur != NULL; cur = cur->next) 1032 if(cur->status == AWAKE) 1033 check_amsg_soft(send_simple_wm_message(cur->wm_aport_id,mtype,mmsg), 1034 __LINE__, 3); 1035} 1036 1037/* emergency cleanup - use rsh to kill remote workers */ 1038/* Try to remove mapped files */ 1039int 1040kill_system() 1041{ 1042 worker_ptr cur; 1043 int i, local; 1044 1045 for(i = 0; i < mc_list.num_machines; i++) 1046 { 1047 local = (strcmp(mc_list.machines[i].hostname,wm_host) == 0); 1048 for(cur = mc_list.machines[i].list; cur != NULL; cur = cur->next) 1049 if(cur->pid) 1050 if(local) 1051 { 1052 if(kill(cur->pid,SIGKILL) != 0) 1053 perror("Kill Failed"); 1054 } 1055 else 1056 { 1057 char command[1024]; 1058 sprintf(command,"rsh %s kill -9 %d", 1059 mc_list.machines[i].hostname, cur->pid); 1060 system(command); 1061 } 1062 if(ec_unlink(mc_list.machines[i].heap_map_file) != 0) 1063 perror("Could not remove heap_map_file\n"); 1064 } 1065 if(ec_unlink(mps_map_file) != 0) 1066 perror("Could not remove mps_map_file\n"); 1067} 1068 1069/*-----------------------------------------------------------------*/ 1070/*------------------ Notify Procedures ----------------------------*/ 1071/*-----------------------------------------------------------------*/ 1072 1073/* There are four aports used to communicate with the worker manager- 1074a. Halt1/2 aports - to halt the system. Either a normal exit or 1075 a panic will cause a halt request. Two separate ports 1076 to implement a two-phase exit protocol. 1077 1078b. High wm-aport - used for start up, and some messages which does not rely 1079 the well-being of the wm status. 1080 1081c. Low wm-aport - used for all other messages, in particular those 1082 requesting the wm status. Therefore this low port messages 1083 should not be handled when the manager is in the middle of 1084 a worker startup or exiting protocols. I.e. this port 1085 should have lower priority. 1086*/ 1087 1088void 1089halt2_notify(port_id) 1090 aport_id_t port_id; 1091{ 1092 amsg_ret_t ret; 1093 amsg_t msg; 1094 amsg_data_t * msg_data; 1095 amsg_type_t msg_type; 1096 amsg_count_t msg_count; 1097 1098 while (( ret = amsg_receive(port_id, 1099 &msg, 1100 &msg_data, 1101 &msg_type, 1102 &msg_count, 0)) == AMSG_OK) { 1103 Notify("WM: Got EXITING message\n"); 1104 num_exited++; 1105 } 1106} 1107 1108/*ARGSUSED*/ 1109void 1110halt1_notify(port_id) 1111 aport_id_t port_id; 1112{ 1113 void exit_system(); 1114 1115 Notify("WM: Got HALT_SYSTEM_REQ message\n"); 1116 exit_system(0,60); 1117} 1118 1119/*-----------------------------------------------------------------*/ 1120/* Implements a two-phase exit protocol and each phase uses 1121 the supplied timeout */ 1122/*-----------------------------------------------------------------*/ 1123void exit_system(exit_code, timeout) 1124int exit_code; 1125int timeout; /* in seconds */ 1126 1127{ 1128 int timer1 = timeout; 1129 int timer2 = timeout; 1130 int killed = 0; 1131 1132 broadcast_halt(HALT_SYSTEM1, 0); 1133 while (num_exited != mc_list.total_workers && (timer1-- > 0 || dont_fork)) 1134 short_sleep(1000000); 1135 if (timer1 > 0 || dont_fork) { 1136 broadcast_halt(HALT_SYSTEM2, 0); 1137 while (closed_bports != mc_list.total_workers && (timer2-- > 0||dont_fork)) 1138 short_sleep(1000000); 1139 } 1140 1141 if ((timer1 <= 0 || timer2 <= 0) && !dont_fork) { 1142 printf("Time out reached - Killing workers and exiting !!\n"); 1143 kill_system(); 1144 killed = 1; 1145 } 1146 1147 if (!killed) { /* dont try to deregister after an hard exit. 1148 The MPS memory may be in an inconsistent state */ 1149 /* release all name server ids */ 1150 check_nsrv_soft(nsrv_aport_deregister(session_key, 1151 wm_high_aport_name, wm_signature), __LINE__, 1); 1152 check_nsrv_soft(nsrv_aport_deregister(session_key, 1153 wm_halt1_aport_name, wm_signature), __LINE__, 1); 1154 check_nsrv_soft(nsrv_aport_deregister(session_key, 1155 wm_halt2_aport_name, wm_signature), __LINE__, 1); 1156 check_nsrv_soft(nsrv_aport_deregister(session_key, 1157 wm_low_aport_name, wm_signature), __LINE__, 1); 1158 check_nsrv_soft(nsrv_bport_deregister(session_key, 1159 wm_port_name, wm_signature), __LINE__, 1); 1160 check_nsrv_soft(nsrv_free_bport_id(wm_signature, 1161 wm_bport_id), __LINE__, 2); 1162 check_nsrv_soft(nsrv_bdomain_deregister(bdomain_key, 1163 domain_name, wm_signature), __LINE__, 1); 1164 check_nsrv_soft(nsrv_free_bdomain_id(wm_signature, 1165 bdomain.bdomain_id), __LINE__, 2); 1166 } 1167 nsrv_exit(); 1168 amsg_exit(); 1169 bmsg_exit(); 1170 exit(exit_code); 1171} 1172 1173/*------------------------------------------------------------*/ 1174/* Low WM Port handling routines */ 1175/*------------------------------------------------------------*/ 1176void send_status(w) 1177worker_ptr w; 1178{ 1179 amsg_t msg; 1180 amsg_data_t * msg_data; 1181 amsg_size_t size; 1182 status_msg_t *stat_msg; 1183 int i; 1184 1185 size = sizeof(status_msg_t); 1186 1187 check_amsg(amsg_alloc(size, 1188 &msg_data, 1189 &msg),__LINE__,2); 1190 1191 stat_msg = (status_msg_t *) msg_data; 1192 stat_msg->header.msg_type = STATUS_NOTIFY; 1193 stat_msg->header.wid = 0; 1194 stat_msg->num_machines = mc_list.num_machines; 1195 stat_msg->total_workers = mc_list.total_workers; 1196 1197 for(i = 0; i < mc_list.num_machines; i++) 1198 { 1199 stat_msg->machines[i].num_workers = mc_list.machines[i].num_workers; 1200 stat_msg->machines[i].num_awake = mc_list.machines[i].num_awake; 1201 strcpy(stat_msg->machines[i].hostname, mc_list.machines[i].hostname); 1202 } 1203 check_amsg_soft(amsg_send(w->wm_aport_id,msg,mdt_wm_status,1,0),__LINE__,3); 1204} 1205 1206/* send the host name of the worker manager machine to worker w */ 1207void send_host_name(w) 1208worker_ptr w; 1209{ 1210 amsg_t msg; 1211 amsg_data_t * msg_data; 1212 amsg_size_t size; 1213 host_name_msg_t *host_name_msg; 1214 1215 size = sizeof(host_name_msg_t); 1216 1217 check_amsg(amsg_alloc(size, 1218 &msg_data, 1219 &msg),__LINE__,2); 1220 1221 host_name_msg = (host_name_msg_t *) msg_data; 1222 host_name_msg->header.msg_type = WM_HOSTNAME; 1223 host_name_msg->header.wid = 0; 1224 strcpy(host_name_msg->hostname,wm_host); 1225 check_amsg_soft(amsg_send(w->wm_aport_id,msg,mdt_wm_nsrvname,1,0), 1226 __LINE__,3); 1227} 1228 1229void send_host_status(w,hostname) 1230worker_ptr w; 1231char *hostname; 1232{ 1233 amsg_t msg; 1234 amsg_data_t * msg_data; 1235 amsg_size_t size; 1236 host_status_msg_t *stat_msg; 1237 int asleep_count, awake_count; 1238 machine_t *mc; 1239 worker_ptr lw; 1240 1241 size = sizeof(host_status_msg_t); 1242 1243 check_amsg(amsg_alloc(size, 1244 &msg_data, 1245 &msg),__LINE__,2); 1246 1247 stat_msg = (host_status_msg_t *) msg_data; 1248 stat_msg->header.msg_type = HOST_STATUS_NOTIFY; 1249 stat_msg->header.wid = 0; 1250 1251 mc = get_mc(hostname); 1252 if(mc == NULL) 1253 { 1254 stat_msg->num_awake = 0; 1255 stat_msg->num_asleep = 0; 1256 } 1257 else 1258 { 1259 lw = mc->list; 1260 1261 awake_count = 0; 1262 asleep_count = 0; 1263 while(lw != NULL) 1264 { 1265 if (lw->status == AWAKE) 1266 { 1267 stat_msg->awake_ids[awake_count] = lw->index; 1268 awake_count++; 1269 } 1270 else if (lw->status == SLEEPING) 1271 { 1272 stat_msg->asleep_ids[asleep_count] = lw->index; 1273 asleep_count++; 1274 } 1275 lw = lw->next; 1276 } 1277 stat_msg->num_awake = awake_count; 1278 stat_msg->num_asleep = asleep_count; 1279 } 1280 check_amsg_soft(amsg_send(w->wm_aport_id,msg,mdt_wm_hoststatus,1,0), 1281 __LINE__,3); 1282} 1283 1284int send_to_sleep(w,mc) 1285worker_ptr w; 1286machine_t *mc; 1287 1288{ 1289 amsg_ret_t aret; 1290 1291 aret = send_simple_wm_message(w->wm_aport_id, GOTO_SLEEP,0); 1292 if (aret != AMSG_OK) 1293 return(0); 1294 w->status = SLEEPING; 1295 mc->num_awake--; 1296 if (interactive) { 1297 tk_geval("remake_status"); 1298 } 1299 return(1); 1300} 1301 1302int wakeup(w,mc) 1303worker_ptr w; 1304machine_t *mc; 1305 1306{ 1307 amsg_ret_t aret; 1308 1309 aret = send_simple_wm_message(w->wm_aport_id, WAKEUP,0); 1310 if (aret != AMSG_OK) 1311 return(0); 1312 w->status = AWAKE; 1313 mc->num_awake++; 1314 if (interactive) { 1315 tk_geval("remake_status"); 1316 } 1317 return(1); 1318} 1319 1320/*ARGSUSED*/ 1321void 1322low_wm_notify(port_id) 1323aport_id_t port_id; 1324{ 1325 amsg_ret_t ret; 1326 amsg_t msg; 1327 amsg_data_t * msg_data; 1328 amsg_count_t msg_count; 1329 amsg_type_t msg_type; 1330 wm_simple_msg_t * wm_simple_msg; 1331 int msg_value; 1332 config_msg_t *config_mess; 1333 int req_type; 1334 worker_info_msg_t * worker_info_msg; 1335 host_name_msg_t * host_name_msg; 1336 worker_ptr w; 1337 int i, worker_id; 1338 worker_ptr cur; 1339 int workers; 1340 char *hostname; 1341 machine_t *mc; 1342 int freemsg; 1343 1344 1345 freemsg = 1; 1346 1347 while (( ret = amsg_receive(port_id, 1348 &msg, 1349 &msg_data, 1350 &msg_type, 1351 &msg_count, 0)) == AMSG_OK) { 1352 1353 if(msg_type == mdt_wm_simple) 1354 { 1355 wm_simple_msg = (wm_simple_msg_t *) msg_data; 1356 worker_id = wm_simple_msg->header.wid; 1357 msg_value = wm_simple_msg->msg_value; 1358 switch(wm_simple_msg->header.msg_type) { 1359 1360 case START_INTERFACE: 1361 if(!interactive) 1362 { 1363 start_tkwindow(); 1364 interactive = 1; 1365 } 1366 break; 1367 case REMOVE_INTERFACE: 1368 if(interactive) 1369 { 1370 interactive = 0; 1371 delete_tkwindow(); 1372 } 1373 break; 1374 case SET_ONE_SLEEPING: 1375 cur = get_worker(msg_value); 1376 mc = get_mc_id(msg_value); 1377 if (cur && mc) 1378 send_to_sleep(cur,mc); 1379 break; 1380 1381 case STATUS_REQ: 1382 Notify("WM: Got STATUS_REQ message\n"); 1383 w = get_worker(worker_id); 1384 send_status(w); 1385 break; 1386 1387 case START_TRACING: 1388 Notify("WM: Got START_TRACING message\n"); 1389 broadcast_awake(START_TRACING, 0); 1390 break; 1391 1392 case STOP_TRACING: 1393 Notify("WM: Got STOP_TRACING message\n"); 1394 broadcast_awake(STOP_TRACING, 0); 1395 break; 1396 1397 default: 1398 printf("Unknown simple Worker Manager message\n"); 1399 break; 1400 } 1401 } 1402 else if (msg_type == mdt_wm_config) 1403 { 1404 config_mess = (config_msg_t *) msg_data; 1405 req_type = config_mess->header.msg_type; 1406 worker_id = config_mess->header.wid; 1407 workers = config_mess->workers; 1408 hostname = config_mess->hostname; 1409 mc = get_mc(hostname); 1410 i = 0; 1411 if (mc != NULL) 1412 { 1413 if (req_type == ADD_WORKERS) 1414 { 1415 Notify("WM: Got ADD_WORKERS message\n"); 1416 for(i = 0; i < workers; i++) 1417 if (!add_worker(Argc,Argv,mc)) 1418 break; 1419 } 1420 else if (req_type == SLEEP_WORKERS) 1421 { 1422 Notify("WM: Got SLEEP_WORKERS message\n"); 1423 if (workers < mc->num_awake) 1424 { 1425 cur = mc->list; 1426 for (i = 0; i < workers; i++) 1427 { 1428 while((cur->status == SLEEPING) || (cur->index == worker_id)) 1429 cur = cur->next; 1430 if(!send_to_sleep(cur,mc)) break; 1431 } 1432 } 1433 } 1434 else /* req_type == WAKEUP_WORKERS */ 1435 { 1436 Notify("WM: Got WAKEUP_WORKERS message\n"); 1437 if (workers <= mc->num_workers - mc->num_awake) 1438 { 1439 cur = mc->list; 1440 for (i = 0; i < workers; i++) 1441 { 1442 while(cur->status != SLEEPING) 1443 cur = cur->next; 1444 if(!wakeup(cur,mc)) break; 1445 } 1446 } 1447 } 1448 } 1449 w = get_worker(worker_id); 1450 check_amsg_soft(send_simple_wm_message(w->wm_aport_id, 1451 CONFIG_NOTIFY,i), 1452 __LINE__,3); 1453 } 1454 else if (msg_type == mdt_wm_nsrvname) 1455 { 1456 1457 host_name_msg = (host_name_msg_t *) msg_data; 1458 Notify("WM: Got HOST_STATUS_REQ message\n"); 1459 w = get_worker(host_name_msg->header.wid); 1460 hostname = host_name_msg->hostname; 1461 send_host_status(w,hostname); 1462 } 1463 else if (msg_type == MDT_BYTE) 1464 { 1465 worker_info_msg = (worker_info_msg_t *) msg_data; 1466 if(worker_info_msg->header.msg_type == WORKER_INFO_SET) 1467 { 1468 Notify("WM: Got WORKER_INFO_SET message\n"); 1469 w = get_worker(worker_info_msg->pro_wid); 1470 check_amsg_soft(amsg_send(w->wm_aport_id,msg, 1471 MDT_BYTE,msg_count,0),__LINE__,3); 1472 } 1473 else if(worker_info_msg->header.msg_type == WORKER_INFO_GET) 1474 { 1475 Notify("WM: Got WORKER_INFO_GET message\n"); 1476 w = get_worker(worker_info_msg->pro_wid); 1477 check_amsg_soft(amsg_send(w->wm_aport_id,msg, 1478 MDT_BYTE,msg_count,0),__LINE__,3); 1479 } 1480 else if(worker_info_msg->header.msg_type == WORKER_INFO_NOTIFY) 1481 { 1482 Notify("WM: Got WORKER_INFO_NOTIFY message\n"); 1483 send_worker_info(worker_info_msg->req_wid, worker_info_msg->size, 1484 (void_ptr) (worker_info_msg + 1)); 1485 } 1486 freemsg = 0; 1487 } 1488 if (freemsg) 1489 check_amsg_soft(amsg_free(msg),__LINE__,1); 1490 else 1491 freemsg = 1; 1492 } 1493 if (ret != AMSG_NOMESSAGE) { 1494 check_amsg_soft(ret,__LINE__, 2); 1495 } 1496 tk_doidledummy(); 1497} 1498 1499/*------------------------------------------------------------*/ 1500/* High WM port handling routines */ 1501/*------------------------------------------------------------*/ 1502int send_start_time(slave_no) 1503 int slave_no; 1504{ 1505 amsg_t msg; 1506 amsg_data_t * msg_data; 1507 start_time_msg_t * start_time_msg; 1508 amsg_size_t size; 1509 1510 worker_ptr w; 1511 1512 size = sizeof(start_time_msg_t); 1513 check_amsg(amsg_alloc(size, 1514 &msg_data, 1515 &msg),__LINE__,2); 1516 1517 start_time_msg = (start_time_msg_t *) msg_data; 1518 start_time_msg->header.msg_type = SEND_START_TIME; 1519 start_time_msg->header.wid = 0; 1520 start_time_msg->start_time = start_time; 1521 1522 w = get_worker(slave_no); 1523 check_amsg_soft(amsg_send(w->wm_aport_id,msg,mdt_wm_starttime,1,0), 1524 __LINE__,3); 1525} 1526 1527#ifdef HAVE_GETHRTIME 1528 1529hrtime_t get_time() 1530{ 1531 return(gethrtime()); 1532} 1533 1534double relative_elapsed_time(start_time) 1535hrtime_t start_time; 1536{ 1537 hrtime_t thr_time; 1538 double elapsed; 1539 1540 thr_time = gethrtime(); 1541 elapsed = (thr_time - start_time)/ 1000.0; 1542 elapsed = elapsed / 1000000.0; 1543 return(elapsed); 1544} 1545 1546#else 1547#ifdef BSD_TIMES 1548 1549time_t get_time() 1550{ 1551 return(time((time_t *) 0)); 1552} 1553 1554double relative_elapsed_time(start_time) 1555time_t start_time; 1556{ 1557 struct timeb realtime; 1558 double elapsed; 1559 1560 /* times() returns nothing useful in BSD, need ftime() for elapsed time */ 1561 (void) ftime(&realtime); 1562 elapsed = (realtime.time - start_time) + (double)realtime.millitm/1000.0; 1563 return(elapsed); 1564} 1565#else 1566 1567clock_t get_time() 1568{ 1569 struct tms dummy; 1570 return(times(&dummy)); 1571} 1572 1573double relative_elapsed_time(start_time) 1574clock_t start_time; 1575{ 1576 clock_t realtime; 1577 struct tms dummy; 1578 double elapsed; 1579 1580 if ((realtime = times(&dummy)) == (clock_t) -1) 1581 return(0.0); 1582 elapsed = (double) (realtime - start_time) / clock_hz; 1583 return(elapsed); 1584} 1585#endif 1586#endif 1587 1588void send_elapsed_time(slave_no) 1589 int slave_no; 1590{ 1591 amsg_t msg; 1592 amsg_data_t * msg_data; 1593 time_msg_t * time_msg; 1594 amsg_size_t size; 1595 double elapsed; 1596 1597 worker_ptr w; 1598 1599 elapsed = relative_elapsed_time(start_time); 1600 1601 size = sizeof(time_msg_t); 1602 1603 check_amsg(amsg_alloc(size, 1604 &msg_data, 1605 &msg),__LINE__,2); 1606 1607 time_msg = (time_msg_t *) msg_data; 1608 time_msg->header.msg_type = SEND_TIME; 1609 time_msg->header.wid = 0; 1610 1611 time_msg->cur_time = elapsed; 1612 1613 w = get_worker(slave_no); 1614 check_amsg_soft(amsg_send(w->wm_aport_id,msg,mdt_wm_time,1,0),__LINE__,3); 1615} 1616 1617 1618void 1619high_wm_notify(port_id) 1620 aport_id_t port_id; 1621 1622{ 1623 amsg_ret_t ret; 1624 amsg_t msg; 1625 amsg_data_t * msg_data; 1626 amsg_type_t msg_type; 1627 amsg_count_t msg_count; 1628 wm_simple_msg_t * wm_simple_msg; 1629 node_msg_t * node_msg; 1630 worker_ptr w; 1631 1632 while (( ret = amsg_receive(port_id, 1633 &msg, 1634 &msg_data, 1635 &msg_type, 1636 &msg_count, 0)) == AMSG_OK) { 1637 if (msg_type == mdt_wm_simple) 1638 { 1639 wm_simple_msg = (wm_simple_msg_t *) msg_data; 1640 switch(wm_simple_msg->header.msg_type) { 1641 1642 case DONE_INIT_OPENS: 1643 Notify("WM: Got DONE_INIT_OPENS message\n"); 1644 init_acks++; 1645 if (wm_verbose_startup) 1646 printf("worker %d is up\n",wm_simple_msg->header.wid); 1647 break; 1648 1649 1650 case REQ_TIME: 1651 Notify("WM: Got REQ_TIME message\n"); 1652 send_elapsed_time(wm_simple_msg->header.wid); 1653 break; 1654 1655 case REQ_START_TIME: 1656 Notify("WM: Got REQ_START_TIME message\n"); 1657 send_start_time(wm_simple_msg->header.wid); 1658 break; 1659 1660 case REQ_WM_HOSTNAME: 1661 Notify("WM: Got REQ_WM_HOSTNAME message\n"); 1662 w = get_worker(wm_simple_msg->header.wid); 1663 send_host_name(w); 1664 break; 1665 1666 default: 1667 break; 1668 } 1669 } 1670 else if (msg_type == mdt_wm_node) 1671 { /* ROOT_NODE_REGISTER message */ 1672 node_msg = (node_msg_t *) msg_data; 1673 Notify("WM: Got ROOT_NODE_REGISTER message\n"); 1674 root_id = node_msg->node; 1675 root_id_sender = node_msg->header.wid; 1676 got_root_id = 1; 1677 } 1678 else if (msg_type == mdt_wm_wstat) 1679 { 1680 /* WSTAT_RET message */ 1681 wstat_msg_t * wstat_msg; 1682 Notify("WM: Got WSTAT_RET message\n"); 1683 wstat_msg = (wstat_msg_t *) msg_data; 1684 get_worker(wstat_msg->header.wid)->cur_wstat = 1685 wstat_msg->stat; 1686 wstat_received = 1; 1687 } 1688 1689 check_amsg_soft(amsg_free(msg),__LINE__, 1); 1690 } 1691 1692 if (ret != AMSG_NOMESSAGE) 1693 check_amsg_soft(ret,__LINE__, 2); 1694} 1695 1696/*------------------------------------------------------------*/ 1697 1698void 1699get_arguments(argv, argc, numworkers, nofork) 1700char *argv[]; 1701int *argc; 1702int *numworkers; 1703int *nofork; 1704{ 1705 int i, j, k; 1706 *numworkers = 1; 1707 *nofork = 0; 1708 for(i=1, j=1; i < *argc; i++) 1709 { 1710 if (strcmp(argv[i],"--") == 0) 1711 { 1712 do { 1713 argv[j++] = argv[i++]; 1714 } while (i < *argc); 1715 } 1716 else if (strcmp(argv[i],"-wnf") == 0) 1717 { 1718 *nofork = 1; 1719 } 1720 else if (strcmp(argv[i],"-wv") == 0) 1721 { 1722 wm_verbose_startup = 1; 1723 } 1724 else if (strcmp(argv[i],"-wmi") == 0) 1725 { 1726 interactive = 1; 1727 } 1728 else if (strcmp(argv[i],"-wf") == 0) 1729 { 1730 usemcfile = 1; 1731 } 1732 else if (strcmp(argv[i],"-wh") == 0) 1733 { 1734 if (++i < *argc) 1735 init_host = argv[i]; 1736 else 1737 fprintf(stderr,"Expected hostname after %s\n",argv[i-1]); 1738 } 1739 else if (strcmp(argv[i],"-wx") == 0) 1740 { 1741 if (++i < *argc) 1742 init_exec = argv[i]; 1743 else 1744 fprintf(stderr,"Expected worker executable after %s\n",argv[i-1]); 1745 } 1746 else if (strcmp(argv[i],"-w") == 0) 1747 { 1748 if (++i < *argc) 1749 *numworkers = atoi(argv[i]); 1750 else 1751 fprintf(stderr,"Expected number of workers after %s\n",argv[i-1]); 1752 } 1753 else if (argv[i][0] == '-' && argv[i][1] == 'w' && 1754 (k = atoi(&argv[i][2])) > 0) 1755 { 1756 *numworkers = k; 1757 } 1758 else 1759 { 1760 argv[j++] = argv[i]; 1761 } 1762 } 1763 *argc = j; 1764 Argc = j; 1765 Argv = argv; 1766} 1767 1768/*----------------------------------------------------------------*/ 1769/* read_mcfile() */ 1770/*----------------------------------------------------------------*/ 1771/* Read the file ".eclipse_machines" from the current directory, 1772 if one exists, otherwise from the lib directory. 1773 1774 Each line in the file should have the following format 1775 <host_name> <eclipse_exe_file> <num_workers> <auto_start> 1776 1777 where 1778 host_name is the name of the machine where workers are to be started. 1779 eclipse_exec_file is the full path name of the eclipse worker (on the 1780 specified host) 1781 num_workers is the number of workers to start initially 1782 auto_start can be 0 or 1 - if 1 the workers will be started 1783 via rsh. Otherwise, only the command line will be printed and 1784 user must start the workers manually on the specified host. 1785*/ 1786void read_mcfile() 1787{ 1788 FILE * mcfile; 1789 char lib_mc_file[MAX_PATH_LEN], mcfilename[MAX_PATH_LEN], 1790 exec_file[MAX_PATH_LEN]; 1791 struct stat buf; 1792 sstring hostname; 1793 int num_workers, i, done, val; 1794 int auto_start; 1795 1796 strcpy(mcfilename,".eclipse_machines"); 1797 1798 if ((ec_stat(mcfilename,&buf) == -1) && errno == ENOENT) 1799 { 1800 sprintf(lib_mc_file,"%s/lib/%s",eclipsehome(),mcfilename); 1801 mcfile = fopen(lib_mc_file,"r"); 1802 } 1803 else 1804 mcfile = fopen(mcfilename,"r"); 1805 1806 if (mcfile != NULL) { 1807 done = 0; 1808 while (!done) { 1809 val = fscanf(mcfile, "%s %s %d %d", hostname, exec_file, &num_workers, 1810 &auto_start); 1811 if (val == EOF) 1812 done = 1; 1813 else if (val == 4) { 1814/* printf("%s %s %d %d\n",hostname,exec_file,num_workers,auto_start);*/ 1815 add_mc(hostname,exec_file,auto_start); 1816 if (usemcfile) 1817 for(i = 0; i < num_workers; i++) 1818 init_worker(hostname,0); 1819 } 1820 else { 1821 fprintf(stderr,"Illegal format in %s startup file\n", mcfilename); 1822 exit(-1); 1823 } 1824 } 1825 fclose(mcfile); 1826 } 1827} 1828 1829void init_global_values() 1830{ 1831 int i; 1832 1833 start_time = get_time(); 1834 1835#ifdef HAVE_SYSCONF 1836 clock_hz = sysconf(_SC_CLK_TCK); 1837#else 1838#ifdef CLOCK_HZ 1839 clock_hz = CLOCK_HZ; 1840#else 1841 clock_hz = 60; 1842#endif 1843#endif 1844 1845 private_mem_init(mem_panic); 1846 1847 wm_pid = getpid(); 1848 1849 sprintf(bdomain_key, WORKER_PORT_NAME); 1850/* bdomain.bdomain_file = (char *) malloc(MAX_PATH_LEN);*/ 1851 strcpy(wm_port_name, WM_PORT_NAME); 1852 1853 map_dir = getenv("ECLIPSETMP"); 1854/* If not set use current directory - this will cause problems 1855 if the current directory is not writable */ 1856 if (!map_dir) 1857 map_dir = getcwd(NULL, MAX_PATH_LEN); 1858 1859 sprintf(mps_map_file,"%s/%d.mps.map",map_dir,wm_pid); 1860 1861 wm_host = (char *) hp_alloc_size(MAXHOSTLEN); 1862 mygethostname(wm_host); 1863 1864 sprintf(session_key,"%s.%d",wm_host,wm_pid); 1865 sprintf(wm_signature,"%s.%d",wm_host,wm_pid); 1866 1867 nsrv_host = getenv("NSRV_HOST"); 1868 if(!nsrv_host) 1869 nsrv_host = wm_host; 1870 1871 init_mc_list(); 1872 1873 read_mcfile(); 1874 if (!usemcfile) 1875 { 1876 if (!init_host) 1877 init_host = (char *) wm_host; 1878 1879 if (!init_exec) { 1880 init_exec = hp_alloc_size(sizeof(sstring)); 1881 sprintf(init_exec,"%s/bin/%s/worker",eclipsehome(),HOSTARCH); 1882 } 1883 1884 add_mc(init_host,init_exec,1); 1885 for(i = 1; i <= num_workers; i++) 1886 init_worker(init_host,0); 1887 } 1888} 1889 1890mygethostname(host) 1891 char *host; 1892{ 1893#if defined(HAVE_GETHOSTNAME) 1894 gethostname(host,MAXHOSTLEN); 1895#else 1896# if defined(HAVE_SYSINFO) && defined(HAVE_SYS_SYSTEMINFO_H) 1897 sysinfo(SI_HOSTNAME, host, MAXHOSTLEN); 1898# else 1899 struct utsname ut; 1900 uname(&ut); 1901 strcpy(host,ut.nodename); 1902# endif 1903#endif 1904} 1905 1906/*-----------------------------------------------------------*/ 1907/* MPS system initialisation routines */ 1908/*-----------------------------------------------------------*/ 1909 1910/* Register the wm's high_aport, halt1/2_aport and low_aport*/ 1911void register_aport_names() 1912{ 1913 aport_t aport; 1914 1915 sprintf(wm_high_aport_name,"%s_high_aport",wm_port_name); 1916 aport.aport_id = local_aport_ids[HIGH_APORT_NUMBER]; 1917 aport.bport_id = wm_bport_id; 1918 aport.bdomain_id = bdomain.bdomain_id; 1919 check_nsrv(nsrv_aport_register(session_key,wm_high_aport_name, 1920 wm_signature,&aport),__LINE__, 2); 1921 1922 sprintf(wm_halt2_aport_name,"%s_halt2_aport",wm_port_name); 1923 aport.aport_id = local_aport_ids[HALT2_APORT_NUMBER]; 1924 check_nsrv(nsrv_aport_register(session_key,wm_halt2_aport_name, 1925 wm_signature,&aport),__LINE__, 2); 1926 1927 sprintf(wm_halt1_aport_name,"%s_halt1_aport",wm_port_name); 1928 aport.aport_id = local_aport_ids[HALT1_APORT_NUMBER]; 1929 check_nsrv(nsrv_aport_register(session_key,wm_halt1_aport_name, 1930 wm_signature,&aport),__LINE__, 2); 1931 1932 sprintf(wm_low_aport_name,"%s_low_aport",wm_port_name); 1933 aport.aport_id = local_aport_ids[LOW_APORT_NUMBER]; 1934 check_nsrv(nsrv_aport_register(session_key,wm_low_aport_name, 1935 wm_signature,&aport),__LINE__, 2); 1936 wm_types_init(); /* can only be done after aport registration 1937 since MDT_NSRVNAME is only defined at this point */ 1938} 1939 1940void fill_in_domain(ldomain_id, lbdomain) 1941bdomain_id_t ldomain_id; 1942bdomain_t *lbdomain; 1943 1944{ 1945 lbdomain->bdomain_id = ldomain_id; 1946 lbdomain->bdomain_size = 0x00800000; /* 8 MB */ 1947 if (!shared_mem_base()) 1948 lbdomain->bdomain_start = (bmem_address_t) (shared_mem_base()); 1949 else 1950 lbdomain->bdomain_start = (bmem_address_t) (shared_mem_base() + 0x00800000); 1951 strcpy(lbdomain->bdomain_file,mps_map_file); 1952} 1953 1954/* initialise message-passing system */ 1955void mps_init() 1956{ 1957 bport_t my_bport; 1958 nsrv_ret_t nret; 1959 char hostname[MAXHOSTLEN]; 1960 1961 /* first set up the b-layer */ 1962 check_nsrv(nsrv_new_bport_id(wm_signature,&wm_bport_id),__LINE__,3); 1963 1964 gethostname(hostname,MAXHOSTLEN); 1965 1966 sprintf(domain_name,"%s.%s.%s", WORKER_PORT_NAME, hostname,session_key); 1967 1968 nret = nsrv_bdomain_look_up(bdomain_key,domain_name,&bdomain); 1969 1970 if (nret == NSRV_NOT_REGISTERED) 1971 { 1972 /* This is not safe in case two wms try to create 1973 * the domain at the same time */ 1974 check_nsrv(nsrv_new_bdomain_id(wm_signature,&domain_id),__LINE__,5); 1975 fill_in_domain(domain_id,&bdomain); 1976 check_bmsg(bmsg_init(wm_bport_id,&bdomain,BDOMAIN_CREATE | 1977#ifdef DEBUG_MPS 1978 BMSG_ALOG_ON | 1979 BMSG_ALOG_OPEN | 1980 BMSG_ALOG_CLOSE | 1981 BMSG_ALOG_MASTER | 1982#endif 1983 BPORT_NOTIFICATION ),__LINE__, 1); 1984 check_nsrv(nsrv_bdomain_register(bdomain_key,domain_name,wm_signature,&bdomain),__LINE__,2); 1985 } 1986 else if (nret == NSRV_OK) 1987 check_bmsg(bmsg_init(wm_bport_id,&bdomain, BPORT_NOTIFICATION),__LINE__,1); 1988 else 1989 { 1990 check_nsrv(nret, __LINE__, 4); 1991 } 1992 check_bmsg(bport_port(wm_bport_id,&my_bport),__LINE__,2); 1993 check_nsrv(nsrv_bport_register(session_key,wm_port_name, 1994 wm_signature,&my_bport),__LINE__,2); 1995 1996 /* Now set up the a-layer ports */ 1997 notify[HALT1_APORT_NUMBER] = halt1_notify; 1998 notify[HALT2_APORT_NUMBER] = halt2_notify; 1999 notify[HIGH_APORT_NUMBER] = high_wm_notify; 2000 notify[LOW_APORT_NUMBER] = low_wm_notify; 2001 2002#ifdef HAVE_SIGPROCMASK 2003 sigemptyset(&sigio_mask); 2004 sigaddset(&sigio_mask, SIGIO); 2005#ifdef SIGPOLL 2006 sigaddset(&sigio_mask, SIGPOLL); 2007#endif 2008#endif 2009#ifdef HAVE_SIGACTION 2010 { 2011 struct sigaction sa; 2012 sa.sa_handler = sigio_handler; 2013 sigemptyset(&sa.sa_mask); 2014 sa.sa_flags = 0; 2015 (void) sigaction(SIGIO, &sa, (struct sigaction *) 0); 2016#ifdef SIGPOLL 2017 (void) sigaction(SIGPOLL, &sa, (struct sigaction *) 0); 2018#endif 2019 sa.sa_handler = SIG_IGN; 2020 (void) sigaction(SIGINT,&sa, (struct sigaction *) 0); 2021 } 2022#else 2023 signal(SIGINT, SIG_IGN); 2024 signal(SIGIO,sigio_handler); 2025#ifdef SIGPOLL 2026 signal(SIGPOLL,sigio_handler); 2027#endif 2028#endif 2029 2030 check_amsg(amsg_init(STATIC_PORTS,notify,local_aport_ids,0), __LINE__,1); 2031 2032 check_amsg_soft(aport_set_option(local_aport_ids[LOW_APORT_NUMBER], 2033 APORT_NOTIFY_LEVEL, 2034 (aport_optval_t) 1),__LINE__, 4); 2035 check_amsg_soft(aport_set_option(local_aport_ids[HIGH_APORT_NUMBER], 2036 APORT_NOTIFY_LEVEL, 2037 (aport_optval_t) 2),__LINE__, 4); 2038 check_amsg_soft(aport_set_option(local_aport_ids[HALT1_APORT_NUMBER], 2039 APORT_NOTIFY_LEVEL, 2040 (aport_optval_t) 3),__LINE__, 4); 2041 check_amsg_soft(aport_set_option(local_aport_ids[HALT2_APORT_NUMBER], 2042 APORT_NOTIFY_LEVEL, 2043 (aport_optval_t) 4),__LINE__, 4); 2044 register_aport_names(); 2045} 2046 2047/*-------------------------------------------------------------*/ 2048/* Worker startup Routines */ 2049/*-------------------------------------------------------------*/ 2050 2051/* Starting up a new worker involves 2052 i. Creating it : bproc_create(...) 2053 ii. Waiting until the new worker registers its bport with the name server: 2054 wait_for_bport_connections(...) 2055 iii. Waiting until the new worker registers its aports with the name server: 2056 wait_for_aport_connections. 2057 iv. Sending the new worker the port names of all other workers 2058 send_port_names(..) 2059 v. Sending all the other (old) workers the port name of the new worker 2060 send_new_port(...) 2061 vi. Waiting for an acknowledge message from the new worker: 2062 the DONE_INIT_OPENS message. 2063 2064This is the protocol implemented in add_worker(). 2065 2066At initial startup, instead of starting workers one by one, we create 2067all of them (see create_workers()), wait for all the bport and aport 2068registrations and then send the all the portnames to each worker 2069(see initialise_workers()). 2070 2071*/ 2072 2073/* bproc_create - start a worker */ 2074 2075#define LOCAL_SLAVE_EXTRA_ARGS 7 2076#define REM_SLAVE_EXTRA_ARGS 10 2077 2078/*ARGSUSED*/ 2079int bproc_create(hostname,slave_no,lsession_key,first,argc,argv) 2080char * hostname; 2081int slave_no; 2082nsrv_name_t lsession_key; 2083int first; 2084int argc; 2085char ** argv; 2086 2087{ 2088 bmsg_ret_t ret; 2089 sstring s_slave_no; 2090 sstring s_nsrv_port; 2091 char ** slave_argv; 2092 int slave_argc; 2093 int i,j; 2094 machine_t *mc; 2095 worker_t *w; 2096 2097 if(gethostbyname(hostname) == NULL) 2098 { 2099 fprintf(stderr,"Invalid hostname %s\n",hostname); 2100 return(0); 2101 } 2102 2103 mc = get_mc(hostname); 2104 ret = fork(); 2105 2106 if (ret == 0) { /* child */ 2107 sprintf(s_slave_no,"%d",slave_no); 2108 sprintf(s_nsrv_port,"%d",nsrv_port_number); 2109 if(strcmp(hostname,wm_host) == 0) { /* local worker */ 2110 2111 slave_argc = argc + LOCAL_SLAVE_EXTRA_ARGS; 2112 slave_argv = (char **) hp_alloc_size(sizeof(char *) * 2113 (slave_argc + 1)); 2114 2115 /* client needs slave_no, session_key */ 2116 2117 slave_argv[0] = mc->exec_file; 2118 slave_argv[1] = "-a"; 2119 slave_argv[2] = s_slave_no; 2120 slave_argv[3] = lsession_key; 2121 slave_argv[4] = nsrv_host; 2122 slave_argv[5] = s_nsrv_port; 2123 slave_argv[6] = first == 1 ? "-m" : "-c"; 2124 slave_argv[7] = mc->heap_map_file; 2125 2126 for(i = LOCAL_SLAVE_EXTRA_ARGS+1, j = 1 ; i < slave_argc ; i++, j++) 2127 slave_argv[i] = argv[j]; 2128 2129 slave_argv[slave_argc + 1] = (char *) 0; 2130 2131 ret = execvp(mc->exec_file,slave_argv); 2132 /*NOTREACHED*/ 2133 perror("Execvp failed"); 2134 exit(4); 2135 } 2136 2137 else 2138 { /* Non-local worker to be created - use rsh */ 2139 2140/* fclose(stdout); */ 2141 slave_argc = argc + REM_SLAVE_EXTRA_ARGS; 2142 slave_argv = (char **) hp_alloc_size(sizeof(char *) * 2143 (slave_argc + 1)); 2144 slave_argv[0] = "rsh"; 2145 slave_argv[1] = slave_no == 1? "": "-n"; /*slave worker needn't stdin*/ 2146 slave_argv[2] = hostname; 2147 slave_argv[3] = mc->exec_file; 2148 slave_argv[4] = "-a"; 2149 slave_argv[5] = s_slave_no; 2150 slave_argv[6] = lsession_key; 2151 slave_argv[7] = nsrv_host; 2152 slave_argv[8] = s_nsrv_port; 2153 slave_argv[9] = first == 1 ? "-m" : "-c"; 2154 slave_argv[10] = mc->heap_map_file; 2155 2156 for(i = REM_SLAVE_EXTRA_ARGS+1, j = 1 ; i < slave_argc ; i++, j++) 2157 slave_argv[i] = argv[j]; 2158 slave_argv[slave_argc + 1] = (char *) 0; 2159 2160 ret = execvp(slave_argv[0],slave_argv); 2161 2162 perror("Execvp failed"); 2163 exit(4); 2164 } 2165 } 2166 else if (ret > 0) 2167 { 2168 if (wm_verbose_startup) 2169 printf("created worker %d, pid = %d\n",slave_no,ret); 2170 w = get_worker(slave_no); 2171 w->pid = ret; 2172 return(1); 2173 } 2174 else 2175 { 2176 fprintf(stderr,"Could not fork slave %d\n",slave_no); 2177 perror("Could not fork"); 2178 return(0); 2179 } 2180 /* should never be reached - lint warning fix */ 2181 return(-1); 2182} 2183 2184 2185#define TIME_OUT 3000 2186#define BPORT 1 2187#define APORT 2 2188 2189void wait_for_port_connection(w,port_type,dont_time_out) 2190worker_ptr w; 2191int port_type; 2192int dont_time_out; /* do not time out if starting workers manually */ 2193{ 2194 nsrv_ret_t ret; 2195 int j=0, time_out=TIME_OUT; /* 30 seconds */ 2196 2197 if (port_type == BPORT) { 2198 bport_t slave_bport; 2199 do { 2200 ret = nsrv_bport_look_up(session_key,w->bport_name,&slave_bport); 2201 if (ret == NSRV_OK) { 2202 w->pid = slave_bport.bpid; 2203 w->bport_id = slave_bport.bport_id; 2204 return; 2205 } 2206 short_sleep(1000); 2207 j++; 2208 } while(ret != NSRV_OK && ((j < time_out) || dont_time_out)); 2209 } else { 2210 nsrv_name_t w_aport_name; 2211 aport_t slave_aport; 2212 int wm_connected, halt1_connected, halt2_connected; 2213 wm_connected = halt1_connected = halt2_connected = 0; 2214 do { 2215 if (!wm_connected) { 2216 sprintf(w_aport_name,"%s_wm_aport",w->bport_name); 2217 ret = nsrv_aport_look_up(session_key,w_aport_name,&slave_aport); 2218 if (ret==NSRV_OK) { 2219 w->wm_aport_id = slave_aport.aport_id; 2220 wm_connected = 1; 2221 } 2222 } 2223 if (!halt1_connected) { 2224 sprintf(w_aport_name,"%s_halt1_aport",w->bport_name); 2225 ret = nsrv_aport_look_up(session_key,w_aport_name,&slave_aport); 2226 if (ret==NSRV_OK) { 2227 w->halt1_aport_id = slave_aport.aport_id; 2228 halt1_connected = 1; 2229 } 2230 } 2231 if (!halt2_connected) { 2232 sprintf(w_aport_name,"%s_halt2_aport",w->bport_name); 2233 ret = nsrv_aport_look_up(session_key,w_aport_name,&slave_aport); 2234 if (ret==NSRV_OK) { 2235 w->halt2_aport_id = slave_aport.aport_id; 2236 halt2_connected = 1; 2237 } 2238 } 2239 if (wm_connected&&halt1_connected&&halt2_connected) 2240 return; 2241 short_sleep(1000); 2242 j++; 2243 } while (j < time_out || dont_time_out); 2244 } 2245 wm_abort_error("Workers not responding."); 2246 kill_system(); 2247 exit(5); 2248} 2249 2250void wait_for_bport_connections() 2251{ 2252 int i; 2253 bmsg_ret_t bret; 2254 bport_t slave_bport; 2255 worker_ptr cur; 2256 2257 for(i = 0; i < mc_list.num_machines; i++) 2258 for (cur = mc_list.machines[i].list; cur != NULL; cur = cur->next) 2259 wait_for_port_connection(cur,BPORT,dont_fork || 2260 mc_list.machines[i].auto_start); 2261 2262 /* wait until all workers have made their bport connections */ 2263 2264 for(i = 0; i < mc_list.num_machines; i++) 2265 for (cur = mc_list.machines[i].list; cur != NULL; cur = cur->next) 2266 { 2267 do { 2268 bret = bport_port(cur->bport_id,&slave_bport); 2269 } while(bret != BMSG_OK); 2270 } 2271 Notify("Slave bports opened\n"); 2272} 2273 2274void wait_for_aport_connections() 2275{ 2276 int i; 2277 worker_ptr cur; 2278 2279 for(i = 0; i < mc_list.num_machines; i++) 2280 for (cur = mc_list.machines[i].list; cur != NULL; cur = cur->next) 2281 wait_for_port_connection(cur,APORT,dont_fork || 2282 mc_list.machines[i].auto_start); 2283 2284 Notify("Slave a-layer ready \n"); 2285} 2286 2287void send_port(worker_aport_id,worker) 2288 aport_id_t worker_aport_id; 2289 worker_ptr worker; 2290{ 2291 amsg_t msg; 2292 amsg_data_t * msg_data; 2293 port_name_msg_t * port_name_msg; 2294 amsg_size_t size; 2295 2296 size = sizeof(port_name_msg_t); 2297 check_amsg(amsg_alloc(size, 2298 &msg_data, 2299 &msg),__LINE__, 2); 2300 2301 port_name_msg = (port_name_msg_t *) msg_data; 2302 port_name_msg->header.msg_type = PORT_NAME; 2303 port_name_msg->header.wid = 0; 2304 2305 port_name_msg->index = worker->index; 2306 strcpy(port_name_msg->port_name,worker->bport_name); 2307 port_name_msg->wm_aport_id = worker->wm_aport_id; 2308 2309 check_amsg_soft(amsg_send(worker_aport_id,msg,mdt_wm_portname,1,0), 2310 __LINE__, 3); 2311} 2312 2313void send_new_port(new_worker) 2314 worker_ptr new_worker; 2315{ 2316 worker_ptr w; 2317 int i; 2318 2319 for(i = 0; i < mc_list.num_machines; i++) 2320 for (w = mc_list.machines[i].list; w != NULL; w = w->next) 2321 if (w != new_worker) 2322 send_port(w->wm_aport_id,new_worker); 2323} 2324 2325void send_port_names(worker_aport_id) 2326 aport_id_t worker_aport_id; 2327{ 2328 amsg_t msg; 2329 amsg_data_t * msg_data; 2330 worker_ptr w; 2331 wm_simple_msg_t * wm_mess_hdr; 2332 amsg_size_t size; 2333 int i; 2334 2335 for(i = 0; i < mc_list.num_machines; i++) 2336 for (w = mc_list.machines[i].list; w != NULL; w = w->next) 2337 send_port(worker_aport_id,w); 2338 2339 send_simple_wm_message(worker_aport_id, SENT_INIT_PORT_NAMES, 2340 mc_list.total_workers); 2341} 2342 2343 2344void send_init_port_names() 2345{ 2346 worker_ptr cur; 2347 int i; 2348 2349 for(i = 0; i < mc_list.num_machines; i++) 2350 for (cur = mc_list.machines[i].list; cur != NULL; cur = cur->next) 2351 send_port_names(cur->wm_aport_id); 2352} 2353 2354void synch_with_slaves() 2355{ 2356 /* a barrier synchronisation, each slave sends an ack and 2357 once all have done, we send acks back to all slaves, 2358 After this point, all slaves are connected to each other */ 2359 2360 worker_ptr cur; 2361 int i; 2362 2363 while (init_acks != (mc_list.total_workers)) short_sleep(1000); 2364 2365 for(i = 0; i < mc_list.num_machines; i++) 2366 for (cur = mc_list.machines[i].list; cur != NULL; cur = cur->next) 2367 { 2368 check_amsg_soft(send_simple_wm_message(cur->wm_aport_id,ALL_CONNECTED,0), 2369 __LINE__, 3); 2370 cur->status = AWAKE; 2371 } 2372 2373} 2374 2375int print_worker_command(index,file_flag,argc,argv,hostname) 2376int index; 2377char * file_flag; 2378int argc; 2379char * argv[]; 2380char * hostname; 2381{ 2382 sstring slave_file; 2383 int i; 2384 machine_t *mc; 2385 2386 mc = get_mc(hostname); 2387 2388 if (init_exec != NULL) 2389 strcpy(slave_file,init_exec); 2390 else 2391 sprintf(slave_file,"%s/bin/%s/worker",eclipsehome(),HOSTARCH); 2392 2393 printf("Slave command is: \n run -a %d %s %s %d %s %s", 2394 index, session_key, nsrv_host, nsrv_port_number, 2395 file_flag,mc->heap_map_file); 2396 2397 for(i = 1; i < argc; i++) 2398 printf(" %s",argv[i]); 2399 printf("\non host %s\n", hostname); 2400 2401} 2402 2403int add_worker(argc,argv,mc) 2404 int argc; 2405 char * argv[]; 2406 machine_t *mc; 2407{ 2408 int worker_index; 2409 worker_ptr new_worker; 2410 int ret; 2411 2412 worker_index = init_worker(mc->hostname,1); 2413 new_worker = get_worker(worker_index); 2414 2415 if (dont_fork || !(mc->auto_start)) { 2416 printf("Worker manager (session %s) waiting for 1 worker\n", 2417 session_key); 2418 print_worker_command(worker_index,(new_worker->first? "-m" : "-c"), 2419 argc,argv,mc->hostname); 2420 ret = 1; 2421 } 2422 else { 2423 ret = bproc_create(mc->hostname,worker_index,session_key, 2424 new_worker->first,argc,argv); 2425 } 2426 if (ret) { 2427 new_worker = get_worker(worker_index); 2428 wait_for_port_connection(new_worker,BPORT); 2429 wait_for_port_connection(new_worker,APORT); 2430 2431 send_port_names(new_worker->wm_aport_id); 2432 send_new_port(new_worker); 2433 2434 while (init_acks != (mc_list.total_workers)) short_sleep(1000); 2435 check_amsg_soft(send_simple_wm_message(new_worker->wm_aport_id, 2436 ALL_CONNECTED,0),__LINE__, 3); 2437 new_worker->status = AWAKE; 2438 check_amsg_soft(send_simple_wm_message(new_worker->wm_aport_id, 2439 ROOT_INITIALISED,0), __LINE__, 3); 2440 return(1); 2441 } 2442 else 2443 return(ret); 2444} 2445 2446/*-----------------------------------------------------------------*/ 2447/* The worker manager event-handling loop */ 2448/*-----------------------------------------------------------------*/ 2449void wait_for_end() 2450{ 2451 while (1) 2452 { 2453 if (interactive) 2454 { 2455 (void) aport_set_option(local_aport_ids[LOW_APORT_NUMBER], 2456 APORT_NOTIFY, (aport_optval_t) AMSG_OFF); 2457 tk_OneEvent(1); 2458 update_perf_window(); 2459 (void) aport_set_option(local_aport_ids[LOW_APORT_NUMBER], 2460 APORT_NOTIFY, (aport_optval_t) AMSG_ON); 2461 short_sleep(10000); 2462 } 2463 else 2464 { 2465 short_sleep(1800000000); /* half an hour :-) */ 2466 } 2467 } 2468} 2469 2470void fork_nameserver() 2471{ 2472 char nsrv_exec_file[MAX_PATH_LEN]; 2473 char s_nsrv_port_number[128]; 2474#ifdef HAVE_SYSINFO 2475 char mcname[128]; 2476#endif 2477 int ret; 2478 int j = 60; /* number of seconds to wait for nameserver to be up */ 2479 2480 2481 sprintf(nsrv_exec_file,"%s/bin/%s/nsrv",eclipsehome(),HOSTARCH); 2482 ret = fork(); 2483 if (ret == 0) /* child */ 2484 { 2485 char *nsrv_argv[6]; 2486 2487 setsid(); /* create a new process group for the name server */ 2488 nsrv_argv[0] = nsrv_exec_file; 2489 nsrv_argv[1] = "-p"; 2490 nsrv_argv[2] = map_dir; 2491#if defined(HAVE_SYSINFO) && defined(HAVE_SYS_SYSTEMINFO_H) 2492 sysinfo(SI_MACHINE, mcname, 128); 2493 if(strcmp("DRS 6000",mcname) == 0) { 2494 nsrv_argv[3] = (char *) 0; 2495 ret = execvp(nsrv_exec_file,nsrv_argv); 2496 perror("Could not start (exec) the name server"); 2497 exit(6); 2498 } 2499 else 2500#endif /* HAVE_SYSINFO */ 2501 { 2502 nsrv_argv[3] = "-nshm"; /* no shared-memory interaction */ 2503 nsrv_argv[4] = "-npds"; /* no pds-mps based interaction */ 2504 nsrv_argv[5] = (char *) 0; 2505 ret = execvp(nsrv_exec_file,nsrv_argv); 2506 perror("Could not start (exec) the name server"); 2507 exit(6); 2508 } 2509 } 2510 else if (ret > 0) 2511 { 2512 fprintf(stderr,"Starting Name Server %s pid = %d...",nsrv_exec_file,ret); 2513 fflush(stderr); 2514 while((j > 0) && (nsrv_init(nsrv_host, &nsrv_port_number) != NSRV_OK)) 2515 { 2516 short_sleep(1000000); 2517 j--; 2518 } 2519 if (j == 0) 2520 { 2521 wm_abort_error("Name server not responding.\n Try starting a new name server (nsrv) manually.\n"); 2522 exit(6); 2523 } 2524 else 2525 fprintf(stderr," ... done\n"); 2526 } 2527 else { 2528 perror("Could not fork name serever!"); 2529 exit(6); 2530 } 2531} 2532 2533/* Creation of "initial" workers */ 2534void create_workers(argc,argv) 2535 int argc; 2536 char * argv[]; 2537{ 2538 worker_ptr w; 2539 int i; 2540 2541 if (dont_fork) 2542 { 2543 printf("Worker manager (session %s) waiting for %d worker(s)\n", 2544 session_key, mc_list.total_workers); 2545 for(i = 0; i < mc_list.num_machines; i++) 2546 for(w = mc_list.machines[i].list; w != NULL; w = w->next) 2547 print_worker_command(w->index,w->first ? "-m" : "-c",argc,argv, 2548 mc_list.machines[i].hostname); 2549 } 2550 else 2551 { 2552 for(i = 0; i < mc_list.num_machines; i++) 2553 if(mc_list.machines[i].auto_start) 2554 { 2555 for(w = mc_list.machines[i].list; w != NULL; w = w->next) 2556 if (!bproc_create(mc_list.machines[i].hostname,w->index, 2557 session_key, w->first,argc,argv)) 2558 { 2559 fprintf(stderr,"Could not create slave: %d\n",w->index); 2560 kill_system(); 2561 exit(7); 2562 } 2563 } 2564 else 2565 for(w = mc_list.machines[i].list; w != NULL; w = w->next) 2566 print_worker_command(w->index,w->first ? "-m" : "-c", 2567 argc,argv,mc_list.machines[i].hostname); 2568 } 2569} 2570 2571/* Implements the initial startup protocol - 2572 All the "initial" workers (specified via the -w command-line 2573 option or the .eclipse_machines file) have been started, before 2574 this routine is called */ 2575 2576void initialise_workers() 2577{ 2578 worker_ptr w; 2579 int i; 2580 2581 wait_for_bport_connections(); 2582 wait_for_aport_connections(); 2583 2584 send_init_port_names(); 2585 Notify("Sent port names and Root Id\n"); 2586 2587 synch_with_slaves(); 2588 2589 while(!got_root_id) short_sleep(2000); 2590 2591 for(i = 0; i < mc_list.num_machines; i++) 2592 for(w = mc_list.machines[i].list; w != NULL; w = w->next) 2593 if(w->index != root_id_sender) 2594 check_amsg(send_simple_wm_message(w->wm_aport_id, 2595 ROOT_INITIALISED, 0), __LINE__, 3); 2596 2597} 2598/*-------------------------------------------------------*/ 2599int main(argc,argv) 2600 int argc; 2601 char * argv[]; 2602{ 2603 nsrv_ret_t nret; 2604 int i; 2605 2606 2607 get_arguments(argv, &argc, &num_workers, &dont_fork); 2608 irq_lock_init(delayed_signal_handler); 2609 init_global_values(); 2610 2611 nret = nsrv_init(nsrv_host, &nsrv_port_number); 2612 if (nret != NSRV_OK) 2613 { 2614 if (nret == NSRV_NOSERVER) 2615 { 2616 /* try starting a nameserver on the local machine */ 2617 nsrv_host = wm_host; 2618 fork_nameserver(); 2619 } 2620 else 2621 check_nsrv(nret,__LINE__, 1); 2622 } 2623 2624 mps_init(); 2625 2626 create_workers(argc,argv); 2627 2628 initialise_workers(); 2629 2630 if (interactive) 2631 start_tkwindow(); 2632 2633 Notify("Waiting for end\n"); 2634 wait_for_end(); 2635} 2636 2637#ifndef HAVE_RANDOM 2638long 2639random() 2640{ 2641 return (long) rand(); 2642} 2643 2644srandom(x) 2645int x; 2646{ 2647 srand(x); 2648} 2649#endif 2650 2651void 2652short_sleep(usec) 2653int usec; 2654{ 2655#ifdef HAVE_SELECT 2656 struct timeval sleep_time; 2657 sleep_time.tv_sec = usec / 1000000; 2658 sleep_time.tv_usec = usec % 1000000; 2659 (void) select(0, 0, 0, 0, &sleep_time); 2660#endif 2661} 2662 2663/* worker info setting and getting functions */ 2664 2665send_worker_info(reqwid, bufsize, buf) 2666int reqwid, bufsize; 2667void_ptr buf; 2668{ 2669 2670 amsg_t msg; 2671 amsg_data_t * msg_data; 2672 worker_info_msg_t * worker_info_msg; 2673 amsg_size_t size; 2674 worker_ptr w; 2675 2676 w = get_worker(reqwid); 2677 2678 size = sizeof(worker_info_msg_t) + bufsize; 2679 2680 check_amsg(amsg_alloc(size, 2681 &msg_data, 2682 &msg),__LINE__, 2); 2683 2684 worker_info_msg = (worker_info_msg_t *) msg_data; 2685 worker_info_msg->header.msg_type = WORKER_INFO_NOTIFY; 2686 worker_info_msg->header.wid = 0; 2687 worker_info_msg->size = bufsize; 2688 memcpy((void_ptr) ((worker_info_msg_t *) msg_data + 1), buf, bufsize); 2689 check_amsg_soft(amsg_send(w->wm_aport_id,msg,MDT_BYTE,size,0),__LINE__,3); 2690} 2691 2692/*-----------------------------------------------------------------*/ 2693/* Worker Statistics support */ 2694/*-----------------------------------------------------------------*/ 2695 2696 2697/* called from update_perf_window (in wm_interface.c) */ 2698/* The return message (WSTAT_RET) updates the current workers 2699 cur_wstat field. This message is received on the high wm-aport 2700 since we are busy waiting on the reply. */ 2701 2702get_worker_stats(flag) 2703int flag; 2704{ 2705 worker_ptr cur; 2706 int i; 2707 struct worker_stat_ext wstat; 2708 2709 for(i = 0; i < mc_list.num_machines; i++) 2710 for(cur = mc_list.machines[i].list; cur != NULL; cur = cur->next) 2711 { 2712 if(flag == 0) /* top-like display */ 2713 cur->start_wstat = cur->cur_wstat; 2714 wstat_received = 0; 2715 check_amsg_soft(send_simple_wm_message(cur->wm_aport_id, 2716 WSTAT_REQ, 0), 2717 __LINE__, 3); 2718 while(!wstat_received) 2719 short_sleep(1000); 2720 2721 if (flag == 1) /* reset */ 2722 cur->start_wstat = cur->cur_wstat; 2723 } 2724} 2725 2726/* mdt type definition for worker_stat_ext structure */ 2727/* mirrored in trace.c */ 2728wstat_types_init(mdt_wstat) 2729amsg_type_t * mdt_wstat; 2730 2731{ 2732 amsg_typedef_t template[33]; 2733 2734 /* mdt_wstat */ 2735 template[0] = MDT_BEGIN; 2736 template[1] = MDT_STRUCT_OPEN; 2737 template[2] = MDT_INT32; 2738 template[3] = MDT_INT32; 2739 template[4] = MDT_INT32; 2740 template[5] = MDT_INT32; 2741 template[6] = MDT_INT32; 2742 template[7] = MDT_INT32; 2743 template[8] = MDT_INT32; 2744 template[9] = MDT_INT32; 2745 template[10] = MDT_INT32; 2746 template[11] = MDT_INT32; 2747 template[12] = MDT_INT32; 2748 template[13] = MDT_INT32; 2749 template[14] = MDT_INT32; 2750 template[15] = MDT_INT32; 2751 template[16] = MDT_INT32; 2752 template[17] = MDT_INT32; 2753 template[18] = MDT_INT32; 2754 template[19] = MDT_ARRAY_OF; 2755 template[20] = 3; 2756 template[21] = MDT_DP_FLOAT; 2757 template[22] = MDT_DP_FLOAT; 2758 template[23] = MDT_ARRAY_OF; 2759 template[24] = MAX_NUM_EVENTS; 2760 template[25] = MDT_DP_FLOAT; 2761 template[26] = MDT_INT32; 2762 template[27] = MDT_INT32; 2763 template[28] = MDT_ARRAY_OF; 2764 template[29] = MAX_NESTING; 2765 template[30] = MDT_INT32; 2766 template[31] = MDT_STRUCT_CLOSE; 2767 template[32] = MDT_END; 2768 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 100, 2769 template, mdt_wstat), __LINE__, 4); 2770} 2771 2772