rf_dagffwr.c revision 1.16
1/* $NetBSD: rf_dagffwr.c,v 1.16 2004/01/09 23:35:59 oster Exp $ */ 2/* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29/* 30 * rf_dagff.c 31 * 32 * code for creating fault-free DAGs 33 * 34 */ 35 36#include <sys/cdefs.h> 37__KERNEL_RCSID(0, "$NetBSD: rf_dagffwr.c,v 1.16 2004/01/09 23:35:59 oster Exp $"); 38 39#include <dev/raidframe/raidframevar.h> 40 41#include "rf_raid.h" 42#include "rf_dag.h" 43#include "rf_dagutils.h" 44#include "rf_dagfuncs.h" 45#include "rf_debugMem.h" 46#include "rf_dagffrd.h" 47#include "rf_general.h" 48#include "rf_dagffwr.h" 49 50/****************************************************************************** 51 * 52 * General comments on DAG creation: 53 * 54 * All DAGs in this file use roll-away error recovery. Each DAG has a single 55 * commit node, usually called "Cmt." If an error occurs before the Cmt node 56 * is reached, the execution engine will halt forward execution and work 57 * backward through the graph, executing the undo functions. Assuming that 58 * each node in the graph prior to the Cmt node are undoable and atomic - or - 59 * does not make changes to permanent state, the graph will fail atomically. 60 * If an error occurs after the Cmt node executes, the engine will roll-forward 61 * through the graph, blindly executing nodes until it reaches the end. 62 * If a graph reaches the end, it is assumed to have completed successfully. 63 * 64 * A graph has only 1 Cmt node. 65 * 66 */ 67 68 69/****************************************************************************** 70 * 71 * The following wrappers map the standard DAG creation interface to the 72 * DAG creation routines. Additionally, these wrappers enable experimentation 73 * with new DAG structures by providing an extra level of indirection, allowing 74 * the DAG creation routines to be replaced at this single point. 75 */ 76 77 78void 79rf_CreateNonRedundantWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 80 RF_DagHeader_t *dag_h, void *bp, 81 RF_RaidAccessFlags_t flags, 82 RF_AllocListElem_t *allocList, 83 RF_IoType_t type) 84{ 85 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 86 RF_IO_TYPE_WRITE); 87} 88 89void 90rf_CreateRAID0WriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 91 RF_DagHeader_t *dag_h, void *bp, 92 RF_RaidAccessFlags_t flags, 93 RF_AllocListElem_t *allocList, 94 RF_IoType_t type) 95{ 96 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 97 RF_IO_TYPE_WRITE); 98} 99 100void 101rf_CreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 102 RF_DagHeader_t *dag_h, void *bp, 103 RF_RaidAccessFlags_t flags, 104 RF_AllocListElem_t *allocList) 105{ 106 /* "normal" rollaway */ 107 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, 108 allocList, &rf_xorFuncs, NULL); 109} 110 111void 112rf_CreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 113 RF_DagHeader_t *dag_h, void *bp, 114 RF_RaidAccessFlags_t flags, 115 RF_AllocListElem_t *allocList) 116{ 117 /* "normal" rollaway */ 118 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, 119 allocList, 1, rf_RegularXorFunc, RF_TRUE); 120} 121 122 123/****************************************************************************** 124 * 125 * DAG creation code begins here 126 */ 127 128 129/****************************************************************************** 130 * 131 * creates a DAG to perform a large-write operation: 132 * 133 * / Rod \ / Wnd \ 134 * H -- block- Rod - Xor - Cmt - Wnd --- T 135 * \ Rod / \ Wnp / 136 * \[Wnq]/ 137 * 138 * The XOR node also does the Q calculation in the P+Q architecture. 139 * All nodes are before the commit node (Cmt) are assumed to be atomic and 140 * undoable - or - they make no changes to permanent state. 141 * 142 * Rod = read old data 143 * Cmt = commit node 144 * Wnp = write new parity 145 * Wnd = write new data 146 * Wnq = write new "q" 147 * [] denotes optional segments in the graph 148 * 149 * Parameters: raidPtr - description of the physical array 150 * asmap - logical & physical addresses for this access 151 * bp - buffer ptr (holds write data) 152 * flags - general flags (e.g. disk locking) 153 * allocList - list of memory allocated in DAG creation 154 * nfaults - number of faults array can tolerate 155 * (equal to # redundancy units in stripe) 156 * redfuncs - list of redundancy generating functions 157 * 158 *****************************************************************************/ 159 160void 161rf_CommonCreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 162 RF_DagHeader_t *dag_h, void *bp, 163 RF_RaidAccessFlags_t flags, 164 RF_AllocListElem_t *allocList, 165 int nfaults, int (*redFunc) (RF_DagNode_t *), 166 int allowBufferRecycle) 167{ 168 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode; 169 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode; 170 int nWndNodes, nRodNodes, i, nodeNum, asmNum; 171 RF_AccessStripeMapHeader_t *new_asm_h[2]; 172 RF_StripeNum_t parityStripeID; 173 char *sosBuffer, *eosBuffer; 174 RF_ReconUnitNum_t which_ru; 175 RF_RaidLayout_t *layoutPtr; 176 RF_PhysDiskAddr_t *pda; 177 178 layoutPtr = &(raidPtr->Layout); 179 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, 180 asmap->raidAddress, 181 &which_ru); 182 183 if (rf_dagDebug) { 184 printf("[Creating large-write DAG]\n"); 185 } 186 dag_h->creator = "LargeWriteDAG"; 187 188 dag_h->numCommitNodes = 1; 189 dag_h->numCommits = 0; 190 dag_h->numSuccedents = 1; 191 192 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */ 193 nWndNodes = asmap->numStripeUnitsAccessed; 194 RF_MallocAndAdd(nodes, 195 (nWndNodes + 4 + nfaults) * sizeof(RF_DagNode_t), 196 (RF_DagNode_t *), allocList); 197 i = 0; 198 wndNodes = &nodes[i]; 199 i += nWndNodes; 200 xorNode = &nodes[i]; 201 i += 1; 202 wnpNode = &nodes[i]; 203 i += 1; 204 blockNode = &nodes[i]; 205 i += 1; 206 commitNode = &nodes[i]; 207 i += 1; 208 termNode = &nodes[i]; 209 i += 1; 210 if (nfaults == 2) { 211 wnqNode = &nodes[i]; 212 i += 1; 213 } else { 214 wnqNode = NULL; 215 } 216 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, 217 new_asm_h, &nRodNodes, &sosBuffer, 218 &eosBuffer, allocList); 219 if (nRodNodes > 0) { 220 RF_MallocAndAdd(rodNodes, nRodNodes * sizeof(RF_DagNode_t), 221 (RF_DagNode_t *), allocList); 222 } else { 223 rodNodes = NULL; 224 } 225 226 /* begin node initialization */ 227 if (nRodNodes > 0) { 228 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 229 rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, 230 dag_h, "Nil", allocList); 231 } else { 232 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 233 rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, 234 dag_h, "Nil", allocList); 235 } 236 237 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, 238 rf_NullNodeUndoFunc, NULL, nWndNodes + nfaults, 1, 0, 0, 239 dag_h, "Cmt", allocList); 240 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, 241 rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, 242 dag_h, "Trm", allocList); 243 244 /* initialize the Rod nodes */ 245 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) { 246 if (new_asm_h[asmNum]) { 247 pda = new_asm_h[asmNum]->stripeMap->physInfo; 248 while (pda) { 249 rf_InitNode(&rodNodes[nodeNum], rf_wait, 250 RF_FALSE, rf_DiskReadFunc, 251 rf_DiskReadUndoFunc, 252 rf_GenericWakeupFunc, 253 1, 1, 4, 0, dag_h, 254 "Rod", allocList); 255 rodNodes[nodeNum].params[0].p = pda; 256 rodNodes[nodeNum].params[1].p = pda->bufPtr; 257 rodNodes[nodeNum].params[2].v = parityStripeID; 258 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 259 0, 0, which_ru); 260 nodeNum++; 261 pda = pda->next; 262 } 263 } 264 } 265 RF_ASSERT(nodeNum == nRodNodes); 266 267 /* initialize the wnd nodes */ 268 pda = asmap->physInfo; 269 for (i = 0; i < nWndNodes; i++) { 270 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, 271 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 272 rf_GenericWakeupFunc, 1, 1, 4, 0, 273 dag_h, "Wnd", allocList); 274 RF_ASSERT(pda != NULL); 275 wndNodes[i].params[0].p = pda; 276 wndNodes[i].params[1].p = pda->bufPtr; 277 wndNodes[i].params[2].v = parityStripeID; 278 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 279 pda = pda->next; 280 } 281 282 /* initialize the redundancy node */ 283 if (nRodNodes > 0) { 284 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, 285 rf_NullNodeUndoFunc, NULL, 1, 286 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1, 287 nfaults, dag_h, "Xr ", allocList); 288 } else { 289 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, 290 rf_NullNodeUndoFunc, NULL, 1, 291 1, 2 * (nWndNodes + nRodNodes) + 1, 292 nfaults, dag_h, "Xr ", allocList); 293 } 294 xorNode->flags |= RF_DAGNODE_FLAG_YIELD; 295 for (i = 0; i < nWndNodes; i++) { 296 /* pda */ 297 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; 298 /* buf ptr */ 299 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; 300 } 301 for (i = 0; i < nRodNodes; i++) { 302 /* pda */ 303 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; 304 /* buf ptr */ 305 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; 306 } 307 /* xor node needs to get at RAID information */ 308 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; 309 310 /* 311 * Look for an Rod node that reads a complete SU. If none, 312 * alloc a buffer to receive the parity info. Note that we 313 * can't use a new data buffer because it will not have gotten 314 * written when the xor occurs. */ 315 if (allowBufferRecycle) { 316 for (i = 0; i < nRodNodes; i++) { 317 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit) 318 break; 319 } 320 } 321 if ((!allowBufferRecycle) || (i == nRodNodes)) { 322 RF_MallocAndAdd(xorNode->results[0], 323 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), 324 (void *), allocList); 325 } else { 326 xorNode->results[0] = rodNodes[i].params[1].p; 327 } 328 329 /* initialize the Wnp node */ 330 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, 331 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, 332 dag_h, "Wnp", allocList); 333 wnpNode->params[0].p = asmap->parityInfo; 334 wnpNode->params[1].p = xorNode->results[0]; 335 wnpNode->params[2].v = parityStripeID; 336 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 337 /* parityInfo must describe entire parity unit */ 338 RF_ASSERT(asmap->parityInfo->next == NULL); 339 340 if (nfaults == 2) { 341 /* 342 * We never try to recycle a buffer for the Q calcuation 343 * in addition to the parity. This would cause two buffers 344 * to get smashed during the P and Q calculation, guaranteeing 345 * one would be wrong. 346 */ 347 RF_MallocAndAdd(xorNode->results[1], 348 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), 349 (void *), allocList); 350 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, 351 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 352 1, 1, 4, 0, dag_h, "Wnq", allocList); 353 wnqNode->params[0].p = asmap->qInfo; 354 wnqNode->params[1].p = xorNode->results[1]; 355 wnqNode->params[2].v = parityStripeID; 356 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 357 /* parityInfo must describe entire parity unit */ 358 RF_ASSERT(asmap->parityInfo->next == NULL); 359 } 360 /* 361 * Connect nodes to form graph. 362 */ 363 364 /* connect dag header to block node */ 365 RF_ASSERT(blockNode->numAntecedents == 0); 366 dag_h->succedents[0] = blockNode; 367 368 if (nRodNodes > 0) { 369 /* connect the block node to the Rod nodes */ 370 RF_ASSERT(blockNode->numSuccedents == nRodNodes); 371 RF_ASSERT(xorNode->numAntecedents == nRodNodes); 372 for (i = 0; i < nRodNodes; i++) { 373 RF_ASSERT(rodNodes[i].numAntecedents == 1); 374 blockNode->succedents[i] = &rodNodes[i]; 375 rodNodes[i].antecedents[0] = blockNode; 376 rodNodes[i].antType[0] = rf_control; 377 378 /* connect the Rod nodes to the Xor node */ 379 RF_ASSERT(rodNodes[i].numSuccedents == 1); 380 rodNodes[i].succedents[0] = xorNode; 381 xorNode->antecedents[i] = &rodNodes[i]; 382 xorNode->antType[i] = rf_trueData; 383 } 384 } else { 385 /* connect the block node to the Xor node */ 386 RF_ASSERT(blockNode->numSuccedents == 1); 387 RF_ASSERT(xorNode->numAntecedents == 1); 388 blockNode->succedents[0] = xorNode; 389 xorNode->antecedents[0] = blockNode; 390 xorNode->antType[0] = rf_control; 391 } 392 393 /* connect the xor node to the commit node */ 394 RF_ASSERT(xorNode->numSuccedents == 1); 395 RF_ASSERT(commitNode->numAntecedents == 1); 396 xorNode->succedents[0] = commitNode; 397 commitNode->antecedents[0] = xorNode; 398 commitNode->antType[0] = rf_control; 399 400 /* connect the commit node to the write nodes */ 401 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults); 402 for (i = 0; i < nWndNodes; i++) { 403 RF_ASSERT(wndNodes->numAntecedents == 1); 404 commitNode->succedents[i] = &wndNodes[i]; 405 wndNodes[i].antecedents[0] = commitNode; 406 wndNodes[i].antType[0] = rf_control; 407 } 408 RF_ASSERT(wnpNode->numAntecedents == 1); 409 commitNode->succedents[nWndNodes] = wnpNode; 410 wnpNode->antecedents[0] = commitNode; 411 wnpNode->antType[0] = rf_trueData; 412 if (nfaults == 2) { 413 RF_ASSERT(wnqNode->numAntecedents == 1); 414 commitNode->succedents[nWndNodes + 1] = wnqNode; 415 wnqNode->antecedents[0] = commitNode; 416 wnqNode->antType[0] = rf_trueData; 417 } 418 /* connect the write nodes to the term node */ 419 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults); 420 RF_ASSERT(termNode->numSuccedents == 0); 421 for (i = 0; i < nWndNodes; i++) { 422 RF_ASSERT(wndNodes->numSuccedents == 1); 423 wndNodes[i].succedents[0] = termNode; 424 termNode->antecedents[i] = &wndNodes[i]; 425 termNode->antType[i] = rf_control; 426 } 427 RF_ASSERT(wnpNode->numSuccedents == 1); 428 wnpNode->succedents[0] = termNode; 429 termNode->antecedents[nWndNodes] = wnpNode; 430 termNode->antType[nWndNodes] = rf_control; 431 if (nfaults == 2) { 432 RF_ASSERT(wnqNode->numSuccedents == 1); 433 wnqNode->succedents[0] = termNode; 434 termNode->antecedents[nWndNodes + 1] = wnqNode; 435 termNode->antType[nWndNodes + 1] = rf_control; 436 } 437} 438/****************************************************************************** 439 * 440 * creates a DAG to perform a small-write operation (either raid 5 or pq), 441 * which is as follows: 442 * 443 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm 444 * \- Rod X / \----> Wnd [Und]-/ 445 * [\- Rod X / \---> Wnd [Und]-/] 446 * [\- Roq -> Q / \--> Wnq [Unq]-/] 447 * 448 * Rop = read old parity 449 * Rod = read old data 450 * Roq = read old "q" 451 * Cmt = commit node 452 * Und = unlock data disk 453 * Unp = unlock parity disk 454 * Unq = unlock q disk 455 * Wnp = write new parity 456 * Wnd = write new data 457 * Wnq = write new "q" 458 * [ ] denotes optional segments in the graph 459 * 460 * Parameters: raidPtr - description of the physical array 461 * asmap - logical & physical addresses for this access 462 * bp - buffer ptr (holds write data) 463 * flags - general flags (e.g. disk locking) 464 * allocList - list of memory allocated in DAG creation 465 * pfuncs - list of parity generating functions 466 * qfuncs - list of q generating functions 467 * 468 * A null qfuncs indicates single fault tolerant 469 *****************************************************************************/ 470 471void 472rf_CommonCreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 473 RF_DagHeader_t *dag_h, void *bp, 474 RF_RaidAccessFlags_t flags, 475 RF_AllocListElem_t *allocList, 476 const RF_RedFuncs_t *pfuncs, 477 const RF_RedFuncs_t *qfuncs) 478{ 479 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode; 480 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes; 481 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes; 482 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes; 483 int i, j, nNodes, totalNumNodes; 484 RF_ReconUnitNum_t which_ru; 485 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *); 486 int (*qfunc) (RF_DagNode_t *); 487 int numDataNodes, numParityNodes; 488 RF_StripeNum_t parityStripeID; 489 RF_PhysDiskAddr_t *pda; 490 char *name, *qname; 491 long nfaults; 492 493 nfaults = qfuncs ? 2 : 1; 494 495 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 496 asmap->raidAddress, &which_ru); 497 pda = asmap->physInfo; 498 numDataNodes = asmap->numStripeUnitsAccessed; 499 numParityNodes = (asmap->parityInfo->next) ? 2 : 1; 500 501 if (rf_dagDebug) { 502 printf("[Creating small-write DAG]\n"); 503 } 504 RF_ASSERT(numDataNodes > 0); 505 dag_h->creator = "SmallWriteDAG"; 506 507 dag_h->numCommitNodes = 1; 508 dag_h->numCommits = 0; 509 dag_h->numSuccedents = 1; 510 511 /* 512 * DAG creation occurs in four steps: 513 * 1. count the number of nodes in the DAG 514 * 2. create the nodes 515 * 3. initialize the nodes 516 * 4. connect the nodes 517 */ 518 519 /* 520 * Step 1. compute number of nodes in the graph 521 */ 522 523 /* number of nodes: a read and write for each data unit a 524 * redundancy computation node for each parity node (nfaults * 525 * nparity) a read and write for each parity unit a block and 526 * commit node (2) a terminate node if atomic RMW an unlock 527 * node for each data unit, redundancy unit */ 528 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) 529 + (nfaults * 2 * numParityNodes) + 3; 530 /* 531 * Step 2. create the nodes 532 */ 533 RF_MallocAndAdd(nodes, totalNumNodes * sizeof(RF_DagNode_t), 534 (RF_DagNode_t *), allocList); 535 i = 0; 536 blockNode = &nodes[i]; 537 i += 1; 538 commitNode = &nodes[i]; 539 i += 1; 540 readDataNodes = &nodes[i]; 541 i += numDataNodes; 542 readParityNodes = &nodes[i]; 543 i += numParityNodes; 544 writeDataNodes = &nodes[i]; 545 i += numDataNodes; 546 writeParityNodes = &nodes[i]; 547 i += numParityNodes; 548 xorNodes = &nodes[i]; 549 i += numParityNodes; 550 termNode = &nodes[i]; 551 i += 1; 552 unlockDataNodes = unlockParityNodes = NULL; 553 554 if (nfaults == 2) { 555 readQNodes = &nodes[i]; 556 i += numParityNodes; 557 writeQNodes = &nodes[i]; 558 i += numParityNodes; 559 qNodes = &nodes[i]; 560 i += numParityNodes; 561 unlockQNodes = NULL; 562 } else { 563 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL; 564 } 565 RF_ASSERT(i == totalNumNodes); 566 567 /* 568 * Step 3. initialize the nodes 569 */ 570 /* initialize block node (Nil) */ 571 nNodes = numDataNodes + (nfaults * numParityNodes); 572 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 573 rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, 574 dag_h, "Nil", allocList); 575 576 /* initialize commit node (Cmt) */ 577 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, 578 rf_NullNodeUndoFunc, NULL, nNodes, 579 (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList); 580 581 /* initialize terminate node (Trm) */ 582 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, 583 rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, 584 dag_h, "Trm", allocList); 585 586 /* initialize nodes which read old data (Rod) */ 587 for (i = 0; i < numDataNodes; i++) { 588 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, 589 rf_DiskReadFunc, rf_DiskReadUndoFunc, 590 rf_GenericWakeupFunc, (nfaults * numParityNodes), 591 1, 4, 0, dag_h, "Rod", allocList); 592 RF_ASSERT(pda != NULL); 593 /* physical disk addr desc */ 594 readDataNodes[i].params[0].p = pda; 595 /* buffer to hold old data */ 596 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, 597 dag_h, pda, allocList); 598 readDataNodes[i].params[2].v = parityStripeID; 599 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 600 0, 0, which_ru); 601 pda = pda->next; 602 for (j = 0; j < readDataNodes[i].numSuccedents; j++) { 603 readDataNodes[i].propList[j] = NULL; 604 } 605 } 606 607 /* initialize nodes which read old parity (Rop) */ 608 pda = asmap->parityInfo; 609 i = 0; 610 for (i = 0; i < numParityNodes; i++) { 611 RF_ASSERT(pda != NULL); 612 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, 613 rf_DiskReadFunc, rf_DiskReadUndoFunc, 614 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, 615 dag_h, "Rop", allocList); 616 readParityNodes[i].params[0].p = pda; 617 /* buffer to hold old parity */ 618 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, 619 dag_h, pda, allocList); 620 readParityNodes[i].params[2].v = parityStripeID; 621 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 622 0, 0, which_ru); 623 pda = pda->next; 624 for (j = 0; j < readParityNodes[i].numSuccedents; j++) { 625 readParityNodes[i].propList[0] = NULL; 626 } 627 } 628 629 /* initialize nodes which read old Q (Roq) */ 630 if (nfaults == 2) { 631 pda = asmap->qInfo; 632 for (i = 0; i < numParityNodes; i++) { 633 RF_ASSERT(pda != NULL); 634 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, 635 rf_DiskReadFunc, rf_DiskReadUndoFunc, 636 rf_GenericWakeupFunc, numParityNodes, 637 1, 4, 0, dag_h, "Roq", allocList); 638 readQNodes[i].params[0].p = pda; 639 /* buffer to hold old Q */ 640 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, 641 dag_h, pda, 642 allocList); 643 readQNodes[i].params[2].v = parityStripeID; 644 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 645 0, 0, which_ru); 646 pda = pda->next; 647 for (j = 0; j < readQNodes[i].numSuccedents; j++) { 648 readQNodes[i].propList[0] = NULL; 649 } 650 } 651 } 652 /* initialize nodes which write new data (Wnd) */ 653 pda = asmap->physInfo; 654 for (i = 0; i < numDataNodes; i++) { 655 RF_ASSERT(pda != NULL); 656 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, 657 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 658 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 659 "Wnd", allocList); 660 /* physical disk addr desc */ 661 writeDataNodes[i].params[0].p = pda; 662 /* buffer holding new data to be written */ 663 writeDataNodes[i].params[1].p = pda->bufPtr; 664 writeDataNodes[i].params[2].v = parityStripeID; 665 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 666 0, 0, which_ru); 667 pda = pda->next; 668 } 669 670 /* 671 * Initialize nodes which compute new parity and Q. 672 */ 673 /* 674 * We use the simple XOR func in the double-XOR case, and when 675 * we're accessing only a portion of one stripe unit. The 676 * distinction between the two is that the regular XOR func 677 * assumes that the targbuf is a full SU in size, and examines 678 * the pda associated with the buffer to decide where within 679 * the buffer to XOR the data, whereas the simple XOR func 680 * just XORs the data into the start of the buffer. */ 681 if ((numParityNodes == 2) || ((numDataNodes == 1) 682 && (asmap->totalSectorsAccessed < 683 raidPtr->Layout.sectorsPerStripeUnit))) { 684 func = pfuncs->simple; 685 undoFunc = rf_NullNodeUndoFunc; 686 name = pfuncs->SimpleName; 687 if (qfuncs) { 688 qfunc = qfuncs->simple; 689 qname = qfuncs->SimpleName; 690 } else { 691 qfunc = NULL; 692 qname = NULL; 693 } 694 } else { 695 func = pfuncs->regular; 696 undoFunc = rf_NullNodeUndoFunc; 697 name = pfuncs->RegularName; 698 if (qfuncs) { 699 qfunc = qfuncs->regular; 700 qname = qfuncs->RegularName; 701 } else { 702 qfunc = NULL; 703 qname = NULL; 704 } 705 } 706 /* 707 * Initialize the xor nodes: params are {pda,buf} 708 * from {Rod,Wnd,Rop} nodes, and raidPtr 709 */ 710 if (numParityNodes == 2) { 711 /* double-xor case */ 712 for (i = 0; i < numParityNodes; i++) { 713 /* note: no wakeup func for xor */ 714 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, 715 undoFunc, NULL, 1, 716 (numDataNodes + numParityNodes), 717 7, 1, dag_h, name, allocList); 718 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD; 719 xorNodes[i].params[0] = readDataNodes[i].params[0]; 720 xorNodes[i].params[1] = readDataNodes[i].params[1]; 721 xorNodes[i].params[2] = readParityNodes[i].params[0]; 722 xorNodes[i].params[3] = readParityNodes[i].params[1]; 723 xorNodes[i].params[4] = writeDataNodes[i].params[0]; 724 xorNodes[i].params[5] = writeDataNodes[i].params[1]; 725 xorNodes[i].params[6].p = raidPtr; 726 /* use old parity buf as target buf */ 727 xorNodes[i].results[0] = readParityNodes[i].params[1].p; 728 if (nfaults == 2) { 729 /* note: no wakeup func for qor */ 730 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, 731 qfunc, undoFunc, NULL, 1, 732 (numDataNodes + numParityNodes), 733 7, 1, dag_h, qname, allocList); 734 qNodes[i].params[0] = readDataNodes[i].params[0]; 735 qNodes[i].params[1] = readDataNodes[i].params[1]; 736 qNodes[i].params[2] = readQNodes[i].params[0]; 737 qNodes[i].params[3] = readQNodes[i].params[1]; 738 qNodes[i].params[4] = writeDataNodes[i].params[0]; 739 qNodes[i].params[5] = writeDataNodes[i].params[1]; 740 qNodes[i].params[6].p = raidPtr; 741 /* use old Q buf as target buf */ 742 qNodes[i].results[0] = readQNodes[i].params[1].p; 743 } 744 } 745 } else { 746 /* there is only one xor node in this case */ 747 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, 748 undoFunc, NULL, 1, (numDataNodes + numParityNodes), 749 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, 750 dag_h, name, allocList); 751 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD; 752 for (i = 0; i < numDataNodes + 1; i++) { 753 /* set up params related to Rod and Rop nodes */ 754 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 755 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */ 756 } 757 for (i = 0; i < numDataNodes; i++) { 758 /* set up params related to Wnd and Wnp nodes */ 759 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 760 writeDataNodes[i].params[0]; 761 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 762 writeDataNodes[i].params[1]; 763 } 764 /* xor node needs to get at RAID information */ 765 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 766 xorNodes[0].results[0] = readParityNodes[0].params[1].p; 767 if (nfaults == 2) { 768 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, 769 undoFunc, NULL, 1, 770 (numDataNodes + numParityNodes), 771 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, 772 dag_h, qname, allocList); 773 for (i = 0; i < numDataNodes; i++) { 774 /* set up params related to Rod */ 775 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 776 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */ 777 } 778 /* and read old q */ 779 qNodes[0].params[2 * numDataNodes + 0] = /* pda */ 780 readQNodes[0].params[0]; 781 qNodes[0].params[2 * numDataNodes + 1] = /* buffer ptr */ 782 readQNodes[0].params[1]; 783 for (i = 0; i < numDataNodes; i++) { 784 /* set up params related to Wnd nodes */ 785 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 786 writeDataNodes[i].params[0]; 787 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 788 writeDataNodes[i].params[1]; 789 } 790 /* xor node needs to get at RAID information */ 791 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 792 qNodes[0].results[0] = readQNodes[0].params[1].p; 793 } 794 } 795 796 /* initialize nodes which write new parity (Wnp) */ 797 pda = asmap->parityInfo; 798 for (i = 0; i < numParityNodes; i++) { 799 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, 800 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 801 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 802 "Wnp", allocList); 803 RF_ASSERT(pda != NULL); 804 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) 805 * filled in by xor node */ 806 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for 807 * parity write 808 * operation */ 809 writeParityNodes[i].params[2].v = parityStripeID; 810 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 811 0, 0, which_ru); 812 pda = pda->next; 813 } 814 815 /* initialize nodes which write new Q (Wnq) */ 816 if (nfaults == 2) { 817 pda = asmap->qInfo; 818 for (i = 0; i < numParityNodes; i++) { 819 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, 820 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 821 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 822 "Wnq", allocList); 823 RF_ASSERT(pda != NULL); 824 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) 825 * filled in by xor node */ 826 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for 827 * parity write 828 * operation */ 829 writeQNodes[i].params[2].v = parityStripeID; 830 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 831 0, 0, which_ru); 832 pda = pda->next; 833 } 834 } 835 /* 836 * Step 4. connect the nodes. 837 */ 838 839 /* connect header to block node */ 840 dag_h->succedents[0] = blockNode; 841 842 /* connect block node to read old data nodes */ 843 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults))); 844 for (i = 0; i < numDataNodes; i++) { 845 blockNode->succedents[i] = &readDataNodes[i]; 846 RF_ASSERT(readDataNodes[i].numAntecedents == 1); 847 readDataNodes[i].antecedents[0] = blockNode; 848 readDataNodes[i].antType[0] = rf_control; 849 } 850 851 /* connect block node to read old parity nodes */ 852 for (i = 0; i < numParityNodes; i++) { 853 blockNode->succedents[numDataNodes + i] = &readParityNodes[i]; 854 RF_ASSERT(readParityNodes[i].numAntecedents == 1); 855 readParityNodes[i].antecedents[0] = blockNode; 856 readParityNodes[i].antType[0] = rf_control; 857 } 858 859 /* connect block node to read old Q nodes */ 860 if (nfaults == 2) { 861 for (i = 0; i < numParityNodes; i++) { 862 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i]; 863 RF_ASSERT(readQNodes[i].numAntecedents == 1); 864 readQNodes[i].antecedents[0] = blockNode; 865 readQNodes[i].antType[0] = rf_control; 866 } 867 } 868 /* connect read old data nodes to xor nodes */ 869 for (i = 0; i < numDataNodes; i++) { 870 RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes)); 871 for (j = 0; j < numParityNodes; j++) { 872 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes); 873 readDataNodes[i].succedents[j] = &xorNodes[j]; 874 xorNodes[j].antecedents[i] = &readDataNodes[i]; 875 xorNodes[j].antType[i] = rf_trueData; 876 } 877 } 878 879 /* connect read old data nodes to q nodes */ 880 if (nfaults == 2) { 881 for (i = 0; i < numDataNodes; i++) { 882 for (j = 0; j < numParityNodes; j++) { 883 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes); 884 readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j]; 885 qNodes[j].antecedents[i] = &readDataNodes[i]; 886 qNodes[j].antType[i] = rf_trueData; 887 } 888 } 889 } 890 /* connect read old parity nodes to xor nodes */ 891 for (i = 0; i < numParityNodes; i++) { 892 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 893 for (j = 0; j < numParityNodes; j++) { 894 readParityNodes[i].succedents[j] = &xorNodes[j]; 895 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i]; 896 xorNodes[j].antType[numDataNodes + i] = rf_trueData; 897 } 898 } 899 900 /* connect read old q nodes to q nodes */ 901 if (nfaults == 2) { 902 for (i = 0; i < numParityNodes; i++) { 903 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 904 for (j = 0; j < numParityNodes; j++) { 905 readQNodes[i].succedents[j] = &qNodes[j]; 906 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i]; 907 qNodes[j].antType[numDataNodes + i] = rf_trueData; 908 } 909 } 910 } 911 /* connect xor nodes to commit node */ 912 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes)); 913 for (i = 0; i < numParityNodes; i++) { 914 RF_ASSERT(xorNodes[i].numSuccedents == 1); 915 xorNodes[i].succedents[0] = commitNode; 916 commitNode->antecedents[i] = &xorNodes[i]; 917 commitNode->antType[i] = rf_control; 918 } 919 920 /* connect q nodes to commit node */ 921 if (nfaults == 2) { 922 for (i = 0; i < numParityNodes; i++) { 923 RF_ASSERT(qNodes[i].numSuccedents == 1); 924 qNodes[i].succedents[0] = commitNode; 925 commitNode->antecedents[i + numParityNodes] = &qNodes[i]; 926 commitNode->antType[i + numParityNodes] = rf_control; 927 } 928 } 929 /* connect commit node to write nodes */ 930 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes))); 931 for (i = 0; i < numDataNodes; i++) { 932 RF_ASSERT(writeDataNodes[i].numAntecedents == 1); 933 commitNode->succedents[i] = &writeDataNodes[i]; 934 writeDataNodes[i].antecedents[0] = commitNode; 935 writeDataNodes[i].antType[0] = rf_trueData; 936 } 937 for (i = 0; i < numParityNodes; i++) { 938 RF_ASSERT(writeParityNodes[i].numAntecedents == 1); 939 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i]; 940 writeParityNodes[i].antecedents[0] = commitNode; 941 writeParityNodes[i].antType[0] = rf_trueData; 942 } 943 if (nfaults == 2) { 944 for (i = 0; i < numParityNodes; i++) { 945 RF_ASSERT(writeQNodes[i].numAntecedents == 1); 946 commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i]; 947 writeQNodes[i].antecedents[0] = commitNode; 948 writeQNodes[i].antType[0] = rf_trueData; 949 } 950 } 951 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 952 RF_ASSERT(termNode->numSuccedents == 0); 953 for (i = 0; i < numDataNodes; i++) { 954 /* connect write new data nodes to term node */ 955 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 956 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 957 writeDataNodes[i].succedents[0] = termNode; 958 termNode->antecedents[i] = &writeDataNodes[i]; 959 termNode->antType[i] = rf_control; 960 } 961 962 for (i = 0; i < numParityNodes; i++) { 963 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 964 writeParityNodes[i].succedents[0] = termNode; 965 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i]; 966 termNode->antType[numDataNodes + i] = rf_control; 967 } 968 969 if (nfaults == 2) { 970 for (i = 0; i < numParityNodes; i++) { 971 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 972 writeQNodes[i].succedents[0] = termNode; 973 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i]; 974 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 975 } 976 } 977} 978 979 980/****************************************************************************** 981 * create a write graph (fault-free or degraded) for RAID level 1 982 * 983 * Hdr -> Commit -> Wpd -> Nil -> Trm 984 * -> Wsd -> 985 * 986 * The "Wpd" node writes data to the primary copy in the mirror pair 987 * The "Wsd" node writes data to the secondary copy in the mirror pair 988 * 989 * Parameters: raidPtr - description of the physical array 990 * asmap - logical & physical addresses for this access 991 * bp - buffer ptr (holds write data) 992 * flags - general flags (e.g. disk locking) 993 * allocList - list of memory allocated in DAG creation 994 *****************************************************************************/ 995 996void 997rf_CreateRaidOneWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 998 RF_DagHeader_t *dag_h, void *bp, 999 RF_RaidAccessFlags_t flags, 1000 RF_AllocListElem_t *allocList) 1001{ 1002 RF_DagNode_t *unblockNode, *termNode, *commitNode; 1003 RF_DagNode_t *nodes, *wndNode, *wmirNode; 1004 int nWndNodes, nWmirNodes, i; 1005 RF_ReconUnitNum_t which_ru; 1006 RF_PhysDiskAddr_t *pda, *pdaP; 1007 RF_StripeNum_t parityStripeID; 1008 1009 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 1010 asmap->raidAddress, &which_ru); 1011 if (rf_dagDebug) { 1012 printf("[Creating RAID level 1 write DAG]\n"); 1013 } 1014 dag_h->creator = "RaidOneWriteDAG"; 1015 1016 /* 2 implies access not SU aligned */ 1017 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; 1018 nWndNodes = (asmap->physInfo->next) ? 2 : 1; 1019 1020 /* alloc the Wnd nodes and the Wmir node */ 1021 if (asmap->numDataFailed == 1) 1022 nWndNodes--; 1023 if (asmap->numParityFailed == 1) 1024 nWmirNodes--; 1025 1026 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock 1027 * + terminator) */ 1028 RF_MallocAndAdd(nodes, 1029 (nWndNodes + nWmirNodes + 3) * sizeof(RF_DagNode_t), 1030 (RF_DagNode_t *), allocList); 1031 i = 0; 1032 wndNode = &nodes[i]; 1033 i += nWndNodes; 1034 wmirNode = &nodes[i]; 1035 i += nWmirNodes; 1036 commitNode = &nodes[i]; 1037 i += 1; 1038 unblockNode = &nodes[i]; 1039 i += 1; 1040 termNode = &nodes[i]; 1041 i += 1; 1042 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3)); 1043 1044 /* this dag can commit immediately */ 1045 dag_h->numCommitNodes = 1; 1046 dag_h->numCommits = 0; 1047 dag_h->numSuccedents = 1; 1048 1049 /* initialize the commit, unblock, and term nodes */ 1050 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, 1051 rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 1052 0, 0, 0, dag_h, "Cmt", allocList); 1053 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 1054 rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 1055 0, 0, dag_h, "Nil", allocList); 1056 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, 1057 rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, 1058 dag_h, "Trm", allocList); 1059 1060 /* initialize the wnd nodes */ 1061 if (nWndNodes > 0) { 1062 pda = asmap->physInfo; 1063 for (i = 0; i < nWndNodes; i++) { 1064 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, 1065 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1066 rf_GenericWakeupFunc, 1, 1, 4, 0, 1067 dag_h, "Wpd", allocList); 1068 RF_ASSERT(pda != NULL); 1069 wndNode[i].params[0].p = pda; 1070 wndNode[i].params[1].p = pda->bufPtr; 1071 wndNode[i].params[2].v = parityStripeID; 1072 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1073 pda = pda->next; 1074 } 1075 RF_ASSERT(pda == NULL); 1076 } 1077 /* initialize the mirror nodes */ 1078 if (nWmirNodes > 0) { 1079 pda = asmap->physInfo; 1080 pdaP = asmap->parityInfo; 1081 for (i = 0; i < nWmirNodes; i++) { 1082 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, 1083 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1084 rf_GenericWakeupFunc, 1, 1, 4, 0, 1085 dag_h, "Wsd", allocList); 1086 RF_ASSERT(pda != NULL); 1087 wmirNode[i].params[0].p = pdaP; 1088 wmirNode[i].params[1].p = pda->bufPtr; 1089 wmirNode[i].params[2].v = parityStripeID; 1090 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1091 pda = pda->next; 1092 pdaP = pdaP->next; 1093 } 1094 RF_ASSERT(pda == NULL); 1095 RF_ASSERT(pdaP == NULL); 1096 } 1097 /* link the header node to the commit node */ 1098 RF_ASSERT(dag_h->numSuccedents == 1); 1099 RF_ASSERT(commitNode->numAntecedents == 0); 1100 dag_h->succedents[0] = commitNode; 1101 1102 /* link the commit node to the write nodes */ 1103 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes)); 1104 for (i = 0; i < nWndNodes; i++) { 1105 RF_ASSERT(wndNode[i].numAntecedents == 1); 1106 commitNode->succedents[i] = &wndNode[i]; 1107 wndNode[i].antecedents[0] = commitNode; 1108 wndNode[i].antType[0] = rf_control; 1109 } 1110 for (i = 0; i < nWmirNodes; i++) { 1111 RF_ASSERT(wmirNode[i].numAntecedents == 1); 1112 commitNode->succedents[i + nWndNodes] = &wmirNode[i]; 1113 wmirNode[i].antecedents[0] = commitNode; 1114 wmirNode[i].antType[0] = rf_control; 1115 } 1116 1117 /* link the write nodes to the unblock node */ 1118 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes)); 1119 for (i = 0; i < nWndNodes; i++) { 1120 RF_ASSERT(wndNode[i].numSuccedents == 1); 1121 wndNode[i].succedents[0] = unblockNode; 1122 unblockNode->antecedents[i] = &wndNode[i]; 1123 unblockNode->antType[i] = rf_control; 1124 } 1125 for (i = 0; i < nWmirNodes; i++) { 1126 RF_ASSERT(wmirNode[i].numSuccedents == 1); 1127 wmirNode[i].succedents[0] = unblockNode; 1128 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i]; 1129 unblockNode->antType[i + nWndNodes] = rf_control; 1130 } 1131 1132 /* link the unblock node to the term node */ 1133 RF_ASSERT(unblockNode->numSuccedents == 1); 1134 RF_ASSERT(termNode->numAntecedents == 1); 1135 RF_ASSERT(termNode->numSuccedents == 0); 1136 unblockNode->succedents[0] = termNode; 1137 termNode->antecedents[0] = unblockNode; 1138 termNode->antType[0] = rf_control; 1139} 1140