mdmn_commd_server.c revision 8452:89d32dfdae6e
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22/*
23 * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24 * Use is subject to license terms.
25 */
26
27#include <unistd.h>
28#include <sys/types.h>
29#include <sys/stat.h>
30#include <sys/statvfs.h>
31#include <sys/uadmin.h>
32#include <sys/resource.h>
33#include <fcntl.h>
34#include <stdio.h>
35#include <thread.h>
36#include <meta.h>
37#include <sdssc.h>
38#include <mdmn_changelog.h>
39#include "mdmn_subr.h"
40
41/*
42 * This is the communication daemon for SVM Multi Node Disksets.
43 * It runs on every node and provides the following rpc services:
44 *  - mdmn_send_svc_2
45 *  - mdmn_work_svc_2
46 *  - mdmn_wakeup_initiator_svc_2
47 *  - mdmn_wakeup_master_svc_2
48 *  - mdmn_comm_lock_svc_2
49 *  - mdmn_comm_unlock_svc_2
50 *  - mdmn_comm_suspend_svc_2
51 *  - mdmn_comm_resume_svc_2
52 *  - mdmn_comm_reinit_set_svc_2
53 * where send, lock, unlock and reinit are meant for external use,
54 * work and the two wakeups are for internal use only.
55 *
56 * NOTE:
57 * On every node only one of those xxx_2 functions can be active at the
58 * same time because the daemon is single threaded.
59 *
60 * (not quite true, as mdmn_send_svc_2 and mdmn_work_svc_2 do thr_create()s
61 * as part of their handlers, so those aspects are multi-threaded)
62 *
63 * In case an event occurs that has to be propagated to all the nodes...
64 *
65 * One node (the initiator)
66 *	calls the libmeta function mdmn_send_message()
67 *	This function calls the local daemon thru mdmn_send_svc_2.
68 *
69 * On the initiator:
70 *	mdmn_send_svc_2()
71 *	    - starts a thread -> mdmn_send_to_work() and returns.
72 *	mdmn_send_to_work()
73 *	    - sends this message over to the master of the diskset.
74 *	      This is done by calling mdmn_work_svc_2 on the master.
75 *	    - registers to the initiator_table
76 *	    - exits without doing a svc_sendreply() for the call to
77 *	      mdmn_send_svc_2. This means that call is blocked until somebody
78 *	      (see end of this comment) does a svc_sendreply().
79 *	      This means mdmn_send_message() does not yet return.
80 *	    - A timeout surveillance is started at this point.
81 *	      This means in case the master doesn't reply at all in an
82 *	      aproppriate time, an error condition is returned
83 *	      to the caller.
84 *
85 * On the master:
86 *	mdmn_work_svc_2()
87 *	    - starts a thread -> mdmn_master_process_msg() and returns
88 *	mdmn_master_process_msg()
89 *	    - logs the message to the change log
90 *	    - executes the message locally
91 *	    - flags the message in the change log
92 *	    - sends the message to mdmn_work_svc_2() on all the
93 *	      other nodes (slaves)
94 *	      after each call to mdmn_work_svc_2 the thread goes to sleep and
95 *	      will be woken up by mdmn_wakeup_master_svc_2() as soon as the
96 *	      slave node is done with this message.
97 *	    - In case the slave doesn't respond in a apropriate time, an error
98 *	      is assumed to ensure the master doesn't wait forever.
99 *
100 * On a slave:
101 *	mdmn_work_svc_2()
102 *	    - starts a thread -> mdmn_slave_process_msg() and returns
103 *	mdmn_slave_process_msg()
104 *	    - processes this message locally by calling the appropriate message
105 *	      handler, that creates some result.
106 *	    - sends that result thru a call to mdmn_wakeup_master_svc_2() to
107 *	      the master.
108 *
109 * Back on the master:
110 *	mdmn_wakeup_master_svc_2()
111 *	    - stores the result into the master_table.
112 *	    - signals the mdmn_master_process_msg-thread.
113 *	    - returns
114 *	mdmn_master_process_msg()
115 *	    - after getting the results from all nodes
116 *	    - sends them back to the initiating node thru a call to
117 *	      mdmn_wakeup_initiator_svc_2.
118 *
119 * Back on the initiator:
120 *	mdmn_wakeup_initiator_svc_2()
121 *	    - calls svc_sendreply() which makes the call to mdmn_send_svc_2()
122 *	      return.
123 *	      which allows the initial mdmn_send_message() call to return.
124 */
125
126FILE *commdout;		/* debug output for the commd */
127char *commdoutfile;	/* file name for the above output */
128/* want at least 10 MB free space when logging into a file */
129#define	MIN_FS_SPACE	(10LL * 1024 * 1024)
130
131/*
132 * Number of outstanding messages that were initiated by this node.
133 * If zero, check_timeouts goes to sleep
134 */
135uint_t	messages_on_their_way;
136mutex_t	check_timeout_mutex;	/* need mutex to protect above */
137cond_t	check_timeout_cv;	/* trigger for check_timeouts */
138
139/* for printing out time stamps */
140hrtime_t __savetime;
141
142/* RPC clients for every set and every node and their protecting locks */
143CLIENT	*client[MD_MAXSETS][NNODES];
144rwlock_t client_rwlock[MD_MAXSETS];
145
146/* the descriptors of all possible sets and their protectors */
147struct md_set_desc *set_descriptor[MD_MAXSETS];
148rwlock_t set_desc_rwlock[MD_MAXSETS];
149
150/* the daemon to daemon communication has to timeout quickly */
151static struct timeval FOUR_SECS = { 4, 0 };
152
153/* These indicate if a set has already been setup */
154int md_mn_set_inited[MD_MAXSETS];
155
156/* For every set we have a message completion table and protecting mutexes */
157md_mn_mct_t *mct[MD_MAXSETS];
158mutex_t	mct_mutex[MD_MAXSETS][MD_MN_NCLASSES];
159
160/* Stuff to describe the global status of the commd on one node */
161#define	MD_CGS_INITED		0x0001
162#define	MD_CGS_ABORTED		0x0002	/* return everything with MDMNE_ABORT */
163uint_t md_commd_global_state = 0;	/* No state when starting up */
164
165/*
166 * Global verbosity level for the daemon
167 */
168uint_t md_commd_global_verb;
169
170/*
171 * libmeta doesn't like multiple threads in metaget_setdesc().
172 * So we must protect access to it with a global lock
173 */
174mutex_t get_setdesc_mutex;
175
176/*
177 * Need a way to block single message types,
178 * hence an array with a status for every message type
179 */
180uint_t msgtype_lock_state[MD_MN_NMESSAGES];
181
182/* for reading in the config file */
183#define	MAX_LINE_SIZE 1024
184
185extern char *commd_get_outfile(void);
186extern uint_t commd_get_verbosity(void);
187
188/*
189 * mdmn_clnt_create is a helper function for meta_client_create_retry.  It
190 * merely needs to call clnt_create_timed, and meta_client_create_retry
191 * will take care of the rest.
192 */
193/* ARGSUSED */
194static CLIENT *
195mdmn_clnt_create(char *ignore, void *data, struct timeval *time_out)
196{
197	md_mnnode_desc	*node = (md_mnnode_desc *)data;
198
199	return (clnt_create_timed(node->nd_priv_ic, MDMN_COMMD, TWO, "tcp",
200	    time_out));
201}
202
203#define	FLUSH_DEBUGFILE() \
204	if (commdout != (FILE *)NULL) { \
205		fflush(commdout); \
206		fsync(fileno(commdout)); \
207	}
208
209static void
210panic_system(int nid, md_mn_msgtype_t type, int master_err, int master_exitval,
211    md_mn_result_t *slave_result)
212{
213	md_mn_commd_err_t	commd_err;
214	md_error_t		mne = mdnullerror;
215	char			*msg_buf;
216
217	msg_buf = (char *)calloc(MAXPATHLEN + 1, sizeof (char));
218
219	FLUSH_DEBUGFILE();
220
221	if (master_err != MDMNE_ACK) {
222		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC fail on master "
223		    "when processing message type %d\n", type);
224	} else if (slave_result == NULL) {
225		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC fail on node "
226		    "%d when processing message type %d\n", nid, type);
227	} else {
228		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: Inconsistent "
229		    "return value from node %d when processing message "
230		    "type %d. Master exitval = %d, Slave exitval = %d\n",
231		    nid, type, master_exitval, slave_result->mmr_exitval);
232	}
233	commd_err.size = strlen(msg_buf);
234	commd_err.md_message = (uint64_t)(uintptr_t)&msg_buf[0];
235
236	metaioctl(MD_MN_COMMD_ERR, &commd_err, &mne, "rpc.mdcommd");
237	(void) uadmin(A_DUMP, AD_BOOT, NULL);
238}
239
240static void
241flush_fcout()
242{
243	struct statvfs64 vfsbuf;
244	long long avail_bytes;
245	int warned = 0;
246
247	for (; ; ) {
248		sleep(10);
249		/* No output file, nothing to do */
250		if (commdout == (FILE *)NULL)
251			continue;
252
253		/*
254		 * stat the appropriate filesystem to check for available space.
255		 */
256		if (statvfs64(commdoutfile, &vfsbuf)) {
257			continue;
258		}
259
260		avail_bytes = vfsbuf.f_frsize * vfsbuf.f_bavail;
261		/*
262		 * If we don't have enough space, we print out a warning.
263		 * And we drop the verbosity level to NULL
264		 * In case the condtion doesn't go away, we don't repeat
265		 * the warning.
266		 */
267		if (avail_bytes < MIN_FS_SPACE) {
268			if (warned) {
269				continue;
270			}
271			commd_debug(MD_MMV_SYSLOG,
272			    "NOT enough space available for logging\n");
273			commd_debug(MD_MMV_SYSLOG,
274			    "Have %lld bytes, need %lld bytes\n",
275			    avail_bytes, MIN_FS_SPACE);
276			warned = 1;
277			md_commd_global_verb = MD_MMV_NULL;
278		} else {
279			warned = 0;
280		}
281
282		fflush(commdout);
283	}
284}
285
286/* safer version of clnt_destroy. If clnt is NULL don't do anything */
287#define	mdmn_clnt_destroy(clnt) {	\
288	if (clnt)			\
289		clnt_destroy(clnt);	\
290}
291
292/*
293 * Own version of svc_sendreply that checks the integrity of the transport
294 * handle and so prevents us from core dumps in the real svc_sendreply()
295 */
296void
297mdmn_svc_sendreply(SVCXPRT *transp, xdrproc_t xdr, caddr_t data)
298{
299	if (SVC_STAT(transp) == XPRT_DIED) {
300		commd_debug(MD_MMV_MISC,
301		    "mdmn_svc_sendreply: XPRT_DIED\n");
302		return;
303	}
304	(void) svc_sendreply(transp, xdr, data);
305}
306
307/*
308 * timeout_initiator(set, class)
309 *
310 * Alas, I sent a message and didn't get a response back in aproppriate time.
311 *
312 * timeout_initiator() takes care for doing the needed svc_sendreply() to the
313 * calling mdmn_send_message, so that guy doesn't wait forever
314 * What is done here is pretty much the same as what is done in
315 * wakeup initiator. The difference is that we cannot provide for any results,
316 * of course and we set the comm_state to MDMNE_TIMEOUT.
317 *
318 * By doing so, mdmn_send_message can decide if a retry would make sense or not.
319 * It's not our's to decide that here.
320 */
321void
322timeout_initiator(set_t setno, md_mn_msgclass_t class)
323{
324	SVCXPRT		*transp;
325	md_mn_msgid_t	mid;
326	md_mn_result_t *resultp;
327
328	resultp = Zalloc(sizeof (md_mn_result_t));
329	resultp->mmr_comm_state	= MDMNE_TIMEOUT;
330
331	commd_debug(MD_MMV_MISC,
332	    "timeout_initiator set = %d, class = %d\n", setno, class);
333
334	transp = mdmn_get_initiator_table_transp(setno, class);
335	mdmn_get_initiator_table_id(setno, class, &mid);
336
337	commd_debug(MD_MMV_MISC, "timeout_ini: (%d, 0x%llx-%d)\n",
338	    MSGID_ELEMS(mid));
339	/*
340	 * Give the result the corresponding msgid from the failed message.
341	 */
342	MSGID_COPY(&mid, &(resultp->mmr_msgid));
343
344	/* return to mdmn_send_message() and let it deal with the situation */
345	mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
346
347	free(resultp);
348	commd_debug(MD_MMV_MISC, "timeout_ini: sendreplied\n");
349	svc_done(transp);
350	mdmn_unregister_initiator_table(setno, class);
351}
352
353
354/*
355 * check_timeouts - thread
356 *
357 * This implements a timeout surveillance for messages sent from the
358 * initiator to the master.
359 *
360 * If a message is started, this thread is triggered thru
361 * cond_signal(&check_timeout_cv) and we keep track of the numbers of
362 * messages that are outstanding (messages_on_their_way).
363 *
364 * As long as there are messages on their way, this thread never goes to sleep.
365 * It'll keep checking all class/set combinations for outstanding messages.
366 * If one is found, it's checked if this message is overdue. In that case,
367 * timeout_initiator() is called to wakeup the calling mdmn_send_message and
368 * to clean up the mess.
369 *
370 * If the result from the master arrives later, this message is considered
371 * to be unsolicited. And will be ignored.
372 */
373
374void
375check_timeouts()
376{
377	set_t			setno;
378	time_t			now, then;
379	mutex_t			*mx;
380	md_mn_msgclass_t	class;
381
382	for (; ; ) {
383		now = time((time_t *)NULL);
384		for (setno = 1; setno < MD_MAXSETS; setno++) {
385			if (md_mn_set_inited[setno] != MDMN_SET_READY) {
386				continue;
387			}
388			for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES;
389			    class++) {
390				mx = mdmn_get_initiator_table_mx(setno, class);
391				mutex_lock(mx);
392
393				/* then is the registered time */
394				then =
395				    mdmn_get_initiator_table_time(setno, class);
396				if ((then != 0) && (now > then)) {
397					timeout_initiator(setno, class);
398				}
399				mutex_unlock(mx);
400			}
401		}
402		/* it's ok to check only once per second */
403		sleep(1);
404
405		/* is there work to do? */
406		mutex_lock(&check_timeout_mutex);
407		if (messages_on_their_way == 0) {
408			cond_wait(&check_timeout_cv, &check_timeout_mutex);
409		}
410		mutex_unlock(&check_timeout_mutex);
411	}
412}
413
414void
415setup_debug(void)
416{
417	char	*tmp_dir;
418
419	/* Read in the debug-controlling tokens from runtime.cf */
420	md_commd_global_verb = commd_get_verbosity();
421	/*
422	 * If the user didn't specify a verbosity level in runtime.cf
423	 * we can safely return here. As we don't intend to printout
424	 * debug messages, we don't need to check for the output file.
425	 */
426	if (md_commd_global_verb == 0) {
427		return;
428	}
429
430	/* if commdout is non-NULL it is an open FILE, we'd better close it */
431	if (commdout != (FILE *)NULL) {
432		fclose(commdout);
433	}
434
435	commdoutfile = commd_get_outfile();
436
437	/* setup the debug output */
438	if (commdoutfile == (char *)NULL) {
439		/* if no valid file was specified, use the default */
440		commdoutfile = "/var/run/commd.out";
441		commdout = fopen(commdoutfile, "a");
442	} else {
443		/* check if the directory exists and is writable */
444		tmp_dir = strdup(commdoutfile);
445		if ((access(dirname(tmp_dir), X_OK|W_OK)) ||
446		    ((commdout = fopen(commdoutfile, "a")) == (FILE *)NULL)) {
447			syslog(LOG_ERR,
448			    "Can't write to specified output file %s,\n"
449			    "using /var/run/commd.out instead\n", commdoutfile);
450			free(commdoutfile);
451			commdoutfile = "/var/run/commd.out";
452			commdout = fopen(commdoutfile, "a");
453		}
454		free(tmp_dir);
455	}
456
457	if (commdout == (FILE *)NULL) {
458		syslog(LOG_ERR, "Can't write to debug output file %s\n",
459		    commdoutfile);
460	}
461}
462
463/*
464 * mdmn_is_node_dead checks to see if a node is dead using
465 * the SunCluster infrastructure which is a stable interface.
466 * If unable to contact SunCuster the node is assumed to be alive.
467 * Return values:
468 *	1 - node is dead
469 *	0 - node is alive
470 */
471int
472mdmn_is_node_dead(md_mnnode_desc *node)
473{
474	char	*fmt = "/usr/cluster/bin/scha_cluster_get -O NODESTATE_NODE ";
475	char	*cmd;
476	size_t	size;
477	char	buf[10];
478	FILE	*ptr;
479	int	retval = 0;
480
481	/* I know that I'm alive */
482	if (strcmp(node->nd_nodename, mynode()) == 0)
483		return (retval);
484
485	size = strlen(fmt) + strlen(node->nd_nodename) + 1;
486	cmd = Zalloc(size);
487	(void) strlcat(cmd, fmt, size);
488	(void) strlcat(cmd, node->nd_nodename, size);
489
490	if ((ptr = popen(cmd, "r")) != NULL) {
491		if (fgets(buf, sizeof (buf), ptr) != NULL) {
492			/* If scha_cluster_get returned DOWN - return dead */
493			if (strncmp(buf, "DOWN", 4) == 0)
494				retval = 1;
495		}
496		(void) pclose(ptr);
497	}
498	Free(cmd);
499	return (retval);
500}
501
502/*
503 * global_init()
504 *
505 * Perform some global initializations.
506 *
507 * the following routines have to call this before operation can start:
508 *  - mdmn_send_svc_2
509 *  - mdmn_work_svc_2
510 *  - mdmn_comm_lock_svc_2
511 *  - mdmn_comm_unlock_svc_2
512 *  - mdmn_comm_suspend_svc_2
513 *  - mdmn_comm_resume_svc_2
514 *  - mdmn_comm_reinit_set_svc_2
515 *
516 * This is a single threaded daemon, so it can only be in one of the above
517 * routines at the same time.
518 * This means, global_init() cannot be called more than once at the same time.
519 * Hence, no lock is needed.
520 */
521void
522global_init(void)
523{
524	set_t			set;
525	md_mn_msgclass_t	class;
526	struct sigaction	sighandler;
527	time_t			clock_val;
528	struct rlimit		commd_limit;
529
530
531
532	/* Do these global initializations only once */
533	if (md_commd_global_state & MD_CGS_INITED) {
534		return;
535	}
536	(void) sdssc_bind_library();
537
538	/* setup the debug options from the config file */
539	setup_debug();
540
541	/* make sure that we don't run out of file descriptors */
542	commd_limit.rlim_cur = commd_limit.rlim_max = RLIM_INFINITY;
543	if (setrlimit(RLIMIT_NOFILE, &commd_limit) != 0) {
544		syslog(LOG_WARNING, gettext("setrlimit failed."
545		    "Could not increase the max file descriptors"));
546	}
547
548	/* Make setup_debug() be the action in case of SIGHUP */
549	sighandler.sa_flags = 0;
550	sigfillset(&sighandler.sa_mask);
551	sighandler.sa_handler = (void (*)(int)) setup_debug;
552	sigaction(SIGHUP, &sighandler, NULL);
553
554	__savetime = gethrtime();
555	(void) time(&clock_val);
556	commd_debug(MD_MMV_MISC, "global init called %s\n", ctime(&clock_val));
557
558	/* start a thread that flushes out the debug on a regular basis */
559	thr_create(NULL, 0, (void *(*)(void *))flush_fcout,
560	    (void *) NULL, THR_DETACHED, NULL);
561
562	/* global rwlock's / mutex's / cond_t's go here */
563	mutex_init(&check_timeout_mutex, USYNC_THREAD, NULL);
564	cond_init(&check_timeout_cv, USYNC_THREAD, NULL);
565	mutex_init(&get_setdesc_mutex, USYNC_THREAD, NULL);
566
567	/* Make sure the initiator table is initialized correctly */
568	for (set = 0; set < MD_MAXSETS; set++) {
569		for (class = 0; class < MD_MN_NCLASSES; class++) {
570			mdmn_unregister_initiator_table(set, class);
571		}
572	}
573
574
575	/* setup the check for timeouts */
576	thr_create(NULL, 0, (void *(*)(void *))check_timeouts,
577	    (void *) NULL, THR_DETACHED, NULL);
578
579	md_commd_global_state |= MD_CGS_INITED;
580}
581
582
583/*
584 * mdmn_init_client(setno, nodeid)
585 * called if client[setno][nodeid] is NULL
586 *
587 * NOTE: Must be called with set_desc_rwlock held as a reader
588 * NOTE: Must be called with client_rwlock held as a writer
589 *
590 * If the rpc client for this node has not been setup for any set, we do it now.
591 *
592 * Returns	0 on success (node found in set, rpc client setup)
593 *		-1 if metaget_setdesc failed,
594 *		-2 if node not part of set
595 *		-3 if clnt_create fails
596 */
597static int
598mdmn_init_client(set_t setno, md_mn_nodeid_t nid)
599{
600	md_error_t	ep = mdnullerror;
601	md_mnnode_desc	*node;
602	md_set_desc	*sd;	/* just an abbr for set_descriptor[setno] */
603
604	sd = set_descriptor[setno];
605
606	/*
607	 * Is the appropriate set_descriptor already initialized ?
608	 * Can't think of a scenario where this is not the case, but we'd better
609	 * check for it anyway.
610	 */
611	if (sd == NULL) {
612		mdsetname_t	*sp;
613
614		rw_unlock(&set_desc_rwlock[setno]); /* readlock -> writelock */
615		rw_wrlock(&set_desc_rwlock[setno]);
616		sp = metasetnosetname(setno, &ep);
617		/* Only one thread is supposed to be in metaget_setdesc() */
618		mutex_lock(&get_setdesc_mutex);
619		sd = metaget_setdesc(sp, &ep);
620		mutex_unlock(&get_setdesc_mutex);
621		if (sd == NULL) {
622			rw_unlock(&set_desc_rwlock[setno]); /* back to ... */
623			rw_rdlock(&set_desc_rwlock[setno]); /* ... readlock */
624			return (-1);
625		}
626		set_descriptor[setno] = sd;
627		rw_unlock(&set_desc_rwlock[setno]); /* back to readlock */
628		rw_rdlock(&set_desc_rwlock[setno]);
629	}
630
631	/* first we have to find the node name for this node id */
632	for (node = sd->sd_nodelist; node; node = node->nd_next) {
633		if (node->nd_nodeid == nid)
634			break; /* we found our node in this set */
635	}
636
637
638	if (node == (md_mnnode_desc *)NULL) {
639		commd_debug(MD_MMV_SYSLOG,
640		    "FATAL: node %d not found in set %d\n", nid, setno);
641		rw_unlock(&set_desc_rwlock[setno]);
642		return (-2);
643	}
644
645	commd_debug(MD_MMV_INIT, "init: %s has the flags: 0x%x\n",
646	    node->nd_nodename ? node->nd_nodename : "NULL", node->nd_flags);
647
648	/* Did this node join the diskset?  */
649	if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
650		commd_debug(MD_MMV_INIT, "init: %s didn't join set %d\n",
651		    node->nd_nodename ? node->nd_nodename : "NULL", setno);
652		rw_unlock(&set_desc_rwlock[setno]);
653		return (-2);
654	}
655
656	/* if clnt_create has not been done for that node, do it now */
657	if (client[setno][nid] == (CLIENT *) NULL) {
658		time_t	tout = 0;
659
660		/*
661		 * While trying to create a connection to a node,
662		 * periodically check to see if the node has been marked
663		 * dead by the SunCluster infrastructure.
664		 * This periodic check is needed since a non-responsive
665		 * rpc.mdcommd (while it is attempting to create a connection
666		 * to a dead node) can lead to large delays and/or failures
667		 * in the reconfig steps.
668		 */
669		while ((client[setno][nid] == (CLIENT *) NULL) &&
670		    (tout < MD_CLNT_CREATE_TOUT)) {
671			client[setno][nid] = meta_client_create_retry(
672			    node->nd_nodename, mdmn_clnt_create,
673			    (void *) node, MD_CLNT_CREATE_SUBTIMEOUT, &ep);
674			/* Is the node dead? */
675			if (mdmn_is_node_dead(node) == 1) {
676				commd_debug(MD_MMV_SYSLOG,
677				    "rpc.mdcommd: no client for dead node %s\n",
678				    node->nd_nodename);
679				break;
680			} else
681				tout += MD_CLNT_CREATE_SUBTIMEOUT;
682		}
683
684		if (client[setno][nid] == (CLIENT *) NULL) {
685			clnt_pcreateerror(node->nd_nodename);
686			rw_unlock(&set_desc_rwlock[setno]);
687			return (-3);
688		}
689		/* this node has the license to send */
690		commd_debug(MD_MMV_MISC, "init_client: calling add_lic\n");
691		add_license(node);
692
693		/* set the timeout value */
694		clnt_control(client[setno][nid], CLSET_TIMEOUT,
695		    (char *)&FOUR_SECS);
696
697	}
698	rw_unlock(&set_desc_rwlock[setno]);
699	return (0);
700}
701
702/*
703 * check_client(setno, nodeid)
704 *
705 * must be called with reader lock held for set_desc_rwlock[setno]
706 * and must be called with reader lock held for client_rwlock[setno]
707 * Checks if the client for this set/node combination is already setup
708 * if not it upgrades the lock to a writer lock
709 * and tries to initialize the client.
710 * Finally it's checked if the client nulled out again due to some race
711 *
712 * returns 0 if there is a usable client
713 * returns MDMNE_RPC_FAIL otherwise
714 */
715static int
716check_client(set_t setno, md_mn_nodeid_t nodeid)
717{
718	int ret = 0;
719
720	while ((client[setno][nodeid] == (CLIENT *)NULL) && (ret == 0)) {
721		rw_unlock(&client_rwlock[setno]); /* upgrade reader ... */
722		rw_wrlock(&client_rwlock[setno]); /* ... to writer lock. */
723		if (mdmn_init_client(setno, nodeid) != 0) {
724			ret = MDMNE_RPC_FAIL;
725		}
726		rw_unlock(&client_rwlock[setno]); /* downgrade writer ... */
727		rw_rdlock(&client_rwlock[setno]); /* ... back to reader lock. */
728	}
729	return (ret);
730}
731
732/*
733 * mdmn_init_set(setno, todo)
734 * setno is the number of the set to be initialized.
735 * todo is one of the MDMN_SET_* thingies or MDMN_SET_READY
736 * If called with MDMN_SET_READY everything is initialized.
737 *
738 * If the set mutexes are already initialized, the caller has to hold
739 * both set_desc_rwlock[setno] and client_rwlock[setno] as a writer, before
740 * calling mdmn_init_set()
741 */
742int
743mdmn_init_set(set_t setno, int todo)
744{
745	int class;
746	md_mnnode_desc	*node;
747	md_set_desc	*sd; /* just an abbr for set_descriptor[setno] */
748	mdsetname_t	*sp;
749	md_error_t	ep = mdnullerror;
750	md_mn_nodeid_t	nid;
751
752	/*
753	 * Check if we are told to setup the mutexes and
754	 * if these are not yet setup
755	 */
756	if ((todo & MDMN_SET_MUTEXES) &&
757	    ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0)) {
758		mutex_init(&mdmn_busy_mutex[setno], USYNC_THREAD, NULL);
759		cond_init(&mdmn_busy_cv[setno], USYNC_THREAD, NULL);
760		rwlock_init(&client_rwlock[setno], USYNC_THREAD, NULL);
761		rwlock_init(&set_desc_rwlock[setno], USYNC_THREAD, NULL);
762
763		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
764			mutex_init(mdmn_get_master_table_mx(setno, class),
765			    USYNC_THREAD, NULL);
766			cond_init(mdmn_get_master_table_cv(setno, class),
767			    USYNC_THREAD, NULL);
768			mutex_init(mdmn_get_initiator_table_mx(setno, class),
769			    USYNC_THREAD, NULL);
770		}
771		md_mn_set_inited[setno] |= MDMN_SET_MUTEXES;
772	}
773	if ((todo & MDMN_SET_MCT) &&
774	    ((md_mn_set_inited[setno] & MDMN_SET_MCT) == 0)) {
775		int	fd;
776		size_t	filesize;
777		caddr_t	addr;
778		char table_name[32];
779
780		filesize = (sizeof (md_mn_mct_t));
781		(void) snprintf(table_name, sizeof (table_name), "%s%d",
782		    MD_MN_MSG_COMP_TABLE, setno);
783		/*
784		 * If the mct file exists we map it into memory.
785		 * Otherwise we create an empty file of appropriate
786		 * size and map that into memory.
787		 * The mapped areas are stored in mct[setno].
788		 */
789		fd = open(table_name, O_RDWR|O_CREAT|O_DSYNC, 0600);
790		if (fd < 0) {
791			commd_debug(MD_MMV_MISC,
792			    "init_set: Can't open MCT\n");
793			return (-1);
794		}
795		/*
796		 * To ensure that the file has the appropriate size,
797		 * we write a byte at the end of the file.
798		 */
799		lseek(fd, filesize + 1, SEEK_SET);
800		write(fd, "\0", 1);
801
802		/* at this point we have a file in place that we can mmap */
803		addr = mmap(0, filesize, PROT_READ | PROT_WRITE,
804		    MAP_SHARED, fd, (off_t)0);
805		if (addr == MAP_FAILED) {
806			commd_debug(MD_MMV_INIT,
807			    "init_set: mmap mct error %d\n",
808			    errno);
809			return (-1);
810		}
811		/* LINTED pointer alignment */
812		mct[setno] = (md_mn_mct_t *)addr;
813
814		/* finally we initialize the mutexes that protect the mct */
815		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
816			mutex_init(&(mct_mutex[setno][class]),
817			    USYNC_THREAD, NULL);
818		}
819
820		md_mn_set_inited[setno] |= MDMN_SET_MCT;
821	}
822	/*
823	 * Check if we are told to setup the nodes and
824	 * if these are not yet setup
825	 * (Attention: negative logic here compared to above!)
826	 */
827	if (((todo & MDMN_SET_NODES) == 0) ||
828	    (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
829		return (0); /* success */
830	}
831
832	if ((sp = metasetnosetname(setno, &ep)) == NULL) {
833		commd_debug(MD_MMV_SYSLOG,
834		    "metasetnosetname(%d) returned NULL\n", setno);
835		return (MDMNE_NOT_JOINED);
836	}
837
838	/* flush local copy of rpc.metad data */
839	metaflushsetname(sp);
840
841	mutex_lock(&get_setdesc_mutex);
842	sd = metaget_setdesc(sp, &ep);
843	mutex_unlock(&get_setdesc_mutex);
844
845	if (sd == NULL) {
846		commd_debug(MD_MMV_SYSLOG,
847		    "metaget_setdesc(%d) returned NULL\n", setno);
848		return (MDMNE_NOT_JOINED);
849	}
850
851	/*
852	 * if this set is not a multinode set or
853	 * this node didn't join yet the diskset, better don't do anything
854	 */
855	if ((MD_MNSET_DESC(sd) == 0) ||
856	    (sd->sd_mn_mynode->nd_flags & MD_MN_NODE_OWN) == 0) {
857		commd_debug(MD_MMV_INIT, "didn't yet join set %d\n", setno);
858		return (MDMNE_NOT_JOINED);
859	}
860
861	for (node = sd->sd_nodelist; node != NULL; node = node->nd_next) {
862		time_t	tout = 0;
863		nid = node->nd_nodeid;
864
865		commd_debug(MD_MMV_INIT,
866		    "setting up: node=%s, priv_ic=%s, flags=0x%x\n",
867		    node->nd_nodename ? node->nd_nodename : "NULL",
868		    node->nd_priv_ic ? node->nd_priv_ic : "NULL",
869		    node->nd_flags);
870
871		if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
872			commd_debug(MD_MMV_INIT,
873			    "init: %s didn't join set %d\n",
874			    node->nd_nodename ? node->nd_nodename : "NULL",
875			    setno);
876			continue;
877		}
878
879		if (client[setno][nid] != (CLIENT *) NULL) {
880			/* already inited */
881			commd_debug(MD_MMV_INIT, "init: already: node=%s\n",
882			    node->nd_nodename ? node->nd_nodename : "NULL");
883			continue;
884		}
885
886		/*
887		 * While trying to create a connection to a node,
888		 * periodically check to see if the node has been marked
889		 * dead by the SunCluster infrastructure.
890		 * This periodic check is needed since a non-responsive
891		 * rpc.mdcommd (while it is attempting to create a connection
892		 * to a dead node) can lead to large delays and/or failures
893		 * in the reconfig steps.
894		 */
895		while ((client[setno][nid] == (CLIENT *) NULL) &&
896		    (tout < MD_CLNT_CREATE_TOUT)) {
897			client[setno][nid] = meta_client_create_retry(
898			    node->nd_nodename, mdmn_clnt_create,
899			    (void *) node, MD_CLNT_CREATE_SUBTIMEOUT, &ep);
900			/* Is the node dead? */
901			if (mdmn_is_node_dead(node) == 1) {
902				commd_debug(MD_MMV_SYSLOG,
903				    "rpc.mdcommd: no client for dead node %s\n",
904				    node->nd_nodename);
905				break;
906			} else
907				tout += MD_CLNT_CREATE_SUBTIMEOUT;
908		}
909
910		if (client[setno][nid] == (CLIENT *) NULL) {
911			clnt_pcreateerror(node->nd_nodename);
912			/*
913			 * If we cannot connect to a single node
914			 * (maybe because it is down) we mark this node as not
915			 * owned and continue with the next node in the list.
916			 * This is better than failing the entire starting up
917			 * of the commd system.
918			 */
919			node->nd_flags &= ~MD_MN_NODE_OWN;
920			commd_debug(MD_MMV_SYSLOG,
921			    "WARNING couldn't create client for %s\n"
922			    "Reconfig cycle required\n",
923			    node->nd_nodename);
924			commd_debug(MD_MMV_INIT,
925			    "WARNING couldn't create client for %s\n"
926			    "Reconfig cycle required\n",
927			    node->nd_nodename);
928			continue;
929		}
930		/* this node has the license to send */
931		commd_debug(MD_MMV_MISC, "init_set: calling add_lic\n");
932		add_license(node);
933
934		/* set the timeout value */
935		clnt_control(client[setno][nid], CLSET_TIMEOUT,
936		    (char *)&FOUR_SECS);
937
938		commd_debug(MD_MMV_INIT, "init: done: node=%s\n",
939		    node->nd_nodename ? node->nd_nodename : "NULL");
940	}
941
942	set_descriptor[setno] = sd;
943	md_mn_set_inited[setno] |= MDMN_SET_NODES;
944	return (0); /* success */
945}
946
947void *
948mdmn_send_to_work(void *arg)
949{
950	int			*rpc_err = NULL;
951	int			success;
952	int			try_master;
953	set_t			setno;
954	mutex_t			*mx;	/* protection for initiator_table */
955	SVCXPRT			*transp;
956	md_mn_msg_t		*msg;
957	md_mn_nodeid_t		set_master;
958	md_mn_msgclass_t	class;
959	md_mn_msg_and_transp_t	*matp = (md_mn_msg_and_transp_t *)arg;
960
961	msg			= matp->mat_msg;
962	transp			= matp->mat_transp;
963
964	class = mdmn_get_message_class(msg->msg_type);
965	setno = msg->msg_setno;
966
967	/* set the sender, so the master knows who to send the results */
968	rw_rdlock(&set_desc_rwlock[setno]);
969	msg->msg_sender = set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
970	set_master	= set_descriptor[setno]->sd_mn_master_nodeid;
971
972	mx = mdmn_get_initiator_table_mx(setno, class);
973	mutex_lock(mx);
974
975	/*
976	 * Here we check, if the initiator table slot for this set/class
977	 * combination is free to use.
978	 * If this is not the case, we return CLASS_BUSY forcing the
979	 * initiating send_message call to retry
980	 */
981	success = mdmn_check_initiator_table(setno, class);
982	if (success == MDMNE_CLASS_BUSY) {
983		md_mn_msgid_t		active_mid;
984
985		mdmn_get_initiator_table_id(setno, class, &active_mid);
986
987		commd_debug(MD_MMV_SEND,
988		    "send_to_work: received but locally busy "
989		    "(%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
990		    "active msg=(%d, 0x%llx-%d)\n",
991		    MSGID_ELEMS(msg->msg_msgid), setno, class,
992		    msg->msg_type, MSGID_ELEMS(active_mid));
993	} else {
994		commd_debug(MD_MMV_SEND,
995		    "send_to_work: received (%d, 0x%llx-%d), "
996		    "set=%d, class=%d, type=%d\n",
997		    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
998	}
999
1000	try_master = 2; /* return failure after two retries */
1001	while ((success == MDMNE_ACK) && (try_master--)) {
1002		rw_rdlock(&client_rwlock[setno]);
1003		/* is the rpc client to the master still around ? */
1004		if (check_client(setno, set_master)) {
1005			success = MDMNE_RPC_FAIL;
1006			FLUSH_DEBUGFILE();
1007			rw_unlock(&client_rwlock[setno]);
1008			break; /* out of try_master-loop */
1009		}
1010
1011		/*
1012		 * Send the request to the work function on the master
1013		 * this call will return immediately
1014		 */
1015		rpc_err = mdmn_work_2(msg, client[setno][set_master],
1016		    set_master);
1017
1018		/* Everything's Ok? */
1019		if (rpc_err == NULL) {
1020			success = MDMNE_RPC_FAIL;
1021			/*
1022			 * Probably something happened to the daemon on the
1023			 * master. Kill the client, and try again...
1024			 */
1025			rw_unlock(&client_rwlock[setno]);
1026			rw_wrlock(&client_rwlock[setno]);
1027			mdmn_clnt_destroy(client[setno][set_master]);
1028			if (client[setno][set_master] != (CLIENT *)NULL) {
1029				client[setno][set_master] = (CLIENT *)NULL;
1030			}
1031			rw_unlock(&client_rwlock[setno]);
1032			continue;
1033
1034		} else  if (*rpc_err != MDMNE_ACK) {
1035			/* something went wrong, break out */
1036			success = *rpc_err;
1037			free(rpc_err);
1038			rw_unlock(&client_rwlock[setno]);
1039			break; /* out of try_master-loop */
1040		}
1041
1042		rw_unlock(&client_rwlock[setno]);
1043		free(rpc_err);
1044
1045		/*
1046		 * If we are here, we sucessfully delivered the message.
1047		 * We register the initiator_table, so that
1048		 * wakeup_initiator_2 can do the sendreply with the
1049		 * results for us.
1050		 */
1051		success = MDMNE_ACK;
1052		mdmn_register_initiator_table(setno, class, msg, transp);
1053
1054		/* tell check_timeouts, there's work to do */
1055		mutex_lock(&check_timeout_mutex);
1056		messages_on_their_way++;
1057		cond_signal(&check_timeout_cv);
1058		mutex_unlock(&check_timeout_mutex);
1059		break; /* out of try_master-loop */
1060	}
1061
1062	rw_unlock(&set_desc_rwlock[setno]);
1063
1064	if (success == MDMNE_ACK) {
1065		commd_debug(MD_MMV_SEND,
1066		    "send_to_work: registered (%d, 0x%llx-%d)\n",
1067		    MSGID_ELEMS(msg->msg_msgid));
1068	} else {
1069		/* In case of failure do the sendreply now */
1070		md_mn_result_t *resultp;
1071		resultp = Zalloc(sizeof (md_mn_result_t));
1072		resultp->mmr_comm_state = success;
1073		/*
1074		 * copy the MSGID so that we know _which_ message
1075		 * failed (if the transp has got mangled)
1076		 */
1077		MSGID_COPY(&(msg->msg_msgid), &(resultp->mmr_msgid));
1078		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
1079		commd_debug(MD_MMV_SEND,
1080		    "send_to_work: not registered (%d, 0x%llx-%d) cs=%d\n",
1081		    MSGID_ELEMS(msg->msg_msgid), success);
1082		free_result(resultp);
1083		/*
1084		 * We don't have a timeout registered to wake us up, so we're
1085		 * now done with this handle. Release it back to the pool.
1086		 */
1087		svc_done(transp);
1088
1089	}
1090
1091	free_msg(msg);
1092	/* the alloc was done in mdmn_send_svc_2 */
1093	Free(matp);
1094	mutex_unlock(mx);
1095	return (NULL);
1096
1097}
1098
1099/*
1100 * do_message_locally(msg, result)
1101 * Process a message locally on the master
1102 * Lookup the MCT if the message has already been processed.
1103 * If not, call the handler and store the result
1104 * If yes, retrieve the result from the MCT.
1105 * Return:
1106 *	MDMNE_ACK in case of success
1107 *	MDMNE_LOG_FAIL if the MCT could not be checked
1108 */
1109static int
1110do_message_locally(md_mn_msg_t *msg, md_mn_result_t *result)
1111{
1112	int			completed;
1113	set_t			setno;
1114	md_mn_msgtype_t		msgtype = msg->msg_type;
1115	md_mn_msgclass_t	class;
1116
1117	void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
1118
1119	handler = mdmn_get_handler(msgtype);
1120	if (handler == NULL) {
1121		result->mmr_exitval = 0;
1122		/* let the sender decide if this is an error or not */
1123		result->mmr_comm_state = MDMNE_NO_HANDLER;
1124		return (MDMNE_NO_HANDLER);
1125	}
1126
1127	class = mdmn_get_message_class(msg->msg_type);
1128	setno = msg->msg_setno;
1129
1130	result->mmr_msgtype	= msgtype;
1131	result->mmr_flags	= msg->msg_flags;
1132	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1133
1134	mutex_lock(&mct_mutex[setno][class]);
1135	completed = mdmn_check_completion(msg, result);
1136	if (completed == MDMN_MCT_NOT_DONE) {
1137		/* message not yet processed locally */
1138		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1139		    "calling handler for (%d,0x%llx-%d) type %d\n",
1140		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1141
1142		/*
1143		 * Mark the message as being currently processed,
1144		 * so we won't start a second handler for it
1145		 */
1146		(void) mdmn_mark_completion(msg, NULL, MDMN_MCT_IN_PROGRESS);
1147		mutex_unlock(&mct_mutex[setno][class]);
1148
1149		/* here we actually process the message on the master */
1150		(*handler)(msg, MD_MSGF_ON_MASTER, result);
1151
1152		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1153		    "finished handler for (%d,0x%llx-%d) type %d\n",
1154		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1155
1156		/* Mark the message as fully processed, store the result */
1157		mutex_lock(&mct_mutex[setno][class]);
1158		(void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
1159	} else if (completed == MDMN_MCT_DONE) {
1160		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1161		    "result for (%d, 0x%llx-%d) from MCT\n",
1162		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1163	} else if (completed == MDMN_MCT_IN_PROGRESS) {
1164		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1165		    "(%d, 0x%llx-%d) is currently being processed\n",
1166		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1167	} else {
1168		/* MCT error occurred (should never happen) */
1169		mutex_unlock(&mct_mutex[setno][class]);
1170		result->mmr_comm_state = MDMNE_LOG_FAIL;
1171		commd_debug(MD_MMV_SYSLOG, "WARNING "
1172		    "mdmn_check_completion returned %d "
1173		    "for (%d,0x%llx-%d)\n", completed,
1174		    MSGID_ELEMS(msg->msg_msgid));
1175		return (MDMNE_LOG_FAIL);
1176	}
1177	mutex_unlock(&mct_mutex[setno][class]);
1178	return (MDMNE_ACK);
1179
1180}
1181
1182/*
1183 * do_send_message(msg, node)
1184 *
1185 * Send a message to a given node and wait for a acknowledgment, that the
1186 * message has arrived on the remote node.
1187 * Make sure that the client for the set is setup correctly.
1188 * If no ACK arrives, destroy and recreate the RPC client and retry the
1189 * message one time
1190 * After actually sending wait no longer than the appropriate number of
1191 * before timing out the message.
1192 *
1193 * Note must be called with set_desc_wrlock held in reader mode
1194 */
1195static int
1196do_send_message(md_mn_msg_t *msg, md_mnnode_desc *node)
1197{
1198	int			err;
1199	int			rpc_retries;
1200	int			timeout_retries = 0;
1201	int			*ret = NULL;
1202	set_t			setno;
1203	cond_t			*cv;	/* see mdmn_wakeup_master_svc_2 */
1204	mutex_t			*mx;	/* protection for class_busy */
1205	timestruc_t		timeout; /* surveillance for remote daemon */
1206	md_mn_nodeid_t		nid;
1207	md_mn_msgtype_t		msgtype;
1208	md_mn_msgclass_t	class;
1209
1210	nid	= node->nd_nodeid;
1211	msgtype = msg->msg_type;
1212	setno	= msg->msg_setno;
1213	class	= mdmn_get_message_class(msgtype);
1214	mx	= mdmn_get_master_table_mx(setno, class);
1215	cv	= mdmn_get_master_table_cv(setno, class);
1216
1217retry_rpc:
1218
1219	/* We try two times to send the message */
1220	rpc_retries = 2;
1221
1222	/*
1223	 * if sending the message doesn't succeed the first time due to a
1224	 * RPC problem, we retry one time
1225	 */
1226	while ((rpc_retries != 0) && (ret == NULL)) {
1227		/*  in abort state, we error out immediately */
1228		if (md_commd_global_state & MD_CGS_ABORTED) {
1229			return (MDMNE_ABORT);
1230		}
1231
1232		rw_rdlock(&client_rwlock[setno]);
1233		/* unable to create client? Ignore it */
1234		if (check_client(setno, nid)) {
1235			/*
1236			 * In case we cannot establish an RPC client, we
1237			 * take this node out of our considerations.
1238			 * This will be reset by a reconfig
1239			 * cycle that should come pretty soon.
1240			 * MNISSUE: Should a reconfig cycle
1241			 * be forced on SunCluster?
1242			 */
1243			node->nd_flags &= ~MD_MN_NODE_OWN;
1244			commd_debug(MD_MMV_SYSLOG,
1245			    "WARNING couldn't create client for %s\n"
1246			    "Reconfig cycle required\n",
1247			    node->nd_nodename);
1248			commd_debug(MD_MMV_PROC_M, "proc_mas: (%d,0x%llx-%d) "
1249			    "WARNING couldn't create client for %s\n",
1250			    MSGID_ELEMS(msg->msg_msgid), node->nd_nodename);
1251			rw_unlock(&client_rwlock[setno]);
1252			return (MDMNE_IGNORE_NODE);
1253		}
1254		/* let's be paranoid and check again before sending */
1255		if (client[setno][nid] == NULL) {
1256			/*
1257			 * if this is true, strange enough, we catch our breath,
1258			 * and then continue, so that the client is set up
1259			 * once again.
1260			 */
1261			commd_debug(MD_MMV_PROC_M, "client is NULL\n");
1262			rw_unlock(&client_rwlock[setno]);
1263			sleep(1);
1264			continue;
1265		}
1266
1267		/* send it over, it will return immediately */
1268		ret = mdmn_work_2(msg, client[setno][nid], nid);
1269
1270		rw_unlock(&client_rwlock[setno]);
1271
1272		if (ret != NULL) {
1273			commd_debug(MD_MMV_PROC_M,
1274			    "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1275			    " 0x%x\n",
1276			    MSGID_ELEMS(msg->msg_msgid), nid, *ret);
1277		} else {
1278			commd_debug(MD_MMV_PROC_M,
1279			    "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1280			    " NULL \n",
1281			    MSGID_ELEMS(msg->msg_msgid), nid);
1282		}
1283
1284		if ((ret == NULL) || (*ret == MDMNE_CANNOT_CONNECT) ||
1285		    (*ret == MDMNE_THR_CREATE_FAIL)) {
1286			/*
1287			 * Something happened to the daemon on the other side.
1288			 * Kill the client, and try again.
1289			 * check_client() will create a new client
1290			 */
1291			rw_wrlock(&client_rwlock[setno]);
1292			mdmn_clnt_destroy(client[setno][nid]);
1293			if (client[setno][nid] != (CLIENT *)NULL) {
1294				client[setno][nid] = (CLIENT *)NULL;
1295			}
1296			rw_unlock(&client_rwlock[setno]);
1297
1298			/* ... but don't try infinitely */
1299			--rpc_retries;
1300			continue;
1301		}
1302		/*
1303		 * If the class is locked on the other node, keep trying.
1304		 * This situation will go away automatically,
1305		 * if we wait long enough
1306		 */
1307		if (*ret == MDMNE_CLASS_LOCKED) {
1308			sleep(1);
1309			free(ret);
1310			ret = NULL;
1311			continue;
1312		}
1313	}
1314	if (ret == NULL) {
1315		return (MDMNE_RPC_FAIL);
1316	}
1317
1318
1319	/* if the slave is in abort state, we just ignore it. */
1320	if (*ret == MDMNE_ABORT) {
1321		commd_debug(MD_MMV_PROC_M,
1322		    "proc_mas: work(%d,0x%llx-%d) returned "
1323		    "MDMNE_ABORT\n",
1324		    MSGID_ELEMS(msg->msg_msgid));
1325		free(ret);
1326		return (MDMNE_IGNORE_NODE);
1327	}
1328
1329	/* Did the remote processing succeed? */
1330	if (*ret != MDMNE_ACK) {
1331		/*
1332		 * Some commd failure in the middle of sending the msg
1333		 * to the nodes. We don't continue here.
1334		 */
1335		commd_debug(MD_MMV_PROC_M,
1336		    "proc_mas: work(%d,0x%llx-%d) returns %d\n",
1337		    MSGID_ELEMS(msg->msg_msgid), *ret);
1338		free(ret);
1339		return (MDMNE_RPC_FAIL);
1340	}
1341	free(ret);
1342	ret = NULL;
1343
1344	/*
1345	 * When we are here, we have sent the message to the other node and
1346	 * we know that node has accepted it.
1347	 * We go to sleep and have trust to be woken up by wakeup.
1348	 * If we wakeup due to a timeout, or a signal, no result has been
1349	 * placed in the appropriate slot.
1350	 * If we timeout, it is likely that this is because the node has
1351	 * gone away, so we will destroy the client and try it again in the
1352	 * expectation that the rpc will fail and we will return
1353	 * MDMNE_IGNORE_NODE. If that is not the case, the message must still
1354	 * be being processed on the slave. In this case just timeout for 4
1355	 * more seconds and then return RPC_FAIL if the message is not complete.
1356	 */
1357	timeout.tv_nsec = 0;
1358	timeout.tv_sec = (timeout_retries == 0) ? mdmn_get_timeout(msgtype) :
1359	    FOUR_SECS.tv_sec;
1360	err = cond_reltimedwait(cv, mx, &timeout);
1361
1362	if (err == 0) {
1363		/* everything's fine, return success */
1364		return (MDMNE_ACK);
1365	}
1366
1367	if (err == ETIME) {
1368		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1369		    "timeout occured, set=%d, class=%d, "
1370		    "msgid=(%d, 0x%llx-%d), timeout_retries=%d\n",
1371		    setno, class, MSGID_ELEMS(msg->msg_msgid), timeout_retries);
1372		if (timeout_retries == 0) {
1373			timeout_retries++;
1374			/*
1375			 * Destroy the client and try the rpc call again
1376			 */
1377			rw_wrlock(&client_rwlock[setno]);
1378			mdmn_clnt_destroy(client[setno][nid]);
1379			client[setno][nid] = (CLIENT *)NULL;
1380			rw_unlock(&client_rwlock[setno]);
1381			goto retry_rpc;
1382		}
1383	} else if (err == EINTR) {
1384		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1385		    "commd signalled, set=%d, class=%d, "
1386		    "msgid=(%d, 0x%llx-%d)\n",
1387		    setno, class, MSGID_ELEMS(msg->msg_msgid));
1388	} else {
1389		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1390		    "cond_reltimedwait err=%d, set=%d, "
1391		    "class=%d, msgid=(%d, 0x%llx-%d)\n",
1392		    err, setno, class,
1393		    MSGID_ELEMS(msg->msg_msgid));
1394	}
1395
1396	/* some failure happened */
1397	return (MDMNE_RPC_FAIL);
1398}
1399
1400/*
1401 * before we return we have to
1402 * free_msg(msg); because we are working on a copied message
1403 */
1404void
1405mdmn_master_process_msg(md_mn_msg_t *msg)
1406{
1407	int		*ret;
1408	int		err;
1409	int		nmsgs;		/* total number of msgs */
1410	int		curmsg;		/* index of current msg */
1411	set_t		setno;
1412	uint_t		inherit_flags = 0;
1413	uint_t		secdiff, usecdiff; /* runtime of this message */
1414	md_error_t	mde = mdnullerror;
1415	md_mn_msg_t	*msglist[MAX_SUBMESSAGES]; /* all msgs to process */
1416	md_mn_msg_t	*cmsg;		/* current msg */
1417	md_mn_msgid_t	dummyid;
1418	md_mn_result_t	*result;
1419	md_mn_result_t	*slave_result;
1420	md_mn_nodeid_t	sender;
1421	md_mn_nodeid_t	set_master;
1422	md_mnnode_desc	*node;
1423	md_mn_msgtype_t	orig_type;	/* type of the original message */
1424	md_mn_msgtype_t	msgtype;	/* type of the current message */
1425	md_mn_msgclass_t orig_class;	/* class of the original message */
1426	md_mn_msgclass_t class;		/* class of the current message */
1427
1428	int (*smgen)(md_mn_msg_t *msg, md_mn_msg_t **msglist);
1429
1430	orig_type = msgtype = msg->msg_type;
1431	sender	= msg->msg_sender;
1432	setno	= msg->msg_setno;
1433
1434	result = Zalloc(sizeof (md_mn_result_t));
1435	result->mmr_setno	= setno;
1436	result->mmr_msgtype	= msgtype;
1437	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1438
1439	orig_class = mdmn_get_message_class(msgtype);
1440
1441	commd_debug(MD_MMV_PROC_M,
1442	    "proc_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
1443	    MSGID_ELEMS(msg->msg_msgid), setno, orig_class, msgtype);
1444
1445	rw_rdlock(&set_desc_rwlock[setno]);
1446	set_master = set_descriptor[setno]->sd_mn_master_nodeid;
1447	result->mmr_sender	= set_master;
1448	/*
1449	 * Put message into the change log unless told otherwise
1450	 * Note that we only log original messages.
1451	 * If they are generated by some smgen, we don't log them!
1452	 * Replay messages aren't logged either.
1453	 * Note, that replay messages are unlogged on completion.
1454	 */
1455	if ((msg->msg_flags & (MD_MSGF_NO_LOG | MD_MSGF_REPLAY_MSG)) == 0) {
1456		commd_debug(MD_MMV_PROC_M,
1457		    "proc_mas: calling log_msg for (%d,0x%llx-%d) type %d\n",
1458		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1459		err = mdmn_log_msg(msg);
1460		if (err == MDMNE_NULL) {
1461			/* msg logged successfully */
1462			commd_debug(MD_MMV_PROC_M, "proc_mas: "
1463			    "done log_msg for (%d,0x%llx-%d) type %d\n",
1464			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1465			goto proceed;
1466		}
1467		if (err == MDMNE_ACK) {
1468			/* Same msg in the slot, proceed */
1469			commd_debug(MD_MMV_PROC_M, "proc_mas: "
1470			    "already logged (%d,0x%llx-%d) type %d\n",
1471			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1472			goto proceed;
1473		}
1474		if (err == MDMNE_LOG_FAIL) {
1475			/* Oh, bad, the log is non functional. */
1476			result->mmr_comm_state = MDMNE_LOG_FAIL;
1477			/*
1478			 * Note that the mark_busy was already done by
1479			 * mdmn_work_svc_2()
1480			 */
1481			mutex_lock(&mdmn_busy_mutex[setno]);
1482			mdmn_mark_class_unbusy(setno, orig_class);
1483			mutex_unlock(&mdmn_busy_mutex[setno]);
1484
1485		}
1486		if (err == MDMNE_CLASS_BUSY) {
1487			/*
1488			 * The log is occupied with a different message
1489			 * that needs to be played first.
1490			 * We reject the current message with MDMNE_CLASS_BUSY
1491			 * to the initiator and do not unbusy the set/class,
1492			 * because we will proceed with the logged message,
1493			 * which has the same set/class combination
1494			 */
1495			result->mmr_comm_state = MDMNE_CLASS_BUSY;
1496		}
1497		ret = (int *)NULL;
1498		rw_rdlock(&client_rwlock[setno]);
1499
1500		if (check_client(setno, sender)) {
1501			commd_debug(MD_MMV_SYSLOG,
1502			    "proc_mas: No client for initiator \n");
1503		} else {
1504			ret = mdmn_wakeup_initiator_2(result,
1505			    client[setno][sender], sender);
1506		}
1507		rw_unlock(&client_rwlock[setno]);
1508
1509		if (ret == (int *)NULL) {
1510			commd_debug(MD_MMV_SYSLOG,
1511			    "proc_mas: couldn't wakeup_initiator \n");
1512		} else {
1513			if (*ret != MDMNE_ACK) {
1514				commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1515				    "wakeup_initiator returned %d\n", *ret);
1516			}
1517			free(ret);
1518		}
1519		free_msg(msg);
1520
1521		if (err == MDMNE_LOG_FAIL) {
1522			/* we can't proceed here */
1523			free_result(result);
1524			rw_unlock(&set_desc_rwlock[setno]);
1525			return;
1526		} else if (err == MDMNE_CLASS_BUSY) {
1527			mdmn_changelog_record_t *lr;
1528			lr = mdmn_get_changelogrec(setno, orig_class);
1529			assert(lr != NULL);
1530
1531			/* proceed with the logged message */
1532			msg = copy_msg(&(lr->lr_msg), NULL);
1533
1534			/*
1535			 * The logged message has to have the same class but
1536			 * type and sender can be different
1537			 */
1538			orig_type = msgtype = msg->msg_type;
1539			sender	= msg->msg_sender;
1540
1541			commd_debug(MD_MMV_PROC_M,
1542			    "proc_mas: Got new message from change log: "
1543			    "(%d,0x%llx-%d) type %d\n",
1544			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1545
1546			/* continue normal operation with this message */
1547		}
1548	}
1549
1550proceed:
1551	smgen = mdmn_get_submessage_generator(msgtype);
1552	if (smgen == NULL) {
1553		/* no submessages to create, just use the original message */
1554		msglist[0] = msg;
1555		nmsgs = 1;
1556	} else {
1557		/* some bits are passed on to submessages */
1558		inherit_flags = msg->msg_flags & MD_MSGF_INHERIT_BITS;
1559
1560		nmsgs = smgen(msg, msglist);
1561
1562		/* some settings for the submessages */
1563		for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1564			cmsg    = msglist[curmsg];
1565
1566			/* Apply the inherited flags */
1567			cmsg->msg_flags |= inherit_flags;
1568
1569			/*
1570			 * Make sure the submessage ID is set correctly
1571			 * Note: first submessage has mid_smid of 1 (not 0)
1572			 */
1573			cmsg->msg_msgid.mid_smid = curmsg + 1;
1574
1575			/* need the original class set in msgID (for MCT) */
1576			cmsg->msg_msgid.mid_oclass = orig_class;
1577		}
1578
1579		commd_debug(MD_MMV_PROC_M,
1580		    "smgen generated %d submsgs, origclass = %d\n",
1581		    nmsgs, orig_class);
1582	}
1583	/*
1584	 * This big loop does the following.
1585	 * For all messages:
1586	 *	process message on the master first (a message completion
1587	 *		table MCT ensures a message is not processed twice)
1588	 *	in case of an error break out of message loop
1589	 *	for all nodes -- unless MD_MSGF_NO_BCAST is set --
1590	 *		send message to node until that succeeds
1591	 *		merge result -- not yet implemented
1592	 *		respect MD_MSGF_STOP_ON_ERROR
1593	 */
1594	for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1595		int	break_msg_loop = 0;
1596		mutex_t	*mx;		/* protection for class_busy */
1597		int	master_err;
1598		int	master_exitval = -1;
1599
1600		cmsg	= msglist[curmsg];
1601		msgtype = cmsg->msg_type;
1602		class	= mdmn_get_message_class(msgtype);
1603		node	= NULL;
1604		mx	= mdmn_get_master_table_mx(setno, class);
1605
1606		/* If we are in the abort state, we error out immediately */
1607		if (md_commd_global_state & MD_CGS_ABORTED) {
1608			break; /* out of the message loop */
1609		}
1610
1611		commd_debug(MD_MMV_PROC_M, "class=%d, orig_class=%d\n",
1612		    class, orig_class);
1613		/*
1614		 * If the current class is different from the original class,
1615		 * we have to lock it down.
1616		 * The original class is already marked busy.
1617		 * At this point we cannot refuse the message because the
1618		 * class is busy right now, so we wait until the class becomes
1619		 * available again. As soon as something changes for this set
1620		 * we will be cond_signal'ed (in mdmn_mark_class_unbusy)
1621		 *
1622		 * Granularity could be finer (setno/class)
1623		 */
1624		if (class != orig_class) {
1625			mutex_lock(&mdmn_busy_mutex[setno]);
1626			while (mdmn_mark_class_busy(setno, class) == FALSE) {
1627				cond_wait(&mdmn_busy_cv[setno],
1628				    &mdmn_busy_mutex[setno]);
1629			}
1630			mutex_unlock(&mdmn_busy_mutex[setno]);
1631		}
1632
1633		master_err = do_message_locally(cmsg, result);
1634
1635		if ((master_err != MDMNE_ACK) ||
1636		    ((master_err == MDMNE_ACK) && (result->mmr_exitval != 0))) {
1637			result->mmr_failing_node = set_master;
1638			if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1639				/*
1640				 * if appropriate, unbusy the class and
1641				 * break out of the message loop
1642				 */
1643				if (class != orig_class) {
1644					mutex_lock(&mdmn_busy_mutex[setno]);
1645					mdmn_mark_class_unbusy(setno, class);
1646					mutex_unlock(&mdmn_busy_mutex[setno]);
1647				}
1648				break;
1649			}
1650		}
1651
1652		if (master_err == MDMNE_ACK)
1653			master_exitval = result->mmr_exitval;
1654
1655		/* No broadcast? => next message */
1656		if (cmsg->msg_flags & MD_MSGF_NO_BCAST) {
1657			/* if appropriate, unbusy the class */
1658			if (class != orig_class) {
1659				mutex_lock(&mdmn_busy_mutex[setno]);
1660				mdmn_mark_class_unbusy(setno, class);
1661				mutex_unlock(&mdmn_busy_mutex[setno]);
1662			}
1663			continue;
1664		}
1665
1666
1667		/* fake sender, so we get notified when the results are avail */
1668		cmsg->msg_sender = set_master;
1669		/*
1670		 * register to the master_table. It's needed by wakeup_master to
1671		 * wakeup the sleeping thread.
1672		 * Access is protected by the class lock: mdmn_mark_class_busy()
1673		 */
1674		mdmn_set_master_table_id(setno, class, &(cmsg->msg_msgid));
1675
1676
1677
1678		rw_rdlock(&set_desc_rwlock[setno]);
1679		/* Send the message  to all other nodes */
1680		for (node = set_descriptor[setno]->sd_nodelist; node;
1681		    node = node->nd_next) {
1682			md_mn_nodeid_t nid = node->nd_nodeid;
1683
1684			/* We are master and have already processed the msg */
1685			if (node == set_descriptor[setno]->sd_mn_masternode) {
1686				continue;
1687			}
1688
1689			/* If this node didn't join the disk set, ignore it */
1690			if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
1691				continue;
1692			}
1693
1694			/* If a DIRECTED message, skip non-recipient nodes */
1695			if ((cmsg->msg_flags & MD_MSGF_DIRECTED) &&
1696			    nid != cmsg->msg_recipient) {
1697				continue;
1698			}
1699
1700			mutex_lock(mx);
1701			/*
1702			 * Register the node that is addressed,
1703			 * so we can detect unsolicited messages
1704			 */
1705			mdmn_set_master_table_addr(setno, class, nid);
1706			slave_result = (md_mn_result_t *)NULL;
1707
1708			/*
1709			 * Now send it. do_send_message() will return if
1710			 *	a failure occurs or
1711			 *	the results are available
1712			 */
1713			err = do_send_message(cmsg, node);
1714
1715			/*  in abort state, we error out immediately */
1716			if (md_commd_global_state & MD_CGS_ABORTED) {
1717				break;
1718			}
1719
1720			if (err == MDMNE_ACK) {
1721				slave_result =
1722				    mdmn_get_master_table_res(setno, class);
1723				commd_debug(MD_MMV_PROC_M,
1724				    "proc_mas: got result for (%d,0x%llx-%d)\n",
1725				    MSGID_ELEMS(cmsg->msg_msgid));
1726			} else if (err == MDMNE_IGNORE_NODE) {
1727				mutex_unlock(mx);
1728				continue; /* send to next node */
1729			}
1730			mutex_unlock(mx);
1731
1732
1733			/*
1734			 * If the result is NULL, or err doesn't show success,
1735			 * something went wrong with this RPC call.
1736			 */
1737			if ((slave_result == NULL) || (err != MDMNE_ACK)) {
1738				/*
1739				 * If PANIC_WHEN_INCONSISTENT set,
1740				 * panic if the master succeeded while
1741				 * this node failed
1742				 */
1743				if ((cmsg->msg_flags &
1744				    MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1745				    (master_err == MDMNE_ACK))
1746					panic_system(nid, cmsg->msg_type,
1747					    master_err, master_exitval,
1748					    slave_result);
1749
1750				result->mmr_failing_node = nid;
1751				/* are we supposed to stop in case of error? */
1752				if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1753					result->mmr_exitval = MDMNE_RPC_FAIL;
1754					commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1755					    "result (%d,0x%llx-%d) is NULL\n",
1756					    MSGID_ELEMS(cmsg->msg_msgid));
1757					FLUSH_DEBUGFILE();
1758					break_msg_loop = 1;
1759					break; /* out of node loop first */
1760				} else {
1761					/* send msg to the next node */
1762					continue;
1763				}
1764
1765			}
1766
1767			/*
1768			 * Message processed on remote node.
1769			 * If PANIC_WHEN_INCONSISTENT set, panic if the
1770			 * result is different on this node from the result
1771			 * on the master
1772			 */
1773			if ((cmsg->msg_flags &
1774			    MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1775			    ((master_err != MDMNE_ACK) ||
1776			    (slave_result->mmr_exitval != master_exitval)))
1777				panic_system(nid, cmsg->msg_type, master_err,
1778				    master_exitval, slave_result);
1779
1780			/*
1781			 * At this point we know we have a message that was
1782			 * processed on the remote node.
1783			 * We now check if the exitval is non zero.
1784			 * In that case we discard the previous result and
1785			 * rather use the current.
1786			 * This means: If a message fails on no node,
1787			 * the result from the master will be returned.
1788			 * There's currently no such thing as merge of results
1789			 * If additionally STOP_ON_ERROR is set, we bail out
1790			 */
1791			if (slave_result->mmr_exitval != 0) {
1792				/* throw away the previously allocated result */
1793				free_result(result);
1794
1795				/* copy_result() allocates new memory */
1796				result = copy_result(slave_result);
1797				free_result(slave_result);
1798
1799				dump_result(MD_MMV_PROC_M, "proc_mas", result);
1800
1801				result->mmr_failing_node = nid;
1802				if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1803					break_msg_loop = 1;
1804					break; /* out of node loop */
1805				}
1806				continue; /* try next node */
1807
1808			} else {
1809				/*
1810				 * MNIssue: may want to merge the results
1811				 * from all slaves.  Currently only report
1812				 * the results from the master.
1813				 */
1814				free_result(slave_result);
1815			}
1816
1817		} /* End of loop over the nodes */
1818		rw_unlock(&set_desc_rwlock[setno]);
1819
1820
1821		/* release the current class again */
1822		if (class != orig_class) {
1823			mutex_lock(&mdmn_busy_mutex[setno]);
1824			mdmn_mark_class_unbusy(setno, class);
1825			mutex_unlock(&mdmn_busy_mutex[setno]);
1826		}
1827
1828		/* are we supposed to quit entirely ? */
1829		if (break_msg_loop ||
1830		    (md_commd_global_state & MD_CGS_ABORTED)) {
1831			break; /* out of msg loop */
1832		}
1833
1834	} /* End of loop over the messages */
1835	/*
1836	 * If we are here, there's two possibilities:
1837	 * 	- we processed all messages on all nodes without an error.
1838	 *	    In this case we return the result from the master.
1839	 *	    (to be implemented: return the merged result)
1840	 *	- we encountered an error in which case result has been
1841	 *	    set accordingly already.
1842	 */
1843
1844	if (md_commd_global_state & MD_CGS_ABORTED) {
1845		result->mmr_comm_state = MDMNE_ABORT;
1846	}
1847
1848	/*
1849	 * This message has been processed completely.
1850	 * Remove it from the changelog.
1851	 * Do this for replay messages too.
1852	 * Note that the message is unlogged before waking up the
1853	 * initiator.  This is done for two reasons.
1854	 * 1. Remove a race condition that occurs when back to back
1855	 *   messages are sent for the same class, the registeration is
1856	 *   is lost.
1857	 * 2. If the initiator died but the action was completed on all the
1858	 *   the nodes, we want that to be marked "done" quickly.
1859	 */
1860
1861	if ((msg->msg_flags & MD_MSGF_NO_LOG) == 0) {
1862		commd_debug(MD_MMV_PROC_M,
1863		    "proc_mas: calling unlog_msg for (%d,0x%llx-%d) type %d\n",
1864		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1865		mdmn_unlog_msg(msg);
1866		commd_debug(MD_MMV_PROC_M,
1867		    "proc_mas: done unlog_msg for (%d,0x%llx-%d) type %d\n",
1868		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1869	}
1870
1871	/*
1872	 * In case of submessages, we increased the submessage ID in the
1873	 * result structure. We restore the message ID to the value that
1874	 * the initiator is waiting for.
1875	 */
1876	result->mmr_msgid.mid_smid	= 0;
1877	result->mmr_msgtype		= orig_type;
1878	result->mmr_sender		= set_master;
1879
1880	/* if we have an inited client, send result */
1881	ret = (int *)NULL;
1882
1883	rw_rdlock(&client_rwlock[setno]);
1884	if (check_client(setno, sender)) {
1885		commd_debug(MD_MMV_SYSLOG,
1886		    "proc_mas: unable to create client for initiator\n");
1887	} else {
1888		ret = mdmn_wakeup_initiator_2(result, client[setno][sender],
1889		    sender);
1890	}
1891	rw_unlock(&client_rwlock[setno]);
1892
1893	if (ret == (int *)NULL) {
1894		commd_debug(MD_MMV_PROC_M,
1895		    "proc_mas: couldn't wakeup initiator\n");
1896	} else {
1897		if (*ret != MDMNE_ACK) {
1898			commd_debug(MD_MMV_PROC_M,
1899			    "proc_mas: wakeup_initiator returned %d\n",
1900			    *ret);
1901		}
1902		free(ret);
1903	}
1904
1905	rw_unlock(&set_desc_rwlock[setno]);
1906	/* Free all submessages, if there were any */
1907	if (nmsgs > 1) {
1908		for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1909			free_msg(msglist[curmsg]);
1910		}
1911	}
1912	/* Free the result */
1913	free_result(result);
1914
1915	mutex_lock(&mdmn_busy_mutex[setno]);
1916	mdmn_mark_class_unbusy(setno, orig_class);
1917	mutex_unlock(&mdmn_busy_mutex[setno]);
1918
1919
1920	/*
1921	 * We use this ioctl just to get the time in the same format as used in
1922	 * the messageID. If it fails, all we get is a bad runtime output.
1923	 */
1924	(void) metaioctl(MD_IOCGUNIQMSGID, &dummyid, &mde, NULL);
1925	secdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) >> 32;
1926	usecdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) & 0xfffff;
1927
1928	/* catching possible overflow */
1929	if (usecdiff >= 1000000) {
1930		usecdiff -= 1000000;
1931		secdiff++;
1932	}
1933
1934
1935	commd_debug(MD_MMV_PROC_M, "proc_mas: done (%d, 0x%llx-%d) type=%02d "
1936	    "%5d.%06d secs runtime\n",
1937	    MSGID_ELEMS(msg->msg_msgid), orig_type, secdiff, usecdiff);
1938
1939	/* Free the original message */
1940	free_msg(msg);
1941}
1942
1943void
1944mdmn_slave_process_msg(md_mn_msg_t *msg)
1945{
1946	int			*ret = NULL;
1947	int			completed;
1948	int			retries;
1949	int			successfully_returned;
1950	set_t			setno;
1951	md_mn_result_t		*result;
1952	md_mn_nodeid_t		sender;
1953	md_mn_nodeid_t		whoami;
1954	md_mn_msgtype_t		msgtype;
1955	md_mn_msgclass_t	class;
1956
1957	void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
1958
1959	setno	= msg->msg_setno;
1960	sender	= msg->msg_sender; /* this is always the master of the set */
1961	msgtype	= msg->msg_type;
1962
1963	rw_rdlock(&set_desc_rwlock[setno]);
1964	whoami		= set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
1965	rw_unlock(&set_desc_rwlock[setno]);
1966
1967	result = Zalloc(sizeof (md_mn_result_t));
1968	result->mmr_flags	= msg->msg_flags;
1969	result->mmr_setno	= setno;
1970	result->mmr_msgtype	= msgtype;
1971	result->mmr_sender	= whoami;
1972	result->mmr_comm_state	= MDMNE_ACK; /* Ok state */
1973	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1974	class = mdmn_get_message_class(msgtype);
1975
1976	commd_debug(MD_MMV_PROC_S,
1977	    "proc_sla: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
1978	    MSGID_ELEMS(msg->msg_msgid), setno, class, msgtype);
1979
1980	handler = mdmn_get_handler(msgtype);
1981
1982	if (handler == NULL) {
1983		result->mmr_exitval = 0;
1984		/* let the sender decide if this is an error or not */
1985		result->mmr_comm_state = MDMNE_NO_HANDLER;
1986		commd_debug(MD_MMV_PROC_S,
1987		    "proc_sla: No handler for (%d, 0x%llx-%d)\n",
1988		    MSGID_ELEMS(msg->msg_msgid));
1989	} else {
1990
1991		/* Did we already process this message ? */
1992		mutex_lock(&mct_mutex[setno][class]);
1993		completed = mdmn_check_completion(msg, result);
1994
1995		if (completed == MDMN_MCT_NOT_DONE) {
1996			/* message not yet processed locally */
1997			commd_debug(MD_MMV_PROC_S,
1998			    "proc_sla: calling handler for (%d, 0x%llx-%d)\n",
1999			    MSGID_ELEMS(msg->msg_msgid));
2000
2001			/*
2002			 * Mark the message as being currently processed,
2003			 * so we won't start a second handler for it
2004			 */
2005			(void) mdmn_mark_completion(msg, NULL,
2006			    MDMN_MCT_IN_PROGRESS);
2007
2008			mutex_unlock(&mct_mutex[setno][class]);
2009			(*handler)(msg, MD_MSGF_ON_SLAVE, result);
2010
2011			commd_debug(MD_MMV_PROC_S,
2012			    "proc_sla: finished handler for (%d, 0x%llx-%d)\n",
2013			    MSGID_ELEMS(msg->msg_msgid));
2014
2015			mutex_lock(&mct_mutex[setno][class]);
2016			/* Mark the message as fully done, store the result */
2017			(void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
2018
2019		} else if (completed == MDMN_MCT_DONE) {
2020			/* message processed previously, got result from MCT */
2021			commd_debug(MD_MMV_PROC_S,
2022			    "proc_sla: result for (%d, 0x%llx-%d) from MCT\n",
2023			    MSGID_ELEMS(msg->msg_msgid));
2024		} else if (completed == MDMN_MCT_IN_PROGRESS) {
2025			/*
2026			 * If the message is curruntly being processed,
2027			 * we can return here, without sending a result back.
2028			 * This will be done by the initial message handling
2029			 * thread
2030			 */
2031			mutex_unlock(&mct_mutex[setno][class]);
2032			commd_debug(MD_MMV_PROC_M, "proc_sla: "
2033			    "(%d, 0x%llx-%d) is currently being processed\n",
2034			    MSGID_ELEMS(msg->msg_msgid), msgtype);
2035
2036			free_msg(msg);
2037			free_result(result);
2038			return;
2039		} else {
2040			/* MCT error occurred (should never happen) */
2041			result->mmr_comm_state = MDMNE_LOG_FAIL;
2042			commd_debug(MD_MMV_PROC_S,
2043			    "proc_sla: MCT error for (%d, 0x%llx-%d)\n",
2044			    MSGID_ELEMS(msg->msg_msgid));
2045		}
2046		mutex_unlock(&mct_mutex[setno][class]);
2047	}
2048
2049	/*
2050	 * At this point we have a result (even in an error case)
2051	 * that we return to the master.
2052	 */
2053	rw_rdlock(&set_desc_rwlock[setno]);
2054	retries = 2; /* we will try two times to send the results */
2055	successfully_returned = 0;
2056
2057	while (!successfully_returned && (retries != 0)) {
2058		ret = (int *)NULL;
2059		rw_rdlock(&client_rwlock[setno]);
2060		if (check_client(setno, sender)) {
2061			/*
2062			 * If we cannot setup the rpc connection to the master,
2063			 * we can't do anything besides logging this fact.
2064			 */
2065			commd_debug(MD_MMV_SYSLOG,
2066			    "proc_mas: unable to create client for master\n");
2067			rw_unlock(&client_rwlock[setno]);
2068			break;
2069		} else {
2070			ret = mdmn_wakeup_master_2(result,
2071			    client[setno][sender], sender);
2072			/*
2073			 * if mdmn_wakeup_master_2 returns NULL, it can be that
2074			 * the master (or the commd on the master) had died.
2075			 * In that case, we destroy the client to the master
2076			 * and retry.
2077			 * If mdmn_wakeup_master_2 doesn't return MDMNE_ACK,
2078			 * the commd on the master is alive but
2079			 * something else is wrong,
2080			 * in that case a retry doesn't make sense => break out
2081			 */
2082			if (ret == (int *)NULL) {
2083				commd_debug(MD_MMV_PROC_S,
2084				    "proc_sla: wakeup_master returned NULL\n");
2085				/* release reader lock, grab writer lock */
2086				rw_unlock(&client_rwlock[setno]);
2087				rw_wrlock(&client_rwlock[setno]);
2088				mdmn_clnt_destroy(client[setno][sender]);
2089				if (client[setno][sender] != (CLIENT *)NULL) {
2090					client[setno][sender] = (CLIENT *)NULL;
2091				}
2092				rw_unlock(&client_rwlock[setno]);
2093				retries--;
2094				commd_debug(MD_MMV_PROC_S,
2095				    "retries = %d\n", retries);
2096				continue;
2097			}
2098			if (*ret != MDMNE_ACK) {
2099				commd_debug(MD_MMV_PROC_S, "proc_sla: "
2100				    "wakeup_master returned %d\n", *ret);
2101				rw_unlock(&client_rwlock[setno]);
2102				break;
2103			} else { /* Good case */
2104				successfully_returned = 1;
2105				rw_unlock(&client_rwlock[setno]);
2106			}
2107		}
2108	}
2109
2110	rw_unlock(&set_desc_rwlock[setno]);
2111	commd_debug(MD_MMV_PROC_S, "proc_sla: done (%d, 0x%llx-%d)\n",
2112	    MSGID_ELEMS(msg->msg_msgid));
2113
2114	if (ret != (int *)NULL)
2115		free(ret);
2116	free_msg(msg);
2117	free_result(result);
2118}
2119
2120
2121/*
2122 * mdmn_send_svc_2:
2123 * ---------------
2124 * Check that the issuing node is a legitimate one (i.e. is licensed to send
2125 * messages to us), that the RPC request can be staged.
2126 *
2127 * Returns:
2128 *	0	=> no RPC request is in-flight, no deferred svc_sendreply()
2129 *	1	=> queued RPC request in-flight. Completion will be made (later)
2130 *		   by a wakeup_initiator_2() [hopefully]
2131 */
2132int
2133mdmn_send_svc_2(md_mn_msg_t *omsg, struct svc_req *rqstp)
2134{
2135	int			err;
2136	set_t			setno;
2137	SVCXPRT			*transp = rqstp->rq_xprt;
2138	md_mn_msg_t		*msg;
2139	md_mn_result_t		*resultp;
2140	md_mn_msgclass_t	class;
2141	md_mn_msg_and_transp_t	*matp;
2142
2143	msg = copy_msg(omsg, NULL);
2144	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2145
2146	setno = msg->msg_setno;
2147	class = mdmn_get_message_class(msg->msg_type);
2148
2149	/* If we are in the abort state, we error out immediately */
2150	if (md_commd_global_state & MD_CGS_ABORTED) {
2151		resultp = Zalloc(sizeof (md_mn_result_t));
2152		resultp->mmr_comm_state = MDMNE_ABORT;
2153		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2154		free_result(resultp);
2155		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2156		return (0);
2157	}
2158
2159	/* check if the global initialization is done */
2160	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2161		global_init();
2162	}
2163
2164	commd_debug(MD_MMV_SEND,
2165	    "send: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2166	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2167
2168	/* Check for verbosity related message */
2169	if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2170		md_mn_verbose_t *d;
2171
2172		d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2173		md_commd_global_verb = d->mmv_what;
2174		/* everytime the bitmask is set, we reset the timer */
2175		__savetime = gethrtime();
2176		/*
2177		 * If local-only-flag is set, we are done here,
2178		 * otherwise we pass that message on to the master.
2179		 */
2180		if (msg->msg_flags & MD_MSGF_LOCAL_ONLY) {
2181			resultp = Zalloc(sizeof (md_mn_result_t));
2182			resultp->mmr_comm_state = MDMNE_ACK;
2183			mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2184			    (char *)resultp);
2185			free_result(resultp);
2186			svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2187			return (0);
2188		}
2189	}
2190
2191	/*
2192	 * Are we entering the abort state?
2193	 * Here we don't even need to check for MD_MSGF_LOCAL_ONLY, because
2194	 * this message cannot be distributed anyway.
2195	 * So, it's safe to return immediately.
2196	 */
2197	if (msg->msg_type == MD_MN_MSG_ABORT) {
2198		md_commd_global_state |= MD_CGS_ABORTED;
2199		resultp = Zalloc(sizeof (md_mn_result_t));
2200		resultp->mmr_comm_state = MDMNE_ACK;
2201		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2202		free_result(resultp);
2203		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2204		return (0);
2205	}
2206
2207
2208	/*
2209	 * Is this message type blocked?
2210	 * If so we return MDMNE_CLASS_LOCKED, immediately
2211	 */
2212	if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2213		resultp = Zalloc(sizeof (md_mn_result_t));
2214		resultp->mmr_comm_state = MDMNE_CLASS_LOCKED;
2215		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2216		free_result(resultp);
2217		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2218		commd_debug(MD_MMV_SEND,
2219		    "send: type locked (%d, 0x%llx-%d), set=%d, class=%d, "
2220		    "type=%d\n", MSGID_ELEMS(msg->msg_msgid), setno, class,
2221		    msg->msg_type);
2222		return (0);
2223	}
2224
2225
2226	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2227		/* Can only use the appropriate mutexes if they are inited */
2228		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2229			rw_wrlock(&set_desc_rwlock[setno]);
2230			rw_wrlock(&client_rwlock[setno]);
2231			err = mdmn_init_set(setno, MDMN_SET_READY);
2232			rw_unlock(&client_rwlock[setno]);
2233			rw_unlock(&set_desc_rwlock[setno]);
2234		} else {
2235			err = mdmn_init_set(setno, MDMN_SET_READY);
2236		}
2237
2238		if (err) {
2239			/* couldn't initialize connections, cannot proceed */
2240			resultp = Zalloc(sizeof (md_mn_result_t));
2241			resultp->mmr_comm_state = err;
2242			mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2243			    (char *)resultp);
2244			svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2245			free_result(resultp);
2246			commd_debug(MD_MMV_SEND,
2247			    "send: init err = %d\n", err);
2248			return (0);
2249		}
2250	}
2251
2252	mutex_lock(&mdmn_busy_mutex[setno]);
2253	if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2254	    ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2255		mutex_unlock(&mdmn_busy_mutex[setno]);
2256		resultp = Zalloc(sizeof (md_mn_result_t));
2257		resultp->mmr_comm_state = MDMNE_SUSPENDED;
2258		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2259		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2260		free_result(resultp);
2261		commd_debug(MD_MMV_SEND,
2262		    "send: class suspended (%d, 0x%llx-%d), set=%d, "
2263		    "class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2264		    setno, class, msg->msg_type);
2265		return (0);
2266	}
2267	mutex_unlock(&mdmn_busy_mutex[setno]);
2268
2269	/* is this rpc request coming from the local node? */
2270	if (check_license(rqstp, 0) == FALSE) {
2271		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2272		commd_debug(MD_MMV_SEND,
2273		    "send: check licence fail(%d, 0x%llx-%d), set=%d, "
2274		    "class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2275		    setno, class, msg->msg_type);
2276		return (0);
2277	}
2278
2279
2280	/*
2281	 * We allocate a structure that can take two pointers in order to pass
2282	 * both the message and the transp into thread_create.
2283	 * The free for this alloc is done in mdmn_send_to_work()
2284	 */
2285	matp = Malloc(sizeof (md_mn_msg_and_transp_t));
2286	matp->mat_msg = msg;
2287	matp->mat_transp = transp;
2288
2289	/*
2290	 * create a thread here that calls work on the master.
2291	 * If we are already on the master, this would block if running
2292	 * in the same context. (our service is single threaded)(
2293	 * Make it a detached thread because it will not communicate with
2294	 * anybody thru thr_* mechanisms
2295	 */
2296	thr_create(NULL, 0, mdmn_send_to_work, (void *) matp, THR_DETACHED,
2297	    NULL);
2298
2299	commd_debug(MD_MMV_SEND, "send: done (%d, 0x%llx-%d)\n",
2300	    MSGID_ELEMS(msg->msg_msgid));
2301	/*
2302	 * We return here without sending results. This will be done by
2303	 * mdmn_wakeup_initiator_svc_2() as soon as the results are available.
2304	 * Until then the calling send_message will be blocked, while we
2305	 * are able to take calls.
2306	 */
2307
2308	return (1);
2309}
2310
2311/* ARGSUSED */
2312int *
2313mdmn_work_svc_2(md_mn_msg_t *omsg, struct svc_req *rqstp)
2314{
2315	int		err;
2316	set_t		setno;
2317	thread_t	tid;
2318	int		*retval;
2319	md_mn_msg_t	*msg;
2320	md_mn_msgclass_t class;
2321
2322	retval = Malloc(sizeof (int));
2323
2324	/* If we are in the abort state, we error out immediately */
2325	if (md_commd_global_state & MD_CGS_ABORTED) {
2326	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2327		*retval = MDMNE_ABORT;
2328		return (retval);
2329	}
2330
2331	msg = copy_msg(omsg, NULL);
2332	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2333
2334	/*
2335	 * Is this message type blocked?
2336	 * If so we return MDMNE_CLASS_LOCKED, immediately.
2337	 * This check is performed on master and slave.
2338	 */
2339	if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2340		*retval = MDMNE_CLASS_LOCKED;
2341		return (retval);
2342	}
2343
2344	/* check if the global initialization is done */
2345	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2346		global_init();
2347	}
2348
2349	class = mdmn_get_message_class(msg->msg_type);
2350	setno = msg->msg_setno;
2351
2352	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2353		/* Can only use the appropriate mutexes if they are inited */
2354		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2355			rw_wrlock(&set_desc_rwlock[setno]);
2356			rw_wrlock(&client_rwlock[setno]);
2357			err = mdmn_init_set(setno, MDMN_SET_READY);
2358			rw_unlock(&client_rwlock[setno]);
2359			rw_unlock(&set_desc_rwlock[setno]);
2360		} else {
2361			err = mdmn_init_set(setno, MDMN_SET_READY);
2362		}
2363
2364		if (err) {
2365			*retval = MDMNE_CANNOT_CONNECT;
2366			free_msg(msg);
2367			return (retval);
2368		}
2369	}
2370
2371	/* is this rpc request coming from a licensed node? */
2372	if (check_license(rqstp, msg->msg_sender) == FALSE) {
2373		free_msg(msg);
2374		*retval = MDMNE_RPC_FAIL;
2375		return (retval);
2376	}
2377
2378	commd_debug(MD_MMV_WORK,
2379	    "work: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
2380	    "flags=0x%x\n",
2381	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type,
2382	    msg->msg_flags);
2383
2384	/* Check for various CLASS0 message types */
2385	if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2386		md_mn_verbose_t *d;
2387
2388		d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2389		/* for now we ignore set / class in md_mn_verbose_t */
2390		md_commd_global_verb = d->mmv_what;
2391		/* everytime the bitmask is set, we reset the timer */
2392		__savetime = gethrtime();
2393	}
2394
2395	mutex_lock(&mdmn_busy_mutex[setno]);
2396
2397	/* check if class is locked via a call to mdmn_comm_lock_svc_2 */
2398	if (mdmn_is_class_locked(setno, class) == TRUE) {
2399		mutex_unlock(&mdmn_busy_mutex[setno]);
2400		*retval = MDMNE_CLASS_LOCKED;
2401		free_msg(msg);
2402		return (retval);
2403	}
2404	mutex_unlock(&mdmn_busy_mutex[setno]);
2405
2406	/* Check if the class is busy right now. Do it only on the master */
2407	rw_rdlock(&set_desc_rwlock[setno]);
2408	if (set_descriptor[setno]->sd_mn_am_i_master) {
2409		rw_unlock(&set_desc_rwlock[setno]);
2410		/*
2411		 * If the class is currently suspended, don't accept new
2412		 * messages, unless they are flagged with an override bit.
2413		 */
2414		mutex_lock(&mdmn_busy_mutex[setno]);
2415		if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2416		    ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2417			mutex_unlock(&mdmn_busy_mutex[setno]);
2418			*retval = MDMNE_SUSPENDED;
2419			commd_debug(MD_MMV_SEND,
2420			    "send: set %d is suspended\n", setno);
2421			free_msg(msg);
2422			return (retval);
2423		}
2424		if (mdmn_mark_class_busy(setno, class) == FALSE) {
2425			mutex_unlock(&mdmn_busy_mutex[setno]);
2426			*retval = MDMNE_CLASS_BUSY;
2427			free_msg(msg);
2428			return (retval);
2429		}
2430		mutex_unlock(&mdmn_busy_mutex[setno]);
2431		/*
2432		 * Because the real processing of the message takes time we
2433		 * create a thread for it. So the master thread can continue
2434		 * to run and accept further messages.
2435		 */
2436		*retval = thr_create(NULL, 0,
2437		    (void *(*)(void *))mdmn_master_process_msg, (void *)msg,
2438		    THR_DETACHED|THR_SUSPENDED, &tid);
2439	} else {
2440		rw_unlock(&set_desc_rwlock[setno]);
2441		*retval = thr_create(NULL, 0,
2442		    (void *(*)(void *)) mdmn_slave_process_msg, (void *)msg,
2443		    THR_DETACHED|THR_SUSPENDED, &tid);
2444	}
2445
2446	if (*retval != 0) {
2447		*retval = MDMNE_THR_CREATE_FAIL;
2448		free_msg(msg);
2449		return (retval);
2450	}
2451
2452	/* Now run the new thread */
2453	thr_continue(tid);
2454
2455	commd_debug(MD_MMV_WORK,
2456	    "work: done (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2457	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2458
2459	*retval = MDMNE_ACK; /* this means success */
2460	return (retval);
2461}
2462
2463/* ARGSUSED */
2464int *
2465mdmn_wakeup_initiator_svc_2(md_mn_result_t *res, struct svc_req *rqstp)
2466{
2467
2468	int		*retval;
2469	int		err;
2470	set_t		setno;
2471	mutex_t		*mx;   /* protection of initiator_table */
2472	SVCXPRT		*transp = NULL;
2473	md_mn_msgid_t	initiator_table_id;
2474	md_mn_msgclass_t class;
2475
2476	retval = Malloc(sizeof (int));
2477
2478	/* check if the global initialization is done */
2479	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2480		global_init();
2481	}
2482
2483	setno	= res->mmr_setno;
2484
2485	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2486		/* set not ready means we just crashed are restarted now */
2487		/* Can only use the appropriate mutexes if they are inited */
2488		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2489			rw_wrlock(&set_desc_rwlock[setno]);
2490			rw_wrlock(&client_rwlock[setno]);
2491			err = mdmn_init_set(setno, MDMN_SET_READY);
2492			rw_unlock(&client_rwlock[setno]);
2493			rw_unlock(&set_desc_rwlock[setno]);
2494		} else {
2495			err = mdmn_init_set(setno, MDMN_SET_READY);
2496		}
2497
2498		if (err) {
2499			*retval = MDMNE_CANNOT_CONNECT;
2500			xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2501			return (retval);
2502		}
2503	}
2504
2505	/* is this rpc request coming from a licensed node? */
2506	if (check_license(rqstp, res->mmr_sender) == FALSE) {
2507		xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2508		*retval = MDMNE_RPC_FAIL;
2509		return (retval);
2510	}
2511
2512
2513	class	= mdmn_get_message_class(res->mmr_msgtype);
2514	mx	= mdmn_get_initiator_table_mx(setno, class);
2515
2516	commd_debug(MD_MMV_WAKE_I,
2517	    "wake_ini: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
2518	    MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype);
2519
2520	mutex_lock(mx);
2521
2522	/*
2523	 * Search the initiator wakeup table.
2524	 * If we find an entry here (which should always be true)
2525	 * we are on the initiating node and we wakeup the original
2526	 * local rpc call.
2527	 */
2528	mdmn_get_initiator_table_id(setno, class, &initiator_table_id);
2529
2530	if (MSGID_CMP(&(initiator_table_id), &(res->mmr_msgid))) {
2531		transp = mdmn_get_initiator_table_transp(setno, class);
2532		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)res);
2533		svc_done(transp);
2534		mdmn_unregister_initiator_table(setno, class);
2535		*retval = MDMNE_ACK;
2536
2537		commd_debug(MD_MMV_WAKE_I,
2538		    "wake_ini: replied (%d, 0x%llx-%d)\n",
2539		    MSGID_ELEMS(res->mmr_msgid));
2540	} else {
2541		commd_debug(MD_MMV_WAKE_I,
2542		    "wakeup initiator: unsolicited message (%d, 0x%llx-%d)\n",
2543		    MSGID_ELEMS(res->mmr_msgid));
2544		*retval = MDMNE_NO_WAKEUP_ENTRY;
2545	}
2546	mutex_unlock(mx);
2547	/* less work for check_timeouts */
2548	mutex_lock(&check_timeout_mutex);
2549	if (messages_on_their_way == 0) {
2550		commd_debug(MD_MMV_WAKE_I,
2551		    "Oops, messages_on_their_way < 0 (%d, 0x%llx-%d)\n",
2552		    MSGID_ELEMS(res->mmr_msgid));
2553	} else {
2554		messages_on_their_way--;
2555	}
2556	mutex_unlock(&check_timeout_mutex);
2557	xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2558
2559	return (retval);
2560}
2561
2562
2563/*
2564 * res must be free'd by the thread we wake up
2565 */
2566/* ARGSUSED */
2567int *
2568mdmn_wakeup_master_svc_2(md_mn_result_t *ores, struct svc_req *rqstp)
2569{
2570
2571	int		*retval;
2572	int		err;
2573	set_t		setno;
2574	cond_t		*cv;
2575	mutex_t		*mx;
2576	md_mn_msgid_t	master_table_id;
2577	md_mn_nodeid_t	sender;
2578	md_mn_result_t	*res;
2579	md_mn_msgclass_t class;
2580
2581	retval = Malloc(sizeof (int));
2582
2583	/* check if the global initialization is done */
2584	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2585		global_init();
2586	}
2587
2588	/* Need to copy the results here, as they are static for RPC */
2589	res = copy_result(ores);
2590	xdr_free(xdr_md_mn_result_t, (caddr_t)ores);
2591
2592	class = mdmn_get_message_class(res->mmr_msgtype);
2593	setno = res->mmr_setno;
2594
2595	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2596		/* set not ready means we just crashed are restarted now */
2597		/* Can only use the appropriate mutexes if they are inited */
2598		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2599			rw_wrlock(&set_desc_rwlock[setno]);
2600			rw_wrlock(&client_rwlock[setno]);
2601			err = mdmn_init_set(setno, MDMN_SET_READY);
2602			rw_unlock(&client_rwlock[setno]);
2603			rw_unlock(&set_desc_rwlock[setno]);
2604		} else {
2605			err = mdmn_init_set(setno, MDMN_SET_READY);
2606		}
2607
2608		if (err) {
2609			*retval = MDMNE_CANNOT_CONNECT;
2610			xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2611			return (retval);
2612		}
2613	}
2614
2615	/* is this rpc request coming from a licensed node? */
2616	if (check_license(rqstp, res->mmr_sender) == FALSE) {
2617		*retval = MDMNE_RPC_FAIL;
2618		xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2619		return (retval);
2620	}
2621
2622
2623	commd_debug(MD_MMV_WAKE_M,
2624	    "wake_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d "
2625	    "from %d\n",
2626	    MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype,
2627	    res->mmr_sender);
2628	/*
2629	 * The mutex and cv are needed for waking up the thread
2630	 * sleeping in mdmn_master_process_msg()
2631	 */
2632	mx = mdmn_get_master_table_mx(setno, class);
2633	cv = mdmn_get_master_table_cv(setno, class);
2634
2635	/*
2636	 * lookup the master wakeup table
2637	 * If we find our message, we are on the master and
2638	 * called by a slave that finished processing a message.
2639	 * We store the results in the appropriate slot and
2640	 * wakeup the thread (mdmn_master_process_msg()) waiting for them.
2641	 */
2642	mutex_lock(mx);
2643	mdmn_get_master_table_id(setno, class, &master_table_id);
2644	sender = mdmn_get_master_table_addr(setno, class);
2645
2646	if (MSGID_CMP(&(master_table_id), &(res->mmr_msgid))) {
2647		if (sender == res->mmr_sender) {
2648			mdmn_set_master_table_res(setno, class, res);
2649			cond_signal(cv);
2650			*retval = MDMNE_ACK;
2651		} else {
2652			/* id is correct but wrong sender (I smell a timeout) */
2653			commd_debug(MD_MMV_WAKE_M,
2654			    "wakeup master got unsolicited message: "
2655			    "(%d, 0x%llx-%d) from %d\n",
2656			    MSGID_ELEMS(res->mmr_msgid), res->mmr_sender);
2657			free_result(res);
2658			*retval = MDMNE_TIMEOUT;
2659		}
2660	} else {
2661		/* id is wrong, smells like a very late timeout */
2662		commd_debug(MD_MMV_WAKE_M,
2663		    "wakeup master got unsolicited message: "
2664		    "(%d, 0x%llx-%d) from %d, expected (%d, 0x%llx-%d)\n",
2665		    MSGID_ELEMS(res->mmr_msgid), res->mmr_sender,
2666		    MSGID_ELEMS(master_table_id));
2667		free_result(res);
2668		*retval = MDMNE_NO_WAKEUP_ENTRY;
2669	}
2670
2671	mutex_unlock(mx);
2672
2673	return (retval);
2674}
2675
2676/*
2677 * Lock a set/class combination.
2678 * This is mainly done for debug purpose.
2679 * This set/class combination immediately is blocked,
2680 * even in the middle of sending messages to multiple slaves.
2681 * This remains until the user issues a mdmn_comm_unlock_svc_2 for the same
2682 * set/class combination.
2683 *
2684 * Special messages of class MD_MSG_CLASS0 can never be locked.
2685 * 	e.g. MD_MN_MSG_VERBOSITY, MD_MN_MSG_ABORT
2686 *
2687 * That means, if MD_MSG_CLASS0 is specified, we lock all classes from
2688 * >= MD_MSG_CLASS1 to < MD_MN_NCLASSES
2689 *
2690 * set must be between 1 and MD_MAXSETS
2691 * class can be:
2692 *	MD_MSG_CLASS0 which means all other classes in this case
2693 *	or one specific class (< MD_MN_NCLASSES)
2694 *
2695 * Returns:
2696 *	MDMNE_ACK on sucess (locking a locked class is Ok)
2697 *	MDMNE_EINVAL if a parameter is out of range
2698 */
2699
2700/* ARGSUSED */
2701int *
2702mdmn_comm_lock_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2703{
2704	int			*retval;
2705	set_t			setno = msc->msc_set;
2706	md_mn_msgclass_t	class = msc->msc_class;
2707
2708	retval = Malloc(sizeof (int));
2709
2710	/* check if the global initialization is done */
2711	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2712		global_init();
2713	}
2714
2715	/* is this rpc request coming from the local node ? */
2716	if (check_license(rqstp, 0) == FALSE) {
2717		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2718		*retval = MDMNE_RPC_FAIL;
2719		return (retval);
2720	}
2721
2722	/* Perform some range checking */
2723	if ((setno == 0) || (setno >= MD_MAXSETS) ||
2724	    (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2725		*retval = MDMNE_EINVAL;
2726		return (retval);
2727	}
2728
2729	commd_debug(MD_MMV_MISC, "lock: set=%d, class=%d\n", setno, class);
2730	mutex_lock(&mdmn_busy_mutex[setno]);
2731	if (class != MD_MSG_CLASS0) {
2732		mdmn_mark_class_locked(setno, class);
2733	} else {
2734		/* MD_MSG_CLASS0 is used as a wild card for all classes */
2735		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2736			mdmn_mark_class_locked(setno, class);
2737		}
2738	}
2739	mutex_unlock(&mdmn_busy_mutex[setno]);
2740
2741	*retval = MDMNE_ACK;
2742	return (retval);
2743}
2744
2745/*
2746 * Unlock a set/class combination.
2747 * set must be between 1 and MD_MAXSETS
2748 * class can be:
2749 *	MD_MSG_CLASS0 which means all other classes in this case (like above)
2750 *	or one specific class (< MD_MN_NCLASSES)
2751 *
2752 * Returns:
2753 *	MDMNE_ACK on sucess (unlocking an unlocked class is Ok)
2754 *	MDMNE_EINVAL if a parameter is out of range
2755 */
2756/* ARGSUSED */
2757int *
2758mdmn_comm_unlock_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2759{
2760	int			*retval;
2761	set_t			setno  = msc->msc_set;
2762	md_mn_msgclass_t	class  = msc->msc_class;
2763
2764	retval = Malloc(sizeof (int));
2765
2766	/* check if the global initialization is done */
2767	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2768		global_init();
2769	}
2770
2771	/* is this rpc request coming from the local node ? */
2772	if (check_license(rqstp, 0) == FALSE) {
2773		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2774		*retval = MDMNE_RPC_FAIL;
2775		return (retval);
2776	}
2777
2778	/* Perform some range checking */
2779	if ((setno == 0) || (setno >= MD_MAXSETS) ||
2780	    (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2781		*retval = MDMNE_EINVAL;
2782		return (retval);
2783	}
2784	commd_debug(MD_MMV_MISC, "unlock: set=%d, class=%d\n", setno, class);
2785
2786	mutex_lock(&mdmn_busy_mutex[setno]);
2787	if (class != MD_MSG_CLASS0) {
2788		mdmn_mark_class_unlocked(setno, class);
2789	} else {
2790		/* MD_MSG_CLASS0 is used as a wild card for all classes */
2791		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2792			mdmn_mark_class_unlocked(setno, class);
2793		}
2794	}
2795	mutex_unlock(&mdmn_busy_mutex[setno]);
2796
2797	*retval = MDMNE_ACK;
2798	return (retval);
2799}
2800
2801/*
2802 * mdmn_comm_suspend_svc_2(setno, class)
2803 *
2804 * Drain all outstanding messages for a given set/class combination
2805 * and don't allow new messages to be processed.
2806 *
2807 * Special messages of class MD_MSG_CLASS0 can never be locked.
2808 * 	e.g. MD_MN_MSG_VERBOSITY
2809 *
2810 * 1 <= setno < MD_MAXSETS	or setno == MD_COMM_ALL_SETS
2811 * 1 <= class < MD_MN_NCLASSES	or class == MD_COMM_ALL_CLASSES
2812 *
2813 * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
2814 * one class as being suspended.
2815 * If messages for this class are currently on their way,
2816 * MDMNE_SET_NOT_DRAINED is returned. Otherwise MDMNE_ACK is returned.
2817 *
2818 * If class _is_ MD_COMM_ALL_CLASSES we drain all classes of this set.
2819 * Messages must be generated in ascending order.
2820 * This means, a message cannot create submessages with the same or lower class.
2821 * Draining messages must go from 1 to NCLASSES in order to ensure we don't
2822 * generate a hanging situation here.
2823 * We mark class 1 as being suspended.
2824 * if the class is not busy, we proceed with class 2
2825 * and so on
2826 * if a class *is* busy, we cannot continue here, but return
2827 * MDMNE_SET_NOT_DRAINED.
2828 * We expect the caller to hold on for some seconds and try again.
2829 * When that message, that held the class busy is done in
2830 * mdmn_master_process_msg(), mdmn_mark_class_unbusy() called.
2831 * There it is checked if the class is about to drain.
2832 * In that case it tries to drain all higher classes there.
2833 *
2834 * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
2835 * In that case we return MDMNE_SET_NOT_DRAINED if not all sets are
2836 * completely drained.
2837 *
2838 * Returns:
2839 *	MDMNE_ACK on sucess (set is drained, no outstanding messages)
2840 *	MDMNE_SET_NOT_DRAINED  if drain process is started, but there are
2841 *		still outstanding messages for this set(s)
2842 *	MDMNE_EINVAL if setno is out of range
2843 *	MDMNE_NOT_JOINED if the set is not yet initialized on this node
2844 */
2845
2846/* ARGSUSED */
2847int *
2848mdmn_comm_suspend_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2849{
2850	int			*retval;
2851	int			failure = 0;
2852	set_t			startset, endset;
2853	set_t			setno  = msc->msc_set;
2854	md_mn_msgclass_t	oclass = msc->msc_class;
2855#ifdef NOT_YET_NEEDED
2856	uint_t			flags  = msc->msc_flags;
2857#endif /* NOT_YET_NEEDED */
2858	md_mn_msgclass_t	class;
2859
2860	retval = Malloc(sizeof (int));
2861
2862	/* check if the global initialization is done */
2863	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2864		global_init();
2865	}
2866
2867	/* is this rpc request coming from the local node ? */
2868	if (check_license(rqstp, 0) == FALSE) {
2869		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2870		*retval = MDMNE_RPC_FAIL;
2871		return (retval);
2872	}
2873
2874	commd_debug(MD_MMV_MISC, "suspend: called for set=%d class=%d\n",
2875	    setno, oclass);
2876
2877	/* Perform some range checking */
2878	if (setno >= MD_MAXSETS) {
2879		*retval = MDMNE_EINVAL;
2880		commd_debug(MD_MMV_MISC, "suspend: returning MDMNE_EINVAL\n");
2881		return (retval);
2882	}
2883
2884	/*  setno == MD_COMM_ALL_SETS means: we walk thru all possible sets. */
2885	if (setno == MD_COMM_ALL_SETS) {
2886		startset = 1;
2887		endset = MD_MAXSETS - 1;
2888	} else {
2889		startset = setno;
2890		endset = setno;
2891	}
2892
2893	for (setno = startset; setno <= endset; setno++) {
2894		/* Here we need the mutexes for the set to be setup */
2895		if (md_mn_set_inited[setno] != MDMN_SET_MUTEXES) {
2896			(void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
2897		}
2898
2899		mutex_lock(&mdmn_busy_mutex[setno]);
2900		/* shall we drain all classes of this set? */
2901		if (oclass == MD_COMM_ALL_CLASSES) {
2902			for (class = 1; class < MD_MN_NCLASSES; class ++) {
2903				commd_debug(MD_MMV_MISC,
2904				    "suspend: suspending set %d, class %d\n",
2905				    setno, class);
2906				*retval = mdmn_mark_class_suspended(setno,
2907				    class, MDMN_SUSPEND_ALL);
2908				if (*retval == MDMNE_SET_NOT_DRAINED) {
2909					failure++;
2910				}
2911			}
2912		} else {
2913			/* only drain one specific class */
2914			commd_debug(MD_MMV_MISC,
2915			    "suspend: suspending set=%d class=%d\n",
2916			    setno, oclass);
2917			*retval = mdmn_mark_class_suspended(setno, oclass,
2918			    MDMN_SUSPEND_1);
2919			if (*retval == MDMNE_SET_NOT_DRAINED) {
2920				failure++;
2921			}
2922		}
2923		mutex_unlock(&mdmn_busy_mutex[setno]);
2924	}
2925	/* If one or more sets are not entirely drained, failure is non-zero */
2926	if (failure != 0) {
2927		*retval = MDMNE_SET_NOT_DRAINED;
2928		commd_debug(MD_MMV_MISC,
2929		    "suspend: returning MDMNE_SET_NOT_DRAINED\n");
2930	} else {
2931		*retval = MDMNE_ACK;
2932	}
2933
2934	return (retval);
2935}
2936
2937/*
2938 * mdmn_comm_resume_svc_2(setno, class)
2939 *
2940 * Resume processing messages for a given set.
2941 * This incorporates the repeal of a previous suspend operation.
2942 *
2943 * 1 <= setno < MD_MAXSETS	or setno == MD_COMM_ALL_SETS
2944 * 1 <= class < MD_MN_NCLASSES	or class == MD_COMM_ALL_CLASSES
2945 *
2946 * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
2947 * one class as being resumed.
2948 *
2949 * If class _is_ MD_COMM_ALL_CLASSES we resume all classes of this set.
2950 *
2951 * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
2952 *
2953 * If both setno is MD_COMM_ALL_SETS and class is MD_COMM_ALL_CLASSES we also
2954 * reset any ABORT flag from the global state.
2955 *
2956 * Returns:
2957 *	MDMNE_ACK on sucess (resuming an unlocked set is Ok)
2958 *	MDMNE_EINVAL if setno is out of range
2959 *	MDMNE_NOT_JOINED if the set is not yet initialized on this node
2960 */
2961/* ARGSUSED */
2962int *
2963mdmn_comm_resume_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2964{
2965	int			*retval;
2966	set_t			startset, endset;
2967	set_t			setno  = msc->msc_set;
2968	md_mn_msgclass_t	oclass = msc->msc_class;
2969	uint_t			flags  = msc->msc_flags;
2970	md_mn_msgclass_t	class;
2971
2972	retval = Malloc(sizeof (int));
2973
2974	/* check if the global initialization is done */
2975	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2976		global_init();
2977	}
2978
2979	/* is this rpc request coming from the local node ? */
2980	if (check_license(rqstp, 0) == FALSE) {
2981		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2982		*retval = MDMNE_RPC_FAIL;
2983		return (retval);
2984	}
2985
2986	commd_debug(MD_MMV_MISC, "resume: called for set=%d class=%d\n",
2987	    setno, oclass);
2988
2989	/* Perform some range checking */
2990	if (setno > MD_MAXSETS) {
2991		*retval = MDMNE_EINVAL;
2992		return (retval);
2993	}
2994
2995	if (setno == MD_COMM_ALL_SETS) {
2996		startset = 1;
2997		endset = MD_MAXSETS - 1;
2998		if (oclass == MD_COMM_ALL_CLASSES) {
2999			/* This is the point where we "unabort" the commd */
3000			commd_debug(MD_MMV_MISC, "resume: resetting ABORT\n");
3001			md_commd_global_state &= ~MD_CGS_ABORTED;
3002		}
3003	} else {
3004		startset = setno;
3005		endset = setno;
3006	}
3007
3008	for (setno = startset; setno <= endset; setno++) {
3009
3010		/* Here we need the mutexes for the set to be setup */
3011		if ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0) {
3012			(void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
3013		}
3014
3015		mutex_lock(&mdmn_busy_mutex[setno]);
3016
3017		if (oclass == MD_COMM_ALL_CLASSES) {
3018			int end_class = 1;
3019			/*
3020			 * When SUSPENDing all classes, we go
3021			 * from 1 to MD_MN_NCLASSES-1
3022			 * The correct reverse action is RESUMing
3023			 * from MD_MN_NCLASSES-1 to 1 (or 2)
3024			 */
3025
3026			if (flags & MD_MSCF_DONT_RESUME_CLASS1) {
3027				end_class = 2;
3028			}
3029
3030			/*
3031			 * Then mark all classes of this set as no longer
3032			 * suspended. This supersedes any previous suspend(1)
3033			 * calls and resumes the set entirely.
3034			 */
3035			for (class = MD_MN_NCLASSES - 1; class >= end_class;
3036			    class --) {
3037				commd_debug(MD_MMV_MISC,
3038				    "resume: resuming set=%d class=%d\n",
3039				    setno, class);
3040				mdmn_mark_class_resumed(setno, class,
3041				    (MDMN_SUSPEND_ALL | MDMN_SUSPEND_1));
3042			}
3043		} else {
3044			/*
3045			 * In this case only one class is marked as not
3046			 * suspended. If a suspend(all) is currently active for
3047			 * this set, this class will still be suspended.
3048			 * That state will be cleared by a suspend(all)
3049			 * (see above)
3050			 */
3051			commd_debug(MD_MMV_MISC,
3052			    "resume: resuming set=%d class=%d\n",
3053			    setno, oclass);
3054			mdmn_mark_class_resumed(setno, oclass, MDMN_SUSPEND_1);
3055		}
3056
3057		mutex_unlock(&mdmn_busy_mutex[setno]);
3058	}
3059
3060	*retval = MDMNE_ACK;
3061	return (retval);
3062}
3063/* ARGSUSED */
3064int *
3065mdmn_comm_reinit_set_svc_2(set_t *setnop, struct svc_req *rqstp)
3066{
3067	int		*retval;
3068	md_mnnode_desc	*node;
3069	set_t		 setno = *setnop;
3070
3071	retval = Malloc(sizeof (int));
3072
3073	/* check if the global initialization is done */
3074	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3075		global_init();
3076	}
3077
3078	/* is this rpc request coming from the local node ? */
3079	if (check_license(rqstp, 0) == FALSE) {
3080		xdr_free(xdr_set_t, (caddr_t)setnop);
3081		*retval = MDMNE_RPC_FAIL;
3082		return (retval);
3083	}
3084
3085	commd_debug(MD_MMV_MISC, "reinit: set=%d\n", setno);
3086
3087	rw_rdlock(&set_desc_rwlock[setno]);
3088	/*
3089	 * We assume, that all messages have been suspended previously.
3090	 *
3091	 * As we are modifying lots of clients here we grab the client_rwlock
3092	 * in writer mode. This ensures, no new messages come in.
3093	 */
3094	rw_wrlock(&client_rwlock[setno]);
3095	/* This set is no longer initialized */
3096
3097	if ((set_descriptor[setno] != NULL) &&
3098	    (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
3099		/* destroy all rpc clients from this set */
3100		for (node = set_descriptor[setno]->sd_nodelist; node;
3101		    node = node->nd_next) {
3102			mdmn_clnt_destroy(client[setno][node->nd_nodeid]);
3103			if (client[setno][node->nd_nodeid] != (CLIENT *)NULL) {
3104				client[setno][node->nd_nodeid] = (CLIENT *)NULL;
3105			}
3106		}
3107	md_mn_set_inited[setno] &= ~MDMN_SET_NODES;
3108	}
3109
3110	commd_debug(MD_MMV_MISC, "reinit: done init_set(%d)\n", setno);
3111
3112	rw_unlock(&client_rwlock[setno]);
3113	rw_unlock(&set_desc_rwlock[setno]);
3114	*retval = MDMNE_ACK;
3115	return (retval);
3116}
3117
3118/*
3119 * This is just an interface for testing purpose.
3120 * Here we can disable single message types.
3121 * If we block a message type, this is valid for all MN sets.
3122 * If a message arrives later, and  it's message type is blocked, it will
3123 * be returned immediately with MDMNE_CLASS_LOCKED, which causes the sender to
3124 * resend this message over and over again.
3125 */
3126
3127/* ARGSUSED */
3128int *
3129mdmn_comm_msglock_svc_2(md_mn_type_and_lock_t *mmtl, struct svc_req *rqstp)
3130{
3131	int			*retval;
3132	md_mn_msgtype_t		type = mmtl->mmtl_type;
3133	uint_t			lock = mmtl->mmtl_lock;
3134
3135	retval = Malloc(sizeof (int));
3136
3137	/* check if the global initialization is done */
3138	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3139		global_init();
3140	}
3141
3142	/* is this rpc request coming from the local node ? */
3143	if (check_license(rqstp, 0) == FALSE) {
3144		xdr_free(xdr_md_mn_type_and_lock_t, (caddr_t)mmtl);
3145		*retval = MDMNE_RPC_FAIL;
3146		return (retval);
3147	}
3148
3149	/* Perform some range checking */
3150	if ((type == 0) || (type >= MD_MN_NMESSAGES)) {
3151		*retval = MDMNE_EINVAL;
3152		return (retval);
3153	}
3154
3155	commd_debug(MD_MMV_MISC, "msglock: type=%d, lock=%d\n", type, lock);
3156	msgtype_lock_state[type] = lock;
3157
3158	*retval = MDMNE_ACK;
3159	return (retval);
3160}
3161