rf_dagffwr.c revision 1.12
1/*	$NetBSD: rf_dagffwr.c,v 1.12 2003/12/29 03:33:47 oster Exp $	*/
2/*
3 * Copyright (c) 1995 Carnegie-Mellon University.
4 * All rights reserved.
5 *
6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II
7 *
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
13 *
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 *
18 * Carnegie Mellon requests users of this software to return to
19 *
20 *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU
21 *  School of Computer Science
22 *  Carnegie Mellon University
23 *  Pittsburgh PA 15213-3890
24 *
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
27 */
28
29/*
30 * rf_dagff.c
31 *
32 * code for creating fault-free DAGs
33 *
34 */
35
36#include <sys/cdefs.h>
37__KERNEL_RCSID(0, "$NetBSD: rf_dagffwr.c,v 1.12 2003/12/29 03:33:47 oster Exp $");
38
39#include <dev/raidframe/raidframevar.h>
40
41#include "rf_raid.h"
42#include "rf_dag.h"
43#include "rf_dagutils.h"
44#include "rf_dagfuncs.h"
45#include "rf_debugMem.h"
46#include "rf_dagffrd.h"
47#include "rf_general.h"
48#include "rf_dagffwr.h"
49
50/******************************************************************************
51 *
52 * General comments on DAG creation:
53 *
54 * All DAGs in this file use roll-away error recovery.  Each DAG has a single
55 * commit node, usually called "Cmt."  If an error occurs before the Cmt node
56 * is reached, the execution engine will halt forward execution and work
57 * backward through the graph, executing the undo functions.  Assuming that
58 * each node in the graph prior to the Cmt node are undoable and atomic - or -
59 * does not make changes to permanent state, the graph will fail atomically.
60 * If an error occurs after the Cmt node executes, the engine will roll-forward
61 * through the graph, blindly executing nodes until it reaches the end.
62 * If a graph reaches the end, it is assumed to have completed successfully.
63 *
64 * A graph has only 1 Cmt node.
65 *
66 */
67
68
69/******************************************************************************
70 *
71 * The following wrappers map the standard DAG creation interface to the
72 * DAG creation routines.  Additionally, these wrappers enable experimentation
73 * with new DAG structures by providing an extra level of indirection, allowing
74 * the DAG creation routines to be replaced at this single point.
75 */
76
77
78void
79rf_CreateNonRedundantWriteDAG(
80    RF_Raid_t * raidPtr,
81    RF_AccessStripeMap_t * asmap,
82    RF_DagHeader_t * dag_h,
83    void *bp,
84    RF_RaidAccessFlags_t flags,
85    RF_AllocListElem_t * allocList,
86    RF_IoType_t type)
87{
88	rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
89	    RF_IO_TYPE_WRITE);
90}
91
92void
93rf_CreateRAID0WriteDAG(
94    RF_Raid_t * raidPtr,
95    RF_AccessStripeMap_t * asmap,
96    RF_DagHeader_t * dag_h,
97    void *bp,
98    RF_RaidAccessFlags_t flags,
99    RF_AllocListElem_t * allocList,
100    RF_IoType_t type)
101{
102	rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
103	    RF_IO_TYPE_WRITE);
104}
105
106void
107rf_CreateSmallWriteDAG(
108    RF_Raid_t * raidPtr,
109    RF_AccessStripeMap_t * asmap,
110    RF_DagHeader_t * dag_h,
111    void *bp,
112    RF_RaidAccessFlags_t flags,
113    RF_AllocListElem_t * allocList)
114{
115	/* "normal" rollaway */
116	rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
117	    &rf_xorFuncs, NULL);
118}
119
120void
121rf_CreateLargeWriteDAG(
122    RF_Raid_t * raidPtr,
123    RF_AccessStripeMap_t * asmap,
124    RF_DagHeader_t * dag_h,
125    void *bp,
126    RF_RaidAccessFlags_t flags,
127    RF_AllocListElem_t * allocList)
128{
129	/* "normal" rollaway */
130	rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
131	    1, rf_RegularXorFunc, RF_TRUE);
132}
133
134
135/******************************************************************************
136 *
137 * DAG creation code begins here
138 */
139
140
141/******************************************************************************
142 *
143 * creates a DAG to perform a large-write operation:
144 *
145 *           / Rod \           / Wnd \
146 * H -- block- Rod - Xor - Cmt - Wnd --- T
147 *           \ Rod /          \  Wnp /
148 *                             \[Wnq]/
149 *
150 * The XOR node also does the Q calculation in the P+Q architecture.
151 * All nodes are before the commit node (Cmt) are assumed to be atomic and
152 * undoable - or - they make no changes to permanent state.
153 *
154 * Rod = read old data
155 * Cmt = commit node
156 * Wnp = write new parity
157 * Wnd = write new data
158 * Wnq = write new "q"
159 * [] denotes optional segments in the graph
160 *
161 * Parameters:  raidPtr   - description of the physical array
162 *              asmap     - logical & physical addresses for this access
163 *              bp        - buffer ptr (holds write data)
164 *              flags     - general flags (e.g. disk locking)
165 *              allocList - list of memory allocated in DAG creation
166 *              nfaults   - number of faults array can tolerate
167 *                          (equal to # redundancy units in stripe)
168 *              redfuncs  - list of redundancy generating functions
169 *
170 *****************************************************************************/
171
172void
173rf_CommonCreateLargeWriteDAG(
174    RF_Raid_t * raidPtr,
175    RF_AccessStripeMap_t * asmap,
176    RF_DagHeader_t * dag_h,
177    void *bp,
178    RF_RaidAccessFlags_t flags,
179    RF_AllocListElem_t * allocList,
180    int nfaults,
181    int (*redFunc) (RF_DagNode_t *),
182    int allowBufferRecycle)
183{
184	RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
185	RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
186	int     nWndNodes, nRodNodes, i, nodeNum, asmNum;
187	RF_AccessStripeMapHeader_t *new_asm_h[2];
188	RF_StripeNum_t parityStripeID;
189	char   *sosBuffer, *eosBuffer;
190	RF_ReconUnitNum_t which_ru;
191	RF_RaidLayout_t *layoutPtr;
192	RF_PhysDiskAddr_t *pda;
193
194	layoutPtr = &(raidPtr->Layout);
195	parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, asmap->raidAddress,
196	    &which_ru);
197
198	if (rf_dagDebug) {
199		printf("[Creating large-write DAG]\n");
200	}
201	dag_h->creator = "LargeWriteDAG";
202
203	dag_h->numCommitNodes = 1;
204	dag_h->numCommits = 0;
205	dag_h->numSuccedents = 1;
206
207	/* alloc the nodes: Wnd, xor, commit, block, term, and  Wnp */
208	nWndNodes = asmap->numStripeUnitsAccessed;
209	RF_MallocAndAdd(nodes,
210			(nWndNodes + 4 + nfaults) * sizeof(RF_DagNode_t),
211			(RF_DagNode_t *), allocList);
212	i = 0;
213	wndNodes = &nodes[i];
214	i += nWndNodes;
215	xorNode = &nodes[i];
216	i += 1;
217	wnpNode = &nodes[i];
218	i += 1;
219	blockNode = &nodes[i];
220	i += 1;
221	commitNode = &nodes[i];
222	i += 1;
223	termNode = &nodes[i];
224	i += 1;
225	if (nfaults == 2) {
226		wnqNode = &nodes[i];
227		i += 1;
228	} else {
229		wnqNode = NULL;
230	}
231	rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h,
232	    &nRodNodes, &sosBuffer, &eosBuffer, allocList);
233	if (nRodNodes > 0) {
234		RF_MallocAndAdd(rodNodes, nRodNodes * sizeof(RF_DagNode_t),
235				(RF_DagNode_t *), allocList);
236	} else {
237		rodNodes = NULL;
238	}
239
240	/* begin node initialization */
241	if (nRodNodes > 0) {
242		rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
243		    NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList);
244	} else {
245		rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
246		    NULL, 1, 0, 0, 0, dag_h, "Nil", allocList);
247	}
248
249	rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL,
250	    nWndNodes + nfaults, 1, 0, 0, dag_h, "Cmt", allocList);
251	rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL,
252	    0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList);
253
254	/* initialize the Rod nodes */
255	for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
256		if (new_asm_h[asmNum]) {
257			pda = new_asm_h[asmNum]->stripeMap->physInfo;
258			while (pda) {
259				rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc,
260				    rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
261				    "Rod", allocList);
262				rodNodes[nodeNum].params[0].p = pda;
263				rodNodes[nodeNum].params[1].p = pda->bufPtr;
264				rodNodes[nodeNum].params[2].v = parityStripeID;
265				rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
266				    0, 0, which_ru);
267				nodeNum++;
268				pda = pda->next;
269			}
270		}
271	}
272	RF_ASSERT(nodeNum == nRodNodes);
273
274	/* initialize the wnd nodes */
275	pda = asmap->physInfo;
276	for (i = 0; i < nWndNodes; i++) {
277		rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
278		    rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList);
279		RF_ASSERT(pda != NULL);
280		wndNodes[i].params[0].p = pda;
281		wndNodes[i].params[1].p = pda->bufPtr;
282		wndNodes[i].params[2].v = parityStripeID;
283		wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
284		pda = pda->next;
285	}
286
287	/* initialize the redundancy node */
288	if (nRodNodes > 0) {
289		rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
290		    nRodNodes, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
291		    "Xr ", allocList);
292	} else {
293		rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1,
294		    1, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList);
295	}
296	xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
297	for (i = 0; i < nWndNodes; i++) {
298		xorNode->params[2 * i + 0] = wndNodes[i].params[0];	/* pda */
299		xorNode->params[2 * i + 1] = wndNodes[i].params[1];	/* buf ptr */
300	}
301	for (i = 0; i < nRodNodes; i++) {
302		xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0];	/* pda */
303		xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1];	/* buf ptr */
304	}
305	/* xor node needs to get at RAID information */
306	xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
307
308	/*
309         * Look for an Rod node that reads a complete SU. If none, alloc a buffer
310         * to receive the parity info. Note that we can't use a new data buffer
311         * because it will not have gotten written when the xor occurs.
312         */
313	if (allowBufferRecycle) {
314		for (i = 0; i < nRodNodes; i++) {
315			if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit)
316				break;
317		}
318	}
319	if ((!allowBufferRecycle) || (i == nRodNodes)) {
320		RF_MallocAndAdd(xorNode->results[0],
321				rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
322				(void *), allocList);
323	} else {
324		xorNode->results[0] = rodNodes[i].params[1].p;
325	}
326
327	/* initialize the Wnp node */
328	rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
329	    rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList);
330	wnpNode->params[0].p = asmap->parityInfo;
331	wnpNode->params[1].p = xorNode->results[0];
332	wnpNode->params[2].v = parityStripeID;
333	wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
334	/* parityInfo must describe entire parity unit */
335	RF_ASSERT(asmap->parityInfo->next == NULL);
336
337	if (nfaults == 2) {
338		/*
339	         * We never try to recycle a buffer for the Q calcuation
340	         * in addition to the parity. This would cause two buffers
341	         * to get smashed during the P and Q calculation, guaranteeing
342	         * one would be wrong.
343	         */
344		RF_MallocAndAdd(xorNode->results[1],
345				rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit),
346				(void *), allocList);
347		rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
348		    rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList);
349		wnqNode->params[0].p = asmap->qInfo;
350		wnqNode->params[1].p = xorNode->results[1];
351		wnqNode->params[2].v = parityStripeID;
352		wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
353		/* parityInfo must describe entire parity unit */
354		RF_ASSERT(asmap->parityInfo->next == NULL);
355	}
356	/*
357         * Connect nodes to form graph.
358         */
359
360	/* connect dag header to block node */
361	RF_ASSERT(blockNode->numAntecedents == 0);
362	dag_h->succedents[0] = blockNode;
363
364	if (nRodNodes > 0) {
365		/* connect the block node to the Rod nodes */
366		RF_ASSERT(blockNode->numSuccedents == nRodNodes);
367		RF_ASSERT(xorNode->numAntecedents == nRodNodes);
368		for (i = 0; i < nRodNodes; i++) {
369			RF_ASSERT(rodNodes[i].numAntecedents == 1);
370			blockNode->succedents[i] = &rodNodes[i];
371			rodNodes[i].antecedents[0] = blockNode;
372			rodNodes[i].antType[0] = rf_control;
373
374			/* connect the Rod nodes to the Xor node */
375			RF_ASSERT(rodNodes[i].numSuccedents == 1);
376			rodNodes[i].succedents[0] = xorNode;
377			xorNode->antecedents[i] = &rodNodes[i];
378			xorNode->antType[i] = rf_trueData;
379		}
380	} else {
381		/* connect the block node to the Xor node */
382		RF_ASSERT(blockNode->numSuccedents == 1);
383		RF_ASSERT(xorNode->numAntecedents == 1);
384		blockNode->succedents[0] = xorNode;
385		xorNode->antecedents[0] = blockNode;
386		xorNode->antType[0] = rf_control;
387	}
388
389	/* connect the xor node to the commit node */
390	RF_ASSERT(xorNode->numSuccedents == 1);
391	RF_ASSERT(commitNode->numAntecedents == 1);
392	xorNode->succedents[0] = commitNode;
393	commitNode->antecedents[0] = xorNode;
394	commitNode->antType[0] = rf_control;
395
396	/* connect the commit node to the write nodes */
397	RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
398	for (i = 0; i < nWndNodes; i++) {
399		RF_ASSERT(wndNodes->numAntecedents == 1);
400		commitNode->succedents[i] = &wndNodes[i];
401		wndNodes[i].antecedents[0] = commitNode;
402		wndNodes[i].antType[0] = rf_control;
403	}
404	RF_ASSERT(wnpNode->numAntecedents == 1);
405	commitNode->succedents[nWndNodes] = wnpNode;
406	wnpNode->antecedents[0] = commitNode;
407	wnpNode->antType[0] = rf_trueData;
408	if (nfaults == 2) {
409		RF_ASSERT(wnqNode->numAntecedents == 1);
410		commitNode->succedents[nWndNodes + 1] = wnqNode;
411		wnqNode->antecedents[0] = commitNode;
412		wnqNode->antType[0] = rf_trueData;
413	}
414	/* connect the write nodes to the term node */
415	RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
416	RF_ASSERT(termNode->numSuccedents == 0);
417	for (i = 0; i < nWndNodes; i++) {
418		RF_ASSERT(wndNodes->numSuccedents == 1);
419		wndNodes[i].succedents[0] = termNode;
420		termNode->antecedents[i] = &wndNodes[i];
421		termNode->antType[i] = rf_control;
422	}
423	RF_ASSERT(wnpNode->numSuccedents == 1);
424	wnpNode->succedents[0] = termNode;
425	termNode->antecedents[nWndNodes] = wnpNode;
426	termNode->antType[nWndNodes] = rf_control;
427	if (nfaults == 2) {
428		RF_ASSERT(wnqNode->numSuccedents == 1);
429		wnqNode->succedents[0] = termNode;
430		termNode->antecedents[nWndNodes + 1] = wnqNode;
431		termNode->antType[nWndNodes + 1] = rf_control;
432	}
433}
434/******************************************************************************
435 *
436 * creates a DAG to perform a small-write operation (either raid 5 or pq),
437 * which is as follows:
438 *
439 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm
440 *            \- Rod X      /     \----> Wnd [Und]-/
441 *           [\- Rod X     /       \---> Wnd [Und]-/]
442 *           [\- Roq -> Q /         \--> Wnq [Unq]-/]
443 *
444 * Rop = read old parity
445 * Rod = read old data
446 * Roq = read old "q"
447 * Cmt = commit node
448 * Und = unlock data disk
449 * Unp = unlock parity disk
450 * Unq = unlock q disk
451 * Wnp = write new parity
452 * Wnd = write new data
453 * Wnq = write new "q"
454 * [ ] denotes optional segments in the graph
455 *
456 * Parameters:  raidPtr   - description of the physical array
457 *              asmap     - logical & physical addresses for this access
458 *              bp        - buffer ptr (holds write data)
459 *              flags     - general flags (e.g. disk locking)
460 *              allocList - list of memory allocated in DAG creation
461 *              pfuncs    - list of parity generating functions
462 *              qfuncs    - list of q generating functions
463 *
464 * A null qfuncs indicates single fault tolerant
465 *****************************************************************************/
466
467void
468rf_CommonCreateSmallWriteDAG(
469    RF_Raid_t * raidPtr,
470    RF_AccessStripeMap_t * asmap,
471    RF_DagHeader_t * dag_h,
472    void *bp,
473    RF_RaidAccessFlags_t flags,
474    RF_AllocListElem_t * allocList,
475    const RF_RedFuncs_t * pfuncs,
476    const RF_RedFuncs_t * qfuncs)
477{
478	RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
479	RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
480	RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
481	RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
482	int     i, j, nNodes, totalNumNodes, lu_flag;
483	RF_ReconUnitNum_t which_ru;
484	int     (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *);
485	int     (*qfunc) (RF_DagNode_t *);
486	int     numDataNodes, numParityNodes;
487	RF_StripeNum_t parityStripeID;
488	RF_PhysDiskAddr_t *pda;
489	char   *name, *qname;
490	long    nfaults;
491
492	nfaults = qfuncs ? 2 : 1;
493	lu_flag = (rf_enableAtomicRMW) ? 1 : 0;	/* lock/unlock flag */
494
495	parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
496	    asmap->raidAddress, &which_ru);
497	pda = asmap->physInfo;
498	numDataNodes = asmap->numStripeUnitsAccessed;
499	numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
500
501	if (rf_dagDebug) {
502		printf("[Creating small-write DAG]\n");
503	}
504	RF_ASSERT(numDataNodes > 0);
505	dag_h->creator = "SmallWriteDAG";
506
507	dag_h->numCommitNodes = 1;
508	dag_h->numCommits = 0;
509	dag_h->numSuccedents = 1;
510
511	/*
512         * DAG creation occurs in four steps:
513         * 1. count the number of nodes in the DAG
514         * 2. create the nodes
515         * 3. initialize the nodes
516         * 4. connect the nodes
517         */
518
519	/*
520         * Step 1. compute number of nodes in the graph
521         */
522
523	/* number of nodes: a read and write for each data unit a redundancy
524	 * computation node for each parity node (nfaults * nparity) a read
525	 * and write for each parity unit a block and commit node (2) a
526	 * terminate node if atomic RMW an unlock node for each data unit,
527	 * redundancy unit */
528	totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
529	    + (nfaults * 2 * numParityNodes) + 3;
530	if (lu_flag) {
531		totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
532	}
533	/*
534         * Step 2. create the nodes
535         */
536	RF_MallocAndAdd(nodes, totalNumNodes * sizeof(RF_DagNode_t),
537			(RF_DagNode_t *), allocList);
538	i = 0;
539	blockNode = &nodes[i];
540	i += 1;
541	commitNode = &nodes[i];
542	i += 1;
543	readDataNodes = &nodes[i];
544	i += numDataNodes;
545	readParityNodes = &nodes[i];
546	i += numParityNodes;
547	writeDataNodes = &nodes[i];
548	i += numDataNodes;
549	writeParityNodes = &nodes[i];
550	i += numParityNodes;
551	xorNodes = &nodes[i];
552	i += numParityNodes;
553	termNode = &nodes[i];
554	i += 1;
555	if (lu_flag) {
556		unlockDataNodes = &nodes[i];
557		i += numDataNodes;
558		unlockParityNodes = &nodes[i];
559		i += numParityNodes;
560	} else {
561		unlockDataNodes = unlockParityNodes = NULL;
562	}
563	if (nfaults == 2) {
564		readQNodes = &nodes[i];
565		i += numParityNodes;
566		writeQNodes = &nodes[i];
567		i += numParityNodes;
568		qNodes = &nodes[i];
569		i += numParityNodes;
570		if (lu_flag) {
571			unlockQNodes = &nodes[i];
572			i += numParityNodes;
573		} else {
574			unlockQNodes = NULL;
575		}
576	} else {
577		readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
578	}
579	RF_ASSERT(i == totalNumNodes);
580
581	/*
582         * Step 3. initialize the nodes
583         */
584	/* initialize block node (Nil) */
585	nNodes = numDataNodes + (nfaults * numParityNodes);
586	rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
587	    NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList);
588
589	/* initialize commit node (Cmt) */
590	rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
591	    NULL, nNodes, (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList);
592
593	/* initialize terminate node (Trm) */
594	rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
595	    NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList);
596
597	/* initialize nodes which read old data (Rod) */
598	for (i = 0; i < numDataNodes; i++) {
599		rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
600		    rf_GenericWakeupFunc, (nfaults * numParityNodes), 1, 4, 0, dag_h,
601		    "Rod", allocList);
602		RF_ASSERT(pda != NULL);
603		/* physical disk addr desc */
604		readDataNodes[i].params[0].p = pda;
605		/* buffer to hold old data */
606		readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
607		    dag_h, pda, allocList);
608		readDataNodes[i].params[2].v = parityStripeID;
609		readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
610		    lu_flag, 0, which_ru);
611		pda = pda->next;
612		for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
613			readDataNodes[i].propList[j] = NULL;
614		}
615	}
616
617	/* initialize nodes which read old parity (Rop) */
618	pda = asmap->parityInfo;
619	i = 0;
620	for (i = 0; i < numParityNodes; i++) {
621		RF_ASSERT(pda != NULL);
622		rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc,
623		    rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4,
624		    0, dag_h, "Rop", allocList);
625		readParityNodes[i].params[0].p = pda;
626		/* buffer to hold old parity */
627		readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
628		    dag_h, pda, allocList);
629		readParityNodes[i].params[2].v = parityStripeID;
630		readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
631		    lu_flag, 0, which_ru);
632		pda = pda->next;
633		for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
634			readParityNodes[i].propList[0] = NULL;
635		}
636	}
637
638	/* initialize nodes which read old Q (Roq) */
639	if (nfaults == 2) {
640		pda = asmap->qInfo;
641		for (i = 0; i < numParityNodes; i++) {
642			RF_ASSERT(pda != NULL);
643			rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc,
644			    rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList);
645			readQNodes[i].params[0].p = pda;
646			/* buffer to hold old Q */
647			readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda,
648			    allocList);
649			readQNodes[i].params[2].v = parityStripeID;
650			readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
651			    lu_flag, 0, which_ru);
652			pda = pda->next;
653			for (j = 0; j < readQNodes[i].numSuccedents; j++) {
654				readQNodes[i].propList[0] = NULL;
655			}
656		}
657	}
658	/* initialize nodes which write new data (Wnd) */
659	pda = asmap->physInfo;
660	for (i = 0; i < numDataNodes; i++) {
661		RF_ASSERT(pda != NULL);
662		rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
663		    rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
664		    "Wnd", allocList);
665		/* physical disk addr desc */
666		writeDataNodes[i].params[0].p = pda;
667		/* buffer holding new data to be written */
668		writeDataNodes[i].params[1].p = pda->bufPtr;
669		writeDataNodes[i].params[2].v = parityStripeID;
670		writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
671		    0, 0, which_ru);
672		if (lu_flag) {
673			/* initialize node to unlock the disk queue */
674			rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
675			    rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
676			    "Und", allocList);
677			/* physical disk addr desc */
678			unlockDataNodes[i].params[0].p = pda;
679			unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
680			    0, lu_flag, which_ru);
681		}
682		pda = pda->next;
683	}
684
685	/*
686         * Initialize nodes which compute new parity and Q.
687         */
688	/*
689         * We use the simple XOR func in the double-XOR case, and when
690         * we're accessing only a portion of one stripe unit. The distinction
691         * between the two is that the regular XOR func assumes that the targbuf
692         * is a full SU in size, and examines the pda associated with the buffer
693         * to decide where within the buffer to XOR the data, whereas
694         * the simple XOR func just XORs the data into the start of the buffer.
695         */
696	if ((numParityNodes == 2) || ((numDataNodes == 1)
697		&& (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) {
698		func = pfuncs->simple;
699		undoFunc = rf_NullNodeUndoFunc;
700		name = pfuncs->SimpleName;
701		if (qfuncs) {
702			qfunc = qfuncs->simple;
703			qname = qfuncs->SimpleName;
704		} else {
705			qfunc = NULL;
706			qname = NULL;
707		}
708	} else {
709		func = pfuncs->regular;
710		undoFunc = rf_NullNodeUndoFunc;
711		name = pfuncs->RegularName;
712		if (qfuncs) {
713			qfunc = qfuncs->regular;
714			qname = qfuncs->RegularName;
715		} else {
716			qfunc = NULL;
717			qname = NULL;
718		}
719	}
720	/*
721         * Initialize the xor nodes: params are {pda,buf}
722         * from {Rod,Wnd,Rop} nodes, and raidPtr
723         */
724	if (numParityNodes == 2) {
725		/* double-xor case */
726		for (i = 0; i < numParityNodes; i++) {
727			/* note: no wakeup func for xor */
728			rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL,
729			    1, (numDataNodes + numParityNodes), 7, 1, dag_h, name, allocList);
730			xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
731			xorNodes[i].params[0] = readDataNodes[i].params[0];
732			xorNodes[i].params[1] = readDataNodes[i].params[1];
733			xorNodes[i].params[2] = readParityNodes[i].params[0];
734			xorNodes[i].params[3] = readParityNodes[i].params[1];
735			xorNodes[i].params[4] = writeDataNodes[i].params[0];
736			xorNodes[i].params[5] = writeDataNodes[i].params[1];
737			xorNodes[i].params[6].p = raidPtr;
738			/* use old parity buf as target buf */
739			xorNodes[i].results[0] = readParityNodes[i].params[1].p;
740			if (nfaults == 2) {
741				/* note: no wakeup func for qor */
742				rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
743				    (numDataNodes + numParityNodes), 7, 1, dag_h, qname, allocList);
744				qNodes[i].params[0] = readDataNodes[i].params[0];
745				qNodes[i].params[1] = readDataNodes[i].params[1];
746				qNodes[i].params[2] = readQNodes[i].params[0];
747				qNodes[i].params[3] = readQNodes[i].params[1];
748				qNodes[i].params[4] = writeDataNodes[i].params[0];
749				qNodes[i].params[5] = writeDataNodes[i].params[1];
750				qNodes[i].params[6].p = raidPtr;
751				/* use old Q buf as target buf */
752				qNodes[i].results[0] = readQNodes[i].params[1].p;
753			}
754		}
755	} else {
756		/* there is only one xor node in this case */
757		rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, 1,
758		    (numDataNodes + numParityNodes),
759		    (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList);
760		xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
761		for (i = 0; i < numDataNodes + 1; i++) {
762			/* set up params related to Rod and Rop nodes */
763			xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0];	/* pda */
764			xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1];	/* buffer ptr */
765		}
766		for (i = 0; i < numDataNodes; i++) {
767			/* set up params related to Wnd and Wnp nodes */
768			xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] =	/* pda */
769			    writeDataNodes[i].params[0];
770			xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] =	/* buffer ptr */
771			    writeDataNodes[i].params[1];
772		}
773		/* xor node needs to get at RAID information */
774		xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
775		xorNodes[0].results[0] = readParityNodes[0].params[1].p;
776		if (nfaults == 2) {
777			rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1,
778			    (numDataNodes + numParityNodes),
779			    (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h,
780			    qname, allocList);
781			for (i = 0; i < numDataNodes; i++) {
782				/* set up params related to Rod */
783				qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0];	/* pda */
784				qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1];	/* buffer ptr */
785			}
786			/* and read old q */
787			qNodes[0].params[2 * numDataNodes + 0] =	/* pda */
788			    readQNodes[0].params[0];
789			qNodes[0].params[2 * numDataNodes + 1] =	/* buffer ptr */
790			    readQNodes[0].params[1];
791			for (i = 0; i < numDataNodes; i++) {
792				/* set up params related to Wnd nodes */
793				qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] =	/* pda */
794				    writeDataNodes[i].params[0];
795				qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] =	/* buffer ptr */
796				    writeDataNodes[i].params[1];
797			}
798			/* xor node needs to get at RAID information */
799			qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
800			qNodes[0].results[0] = readQNodes[0].params[1].p;
801		}
802	}
803
804	/* initialize nodes which write new parity (Wnp) */
805	pda = asmap->parityInfo;
806	for (i = 0; i < numParityNodes; i++) {
807		rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
808		    rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
809		    "Wnp", allocList);
810		RF_ASSERT(pda != NULL);
811		writeParityNodes[i].params[0].p = pda;	/* param 1 (bufPtr)
812							 * filled in by xor node */
813		writeParityNodes[i].params[1].p = xorNodes[i].results[0];	/* buffer pointer for
814										 * parity write
815										 * operation */
816		writeParityNodes[i].params[2].v = parityStripeID;
817		writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
818		    0, 0, which_ru);
819		if (lu_flag) {
820			/* initialize node to unlock the disk queue */
821			rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
822			    rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
823			    "Unp", allocList);
824			unlockParityNodes[i].params[0].p = pda;	/* physical disk addr
825								 * desc */
826			unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
827			    0, lu_flag, which_ru);
828		}
829		pda = pda->next;
830	}
831
832	/* initialize nodes which write new Q (Wnq) */
833	if (nfaults == 2) {
834		pda = asmap->qInfo;
835		for (i = 0; i < numParityNodes; i++) {
836			rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
837			    rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
838			    "Wnq", allocList);
839			RF_ASSERT(pda != NULL);
840			writeQNodes[i].params[0].p = pda;	/* param 1 (bufPtr)
841								 * filled in by xor node */
842			writeQNodes[i].params[1].p = qNodes[i].results[0];	/* buffer pointer for
843										 * parity write
844										 * operation */
845			writeQNodes[i].params[2].v = parityStripeID;
846			writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
847			    0, 0, which_ru);
848			if (lu_flag) {
849				/* initialize node to unlock the disk queue */
850				rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc,
851				    rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
852				    "Unq", allocList);
853				unlockQNodes[i].params[0].p = pda;	/* physical disk addr
854									 * desc */
855				unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
856				    0, lu_flag, which_ru);
857			}
858			pda = pda->next;
859		}
860	}
861	/*
862         * Step 4. connect the nodes.
863         */
864
865	/* connect header to block node */
866	dag_h->succedents[0] = blockNode;
867
868	/* connect block node to read old data nodes */
869	RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults)));
870	for (i = 0; i < numDataNodes; i++) {
871		blockNode->succedents[i] = &readDataNodes[i];
872		RF_ASSERT(readDataNodes[i].numAntecedents == 1);
873		readDataNodes[i].antecedents[0] = blockNode;
874		readDataNodes[i].antType[0] = rf_control;
875	}
876
877	/* connect block node to read old parity nodes */
878	for (i = 0; i < numParityNodes; i++) {
879		blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
880		RF_ASSERT(readParityNodes[i].numAntecedents == 1);
881		readParityNodes[i].antecedents[0] = blockNode;
882		readParityNodes[i].antType[0] = rf_control;
883	}
884
885	/* connect block node to read old Q nodes */
886	if (nfaults == 2) {
887		for (i = 0; i < numParityNodes; i++) {
888			blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i];
889			RF_ASSERT(readQNodes[i].numAntecedents == 1);
890			readQNodes[i].antecedents[0] = blockNode;
891			readQNodes[i].antType[0] = rf_control;
892		}
893	}
894	/* connect read old data nodes to xor nodes */
895	for (i = 0; i < numDataNodes; i++) {
896		RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes));
897		for (j = 0; j < numParityNodes; j++) {
898			RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes);
899			readDataNodes[i].succedents[j] = &xorNodes[j];
900			xorNodes[j].antecedents[i] = &readDataNodes[i];
901			xorNodes[j].antType[i] = rf_trueData;
902		}
903	}
904
905	/* connect read old data nodes to q nodes */
906	if (nfaults == 2) {
907		for (i = 0; i < numDataNodes; i++) {
908			for (j = 0; j < numParityNodes; j++) {
909				RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes);
910				readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j];
911				qNodes[j].antecedents[i] = &readDataNodes[i];
912				qNodes[j].antType[i] = rf_trueData;
913			}
914		}
915	}
916	/* connect read old parity nodes to xor nodes */
917	for (i = 0; i < numParityNodes; i++) {
918		RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
919		for (j = 0; j < numParityNodes; j++) {
920			readParityNodes[i].succedents[j] = &xorNodes[j];
921			xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i];
922			xorNodes[j].antType[numDataNodes + i] = rf_trueData;
923		}
924	}
925
926	/* connect read old q nodes to q nodes */
927	if (nfaults == 2) {
928		for (i = 0; i < numParityNodes; i++) {
929			RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
930			for (j = 0; j < numParityNodes; j++) {
931				readQNodes[i].succedents[j] = &qNodes[j];
932				qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i];
933				qNodes[j].antType[numDataNodes + i] = rf_trueData;
934			}
935		}
936	}
937	/* connect xor nodes to commit node */
938	RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
939	for (i = 0; i < numParityNodes; i++) {
940		RF_ASSERT(xorNodes[i].numSuccedents == 1);
941		xorNodes[i].succedents[0] = commitNode;
942		commitNode->antecedents[i] = &xorNodes[i];
943		commitNode->antType[i] = rf_control;
944	}
945
946	/* connect q nodes to commit node */
947	if (nfaults == 2) {
948		for (i = 0; i < numParityNodes; i++) {
949			RF_ASSERT(qNodes[i].numSuccedents == 1);
950			qNodes[i].succedents[0] = commitNode;
951			commitNode->antecedents[i + numParityNodes] = &qNodes[i];
952			commitNode->antType[i + numParityNodes] = rf_control;
953		}
954	}
955	/* connect commit node to write nodes */
956	RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes)));
957	for (i = 0; i < numDataNodes; i++) {
958		RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
959		commitNode->succedents[i] = &writeDataNodes[i];
960		writeDataNodes[i].antecedents[0] = commitNode;
961		writeDataNodes[i].antType[0] = rf_trueData;
962	}
963	for (i = 0; i < numParityNodes; i++) {
964		RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
965		commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
966		writeParityNodes[i].antecedents[0] = commitNode;
967		writeParityNodes[i].antType[0] = rf_trueData;
968	}
969	if (nfaults == 2) {
970		for (i = 0; i < numParityNodes; i++) {
971			RF_ASSERT(writeQNodes[i].numAntecedents == 1);
972			commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i];
973			writeQNodes[i].antecedents[0] = commitNode;
974			writeQNodes[i].antType[0] = rf_trueData;
975		}
976	}
977	RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
978	RF_ASSERT(termNode->numSuccedents == 0);
979	for (i = 0; i < numDataNodes; i++) {
980		if (lu_flag) {
981			/* connect write new data nodes to unlock nodes */
982			RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
983			RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
984			writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
985			unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
986			unlockDataNodes[i].antType[0] = rf_control;
987
988			/* connect unlock nodes to term node */
989			RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
990			unlockDataNodes[i].succedents[0] = termNode;
991			termNode->antecedents[i] = &unlockDataNodes[i];
992			termNode->antType[i] = rf_control;
993		} else {
994			/* connect write new data nodes to term node */
995			RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
996			RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes)));
997			writeDataNodes[i].succedents[0] = termNode;
998			termNode->antecedents[i] = &writeDataNodes[i];
999			termNode->antType[i] = rf_control;
1000		}
1001	}
1002
1003	for (i = 0; i < numParityNodes; i++) {
1004		if (lu_flag) {
1005			/* connect write new parity nodes to unlock nodes */
1006			RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1007			RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1008			writeParityNodes[i].succedents[0] = &unlockParityNodes[i];
1009			unlockParityNodes[i].antecedents[0] = &writeParityNodes[i];
1010			unlockParityNodes[i].antType[0] = rf_control;
1011
1012			/* connect unlock nodes to term node */
1013			RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1014			unlockParityNodes[i].succedents[0] = termNode;
1015			termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i];
1016			termNode->antType[numDataNodes + i] = rf_control;
1017		} else {
1018			RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1019			writeParityNodes[i].succedents[0] = termNode;
1020			termNode->antecedents[numDataNodes + i] = &writeParityNodes[i];
1021			termNode->antType[numDataNodes + i] = rf_control;
1022		}
1023	}
1024
1025	if (nfaults == 2) {
1026		for (i = 0; i < numParityNodes; i++) {
1027			if (lu_flag) {
1028				/* connect write new Q nodes to unlock nodes */
1029				RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1030				RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1031				writeQNodes[i].succedents[0] = &unlockQNodes[i];
1032				unlockQNodes[i].antecedents[0] = &writeQNodes[i];
1033				unlockQNodes[i].antType[0] = rf_control;
1034
1035				/* connect unlock nodes to unblock node */
1036				RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1037				unlockQNodes[i].succedents[0] = termNode;
1038				termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i];
1039				termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1040			} else {
1041				RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1042				writeQNodes[i].succedents[0] = termNode;
1043				termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i];
1044				termNode->antType[numDataNodes + numParityNodes + i] = rf_control;
1045			}
1046		}
1047	}
1048}
1049
1050
1051/******************************************************************************
1052 * create a write graph (fault-free or degraded) for RAID level 1
1053 *
1054 * Hdr -> Commit -> Wpd -> Nil -> Trm
1055 *               -> Wsd ->
1056 *
1057 * The "Wpd" node writes data to the primary copy in the mirror pair
1058 * The "Wsd" node writes data to the secondary copy in the mirror pair
1059 *
1060 * Parameters:  raidPtr   - description of the physical array
1061 *              asmap     - logical & physical addresses for this access
1062 *              bp        - buffer ptr (holds write data)
1063 *              flags     - general flags (e.g. disk locking)
1064 *              allocList - list of memory allocated in DAG creation
1065 *****************************************************************************/
1066
1067void
1068rf_CreateRaidOneWriteDAG(
1069    RF_Raid_t * raidPtr,
1070    RF_AccessStripeMap_t * asmap,
1071    RF_DagHeader_t * dag_h,
1072    void *bp,
1073    RF_RaidAccessFlags_t flags,
1074    RF_AllocListElem_t * allocList)
1075{
1076	RF_DagNode_t *unblockNode, *termNode, *commitNode;
1077	RF_DagNode_t *nodes, *wndNode, *wmirNode;
1078	int     nWndNodes, nWmirNodes, i;
1079	RF_ReconUnitNum_t which_ru;
1080	RF_PhysDiskAddr_t *pda, *pdaP;
1081	RF_StripeNum_t parityStripeID;
1082
1083	parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1084	    asmap->raidAddress, &which_ru);
1085	if (rf_dagDebug) {
1086		printf("[Creating RAID level 1 write DAG]\n");
1087	}
1088	dag_h->creator = "RaidOneWriteDAG";
1089
1090	/* 2 implies access not SU aligned */
1091	nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
1092	nWndNodes = (asmap->physInfo->next) ? 2 : 1;
1093
1094	/* alloc the Wnd nodes and the Wmir node */
1095	if (asmap->numDataFailed == 1)
1096		nWndNodes--;
1097	if (asmap->numParityFailed == 1)
1098		nWmirNodes--;
1099
1100	/* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock
1101	 * + terminator) */
1102	RF_MallocAndAdd(nodes,
1103			(nWndNodes + nWmirNodes + 3) * sizeof(RF_DagNode_t),
1104			(RF_DagNode_t *), allocList);
1105	i = 0;
1106	wndNode = &nodes[i];
1107	i += nWndNodes;
1108	wmirNode = &nodes[i];
1109	i += nWmirNodes;
1110	commitNode = &nodes[i];
1111	i += 1;
1112	unblockNode = &nodes[i];
1113	i += 1;
1114	termNode = &nodes[i];
1115	i += 1;
1116	RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
1117
1118	/* this dag can commit immediately */
1119	dag_h->numCommitNodes = 1;
1120	dag_h->numCommits = 0;
1121	dag_h->numSuccedents = 1;
1122
1123	/* initialize the commit, unblock, and term nodes */
1124	rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
1125	    NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Cmt", allocList);
1126	rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc,
1127	    NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList);
1128	rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc,
1129	    NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
1130
1131	/* initialize the wnd nodes */
1132	if (nWndNodes > 0) {
1133		pda = asmap->physInfo;
1134		for (i = 0; i < nWndNodes; i++) {
1135			rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1136			    rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList);
1137			RF_ASSERT(pda != NULL);
1138			wndNode[i].params[0].p = pda;
1139			wndNode[i].params[1].p = pda->bufPtr;
1140			wndNode[i].params[2].v = parityStripeID;
1141			wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1142			pda = pda->next;
1143		}
1144		RF_ASSERT(pda == NULL);
1145	}
1146	/* initialize the mirror nodes */
1147	if (nWmirNodes > 0) {
1148		pda = asmap->physInfo;
1149		pdaP = asmap->parityInfo;
1150		for (i = 0; i < nWmirNodes; i++) {
1151			rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1152			    rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList);
1153			RF_ASSERT(pda != NULL);
1154			wmirNode[i].params[0].p = pdaP;
1155			wmirNode[i].params[1].p = pda->bufPtr;
1156			wmirNode[i].params[2].v = parityStripeID;
1157			wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1158			pda = pda->next;
1159			pdaP = pdaP->next;
1160		}
1161		RF_ASSERT(pda == NULL);
1162		RF_ASSERT(pdaP == NULL);
1163	}
1164	/* link the header node to the commit node */
1165	RF_ASSERT(dag_h->numSuccedents == 1);
1166	RF_ASSERT(commitNode->numAntecedents == 0);
1167	dag_h->succedents[0] = commitNode;
1168
1169	/* link the commit node to the write nodes */
1170	RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
1171	for (i = 0; i < nWndNodes; i++) {
1172		RF_ASSERT(wndNode[i].numAntecedents == 1);
1173		commitNode->succedents[i] = &wndNode[i];
1174		wndNode[i].antecedents[0] = commitNode;
1175		wndNode[i].antType[0] = rf_control;
1176	}
1177	for (i = 0; i < nWmirNodes; i++) {
1178		RF_ASSERT(wmirNode[i].numAntecedents == 1);
1179		commitNode->succedents[i + nWndNodes] = &wmirNode[i];
1180		wmirNode[i].antecedents[0] = commitNode;
1181		wmirNode[i].antType[0] = rf_control;
1182	}
1183
1184	/* link the write nodes to the unblock node */
1185	RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
1186	for (i = 0; i < nWndNodes; i++) {
1187		RF_ASSERT(wndNode[i].numSuccedents == 1);
1188		wndNode[i].succedents[0] = unblockNode;
1189		unblockNode->antecedents[i] = &wndNode[i];
1190		unblockNode->antType[i] = rf_control;
1191	}
1192	for (i = 0; i < nWmirNodes; i++) {
1193		RF_ASSERT(wmirNode[i].numSuccedents == 1);
1194		wmirNode[i].succedents[0] = unblockNode;
1195		unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
1196		unblockNode->antType[i + nWndNodes] = rf_control;
1197	}
1198
1199	/* link the unblock node to the term node */
1200	RF_ASSERT(unblockNode->numSuccedents == 1);
1201	RF_ASSERT(termNode->numAntecedents == 1);
1202	RF_ASSERT(termNode->numSuccedents == 0);
1203	unblockNode->succedents[0] = termNode;
1204	termNode->antecedents[0] = unblockNode;
1205	termNode->antType[0] = rf_control;
1206}
1207