1/* BEGIN LICENSE BLOCK 2 * Version: CMPL 1.1 3 * 4 * The contents of this file are subject to the Cisco-style Mozilla Public 5 * License Version 1.1 (the "License"); you may not use this file except 6 * in compliance with the License. You may obtain a copy of the License 7 * at www.eclipse-clp.org/license. 8 * 9 * Software distributed under the License is distributed on an "AS IS" 10 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See 11 * the License for the specific language governing rights and limitations 12 * under the License. 13 * 14 * The Original Code is The ECLiPSe Constraint Logic Programming System. 15 * The Initial Developer of the Original Code is Cisco Systems, Inc. 16 * Portions created by the Initial Developer are 17 * Copyright (C) 1994-2006 Cisco Systems, Inc. All Rights Reserved. 18 * 19 * Contributor(s): 20 * 21 * END LICENSE BLOCK */ 22 23/********************************************************************** 24** System: Parallel Distributed System 25** File: worker.c 26** Author: Shyam Mudambi 27** Liang-Liang Li 28** Description: Slave process mps setup and handling routines 29***********************************************************************/ 30 31#include "config.h" 32#include <sys/types.h> 33#include <sys/param.h> 34#include <signal.h> 35#include <stdlib.h> 36#include <stdio.h> 37#include <sys/time.h> 38#include <sys/times.h> 39 40#ifdef HAVE_UNISTD_H 41#include <unistd.h> 42#endif 43 44#if HAVE_STRING_H 45# include <string.h> 46# ifdef MEMCPY_STRING 47# define bcopy(s1, s2, n) (void) memcpy((void *)(s2),(void *)(s1), n) 48# endif 49#endif 50#ifdef MEMCPY_MEMORY 51# include <memory.h> 52# define bcopy(s1, s2, n) (void) memcpy((char *)(s2), (char *)(s1), n) 53#endif 54 55#ifdef PATH_IN_LIMITS 56# include <limits.h> 57# define MAX_PATH_LEN PATH_MAX 58#else 59# include <sys/param.h> 60# define MAX_PATH_LEN MAXPATHLEN 61#endif 62 63#ifdef BSD_TIMES 64#include <sys/timeb.h> 65#endif 66 67#if !defined(HAVE_GETHOSTID) 68#include <sys/utsname.h> /* for uname() */ 69#endif 70 71#ifdef HAVE_SYS_SYSTEMINFO_H 72#include <sys/systeminfo.h> 73#endif 74 75#include "sepia.h" 76#include <pds.h> /* pds.h has to be included before types.h since both 77 define machine word size dependent types */ 78#include <nsrv.h> 79#include "types.h" 80#include "embed.h" 81#include "error.h" 82#include "mem.h" 83#include "dict.h" 84#include "memman.h" 85#include "trace.h" 86#include "wm_msgs.h" 87#include "wm.h" 88 89 90#define ST_HANDLE_DS_DEFINED 1 91#include "sch_types.h" 92#include "sch_eng_interface.h" 93 94 95/*------------------------------------------------------*/ 96/* Types and Constant Definitions */ 97/*------------------------------------------------------*/ 98 99#define ParallelWorker (ec_options.parallel_worker) 100 101#define Notify(text) \ 102{ if (worker_verbose) { printf("%d: %s\n", ParallelWorker, text); }} 103 104typedef struct worker_struct * worker_ptr; 105struct worker_struct { 106 int index; 107 int pid; 108 nsrv_name_t bport_name; 109 bport_id_t bport_id; 110 aport_id_t wm_aport_id; 111 int status; 112 worker_ptr next; 113} ; 114 115typedef struct worker_struct worker_t; 116 117typedef struct { 118 int num_workers; 119 worker_ptr list; 120} hdr_worker_list_t; 121 122/*------------------------------------------------------*/ 123/* External Variables */ 124/*------------------------------------------------------*/ 125 126extern RETSIGTYPE sigmsg_handler(); 127 128extern void sch_wake_eng(), 129 sch_idle_eng(), 130 ec_worker_cleanup(), 131 unblock_signals(); 132 133/* extern void exit(); */ 134 135extern void short_sleep(); 136void poll_short_sleep(); 137#ifdef WORKER_TRACING 138extern void trace_types_init(); 139extern trace_header_t * get_trace_ptr(); 140#endif 141 142/*------------------------------------------------------*/ 143/* Global Variables */ 144/*------------------------------------------------------*/ 145 146aport_id_t wm_halt1_aport_id; 147aport_id_t wm_halt2_aport_id; 148aport_id_t wm_high_aport_id; 149aport_id_t wm_low_aport_id; /* used in trace.c */ 150 151static int worker_verbose = 0; 152static hdr_worker_list_t worker_list; 153 154st_handle_t root_id; 155 156nsrv_name_t bdomain_key; 157nsrv_name_t session_key; 158static nsrv_name_t my_port_name; 159static nsrv_name_t my_signature; 160bdomain_t bdomain; 161 162 163bdomain_id_t domain_id; /* only used if this worker creates a new 164 shared memory domain */ 165 166status_msg_t status; 167host_status_msg_t host_status; 168 169static bport_t wm_bport; 170aport_id_t aports[TOTAL_APORT_NUMBER]; 171static bport_t my_bport; 172static bport_id_t my_bport_id; 173static int my_pid; 174 175#if defined(__STDC__) 176static volatile int config_ret = 0; 177static volatile int status_ret = 0; 178static volatile int worker_info_ret = 0; 179static volatile int got_wm_hostname = 0; 180static volatile int all_connected = 0; 181static volatile int got_halt = 0; 182static volatile int got_root_id = 0; 183static volatile int root_initialised = 0; 184static volatile int sent_init_table = 0; 185static volatile int wm_b_connected = 0; 186static volatile int open_bports = 0; 187static volatile int closed_bports = 0; 188static volatile double cur_time = 0.0; 189#else 190static int config_ret = 0; 191static int status_ret = 0; 192static int worker_info_ret = 0; 193static int got_wm_hostname = 0; 194static int all_connected = 0; 195static int got_halt = 0; 196static int got_root_id = 0; 197static int root_initialised = 0; 198static int sent_init_table = 0; 199static int wm_b_connected = 0; 200static int open_bports = 0; 201static int closed_bports = 0; 202static double cur_time = 0.0; 203#endif 204static char * map_dir; 205nsrv_name_t wm_hostname; /* name of worker manager host */ 206int local_wm = 0; /* set if worker manager is on the same host */ 207 208/* for sending and rec. worker info */ 209void_ptr worker_info_buf, worker_info_bufin; 210int worker_info_bufsize, worker_info_bufinsize; 211 212/* for receiving worker statistics */ 213struct worker_stat_ext * wstat_rec_buf; 214 215#ifdef WORKER_TRACING 216/* for receiving trace header */ 217trace_header_t * trace_header_buf; 218#endif 219 220/* below used for calculating elapsed session time */ 221/* wm_start_time is only used if we on the same host as the 222 worker manager */ 223#ifdef HAVE_GETHRTIME 224static volatile hrtime_t wm_start_time; 225#else 226#ifdef BSD_TIMES 227static volatile time_t wm_start_time; 228#else 229static volatile clock_t wm_start_time; 230#endif 231#endif 232extern int clock_hz; 233 234/*-----------------------------------------------------------------*/ 235/* Port list handling routines */ 236/*-----------------------------------------------------------------*/ 237 238void init_worker_list() 239{ 240 worker_list.num_workers = 0; 241 worker_list.list = NULL; 242} 243 244void insert_port(index,port_name,wm_aport_id) 245int index; 246nsrv_name_t port_name; 247aport_id_t wm_aport_id; 248 249{ 250 worker_ptr worker; 251 252 worker = (worker_ptr) hp_alloc(sizeof(worker_t)); 253 worker->index = index; 254 (void) strcpy(worker->bport_name,port_name); 255 worker->status = 0; 256 worker->wm_aport_id = wm_aport_id; 257 258 if(worker_list.list == NULL) 259 worker->next = NULL; 260 else 261 worker->next = worker_list.list; 262 263 worker_list.list = worker; 264 worker_list.num_workers++; 265} 266 267worker_ptr get_worker(wid) 268int wid; 269{ 270 worker_ptr cur; 271 272 for(cur = worker_list.list; cur != NULL; cur = cur->next) 273 if (cur->index == wid) 274 return(cur); 275 276 return(0); 277} 278 279 280int valid_wid(wid) 281int wid; 282{ 283 worker_ptr cur; 284 285 for(cur = worker_list.list; cur != NULL; cur = cur->next) 286 if (cur->index == wid) 287 return(1); 288 289 return(0); 290} 291 292/*-----------------------------------------------------------------*/ 293 294void check_nsrv(nret,line,err) 295nsrv_ret_t nret; 296int line; 297int err; 298 299{ 300 if (nret != NSRV_OK) 301 { 302 printf("**Worker %d: Name Server (nsrv) Fatal Error**\n",ParallelWorker); 303 switch(err) 304 { 305 case 1: 306 printf("Looks like your nameserver has crashed.\n"); 307 printf("Kill all workers, remove all files in $ECLIPSETMP "); 308 printf("and restart peclipse.\n"); 309 break; 310 case 2: 311 printf("Could not register information with nsrv.\n"); 312 break; 313 case 3: 314 printf("nsrv_new_bport_id call failed.\n"); 315 break; 316 case 4: 317 printf("nsrv_bdomain_look_up failed.\n"); 318 break; 319 case 5: 320 printf("nsrv_bport_look_up failed.\n"); 321 break; 322 case 6: 323 printf("nsrv_aport_look_up failed.\n"); 324 break; 325 default: 326 break; 327 } 328 printf("pid = %d nret = %d at line %d in file %s\n", 329 getpid(),nret,line,__FILE__); 330 exit(1); 331 } 332} 333 334void check_nsrv_soft(nret,line,err) 335nsrv_ret_t nret; 336int line; 337int err; 338{ 339 if (nret != NSRV_OK) 340 { 341 printf("**Worker %d: Name Server (nsrv) Error** \n",ParallelWorker); 342 switch(err) 343 { 344 case 1: 345 printf("Could not deregister nameserver information"); 346 break; 347 case 2: 348 printf("Could not free nameserver ids"); 349 break; 350 default: 351 break; 352 } 353 printf("pid = %d nret = %d at line %d in file %s\n", 354 getpid(),nret,line,__FILE__); 355 printf("Trying to continue execution..\n"); 356 } 357} 358 359void check_bmsg(bret,line,err) 360bmsg_ret_t bret; 361int line; 362int err; 363{ 364 if (bret != BMSG_OK) 365 { 366 printf("**Worker %d: MPS B-layer Fatal Error: Aborting!\n",ParallelWorker); 367 switch(err) 368 { 369 case 1: 370 printf("Could not initialize Message Passing System.\n"); 371 break; 372 case 2: 373 printf("bport_port call failed.\n"); 374 break; 375 case 3: 376 printf("bport_open call failed.\n"); 377 break; 378 default: 379 break; 380 } 381 printf("pid = %d bret = %d at line %d in file %s\n", 382 getpid(),bret,line,__FILE__); 383 exit(2); 384 } 385} 386 387void check_amsg(aret,line,err) 388amsg_ret_t aret; 389int line; 390int err; 391{ 392 if (aret != AMSG_OK) 393 { 394 printf("**Worker %d: MPS A-Layer Fatal Error: Aborting!",ParallelWorker); 395 switch(err) 396 { 397 case 1: 398 printf("Could not initialize Message Passing System.\n"); 399 break; 400 case 2: 401 printf("Could not allocate message buffer."); 402 printf("Probably no memory left.\n"); 403 break; 404 case 3: 405 printf("Error in amsg_send.\n"); 406 default: 407 break; 408 } 409 printf("pid = %d aret = %d at line %d in file %s\n", 410 getpid(),aret,line,__FILE__); 411 exit(3); 412 } 413} 414 415void check_amsg_soft(aret,line,err) 416amsg_ret_t aret; 417int line; 418int err; 419{ 420 if (aret != AMSG_OK) 421 { 422 printf("**Worker %d: MPS A-Layer Error: Warning",ParallelWorker); 423 switch(err) 424 { 425 case 1: 426 printf("Could not free message buffer.\n"); 427 break; 428 case 2: 429 printf("Error in receiving messages.\n"); 430 break; 431 case 3: 432 printf("Error in amsg_send.\n"); 433 default: 434 break; 435 } 436 printf("pid = %d aret = %d at line %d in file %s\n", 437 getpid(),aret,line,__FILE__); 438 printf("Trying to continue execution..\n"); 439 } 440} 441 442 443/*-----------------------------------------------------------*/ 444/* Panic and warning routines */ 445/*-----------------------------------------------------------*/ 446 447static bport_id_t nsrv_port_id; 448 449 450/*ARGSUSED*/ 451void 452worker_bport_notify(port_id,port_primitive) 453 bport_id_t port_id; 454 bport_primitive_t port_primitive; 455{ 456 switch (port_primitive) { 457 case BPORT_OPEN : 458/* printf("%d: bport_notify: BPORT_OPEN:%d\n",ParallelWorker,port_id); */ 459 open_bports++; 460 break; 461 case BPORT_CLOSE : 462/* printf("%d: bport_notify: BPORT_CLOSE:%d\n",ParallelWorker,port_id); */ 463 closed_bports++; 464 break; 465 case BPORT_BLOCK : 466 break; 467 case BPORT_UNBLOCK : 468 break; 469 default: 470 break; 471 } 472} 473 474int mem_worker_list(bport_id) 475 bport_id_t bport_id; 476 477{ 478 worker_ptr cur; 479 480 for(cur = worker_list.list; cur != NULL; cur = cur->next) 481 if (cur->bport_id == bport_id) 482 return(1); 483 484 return(0); 485} 486 487void 488worker_bport_ack(port_id,port_primitive,ret) 489 bport_id_t port_id; 490 bport_primitive_t port_primitive; 491 bmsg_ret_t ret; 492{ 493 494 if (ret == BMSG_OK) { 495 switch (port_primitive) { 496 case BPORT_OPEN : 497/* printf("%d: bport_ack: BPORT_OPEN:%d\n",ParallelWorker,port_id); */ 498 if (port_id == wm_bport.bport_id) 499 wm_b_connected = 1; 500 else if (mem_worker_list(port_id)) 501 open_bports++; 502 else 503 nsrv_port_id = port_id; 504 break; 505 case BPORT_CLOSE : 506/* printf("%d: bport_ack: BPORT_CLOSE:%d\n",ParallelWorker,port_id); */ 507 closed_bports++; 508 break; 509 case BPORT_BLOCK : 510 break; 511 case BPORT_UNBLOCK : 512 break; 513 default: 514 break; 515 } 516 } 517 else 518 { 519 printf("%d: Bport_ack: error in ",ParallelWorker); 520 switch (port_primitive) { 521 case BPORT_OPEN : 522 printf("opening "); 523 break; 524 case BPORT_CLOSE : 525 printf("closing "); 526 break; 527 case BPORT_BLOCK : 528 printf("blocking "); 529 break; 530 case BPORT_UNBLOCK : 531 printf("unblocking "); 532 break; 533 default: 534 break; 535 } 536 printf(" bport_id %d\n", port_id); 537 } 538 return; 539} 540 541 542 543/*-----------------------------------------------------------------*/ 544/* Initialise Message Types */ 545/*-----------------------------------------------------------------*/ 546 547amsg_type_t mdt_wm_header; 548amsg_type_t mdt_wm_simple; 549amsg_type_t mdt_wm_nsrvname; 550amsg_type_t mdt_wm_config; 551amsg_type_t mdt_wm_node; 552amsg_type_t mdt_sthandlet; 553amsg_type_t mdt_wm_mcstatus; 554amsg_type_t mdt_wm_status; 555amsg_type_t mdt_wm_hoststatus; 556amsg_type_t mdt_wm_time; 557amsg_type_t mdt_wm_starttime; 558amsg_type_t mdt_wm_portname; 559amsg_type_t mdt_wm_wstat; 560#ifdef WORKER_TRACING 561amsg_type_t mdt_wm_trace; 562amsg_type_t mdt_trace; 563#endif 564amsg_type_t mdt_wstat; 565 566wm_types_init() 567{ 568 amsg_typedef_t template[20]; 569 570 /* wm_msg_header_t */ 571 template[0] = MDT_BEGIN; 572 template[1] = MDT_STRUCT_OPEN; 573 template[2] = MDT_INT32; 574 template[3] = MDT_INT32; 575 template[4] = MDT_STRUCT_CLOSE; 576 template[5] = MDT_END; 577 578 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 1, 579 template, &mdt_wm_header), __LINE__, 4); 580 581 /* wm_simple_msg_t */ 582 template[0] = MDT_BEGIN; 583 template[1] = MDT_STRUCT_OPEN; 584 template[2] = mdt_wm_header; 585 template[3] = MDT_INT32; 586 template[4] = MDT_STRUCT_CLOSE; 587 template[5] = MDT_END; 588 589 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 2, 590 template, &mdt_wm_simple), __LINE__, 4); 591 592 /* host_status_req_t structures */ 593 template[0] = MDT_BEGIN; 594 template[1] = MDT_STRUCT_OPEN; 595 template[2] = mdt_wm_header; 596 template[3] = MDT_NSRVNAME; 597 template[4] = MDT_STRUCT_CLOSE; 598 template[5] = MDT_END; 599 600 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 3, 601 template, &mdt_wm_nsrvname), __LINE__, 4); 602 603 /* st_handle_t */ 604 605 template[0] = MDT_BEGIN; 606 template[1] = MDT_STRUCT_OPEN; 607 template[2] = MDT_APORTID; 608 template[3] = MDT_UINT32; 609 template[4] = MDT_UINT32; 610 template[5] = MDT_STRUCT_CLOSE; 611 template[6] = MDT_END; 612 613 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 4, 614 template, &mdt_sthandlet), __LINE__, 4); 615 616 /* node_msg_t */ 617 618 template[0] = MDT_BEGIN; 619 template[1] = MDT_STRUCT_OPEN; 620 template[2] = mdt_wm_header; 621 template[3] = mdt_sthandlet; 622 template[4] = MDT_STRUCT_CLOSE; 623 template[5] = MDT_END; 624 625 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 5, 626 template, &mdt_wm_node), __LINE__, 4); 627 628 /* config_msg_t */ 629 template[0] = MDT_BEGIN; 630 template[1] = MDT_STRUCT_OPEN; 631 template[2] = mdt_wm_header; 632 template[3] = MDT_INT32; 633 template[4] = MDT_NSRVNAME; 634 template[5] = MDT_STRUCT_CLOSE; 635 template[6] = MDT_END; 636 637 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 6, 638 template, &mdt_wm_config), __LINE__, 4); 639 640 /* mc_status_t */ 641 template[0] = MDT_BEGIN; 642 template[1] = MDT_STRUCT_OPEN; 643 template[2] = MDT_INT32; 644 template[3] = MDT_INT32; 645 template[4] = MDT_NSRVNAME; 646 template[5] = MDT_STRUCT_CLOSE; 647 template[6] = MDT_END; 648 649 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 7, 650 template, &mdt_wm_mcstatus), __LINE__, 4); 651 652 /* mc_status_t */ 653 template[0] = MDT_BEGIN; 654 template[1] = MDT_STRUCT_OPEN; 655 template[2] = mdt_wm_header; 656 template[3] = MDT_INT32; 657 template[4] = MDT_INT32; 658 template[5] = MDT_ARRAY_OF; 659 template[6] = MAX_MACHINES; 660 template[7] = mdt_wm_mcstatus; 661 template[8] = MDT_STRUCT_CLOSE; 662 template[9] = MDT_END; 663 664 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 8, 665 template, &mdt_wm_status), __LINE__, 4); 666 667 /* host_status_msg_t */ 668 template[0] = MDT_BEGIN; 669 template[1] = MDT_STRUCT_OPEN; 670 template[2] = mdt_wm_header; 671 template[3] = MDT_INT32; 672 template[4] = MDT_INT32; 673 template[5] = MDT_ARRAY_OF; 674 template[6] = MAX_PROCS; 675 template[7] = MDT_INT32; 676 template[8] = MDT_ARRAY_OF; 677 template[9] = MAX_PROCS; 678 template[10] = MDT_INT32; 679 template[11] = MDT_STRUCT_CLOSE; 680 template[12] = MDT_END; 681 682 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 9, 683 template, &mdt_wm_hoststatus), __LINE__, 4); 684 685 686 /* start_time_msg_t */ 687 688 template[0] = MDT_BEGIN; 689 template[1] = MDT_STRUCT_OPEN; 690 template[2] = mdt_wm_header; 691#ifdef HAVE_GETHRTIME 692 template[3] = MDT_DWORD; 693#else 694 template[3] = MDT_UINT32; 695#endif 696 template[4] = MDT_STRUCT_CLOSE; 697 template[5] = MDT_END; 698 699 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 10, 700 template, &mdt_wm_starttime), __LINE__, 4); 701 702 /* time_msg_t */ 703 704 template[0] = MDT_BEGIN; 705 template[1] = MDT_STRUCT_OPEN; 706 template[2] = mdt_wm_header; 707 template[3] = MDT_DP_FLOAT; 708 template[4] = MDT_STRUCT_CLOSE; 709 template[5] = MDT_END; 710 711 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 11, 712 template, &mdt_wm_time), __LINE__, 4); 713 714 /* port_name_msg_t */ 715 template[0] = MDT_BEGIN; 716 template[1] = MDT_STRUCT_OPEN; 717 template[2] = mdt_wm_header; 718 template[3] = MDT_INT32; 719 template[4] = MDT_NSRVNAME; 720 template[5] = MDT_APORTID; 721 template[6] = MDT_STRUCT_CLOSE; 722 template[7] = MDT_END; 723 724 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 12, 725 template, &mdt_wm_portname), __LINE__, 4); 726 727 wstat_types_init(&mdt_wstat); 728 /* wstat_msg_t */ 729 template[0] = MDT_BEGIN; 730 template[1] = MDT_STRUCT_OPEN; 731 template[2] = mdt_wm_header; 732 template[3] = mdt_wstat; 733 template[4] = MDT_STRUCT_CLOSE; 734 template[5] = MDT_END; 735 736 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 13, 737 template, &mdt_wm_wstat), __LINE__, 4); 738 739 /* trace_msg_t */ 740#ifdef WORKER_TRACING 741 trace_types_init(&mdt_trace); 742 template[0] = MDT_BEGIN; 743 template[1] = MDT_STRUCT_OPEN; 744 template[2] = mdt_wm_header; 745 template[3] = mdt_trace; 746 template[4] = MDT_STRUCT_CLOSE; 747 template[5] = MDT_END; 748 749 check_amsg(amsg_type_define(ECLIPSE_WM_INTERFACE, 14, 750 template, &mdt_wm_trace), __LINE__, 4); 751#endif 752} 753 754void send_simple_wm_message(port_id,msg_type,msg_value) 755aport_id_t port_id; 756int msg_type; 757int msg_value; 758{ 759 amsg_t msg; 760 amsg_data_t * msg_data; 761 wm_simple_msg_t * wm_mess_hdr; 762 763 amsg_size_t size; 764 765 size = sizeof(wm_simple_msg_t); 766 767 check_amsg(amsg_alloc(size, 768 &msg_data, 769 &msg),__LINE__, 2); 770 771 wm_mess_hdr = (wm_simple_msg_t *) msg_data; 772 wm_mess_hdr->header.msg_type = msg_type; 773 wm_mess_hdr->header.wid = ParallelWorker; 774 wm_mess_hdr->msg_value = msg_value; 775 check_amsg_soft(amsg_send(port_id,msg,mdt_wm_simple,1,0),__LINE__, 3); 776} 777 778/*-----------------------------------------------------------------*/ 779/* Aport Notify Procedures */ 780/*-----------------------------------------------------------------*/ 781 782void 783halt1_notify(port_id) 784 aport_id_t port_id; 785{ 786 amsg_ret_t ret; 787 amsg_t msg; 788 amsg_data_t * msg_data; 789 amsg_type_t msg_type; 790 amsg_count_t msg_count; 791 792 ret = amsg_receive(port_id, 793 &msg, 794 &msg_data, 795 &msg_type, 796 &msg_count, 0); 797 if (ret == AMSG_OK) { 798 Notify("Got HALT_SYSTEM1 message"); 799 ec_worker_cleanup(); 800 exit(1); 801 } 802} 803 804/*ARGSUSED*/ 805void 806halt2_notify(port_id) 807 aport_id_t port_id; 808{ 809 Notify("Got HALT_SYSTEM2 message"); 810 got_halt = 1; 811} 812 813void 814wm_notify(port_id) 815 aport_id_t port_id; 816 817{ 818 amsg_ret_t ret; 819 amsg_t msg; 820 amsg_data_t * msg_data; 821 amsg_type_t msg_type; 822 amsg_count_t msg_count; 823 wm_simple_msg_t * wm_simple_msg; 824 825 while (( ret = amsg_receive(port_id, 826 &msg, 827 &msg_data, 828 &msg_type, 829 &msg_count,0)) == AMSG_OK) { 830 831 if (msg_type == mdt_wm_simple) 832 { 833 wm_simple_msg = (wm_simple_msg_t *) msg_data; 834 switch(wm_simple_msg->header.msg_type) { 835 836 case SENT_INIT_PORT_NAMES: 837 Notify("Got SENT_INIT_PORT_NAMES message"); 838 sent_init_table = 1; 839 break; 840 841 case ALL_CONNECTED: 842 Notify("Got ALL_CONNECTED message"); 843 all_connected = 1; 844 break; 845 846 case ROOT_INITIALISED: 847 Notify("Got ROOT_INITIALISED message"); 848 root_initialised = 1; 849 break; 850 851 case GOTO_SLEEP: 852 Notify("Got GOTO_SLEEP message"); 853 (void) sch_idle_eng(aports[SCH_APORT_NUMBER]); 854 break; 855 856 case WAKEUP: 857 Notify("Got WAKEUP message"); 858 (void) sch_wake_eng(aports[SCH_APORT_NUMBER]); 859 break; 860 861 case CONFIG_NOTIFY: 862 Notify("Got CONFIG_NOTIFY message"); 863 config_ret = wm_simple_msg->msg_value; 864 break; 865 866 case WSTAT_REQ: 867 { 868 Notify("Got WSTAT_REQ message"); 869 send_wstat(wm_simple_msg->header.wid); 870 } 871 break; 872 873 case WSTAT_RESET: 874 { 875 int wid = wm_simple_msg->header.wid; 876 877 Notify("Got WSTAT_RESET message"); 878 reset_worker_stat(); 879 send_simple_wm_message((get_worker(wid))->wm_aport_id, WSTAT_RET, 0); 880 break; 881 } 882 883 case WSTAT_RET: 884 Notify("Got WSTAT_RET message"); 885 worker_info_ret = 1; 886 break; 887 888#ifdef WORKER_TRACING 889 case START_TRACING: 890 Notify("Got START_TRACING message"); 891 start_tracing(); 892 break; 893 894 case STOP_TRACING: 895 Notify("Got STOP_TRACING message"); 896 stop_tracing(); 897 break; 898 899 case GET_TRACE_HEADER: 900 Notify("Got GET_TRACE_HEADER message"); 901 send_trace_header(wm_simple_msg->header.wid, 902 GET_TRACE_RET, get_trace_ptr()); 903 break; 904 905 case SET_TRACE_RET: 906 Notify("Got SET_TRACE_HEADER message"); 907 worker_info_ret = 1; 908 break; 909#endif 910 911 default: 912 break; 913 } 914 } 915 else if (msg_type == mdt_wm_portname) 916 { 917 /* PORT_NAME message */ 918 port_name_msg_t * port_name_msg; 919 Notify("Got PORT_NAME message"); 920 port_name_msg = (port_name_msg_t *) msg_data; 921 insert_port(port_name_msg->index,port_name_msg->port_name, 922 port_name_msg->wm_aport_id); 923 } 924 else if (msg_type == mdt_wm_nsrvname) 925 { 926 /* WM_HOSTNAME message */ 927 host_name_msg_t *host_name_msg; 928 host_name_msg = (host_name_msg_t *) msg_data; 929 (void) strcpy(wm_hostname,host_name_msg->hostname); 930 got_wm_hostname = 1; 931 } 932 else if (msg_type == mdt_wm_node) 933 { 934 /* SET_ROOT message */ 935 node_msg_t *node_msg; 936 node_msg = (node_msg_t *) msg_data; 937 if(node_msg->header.msg_type == SET_ROOT) 938 { 939 Notify("Got SET_ROOT message"); 940 root_id = node_msg->node; 941 got_root_id = 1; 942 } 943 else if (node_msg->header.msg_type == GET_ROOT) 944 { 945 st_handle_t local; 946 Notify("Got GET_ROOT message"); 947 local = node_msg->node; 948 sch_init_lodge(aports[SCH_APORT_NUMBER], node_msg->header.wid, 949 &local); 950 } 951 } 952 else if (msg_type == mdt_wm_time) 953 { 954 /* SEND_TIME message */ 955 time_msg_t * time_msg; 956 Notify("Got SEND_TIME message"); 957 time_msg = (time_msg_t *) msg_data; 958 cur_time = time_msg->cur_time; 959 } 960 961 else if (msg_type == mdt_wm_starttime) 962 { 963 /* SEND_START_TIME message */ 964 start_time_msg_t * start_time_msg; 965 start_time_msg = (start_time_msg_t *) msg_data; 966 Notify("Got SEND_START_TIME message"); 967 wm_start_time = start_time_msg->start_time; 968 } 969 else if (msg_type == mdt_wm_status) 970 { 971 /* STATUS_NOTIFY message */ 972 status_msg_t * status_msg; 973 Notify("Got STATUS_NOTIFY message"); 974 status_msg = (status_msg_t *) msg_data; 975 { 976 int i; 977 status.num_machines = status_msg->num_machines; 978 for(i = 0 ; i < status_msg->num_machines; i++) 979 status.machines[i] = status_msg->machines[i]; 980 } 981 status_ret = status_msg->total_workers; 982 } 983 984 else if (msg_type == mdt_wm_hoststatus) 985 { 986 /* HOST_STATUS_NOTIFY */ 987 host_status_msg_t * host_status_msg; 988 int i; 989 990 Notify("Got HOST_STATUS_NOTIFY message"); 991 host_status_msg = (host_status_msg_t *) msg_data; 992 host_status.num_awake = host_status_msg->num_awake; 993 host_status.num_asleep = host_status_msg->num_asleep; 994 for(i = 0; i < host_status.num_awake; i++) 995 host_status.awake_ids[i] = host_status_msg->awake_ids[i]; 996 for(i = 0; i < host_status.num_asleep; i++) 997 host_status.asleep_ids[i] = host_status_msg->asleep_ids[i]; 998 999 status_ret = host_status.num_awake + host_status.num_asleep; 1000 } 1001 else if (msg_type == mdt_wm_wstat) 1002 { 1003 /* WSTAT_RET */ 1004 wstat_msg_t * wstat_msg; 1005 Notify("Got WSTAT_RET message"); 1006 wstat_msg = (wstat_msg_t *) msg_data; 1007 *wstat_rec_buf = wstat_msg->stat; 1008 worker_info_ret = 1; 1009 break; 1010 } 1011#ifdef WORKER_TRACING 1012 else if (msg_type == mdt_wm_trace) 1013 { 1014 trace_msg_t * trace_msg; 1015 int wid; 1016 1017 trace_msg = (trace_msg_t *) msg_data; 1018 wid = trace_msg->header.wid; 1019 if(trace_msg->header.msg_type == GET_TRACE_RET) 1020 { 1021 *trace_header_buf = trace_msg->trace_header; 1022 worker_info_ret = 1; 1023 } 1024 else if (trace_msg->header.msg_type == SET_TRACE_HEADER) 1025 { 1026 print_trace(&(trace_msg->trace_header)); 1027 send_simple_wm_message(get_worker(wid)->wm_aport_id, 1028 SET_TRACE_RET, 0); 1029 } 1030 break; 1031 } 1032#endif 1033 else if (msg_type == MDT_BYTE) 1034 { 1035 worker_info_msg_t * worker_info_msg; 1036 worker_info_msg = (worker_info_msg_t *) msg_data; 1037 switch (worker_info_msg->header.msg_type) 1038 { 1039 case WORKER_INFO_SET: 1040 Notify("Got WORKER_INFO_SET message"); 1041 set_worker_info(worker_info_msg->req_wid, 1042 worker_info_msg->infotype, 1043 (void_ptr) (worker_info_msg + 1)); 1044 send_worker_info(worker_info_msg->req_wid, 1045 worker_info_msg->infotype, 1046 0, (void_ptr) NULL); 1047 break; 1048 1049 case WORKER_INFO_GET: 1050 Notify("Got WORKER_INFO_GET message"); 1051 get_worker_info(worker_info_msg->infotype, 1052 &worker_info_bufinsize, 1053 &worker_info_bufin); 1054 send_worker_info(worker_info_msg->req_wid, 1055 worker_info_msg->infotype, 1056 worker_info_bufinsize, worker_info_bufin); 1057 break; 1058 1059 case WORKER_INFO_NOTIFY: 1060 Notify("Got WORKER_INFO_NOTIFY message"); 1061 if(worker_info_bufsize >= worker_info_msg->size) 1062 bcopy((char *) (msg_data + sizeof(worker_info_msg_t)), 1063 (char *) worker_info_buf, 1064 worker_info_msg->size); 1065 else 1066 bcopy((char *) (msg_data + sizeof(worker_info_msg_t)), 1067 (char *) worker_info_buf, 1068 worker_info_bufsize); 1069 worker_info_ret = 1; 1070 break; 1071 } 1072 } 1073 check_amsg_soft(amsg_free(msg), __LINE__, 1); 1074 } 1075 if (ret != AMSG_NOMESSAGE) { 1076 check_amsg_soft(ret,__LINE__, 2); 1077 } 1078} 1079 1080void fill_in_domain(ldomain_id, lbdomain, hostname) 1081bdomain_id_t ldomain_id; 1082bdomain_t *lbdomain; 1083char *hostname; 1084 1085{ 1086 lbdomain->bdomain_id = ldomain_id; 1087 lbdomain->bdomain_size = 0x00800000; /* 8 MB */ 1088 if (!shared_mem_base()) 1089 lbdomain->bdomain_start = (bmem_address_t) (shared_mem_base()); 1090 else 1091 lbdomain->bdomain_start = (bmem_address_t) (shared_mem_base() + 0x00800000); 1092 sprintf(lbdomain->bdomain_file,"%s/%d.%s.mps.map", 1093 map_dir, my_pid, hostname); 1094} 1095 1096void mps_init(create) 1097int create; 1098{ 1099 1100 nsrv_ret_t nret; 1101 char hostname[MAXHOSTLEN]; 1102 nsrv_name_t domain_name; 1103 bmsg_ret_t bret; 1104 1105 check_nsrv(nsrv_new_bport_id(my_signature,&my_bport_id),__LINE__, 3); 1106 1107 mygethostname(hostname); 1108 1109/* Turn on the define below if you want each worker to create its 1110 own shared memory domain. This means that all communication will 1111 take place via sockets i.e. no shared memory communication. */ 1112 1113/*#define TEST_INTER_DOMAIN 1*/ 1114#ifdef TEST_INTER_DOMAIN 1115 sprintf(domain_name,"%s.%s.%s", WORKER_PORT_NAME, hostname,my_signature); 1116 check_nsrv(nsrv_new_bdomain_id(my_signature,&domain_id),__LINE__,5); 1117 fill_in_domain(domain_id,&bdomain,hostname); 1118 check_bmsg(bmsg_init(my_bport_id,&bdomain, 1119 BDOMAIN_CREATE | 1120#ifdef DEBUG_MPS 1121 BMSG_ALOG_ON | 1122 BMSG_ALOG_OPEN | 1123 BMSG_ALOG_CLOSE | 1124 BMSG_ALOG_MASTER | 1125#endif 1126 BPORT_NOTIFICATION), __LINE__, 1); 1127 check_nsrv(nsrv_bdomain_register(bdomain_key,domain_name, 1128 my_signature,&bdomain),__LINE__,2); 1129#else /* TEST_INTER_DOMAIN */ 1130 1131 sprintf(domain_name,"%s.%s.%s", WORKER_PORT_NAME, hostname,session_key); 1132 /* Check if domain exists - it will exist if we are on the 1133 same host as the worker manager */ 1134 nret = nsrv_bdomain_look_up(bdomain_key,domain_name,&bdomain); 1135 1136 /* Loop until the domain is created - should have a timeout? */ 1137 while((nret != NSRV_OK) && !create) { 1138 short_sleep(1000); 1139 nret = nsrv_bdomain_look_up(bdomain_key,domain_name,&bdomain); 1140 } 1141 1142 if (nret == NSRV_OK) 1143 check_bmsg(bmsg_init(my_bport_id,&bdomain, 1144#ifdef DEBUG_MPS 1145 BMSG_ALOG_ON | 1146 BMSG_ALOG_OPEN | 1147 BMSG_ALOG_CLOSE | 1148 BMSG_ALOG_MASTER | 1149#endif 1150 BPORT_NOTIFICATION),__LINE__, 1); 1151 else if ((nret == NSRV_NOT_REGISTERED) && create) 1152 { /* we are the first worker on a new host, so create a new 1153 domain and register it */ 1154 check_nsrv(nsrv_new_bdomain_id(my_signature,&domain_id),__LINE__,5); 1155 fill_in_domain(domain_id,&bdomain,hostname); 1156 check_bmsg(bmsg_init(my_bport_id,&bdomain, 1157 BDOMAIN_CREATE | 1158#ifdef DEBUG_MPS 1159 BMSG_ALOG_ON | 1160 BMSG_ALOG_OPEN | 1161 BMSG_ALOG_CLOSE | 1162 BMSG_ALOG_MASTER | 1163#endif 1164 BPORT_NOTIFICATION), __LINE__, 1); 1165 check_nsrv(nsrv_bdomain_register(bdomain_key,domain_name, 1166 my_signature,&bdomain),__LINE__,2); 1167 } 1168 else 1169 { 1170 printf("nsrv_bdomain_look_up failed! nret = %d\n",nret); 1171 exit(1); 1172 } 1173#endif /* TEST_INTER_DOMAIN */ 1174 check_bmsg(bport_port(my_bport_id,&my_bport),__LINE__, 2); 1175 check_nsrv(nsrv_bport_register(session_key,my_port_name,my_signature,&my_bport),__LINE__,2); 1176 1177 bret = bport_open(&wm_bport); 1178 if (bret != BMSG_POPENING) 1179 check_bmsg(bret,__LINE__, 3); 1180} 1181 1182mygethostname(host) 1183 char *host; 1184{ 1185#if defined(HAVE_GETHOSTNAME) 1186 (void) gethostname(host,MAXHOSTLEN); 1187#else 1188# if defined(HAVE_SYSINFO) && defined(HAVE_SYS_SYSTEMINFO_H) 1189 sysinfo(SI_HOSTNAME, host, MAXHOSTLEN); 1190# else 1191 struct utsname ut; 1192 uname(&ut); 1193 (void) strcpy(host,ut.nodename); 1194# endif 1195#endif 1196} 1197 1198void wait_for_session_table() 1199{ 1200 while(!sent_init_table) 1201 short_sleep(1000); 1202} 1203 1204void open_worker_connections() 1205{ 1206 bport_t rem_bport; 1207 bmsg_ret_t bret; 1208 worker_ptr cur; 1209 1210 for(cur = worker_list.list; cur != NULL; cur = cur->next) 1211 { 1212 check_nsrv(nsrv_bport_look_up(session_key,cur->bport_name,&rem_bport), 1213 __LINE__,5); 1214 cur->bport_id = rem_bport.bport_id; 1215 if(cur->index < ParallelWorker) 1216 { 1217 bret = bport_open(&rem_bport); 1218 if (bret != BMSG_POPENING) 1219 check_bmsg(bret,__LINE__, 3); 1220 } 1221 } 1222 1223 while (open_bports != (worker_list.num_workers - 1)) 1224 short_sleep(1000); 1225} 1226 1227void register_std_aports(aport_ids) 1228 aport_id_t aport_ids[]; 1229{ 1230 nsrv_name_t aport_name; 1231 aport_t aport; 1232 1233 sprintf(aport_name,"%s_halt2_aport",my_port_name); 1234 aport.aport_id = aport_ids[HALT2_APORT_NUMBER]; 1235 aport.bport_id = my_bport_id; 1236 aport.bdomain_id = bdomain.bdomain_id; 1237 check_nsrv(nsrv_aport_register(session_key,aport_name, 1238 my_signature,&aport),__LINE__, 2); 1239 sprintf(aport_name,"%s_halt1_aport",my_port_name); 1240 aport.aport_id = aport_ids[HALT1_APORT_NUMBER]; 1241 aport.bport_id = my_bport_id; 1242 aport.bdomain_id = bdomain.bdomain_id; 1243 check_nsrv(nsrv_aport_register(session_key,aport_name, 1244 my_signature,&aport),__LINE__, 2); 1245 wm_types_init(); /* can only be done after an aport registration 1246 since MDT_NSRVNAME is only defined at this point */ 1247 sprintf(aport_name,"%s_wm_aport",my_port_name); 1248 aport.aport_id = aport_ids[WM_APORT_NUMBER]; 1249 aport.bport_id = my_bport_id; 1250 aport.bdomain_id = bdomain.bdomain_id; 1251 check_nsrv(nsrv_aport_register(session_key,aport_name, 1252 my_signature,&aport),__LINE__, 2); 1253 1254 1255} 1256 1257void deregister_std_aports() 1258{ 1259 nsrv_name_t aport_name; 1260 1261 sprintf(aport_name,"%s_wm_aport",my_port_name); 1262 check_nsrv_soft(nsrv_aport_deregister(session_key,aport_name,my_signature), 1263 __LINE__, 1); 1264 1265 sprintf(aport_name,"%s_halt1_aport",my_port_name); 1266 check_nsrv_soft(nsrv_aport_deregister(session_key,aport_name,my_signature), 1267 __LINE__, 1); 1268 sprintf(aport_name,"%s_halt2_aport",my_port_name); 1269 check_nsrv_soft(nsrv_aport_deregister(session_key,aport_name,my_signature), 1270 __LINE__, 1); 1271} 1272 1273 1274void setup_mps(slave_no,session, nsrv_host_name, nsrv_port_number, create) 1275 int slave_no; 1276 char *session; 1277 char *nsrv_host_name; 1278 unsigned nsrv_port_number; 1279 int create; /* used in mps_init */ 1280{ 1281 extern void eng_port_upcall(); 1282 extern void sch_port_upcall(); 1283 extern void io_port_upcall(); 1284 aport_t wm_aport; /* needed to lookup wm aportids in name server */ 1285 void (* notify_procs [TOTAL_APORT_NUMBER]) (); 1286 1287 /* preliminary setting of the SIGIO handler for message passing */ 1288#ifdef HAVE_SIGACTION 1289 { 1290 struct sigaction sa; 1291 sa.sa_handler = sigmsg_handler; 1292 (void) sigemptyset(&sa.sa_mask); 1293 sa.sa_flags = 0; 1294 (void) sigaction(SIGIO, &sa, (struct sigaction *) 0); 1295#ifdef SIGPOLL 1296 (void) sigaction(SIGPOLL, &sa, (struct sigaction *) 0); 1297#endif 1298 } 1299#else 1300 signal(SIGIO, sigmsg_handler); 1301#ifdef SIGPOLL 1302 signal(SIGPOLL, sigmsg_handler); 1303#endif 1304#endif 1305 1306 map_dir = getenv("ECLIPSETMP"); 1307 if(!map_dir) map_dir = "/tmp"; 1308 1309 check_nsrv(nsrv_init(nsrv_host_name, &nsrv_port_number),__LINE__, 1); 1310 1311 my_pid = getpid(); 1312 (void) strcpy(session_key,session); 1313 sprintf(my_port_name,"worker%d",slave_no); 1314 sprintf(my_signature,"%d",my_pid); 1315 sprintf(bdomain_key, WORKER_PORT_NAME); 1316 init_worker_list(); 1317 1318 check_nsrv(nsrv_bport_look_up(session_key, WM_PORT_NAME, &wm_bport), 1319 __LINE__, 5); 1320 1321 mps_init(create); 1322 1323 notify_procs[HALT1_APORT_NUMBER] = halt1_notify; 1324 notify_procs[HALT2_APORT_NUMBER] = halt2_notify; 1325 notify_procs[WM_APORT_NUMBER] = wm_notify; 1326 notify_procs[SCH_APORT_NUMBER] = sch_port_upcall; 1327 notify_procs[ENG_APORT_NUMBER] = eng_port_upcall; 1328 notify_procs[IO_APORT_NUMBER] = io_port_upcall; 1329 notify_procs[IO_REPLY_APORT_NUMBER] = 0; 1330 check_amsg(amsg_init(TOTAL_APORT_NUMBER,notify_procs,aports,0),__LINE__, 1); 1331 1332 /* set port notification levels */ 1333 check_amsg_soft(aport_set_option(aports[IO_APORT_NUMBER], 1334 APORT_NOTIFY_LEVEL, 1335 (aport_optval_t) 1),__LINE__, 4); 1336 check_amsg_soft(aport_set_option(aports[ENG_APORT_NUMBER], 1337 APORT_NOTIFY_LEVEL, 1338 (aport_optval_t) 2),__LINE__, 4); 1339 check_amsg_soft(aport_set_option(aports[SCH_APORT_NUMBER], 1340 APORT_NOTIFY_LEVEL, 1341 (aport_optval_t) 2),__LINE__, 4); 1342 check_amsg_soft(aport_set_option(aports[WM_APORT_NUMBER], 1343 APORT_NOTIFY_LEVEL, 1344 (aport_optval_t) 3),__LINE__, 4); 1345 check_amsg_soft(aport_set_option(aports[HALT1_APORT_NUMBER], 1346 APORT_NOTIFY_LEVEL, 1347 (aport_optval_t) 4),__LINE__, 4); 1348 check_amsg_soft(aport_set_option(aports[HALT2_APORT_NUMBER], 1349 APORT_NOTIFY_LEVEL, 1350 (aport_optval_t) 5),__LINE__, 4); 1351 1352 register_std_aports(aports); 1353 1354 check_nsrv(nsrv_aport_look_up(session_key, WM_HIGH_APORT_NAME, &wm_aport), 1355 __LINE__, 6); 1356 wm_high_aport_id = wm_aport.aport_id; 1357 1358 check_nsrv(nsrv_aport_look_up(session_key, WM_HALT1_APORT_NAME, &wm_aport), 1359 __LINE__, 6); 1360 wm_halt1_aport_id = wm_aport.aport_id; 1361 1362 check_nsrv(nsrv_aport_look_up(session_key, WM_HALT2_APORT_NAME, &wm_aport), 1363 __LINE__, 6); 1364 wm_halt2_aport_id = wm_aport.aport_id; 1365 1366 check_nsrv(nsrv_aport_look_up(session_key, WM_LOW_APORT_NAME, 1367 &wm_aport),__LINE__, 6); 1368 wm_low_aport_id = wm_aport.aport_id; 1369 1370 while (!wm_b_connected) poll_short_sleep(1000); 1371 1372 wait_for_session_table(); 1373 1374 open_worker_connections(); 1375 1376#ifdef WORKER_TRACING 1377 init_trace_file_names(); 1378#endif 1379 1380 set_local_wm_flag(); 1381 set_start_time(); 1382 1383 /* Tell wm that my mps setup is done */ 1384 send_simple_wm_message(wm_high_aport_id,DONE_INIT_OPENS,0); 1385 1386 /* Synchronize with all other workers since even after this point 1387 the worker is not completely set up - it still needs to lodge at 1388 the root node. This can cause workers not to exit if the system 1389 is halted immediately after adding this worker. Currently the 1390 worker manager just kills the workers and exits (using a timeout)*/ 1391 1392 while(!all_connected) 1393 poll_short_sleep(1000); 1394 1395} 1396 1397void block_wm_aports() 1398{ 1399 (void) aport_set_option(aports[WM_APORT_NUMBER], 1400 APORT_NOTIFY, (aport_optval_t) AMSG_OFF); 1401 (void) aport_set_option(aports[SCH_APORT_NUMBER], 1402 APORT_NOTIFY, (aport_optval_t) AMSG_OFF); 1403 (void) aport_set_option(aports[ENG_APORT_NUMBER], 1404 APORT_NOTIFY, (aport_optval_t) AMSG_OFF); 1405 (void) aport_set_option(aports[IO_APORT_NUMBER], 1406 APORT_NOTIFY, (aport_optval_t) AMSG_OFF); 1407 (void) bmsg_set_option(BMSG_INTRA_DOMAIN_TRIGGERING, BMSG_ON); 1408} 1409 1410/*ARGSUSED*/ 1411void halt_system(exit_code) 1412int exit_code; 1413{ 1414 (void) block_wm_aports(); 1415 unblock_signals(); 1416 send_simple_wm_message(wm_halt1_aport_id,HALT_SYSTEM_REQ,exit_code); 1417 while (1) 1418 poll_short_sleep(1000); 1419} 1420/*ARGSUSED*/ 1421void panic_halt_system(exit_code) 1422int exit_code; 1423{ 1424 halt_system(exit_code); 1425} 1426 1427void exit_mps() 1428{ 1429 worker_ptr cur; 1430 bmsg_ret_t bret; 1431 1432 send_simple_wm_message(wm_halt2_aport_id,EXITING,0); 1433 while(!got_halt) { 1434 poll_short_sleep(1000); 1435 } 1436 for(cur = worker_list.list; cur != NULL; cur = cur->next) 1437 { 1438 if (cur->index < ParallelWorker) 1439 { 1440 bret = bport_close(cur->bport_id); 1441 while (bret != BMSG_NOPORT) 1442 { 1443/* printf("slave %d: Cannot close port %d bret = %d\n", 1444 ParallelWorker,cur->bport_id,bret);*/ 1445 poll_short_sleep(100000); 1446 bret = bport_close(cur->bport_id); 1447 } 1448 } 1449 } 1450 1451 while (closed_bports != (worker_list.num_workers - 1)) 1452 poll_short_sleep(1000); 1453 1454 bret = bport_close(wm_bport.bport_id); 1455 while (bret != BMSG_NOPORT) 1456 { 1457/* printf("slave %d: Cannot close port %d bret = %d\n", 1458 ParallelWorker,wm_bport.bport_id,bret);*/ 1459 poll_short_sleep(100000); 1460 bret = bport_close(wm_bport.bport_id); 1461 } 1462 1463 while (closed_bports != (worker_list.num_workers)) 1464 poll_short_sleep(1000); 1465 1466 deregister_std_aports(); 1467 check_nsrv_soft(nsrv_bport_deregister(session_key,my_port_name,my_signature), 1468 __LINE__, 1); 1469 check_nsrv_soft(nsrv_free_bport_id(my_signature,my_bport_id),__LINE__, 2); 1470 1471 nsrv_exit(); 1472 amsg_exit(); 1473 bmsg_exit(); 1474} 1475 1476/*-------------------------------------------------------------------*/ 1477/* Scheduler initialization support functions */ 1478/*-------------------------------------------------------------------*/ 1479 1480void send_node_msg(wm_aport_id, msg_type, node) 1481aport_id_t wm_aport_id; 1482int msg_type; 1483st_handle_t * node; 1484{ 1485 amsg_t msg; 1486 amsg_data_t * msg_data; 1487 node_msg_t * node_msg; 1488 st_handle_t *local_node; 1489 1490 amsg_size_t size; 1491 1492 size = sizeof(node_msg_t); 1493 check_amsg(amsg_alloc(size, 1494 &msg_data, 1495 &msg),__LINE__, 2); 1496 1497 node_msg = (node_msg_t *) msg_data; 1498 node_msg->header.msg_type = msg_type; 1499 node_msg->header.wid = ParallelWorker; 1500 1501 local_node = (st_handle_t *) &(node_msg->node); 1502 *local_node = *node; 1503 check_amsg(amsg_send(wm_aport_id,msg,mdt_wm_node,1,0),__LINE__, 3); 1504} 1505 1506st_handle_t * 1507get_root_id(leaf) 1508st_handle_t * leaf; 1509{ 1510 /* wait until first worker has initialised the root */ 1511 while(!root_initialised) poll_short_sleep(1000); 1512 1513 /* Now send first worker my leaf id */ 1514 send_node_msg((get_worker(1))->wm_aport_id, GET_ROOT, leaf); 1515 1516 /* wait until I have received my root id */ 1517 while(!got_root_id) poll_short_sleep(1000); 1518 return &root_id; 1519} 1520 1521/* Only called by first worker. 1522 Send req_wid the root. 1523 Called in response to GET_ROOT message (see above) 1524*/ 1525void wm_init_lodged(req_wid, root) 1526int req_wid; 1527st_handle_t *root; 1528{ 1529 send_node_msg((get_worker(req_wid))->wm_aport_id,SET_ROOT, root); 1530} 1531 1532/* Called only by first worker. 1533 Inform wm that root node has been initialised 1534*/ 1535void root_node_register(aport, root) 1536aport_id_t aport; 1537st_handle_t *root; 1538 1539{ 1540 send_node_msg(aport, ROOT_NODE_REGISTER, root); 1541 root_id = *root; /* init locally as well */ 1542 got_root_id = 1; 1543} 1544 1545/*-----------------------------------------------------------*/ 1546/* Session_time support functions */ 1547/*-----------------------------------------------------------*/ 1548 1549/* get the worker manager hostname and set the local_wm flag 1550 if we are on the same host */ 1551int set_local_wm_flag() 1552{ 1553 char hostname[MAXHOSTLEN]; 1554 1555 mygethostname(hostname); 1556 got_wm_hostname = 0; 1557 send_simple_wm_message(wm_high_aport_id,REQ_WM_HOSTNAME,0); 1558 while (!got_wm_hostname) 1559 short_sleep(1000); 1560 if(strcmp(wm_hostname,hostname) == 0) 1561 local_wm = 1; 1562 else 1563 local_wm = 0; 1564} 1565 1566/* Set the start time if we on the same host as the worker manager */ 1567int set_start_time() 1568{ 1569 if(local_wm) 1570 { 1571 /* Only useful if the worker manager is on the same host */ 1572 wm_start_time = 0; 1573 send_simple_wm_message(wm_high_aport_id,REQ_START_TIME,0); 1574 while(wm_start_time == 0); 1575 } 1576} 1577 1578double calc_elapsed_time() 1579{ 1580 double elapsed; 1581 1582#ifdef HAVE_GETHRTIME 1583 1584 elapsed = (gethrtime() - wm_start_time) / 1000.0; 1585 elapsed = elapsed / 1000000.0; 1586 1587#else 1588#ifdef BSD_TIMES 1589 1590 struct timeb realtime; 1591 (void) ftime(&realtime); 1592 elapsed = (realtime.time - wm_start_time) + (double)realtime.millitm/1000.0; 1593 1594#else 1595 1596 struct tms dummy; 1597 clock_t realtime; 1598 1599 if ((realtime = times(&dummy)) == (clock_t) -1) 1600 { 1601 return(0.0); 1602 } 1603 elapsed = (double) (realtime - wm_start_time) / clock_hz; 1604 1605#endif 1606#endif 1607 return(elapsed); 1608} 1609 1610/*---------------------------------------------------------------*/ 1611/* Worker Manager builtins */ 1612/*---------------------------------------------------------------*/ 1613 1614/* If we are on the same host as the worker manager no message is 1615 sent */ 1616double elapsed_session_time() 1617{ 1618 if(local_wm) 1619 /* can get it locally */ 1620 cur_time = calc_elapsed_time(); 1621 else 1622 { 1623 /* have to sent a message */ 1624 cur_time = 0.0; 1625 send_simple_wm_message(wm_high_aport_id,REQ_TIME,0); 1626 while(cur_time <= 0.0); 1627 } 1628 return(cur_time); 1629} 1630 1631int wm_command(command,hostname,workers) 1632char * command; 1633char * hostname; 1634long workers; 1635{ 1636 amsg_t msg; 1637 amsg_data_t * msg_data; 1638 config_msg_t * config_msg; 1639 amsg_size_t size; 1640 int reduce_worker = 0; 1641 1642 size = sizeof(config_msg_t); 1643 1644 check_amsg(amsg_alloc(size, 1645 &msg_data, 1646 &msg),__LINE__, 2); 1647 1648 config_msg = (config_msg_t *) msg_data; 1649 1650 if(strcmp(command,"add") == 0) 1651 config_msg->header.msg_type = ADD_WORKERS; 1652 else if (strcmp(command,"sleep") == 0) { 1653 config_msg->header.msg_type = SLEEP_WORKERS; 1654 reduce_worker = 1; 1655 } 1656 else if (strcmp(command,"wake") == 0) 1657 config_msg->header.msg_type = WAKEUP_WORKERS; 1658 else 1659 return(0); 1660 config_msg->header.wid = ParallelWorker; 1661 1662 config_msg->workers = workers; 1663 (void) strcpy(config_msg->hostname,hostname); 1664 1665 config_ret = -1; 1666 check_amsg(amsg_send(wm_low_aport_id,msg,mdt_wm_config,1,0), 1667 __LINE__, 3); 1668 while (config_ret < 0) poll_short_sleep(1000); 1669 1670 if (reduce_worker) (void) sch_reduce_worker(aports[SCH_APORT_NUMBER]); 1671 1672 return(config_ret == workers); 1673} 1674 1675 1676int 1677wm_host_status(hostname) 1678char *hostname; 1679{ 1680 amsg_t msg; 1681 amsg_data_t * msg_data; 1682 host_name_msg_t * host_name_msg; 1683 amsg_size_t size; 1684 1685 size = sizeof(host_name_msg_t); 1686 1687 check_amsg(amsg_alloc(size, 1688 &msg_data, 1689 &msg),__LINE__, 2); 1690 1691 host_name_msg = (host_name_msg_t *) msg_data; 1692 host_name_msg->header.msg_type = HOST_STATUS_REQ; 1693 host_name_msg->header.wid = ParallelWorker; 1694 1695 (void) strcpy(host_name_msg->hostname,hostname); 1696 1697 status_ret = -1; 1698 check_amsg(amsg_send(wm_low_aport_id,msg,mdt_wm_nsrvname,1,0), 1699 __LINE__, 3); 1700 while (status_ret < 0) poll_short_sleep(1000); 1701 1702 return(status_ret); 1703} 1704 1705int wm_status() 1706 1707{ 1708 1709 status_ret = -1; 1710 send_simple_wm_message(wm_low_aport_id,STATUS_REQ,0); 1711 while (status_ret < 0) poll_short_sleep(1000); 1712 1713 return(status_ret); 1714} 1715 1716int 1717p_wm_interface(vcommand, tcommand) 1718value vcommand; 1719type tcommand; 1720{ 1721 char *command; 1722 1723 Get_Name(vcommand,tcommand,command); 1724 if(strcmp(command,"on") == 0) 1725 send_simple_wm_message(wm_low_aport_id,START_INTERFACE,0); 1726 else if (strcmp(command,"off") == 0) 1727 send_simple_wm_message(wm_low_aport_id,REMOVE_INTERFACE,0); 1728 Succeed_; 1729} 1730 1731 1732int 1733p_wm_set(vcommand, tcommand,vhostname,thostname,vworkers,tworkers) 1734value vcommand; 1735type tcommand; 1736value vhostname; 1737type thostname; 1738value vworkers; 1739type tworkers; 1740{ 1741 char * hostname; 1742 char *command; 1743 1744 Get_Name(vcommand,tcommand,command) 1745 Get_Name(vhostname,thostname,hostname) 1746 Check_Integer(tworkers); 1747 Succeed_If(wm_command(command,hostname,vworkers.nint)); 1748} 1749 1750int 1751p_wm_get(vstatus,tstatus) 1752value vstatus; 1753type tstatus; 1754 1755{ 1756 pword *cur_mc, *prev_mc, *cur_tail, *cur_head; 1757 int i; 1758 1759 if (wm_status()) 1760 { 1761 prev_mc = NULL; 1762 for(i = 0; i < status.num_machines; i++) 1763 { 1764 cur_mc = TG; 1765 Push_List_Frame() 1766 cur_head = TG; 1767 Push_List_Frame() 1768 Make_List(cur_mc,cur_head); 1769 Make_Integer(cur_head,status.machines[i].num_workers) 1770 cur_tail = cur_head + 1; 1771 cur_head = TG; 1772 Push_List_Frame() 1773 Make_List(cur_tail,cur_head); 1774 Make_Integer(cur_head,status.machines[i].num_awake) 1775 cur_tail = cur_head + 1; 1776 cur_head = TG; 1777 Push_List_Frame() 1778 Make_List(cur_tail,cur_head); 1779 Make_String(cur_head,status.machines[i].hostname); 1780 cur_tail = cur_head + 1; 1781 Make_Nil(cur_tail) 1782 cur_tail = cur_mc + 1; 1783 if (prev_mc == NULL) 1784 Make_Nil(cur_tail) 1785 else 1786 { 1787 Make_List(cur_tail, prev_mc); 1788 } 1789 prev_mc = cur_mc; 1790 } 1791 Return_Unify_List(vstatus,tstatus,cur_mc) 1792 } 1793 else 1794 Return_Unify_Nil(vstatus,tstatus) 1795} 1796 1797p_wm_get_ids(vhostname,thostname,vstatus, tstatus) 1798value vhostname, vstatus; 1799type thostname, tstatus; 1800{ 1801 pword *awake_list, *sleep_list, *tail, *stat_list, * prev; 1802 char * hostname; 1803 int i; 1804 1805 Get_Name(vhostname,thostname,hostname) 1806 if (wm_host_status(hostname)) 1807 { 1808 stat_list = TG; 1809 Push_List_Frame() 1810 tail = stat_list + 1; 1811 Make_Nil(tail) 1812 prev = NULL; 1813 if (host_status.num_asleep > 0) 1814 { 1815 for(i = 0; i < host_status.num_asleep ; i++) 1816 { 1817 sleep_list = TG; 1818 Push_List_Frame() 1819 Make_Integer(sleep_list,host_status.asleep_ids[i]) 1820 tail = sleep_list + 1; 1821 if (prev == NULL) 1822 Make_Nil(tail) 1823 else 1824 { 1825 Make_List(tail,prev); 1826 } 1827 prev = sleep_list; 1828 } 1829 Make_List(stat_list, sleep_list); 1830 } 1831 else 1832 { 1833 Make_Nil(stat_list); 1834 } 1835 1836 prev = stat_list; 1837 stat_list = TG; 1838 Push_List_Frame() 1839 tail = stat_list + 1; 1840 Make_List(tail, prev); 1841 1842 prev = NULL; 1843 if (host_status.num_awake > 0) 1844 { 1845 for(i = 0; i < host_status.num_awake; i++) 1846 { 1847 awake_list = TG; 1848 Push_List_Frame() 1849 Make_Integer(awake_list,host_status.awake_ids[i]) 1850 tail = awake_list + 1; 1851 if (prev == NULL) 1852 Make_Nil(tail) 1853 else 1854 { 1855 Make_List(tail,prev); 1856 } 1857 prev = awake_list; 1858 } 1859 Make_List(stat_list,awake_list); 1860 } 1861 else 1862 { 1863 Make_Nil(stat_list); 1864 } 1865 1866 Return_Unify_List(vstatus,tstatus,stat_list); 1867 } 1868 else 1869 Return_Unify_Nil(vstatus,tstatus) 1870} 1871 1872void wm_set_worker_info(wid, infotype, bufsize, buf) 1873int wid; 1874int infotype; 1875int bufsize; 1876void_ptr buf; 1877 1878{ 1879 amsg_t msg; 1880 amsg_data_t * msg_data; 1881 worker_info_msg_t * winfo_mess; 1882 amsg_size_t size; 1883 int dontack = 0; 1884 1885 if (wid < 0) /* dont send acknowledge */ 1886 { 1887 dontack = 1; 1888 wid = -wid; 1889 } 1890 size = sizeof(worker_info_msg_t) + bufsize; 1891 1892 check_amsg(amsg_alloc(size, 1893 &msg_data, 1894 &msg),__LINE__, 2); 1895 1896 winfo_mess = (worker_info_msg_t *) msg_data; 1897 winfo_mess->header.msg_type = WORKER_INFO_SET; 1898 winfo_mess->header.wid = ParallelWorker; 1899 if (dontack) 1900 winfo_mess->req_wid = -ParallelWorker; 1901 else 1902 winfo_mess->req_wid = ParallelWorker; 1903 winfo_mess->pro_wid = wid; 1904 winfo_mess->infotype = infotype; 1905 winfo_mess->size = bufsize; 1906 bcopy((char *) buf, (char *) (winfo_mess + 1), bufsize); 1907 1908 if (dontack) 1909 check_amsg_soft(amsg_send(wm_low_aport_id,msg,MDT_BYTE,size,0),__LINE__, 3); 1910 else { 1911 worker_info_ret = -1; 1912 check_amsg_soft(amsg_send(wm_low_aport_id,msg,MDT_BYTE,size,0),__LINE__, 3); 1913 while(worker_info_ret < 0) 1914 poll_short_sleep(100); 1915 } 1916} 1917 1918 1919void 1920wm_get_worker_info( wid, infotype, bufsize, buf) 1921int wid; 1922int infotype; 1923int bufsize; 1924void_ptr buf; 1925{ 1926 amsg_t msg; 1927 amsg_data_t * msg_data; 1928 amsg_size_t size; 1929 worker_info_msg_t * worker_info_mess; 1930 1931 size = sizeof(worker_info_msg_t); 1932 1933 check_amsg(amsg_alloc(size, 1934 &msg_data, 1935 &msg),__LINE__, 2); 1936 1937 worker_info_mess = (worker_info_msg_t *) msg_data; 1938 worker_info_mess->header.msg_type = WORKER_INFO_GET; 1939 worker_info_mess->header.wid = ParallelWorker; 1940 worker_info_mess->infotype = infotype; 1941 worker_info_mess->pro_wid = wid; 1942 worker_info_mess->req_wid = ParallelWorker; 1943 worker_info_buf = buf; 1944 worker_info_bufsize = bufsize; 1945 worker_info_ret = -1; 1946 check_amsg_soft(amsg_send(wm_low_aport_id,msg,MDT_BYTE,size,0),__LINE__, 3); 1947 1948 while (worker_info_ret < 0) 1949 poll_short_sleep(1000); 1950 1951} 1952 1953send_worker_info(recwid,infotype,bufsize,buf) 1954int recwid; 1955int infotype; 1956int bufsize; 1957void_ptr buf; 1958{ 1959 amsg_t msg; 1960 amsg_data_t * msg_data; 1961 amsg_size_t size; 1962 worker_info_msg_t * worker_info_mess; 1963 1964 if (recwid > 0) 1965 { 1966 size = sizeof(worker_info_msg_t) + bufsize; 1967 1968 check_amsg(amsg_alloc(size, 1969 &msg_data, 1970 &msg),__LINE__, 2); 1971 1972 worker_info_mess = (worker_info_msg_t *) msg_data; 1973 worker_info_mess->header.msg_type = WORKER_INFO_NOTIFY; 1974 worker_info_mess->header.wid = ParallelWorker; 1975 worker_info_mess->req_wid = recwid; 1976 worker_info_mess->pro_wid = ParallelWorker; 1977 worker_info_mess->infotype = infotype; 1978 worker_info_mess->size = bufsize; 1979 bcopy((char *) buf, (char *) (msg_data + sizeof(worker_info_msg_t)), 1980 bufsize); 1981 check_amsg_soft(amsg_send(wm_low_aport_id,msg,MDT_BYTE,size,0),__LINE__, 3); 1982 } 1983} 1984 1985 1986 1987get_worker_info(infotype,infosize,infoval) 1988int infotype; 1989int *infosize; 1990void_ptr * infoval; 1991 1992{ 1993 1994 switch (infotype) { 1995 1996 case SCHED_INFO: /* scheduler */ 1997 sch_get_info(aports[SCH_APORT_NUMBER], infosize, infoval); 1998 break; 1999 2000 default: 2001 printf("%d: Unimplemented infotype = %d\n", ParallelWorker, infotype); 2002 } 2003} 2004 2005set_worker_info(req_wid, infotype, infoval) 2006int req_wid; /* worker id of requesting worker */ 2007int infotype; 2008void_ptr infoval; 2009{ 2010 if (req_wid < 0) 2011 req_wid = - req_wid; 2012 switch (infotype) { 2013 2014 case SCHED_INFO: /* scheduler */ 2015 sch_set_info(aports[SCH_APORT_NUMBER],infoval); 2016 break; 2017 2018 default: 2019 printf("%d: Unimplemented infotype = %d\n", ParallelWorker, infotype); 2020 } 2021} 2022 2023/* set a specific worker to sleep */ 2024 2025p_set_sleeping(vwid, twid) 2026value vwid; 2027type twid; 2028 2029{ 2030 amsg_t msg; 2031 amsg_data_t * msg_data; 2032 wm_simple_msg_t * wm_simple_msg; 2033 amsg_size_t size; 2034 2035 Check_Integer(twid); 2036 2037 size = sizeof(wm_simple_msg_t); 2038 2039 check_amsg(amsg_alloc(size, 2040 &msg_data, 2041 &msg),__LINE__, 2); 2042 2043 wm_simple_msg = (wm_simple_msg_t *) msg_data; 2044 wm_simple_msg->header.msg_type = SET_ONE_SLEEPING; 2045 wm_simple_msg->header.wid = ParallelWorker; 2046 wm_simple_msg->msg_value = vwid.nint; 2047 check_amsg_soft(amsg_send(wm_low_aport_id,msg,mdt_wm_simple,1,0),__LINE__, 3); 2048 2049 Succeed_; 2050} 2051 2052/* Sleep for n usecs and poll for messages*/ 2053 2054void poll_short_sleep(n) 2055int n; 2056{ 2057 bmsg_optval_t optval; 2058 2059 short_sleep(n); 2060 bmsg_get_option(BMSG_INTRA_DOMAIN_TRIGGERING, &optval); 2061 if(optval == BMSG_OFF) 2062 bmsg_trigger(BMSG_INTRA_DOMAIN); 2063} 2064 2065/* Ask for worker statistics */ 2066 2067request_wstat(wid, stat) 2068int wid; 2069struct worker_stat_ext * stat; 2070{ 2071 worker_ptr w; 2072 2073 w = get_worker(wid); 2074 wstat_rec_buf = stat; 2075 worker_info_ret = -1; 2076 send_simple_wm_message(w->wm_aport_id, WSTAT_REQ, 0); 2077 while (worker_info_ret < 0) 2078 poll_short_sleep(1000); 2079} 2080 2081/* Ask remote worker (wid) to reset worker statistics and wait for ack*/ 2082reset_wstat(wid) 2083int wid; 2084{ 2085 worker_ptr w; 2086 2087 w = get_worker(wid); 2088 2089 worker_info_ret = -1; 2090 send_simple_wm_message(w->wm_aport_id, WSTAT_RESET, 0); 2091 while (worker_info_ret < 0) 2092 poll_short_sleep(1000); 2093 2094} 2095 2096/* reply to WSTAT_REQ message from worker wid */ 2097send_wstat(wid) 2098int wid; 2099{ 2100 amsg_t msg; 2101 amsg_data_t * msg_data; 2102 wstat_msg_t * wstat_msg; 2103 amsg_size_t size; 2104 struct worker_stat_ext * local_stat; 2105 worker_ptr w; 2106 2107 if (wid != 0) /* not a Worker Manager request */ 2108 w = get_worker(wid); 2109 size = sizeof(wstat_msg_t); 2110 2111 check_amsg(amsg_alloc(size, 2112 &msg_data, 2113 &msg),__LINE__, 2); 2114 2115 wstat_msg = (wstat_msg_t *) msg_data; 2116 wstat_msg->header.msg_type = WSTAT_RET; 2117 wstat_msg->header.wid = ParallelWorker; 2118 2119 local_stat = (struct worker_stat_ext *) &(wstat_msg->stat); 2120 get_worker_stat(local_stat); 2121 check_amsg(amsg_send(wid ? w->wm_aport_id : wm_high_aport_id, 2122 msg,mdt_wm_wstat,1,0),__LINE__, 3); 2123} 2124 2125/* tracing messages */ 2126#ifdef WORKER_TRACING 2127get_trace_header(wid, trace_header) 2128int wid; 2129trace_header_t * trace_header; 2130{ 2131 worker_ptr w; 2132 w = get_worker(wid); 2133 2134 trace_header_buf = trace_header; 2135 worker_info_ret = -1; 2136 send_simple_wm_message(w->wm_aport_id, GET_TRACE_HEADER, 0); 2137 2138 while(worker_info_ret < 0) 2139 poll_short_sleep(1000); 2140} 2141 2142set_trace_header(wid, theader) 2143int wid; 2144trace_header_t * theader; 2145{ 2146 worker_info_ret = -1; 2147 send_trace_header(wid, SET_TRACE_HEADER, theader); 2148 while(worker_info_ret < 0) 2149 poll_short_sleep(1000); 2150} 2151 2152send_trace_header(wid,msg_type, theader) 2153int wid; 2154int msg_type; 2155trace_header_t * theader; 2156{ 2157 worker_ptr w = get_worker(wid); 2158 amsg_t msg; 2159 amsg_data_t * msg_data; 2160 trace_msg_t * trace_msg; 2161 amsg_size_t size; 2162 2163 size = sizeof(trace_msg_t); 2164 2165 check_amsg(amsg_alloc(size, 2166 &msg_data, 2167 &msg),__LINE__, 2); 2168 2169 trace_msg = (trace_msg_t *) msg_data; 2170 trace_msg->header.msg_type = msg_type; 2171 trace_msg->header.wid = ParallelWorker; 2172 trace_msg->trace_header = *theader; 2173 check_amsg(amsg_send(w->wm_aport_id,msg,mdt_wm_trace,1,0),__LINE__, 3); 2174} 2175#endif 2176