rf_dagffwr.c revision 1.21
1/* $NetBSD: rf_dagffwr.c,v 1.21 2004/03/06 23:52:20 oster Exp $ */ 2/* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29/* 30 * rf_dagff.c 31 * 32 * code for creating fault-free DAGs 33 * 34 */ 35 36#include <sys/cdefs.h> 37__KERNEL_RCSID(0, "$NetBSD: rf_dagffwr.c,v 1.21 2004/03/06 23:52:20 oster Exp $"); 38 39#include <dev/raidframe/raidframevar.h> 40 41#include "rf_raid.h" 42#include "rf_dag.h" 43#include "rf_dagutils.h" 44#include "rf_dagfuncs.h" 45#include "rf_debugMem.h" 46#include "rf_dagffrd.h" 47#include "rf_general.h" 48#include "rf_dagffwr.h" 49 50/****************************************************************************** 51 * 52 * General comments on DAG creation: 53 * 54 * All DAGs in this file use roll-away error recovery. Each DAG has a single 55 * commit node, usually called "Cmt." If an error occurs before the Cmt node 56 * is reached, the execution engine will halt forward execution and work 57 * backward through the graph, executing the undo functions. Assuming that 58 * each node in the graph prior to the Cmt node are undoable and atomic - or - 59 * does not make changes to permanent state, the graph will fail atomically. 60 * If an error occurs after the Cmt node executes, the engine will roll-forward 61 * through the graph, blindly executing nodes until it reaches the end. 62 * If a graph reaches the end, it is assumed to have completed successfully. 63 * 64 * A graph has only 1 Cmt node. 65 * 66 */ 67 68 69/****************************************************************************** 70 * 71 * The following wrappers map the standard DAG creation interface to the 72 * DAG creation routines. Additionally, these wrappers enable experimentation 73 * with new DAG structures by providing an extra level of indirection, allowing 74 * the DAG creation routines to be replaced at this single point. 75 */ 76 77 78void 79rf_CreateNonRedundantWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 80 RF_DagHeader_t *dag_h, void *bp, 81 RF_RaidAccessFlags_t flags, 82 RF_AllocListElem_t *allocList, 83 RF_IoType_t type) 84{ 85 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 86 RF_IO_TYPE_WRITE); 87} 88 89void 90rf_CreateRAID0WriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 91 RF_DagHeader_t *dag_h, void *bp, 92 RF_RaidAccessFlags_t flags, 93 RF_AllocListElem_t *allocList, 94 RF_IoType_t type) 95{ 96 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 97 RF_IO_TYPE_WRITE); 98} 99 100void 101rf_CreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 102 RF_DagHeader_t *dag_h, void *bp, 103 RF_RaidAccessFlags_t flags, 104 RF_AllocListElem_t *allocList) 105{ 106 /* "normal" rollaway */ 107 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, 108 allocList, &rf_xorFuncs, NULL); 109} 110 111void 112rf_CreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 113 RF_DagHeader_t *dag_h, void *bp, 114 RF_RaidAccessFlags_t flags, 115 RF_AllocListElem_t *allocList) 116{ 117 /* "normal" rollaway */ 118 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, 119 allocList, 1, rf_RegularXorFunc, RF_TRUE); 120} 121 122 123/****************************************************************************** 124 * 125 * DAG creation code begins here 126 */ 127 128 129/****************************************************************************** 130 * 131 * creates a DAG to perform a large-write operation: 132 * 133 * / Rod \ / Wnd \ 134 * H -- block- Rod - Xor - Cmt - Wnd --- T 135 * \ Rod / \ Wnp / 136 * \[Wnq]/ 137 * 138 * The XOR node also does the Q calculation in the P+Q architecture. 139 * All nodes are before the commit node (Cmt) are assumed to be atomic and 140 * undoable - or - they make no changes to permanent state. 141 * 142 * Rod = read old data 143 * Cmt = commit node 144 * Wnp = write new parity 145 * Wnd = write new data 146 * Wnq = write new "q" 147 * [] denotes optional segments in the graph 148 * 149 * Parameters: raidPtr - description of the physical array 150 * asmap - logical & physical addresses for this access 151 * bp - buffer ptr (holds write data) 152 * flags - general flags (e.g. disk locking) 153 * allocList - list of memory allocated in DAG creation 154 * nfaults - number of faults array can tolerate 155 * (equal to # redundancy units in stripe) 156 * redfuncs - list of redundancy generating functions 157 * 158 *****************************************************************************/ 159 160void 161rf_CommonCreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 162 RF_DagHeader_t *dag_h, void *bp, 163 RF_RaidAccessFlags_t flags, 164 RF_AllocListElem_t *allocList, 165 int nfaults, int (*redFunc) (RF_DagNode_t *), 166 int allowBufferRecycle) 167{ 168 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode; 169 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode; 170 int nWndNodes, nRodNodes, i, nodeNum, asmNum; 171 RF_AccessStripeMapHeader_t *new_asm_h[2]; 172 RF_StripeNum_t parityStripeID; 173 char *sosBuffer, *eosBuffer; 174 RF_ReconUnitNum_t which_ru; 175 RF_RaidLayout_t *layoutPtr; 176 RF_PhysDiskAddr_t *pda; 177 178 layoutPtr = &(raidPtr->Layout); 179 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, 180 asmap->raidAddress, 181 &which_ru); 182 183#if RF_DEBUG_DAG 184 if (rf_dagDebug) { 185 printf("[Creating large-write DAG]\n"); 186 } 187#endif 188 dag_h->creator = "LargeWriteDAG"; 189 190 dag_h->numCommitNodes = 1; 191 dag_h->numCommits = 0; 192 dag_h->numSuccedents = 1; 193 194 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */ 195 nWndNodes = asmap->numStripeUnitsAccessed; 196 RF_MallocAndAdd(nodes, 197 (nWndNodes + 4 + nfaults) * sizeof(RF_DagNode_t), 198 (RF_DagNode_t *), allocList); 199 i = 0; 200 wndNodes = &nodes[i]; 201 i += nWndNodes; 202 xorNode = &nodes[i]; 203 i += 1; 204 wnpNode = &nodes[i]; 205 i += 1; 206 blockNode = &nodes[i]; 207 i += 1; 208 commitNode = &nodes[i]; 209 i += 1; 210 termNode = &nodes[i]; 211 i += 1; 212#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 213 if (nfaults == 2) { 214 wnqNode = &nodes[i]; 215 i += 1; 216 } else { 217#endif 218 wnqNode = NULL; 219#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 220 } 221#endif 222 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, 223 new_asm_h, &nRodNodes, &sosBuffer, 224 &eosBuffer, allocList); 225 if (nRodNodes > 0) { 226 RF_MallocAndAdd(rodNodes, nRodNodes * sizeof(RF_DagNode_t), 227 (RF_DagNode_t *), allocList); 228 } else { 229 rodNodes = NULL; 230 } 231 232 /* begin node initialization */ 233 if (nRodNodes > 0) { 234 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 235 rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, 236 dag_h, "Nil", allocList); 237 } else { 238 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 239 rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, 240 dag_h, "Nil", allocList); 241 } 242 243 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, 244 rf_NullNodeUndoFunc, NULL, nWndNodes + nfaults, 1, 0, 0, 245 dag_h, "Cmt", allocList); 246 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, 247 rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, 248 dag_h, "Trm", allocList); 249 250 /* initialize the Rod nodes */ 251 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) { 252 if (new_asm_h[asmNum]) { 253 pda = new_asm_h[asmNum]->stripeMap->physInfo; 254 while (pda) { 255 rf_InitNode(&rodNodes[nodeNum], rf_wait, 256 RF_FALSE, rf_DiskReadFunc, 257 rf_DiskReadUndoFunc, 258 rf_GenericWakeupFunc, 259 1, 1, 4, 0, dag_h, 260 "Rod", allocList); 261 rodNodes[nodeNum].params[0].p = pda; 262 rodNodes[nodeNum].params[1].p = pda->bufPtr; 263 rodNodes[nodeNum].params[2].v = parityStripeID; 264 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 265 which_ru); 266 nodeNum++; 267 pda = pda->next; 268 } 269 } 270 } 271 RF_ASSERT(nodeNum == nRodNodes); 272 273 /* initialize the wnd nodes */ 274 pda = asmap->physInfo; 275 for (i = 0; i < nWndNodes; i++) { 276 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, 277 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 278 rf_GenericWakeupFunc, 1, 1, 4, 0, 279 dag_h, "Wnd", allocList); 280 RF_ASSERT(pda != NULL); 281 wndNodes[i].params[0].p = pda; 282 wndNodes[i].params[1].p = pda->bufPtr; 283 wndNodes[i].params[2].v = parityStripeID; 284 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 285 pda = pda->next; 286 } 287 288 /* initialize the redundancy node */ 289 if (nRodNodes > 0) { 290 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, 291 rf_NullNodeUndoFunc, NULL, 1, 292 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1, 293 nfaults, dag_h, "Xr ", allocList); 294 } else { 295 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, 296 rf_NullNodeUndoFunc, NULL, 1, 297 1, 2 * (nWndNodes + nRodNodes) + 1, 298 nfaults, dag_h, "Xr ", allocList); 299 } 300 xorNode->flags |= RF_DAGNODE_FLAG_YIELD; 301 for (i = 0; i < nWndNodes; i++) { 302 /* pda */ 303 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; 304 /* buf ptr */ 305 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; 306 } 307 for (i = 0; i < nRodNodes; i++) { 308 /* pda */ 309 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; 310 /* buf ptr */ 311 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; 312 } 313 /* xor node needs to get at RAID information */ 314 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; 315 316 /* 317 * Look for an Rod node that reads a complete SU. If none, 318 * alloc a buffer to receive the parity info. Note that we 319 * can't use a new data buffer because it will not have gotten 320 * written when the xor occurs. */ 321 if (allowBufferRecycle) { 322 for (i = 0; i < nRodNodes; i++) { 323 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit) 324 break; 325 } 326 } 327 if ((!allowBufferRecycle) || (i == nRodNodes)) { 328 RF_MallocAndAdd(xorNode->results[0], 329 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), 330 (void *), allocList); 331 } else { 332 xorNode->results[0] = rodNodes[i].params[1].p; 333 } 334 335 /* initialize the Wnp node */ 336 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, 337 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, 338 dag_h, "Wnp", allocList); 339 wnpNode->params[0].p = asmap->parityInfo; 340 wnpNode->params[1].p = xorNode->results[0]; 341 wnpNode->params[2].v = parityStripeID; 342 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 343 /* parityInfo must describe entire parity unit */ 344 RF_ASSERT(asmap->parityInfo->next == NULL); 345 346#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 347 if (nfaults == 2) { 348 /* 349 * We never try to recycle a buffer for the Q calcuation 350 * in addition to the parity. This would cause two buffers 351 * to get smashed during the P and Q calculation, guaranteeing 352 * one would be wrong. 353 */ 354 RF_MallocAndAdd(xorNode->results[1], 355 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), 356 (void *), allocList); 357 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, 358 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 359 1, 1, 4, 0, dag_h, "Wnq", allocList); 360 wnqNode->params[0].p = asmap->qInfo; 361 wnqNode->params[1].p = xorNode->results[1]; 362 wnqNode->params[2].v = parityStripeID; 363 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 364 /* parityInfo must describe entire parity unit */ 365 RF_ASSERT(asmap->parityInfo->next == NULL); 366 } 367#endif 368 /* 369 * Connect nodes to form graph. 370 */ 371 372 /* connect dag header to block node */ 373 RF_ASSERT(blockNode->numAntecedents == 0); 374 dag_h->succedents[0] = blockNode; 375 376 if (nRodNodes > 0) { 377 /* connect the block node to the Rod nodes */ 378 RF_ASSERT(blockNode->numSuccedents == nRodNodes); 379 RF_ASSERT(xorNode->numAntecedents == nRodNodes); 380 for (i = 0; i < nRodNodes; i++) { 381 RF_ASSERT(rodNodes[i].numAntecedents == 1); 382 blockNode->succedents[i] = &rodNodes[i]; 383 rodNodes[i].antecedents[0] = blockNode; 384 rodNodes[i].antType[0] = rf_control; 385 386 /* connect the Rod nodes to the Xor node */ 387 RF_ASSERT(rodNodes[i].numSuccedents == 1); 388 rodNodes[i].succedents[0] = xorNode; 389 xorNode->antecedents[i] = &rodNodes[i]; 390 xorNode->antType[i] = rf_trueData; 391 } 392 } else { 393 /* connect the block node to the Xor node */ 394 RF_ASSERT(blockNode->numSuccedents == 1); 395 RF_ASSERT(xorNode->numAntecedents == 1); 396 blockNode->succedents[0] = xorNode; 397 xorNode->antecedents[0] = blockNode; 398 xorNode->antType[0] = rf_control; 399 } 400 401 /* connect the xor node to the commit node */ 402 RF_ASSERT(xorNode->numSuccedents == 1); 403 RF_ASSERT(commitNode->numAntecedents == 1); 404 xorNode->succedents[0] = commitNode; 405 commitNode->antecedents[0] = xorNode; 406 commitNode->antType[0] = rf_control; 407 408 /* connect the commit node to the write nodes */ 409 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults); 410 for (i = 0; i < nWndNodes; i++) { 411 RF_ASSERT(wndNodes->numAntecedents == 1); 412 commitNode->succedents[i] = &wndNodes[i]; 413 wndNodes[i].antecedents[0] = commitNode; 414 wndNodes[i].antType[0] = rf_control; 415 } 416 RF_ASSERT(wnpNode->numAntecedents == 1); 417 commitNode->succedents[nWndNodes] = wnpNode; 418 wnpNode->antecedents[0] = commitNode; 419 wnpNode->antType[0] = rf_trueData; 420#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 421 if (nfaults == 2) { 422 RF_ASSERT(wnqNode->numAntecedents == 1); 423 commitNode->succedents[nWndNodes + 1] = wnqNode; 424 wnqNode->antecedents[0] = commitNode; 425 wnqNode->antType[0] = rf_trueData; 426 } 427#endif 428 /* connect the write nodes to the term node */ 429 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults); 430 RF_ASSERT(termNode->numSuccedents == 0); 431 for (i = 0; i < nWndNodes; i++) { 432 RF_ASSERT(wndNodes->numSuccedents == 1); 433 wndNodes[i].succedents[0] = termNode; 434 termNode->antecedents[i] = &wndNodes[i]; 435 termNode->antType[i] = rf_control; 436 } 437 RF_ASSERT(wnpNode->numSuccedents == 1); 438 wnpNode->succedents[0] = termNode; 439 termNode->antecedents[nWndNodes] = wnpNode; 440 termNode->antType[nWndNodes] = rf_control; 441#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 442 if (nfaults == 2) { 443 RF_ASSERT(wnqNode->numSuccedents == 1); 444 wnqNode->succedents[0] = termNode; 445 termNode->antecedents[nWndNodes + 1] = wnqNode; 446 termNode->antType[nWndNodes + 1] = rf_control; 447 } 448#endif 449} 450/****************************************************************************** 451 * 452 * creates a DAG to perform a small-write operation (either raid 5 or pq), 453 * which is as follows: 454 * 455 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm 456 * \- Rod X / \----> Wnd [Und]-/ 457 * [\- Rod X / \---> Wnd [Und]-/] 458 * [\- Roq -> Q / \--> Wnq [Unq]-/] 459 * 460 * Rop = read old parity 461 * Rod = read old data 462 * Roq = read old "q" 463 * Cmt = commit node 464 * Und = unlock data disk 465 * Unp = unlock parity disk 466 * Unq = unlock q disk 467 * Wnp = write new parity 468 * Wnd = write new data 469 * Wnq = write new "q" 470 * [ ] denotes optional segments in the graph 471 * 472 * Parameters: raidPtr - description of the physical array 473 * asmap - logical & physical addresses for this access 474 * bp - buffer ptr (holds write data) 475 * flags - general flags (e.g. disk locking) 476 * allocList - list of memory allocated in DAG creation 477 * pfuncs - list of parity generating functions 478 * qfuncs - list of q generating functions 479 * 480 * A null qfuncs indicates single fault tolerant 481 *****************************************************************************/ 482 483void 484rf_CommonCreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 485 RF_DagHeader_t *dag_h, void *bp, 486 RF_RaidAccessFlags_t flags, 487 RF_AllocListElem_t *allocList, 488 const RF_RedFuncs_t *pfuncs, 489 const RF_RedFuncs_t *qfuncs) 490{ 491 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode; 492 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes; 493 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes; 494 int i, j, nNodes, totalNumNodes; 495 RF_ReconUnitNum_t which_ru; 496 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *); 497 int (*qfunc) (RF_DagNode_t *); 498 int numDataNodes, numParityNodes; 499 RF_StripeNum_t parityStripeID; 500 RF_PhysDiskAddr_t *pda; 501 char *name, *qname; 502 long nfaults; 503 504 nfaults = qfuncs ? 2 : 1; 505 506 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 507 asmap->raidAddress, &which_ru); 508 pda = asmap->physInfo; 509 numDataNodes = asmap->numStripeUnitsAccessed; 510 numParityNodes = (asmap->parityInfo->next) ? 2 : 1; 511 512#if RF_DEBUG_DAG 513 if (rf_dagDebug) { 514 printf("[Creating small-write DAG]\n"); 515 } 516#endif 517 RF_ASSERT(numDataNodes > 0); 518 dag_h->creator = "SmallWriteDAG"; 519 520 dag_h->numCommitNodes = 1; 521 dag_h->numCommits = 0; 522 dag_h->numSuccedents = 1; 523 524 /* 525 * DAG creation occurs in four steps: 526 * 1. count the number of nodes in the DAG 527 * 2. create the nodes 528 * 3. initialize the nodes 529 * 4. connect the nodes 530 */ 531 532 /* 533 * Step 1. compute number of nodes in the graph 534 */ 535 536 /* number of nodes: a read and write for each data unit a 537 * redundancy computation node for each parity node (nfaults * 538 * nparity) a read and write for each parity unit a block and 539 * commit node (2) a terminate node if atomic RMW an unlock 540 * node for each data unit, redundancy unit */ 541 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) 542 + (nfaults * 2 * numParityNodes) + 3; 543 /* 544 * Step 2. create the nodes 545 */ 546 RF_MallocAndAdd(nodes, totalNumNodes * sizeof(RF_DagNode_t), 547 (RF_DagNode_t *), allocList); 548 i = 0; 549 blockNode = &nodes[i]; 550 i += 1; 551 commitNode = &nodes[i]; 552 i += 1; 553 readDataNodes = &nodes[i]; 554 i += numDataNodes; 555 readParityNodes = &nodes[i]; 556 i += numParityNodes; 557 writeDataNodes = &nodes[i]; 558 i += numDataNodes; 559 writeParityNodes = &nodes[i]; 560 i += numParityNodes; 561 xorNodes = &nodes[i]; 562 i += numParityNodes; 563 termNode = &nodes[i]; 564 i += 1; 565 566#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 567 if (nfaults == 2) { 568 readQNodes = &nodes[i]; 569 i += numParityNodes; 570 writeQNodes = &nodes[i]; 571 i += numParityNodes; 572 qNodes = &nodes[i]; 573 i += numParityNodes; 574 } else { 575#endif 576 readQNodes = writeQNodes = qNodes = NULL; 577#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 578 } 579#endif 580 RF_ASSERT(i == totalNumNodes); 581 582 /* 583 * Step 3. initialize the nodes 584 */ 585 /* initialize block node (Nil) */ 586 nNodes = numDataNodes + (nfaults * numParityNodes); 587 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 588 rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, 589 dag_h, "Nil", allocList); 590 591 /* initialize commit node (Cmt) */ 592 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, 593 rf_NullNodeUndoFunc, NULL, nNodes, 594 (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList); 595 596 /* initialize terminate node (Trm) */ 597 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, 598 rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, 599 dag_h, "Trm", allocList); 600 601 /* initialize nodes which read old data (Rod) */ 602 for (i = 0; i < numDataNodes; i++) { 603 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, 604 rf_DiskReadFunc, rf_DiskReadUndoFunc, 605 rf_GenericWakeupFunc, (nfaults * numParityNodes), 606 1, 4, 0, dag_h, "Rod", allocList); 607 RF_ASSERT(pda != NULL); 608 /* physical disk addr desc */ 609 readDataNodes[i].params[0].p = pda; 610 /* buffer to hold old data */ 611 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, pda, allocList); 612 readDataNodes[i].params[2].v = parityStripeID; 613 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 614 which_ru); 615 pda = pda->next; 616 for (j = 0; j < readDataNodes[i].numSuccedents; j++) { 617 readDataNodes[i].propList[j] = NULL; 618 } 619 } 620 621 /* initialize nodes which read old parity (Rop) */ 622 pda = asmap->parityInfo; 623 i = 0; 624 for (i = 0; i < numParityNodes; i++) { 625 RF_ASSERT(pda != NULL); 626 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, 627 rf_DiskReadFunc, rf_DiskReadUndoFunc, 628 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, 629 dag_h, "Rop", allocList); 630 readParityNodes[i].params[0].p = pda; 631 /* buffer to hold old parity */ 632 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, pda, allocList); 633 readParityNodes[i].params[2].v = parityStripeID; 634 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 635 which_ru); 636 pda = pda->next; 637 for (j = 0; j < readParityNodes[i].numSuccedents; j++) { 638 readParityNodes[i].propList[0] = NULL; 639 } 640 } 641 642#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 643 /* initialize nodes which read old Q (Roq) */ 644 if (nfaults == 2) { 645 pda = asmap->qInfo; 646 for (i = 0; i < numParityNodes; i++) { 647 RF_ASSERT(pda != NULL); 648 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, 649 rf_DiskReadFunc, rf_DiskReadUndoFunc, 650 rf_GenericWakeupFunc, numParityNodes, 651 1, 4, 0, dag_h, "Roq", allocList); 652 readQNodes[i].params[0].p = pda; 653 /* buffer to hold old Q */ 654 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, pda, allocList); 655 readQNodes[i].params[2].v = parityStripeID; 656 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 657 which_ru); 658 pda = pda->next; 659 for (j = 0; j < readQNodes[i].numSuccedents; j++) { 660 readQNodes[i].propList[0] = NULL; 661 } 662 } 663 } 664#endif 665 /* initialize nodes which write new data (Wnd) */ 666 pda = asmap->physInfo; 667 for (i = 0; i < numDataNodes; i++) { 668 RF_ASSERT(pda != NULL); 669 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, 670 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 671 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 672 "Wnd", allocList); 673 /* physical disk addr desc */ 674 writeDataNodes[i].params[0].p = pda; 675 /* buffer holding new data to be written */ 676 writeDataNodes[i].params[1].p = pda->bufPtr; 677 writeDataNodes[i].params[2].v = parityStripeID; 678 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 679 which_ru); 680 pda = pda->next; 681 } 682 683 /* 684 * Initialize nodes which compute new parity and Q. 685 */ 686 /* 687 * We use the simple XOR func in the double-XOR case, and when 688 * we're accessing only a portion of one stripe unit. The 689 * distinction between the two is that the regular XOR func 690 * assumes that the targbuf is a full SU in size, and examines 691 * the pda associated with the buffer to decide where within 692 * the buffer to XOR the data, whereas the simple XOR func 693 * just XORs the data into the start of the buffer. */ 694 if ((numParityNodes == 2) || ((numDataNodes == 1) 695 && (asmap->totalSectorsAccessed < 696 raidPtr->Layout.sectorsPerStripeUnit))) { 697 func = pfuncs->simple; 698 undoFunc = rf_NullNodeUndoFunc; 699 name = pfuncs->SimpleName; 700 if (qfuncs) { 701 qfunc = qfuncs->simple; 702 qname = qfuncs->SimpleName; 703 } else { 704 qfunc = NULL; 705 qname = NULL; 706 } 707 } else { 708 func = pfuncs->regular; 709 undoFunc = rf_NullNodeUndoFunc; 710 name = pfuncs->RegularName; 711 if (qfuncs) { 712 qfunc = qfuncs->regular; 713 qname = qfuncs->RegularName; 714 } else { 715 qfunc = NULL; 716 qname = NULL; 717 } 718 } 719 /* 720 * Initialize the xor nodes: params are {pda,buf} 721 * from {Rod,Wnd,Rop} nodes, and raidPtr 722 */ 723 if (numParityNodes == 2) { 724 /* double-xor case */ 725 for (i = 0; i < numParityNodes; i++) { 726 /* note: no wakeup func for xor */ 727 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, 728 undoFunc, NULL, 1, 729 (numDataNodes + numParityNodes), 730 7, 1, dag_h, name, allocList); 731 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD; 732 xorNodes[i].params[0] = readDataNodes[i].params[0]; 733 xorNodes[i].params[1] = readDataNodes[i].params[1]; 734 xorNodes[i].params[2] = readParityNodes[i].params[0]; 735 xorNodes[i].params[3] = readParityNodes[i].params[1]; 736 xorNodes[i].params[4] = writeDataNodes[i].params[0]; 737 xorNodes[i].params[5] = writeDataNodes[i].params[1]; 738 xorNodes[i].params[6].p = raidPtr; 739 /* use old parity buf as target buf */ 740 xorNodes[i].results[0] = readParityNodes[i].params[1].p; 741#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 742 if (nfaults == 2) { 743 /* note: no wakeup func for qor */ 744 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, 745 qfunc, undoFunc, NULL, 1, 746 (numDataNodes + numParityNodes), 747 7, 1, dag_h, qname, allocList); 748 qNodes[i].params[0] = readDataNodes[i].params[0]; 749 qNodes[i].params[1] = readDataNodes[i].params[1]; 750 qNodes[i].params[2] = readQNodes[i].params[0]; 751 qNodes[i].params[3] = readQNodes[i].params[1]; 752 qNodes[i].params[4] = writeDataNodes[i].params[0]; 753 qNodes[i].params[5] = writeDataNodes[i].params[1]; 754 qNodes[i].params[6].p = raidPtr; 755 /* use old Q buf as target buf */ 756 qNodes[i].results[0] = readQNodes[i].params[1].p; 757 } 758#endif 759 } 760 } else { 761 /* there is only one xor node in this case */ 762 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, 763 undoFunc, NULL, 1, (numDataNodes + numParityNodes), 764 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, 765 dag_h, name, allocList); 766 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD; 767 for (i = 0; i < numDataNodes + 1; i++) { 768 /* set up params related to Rod and Rop nodes */ 769 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 770 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */ 771 } 772 for (i = 0; i < numDataNodes; i++) { 773 /* set up params related to Wnd and Wnp nodes */ 774 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 775 writeDataNodes[i].params[0]; 776 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 777 writeDataNodes[i].params[1]; 778 } 779 /* xor node needs to get at RAID information */ 780 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 781 xorNodes[0].results[0] = readParityNodes[0].params[1].p; 782#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 783 if (nfaults == 2) { 784 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, 785 undoFunc, NULL, 1, 786 (numDataNodes + numParityNodes), 787 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, 788 dag_h, qname, allocList); 789 for (i = 0; i < numDataNodes; i++) { 790 /* set up params related to Rod */ 791 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 792 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */ 793 } 794 /* and read old q */ 795 qNodes[0].params[2 * numDataNodes + 0] = /* pda */ 796 readQNodes[0].params[0]; 797 qNodes[0].params[2 * numDataNodes + 1] = /* buffer ptr */ 798 readQNodes[0].params[1]; 799 for (i = 0; i < numDataNodes; i++) { 800 /* set up params related to Wnd nodes */ 801 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 802 writeDataNodes[i].params[0]; 803 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 804 writeDataNodes[i].params[1]; 805 } 806 /* xor node needs to get at RAID information */ 807 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 808 qNodes[0].results[0] = readQNodes[0].params[1].p; 809 } 810#endif 811 } 812 813 /* initialize nodes which write new parity (Wnp) */ 814 pda = asmap->parityInfo; 815 for (i = 0; i < numParityNodes; i++) { 816 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, 817 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 818 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 819 "Wnp", allocList); 820 RF_ASSERT(pda != NULL); 821 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) 822 * filled in by xor node */ 823 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for 824 * parity write 825 * operation */ 826 writeParityNodes[i].params[2].v = parityStripeID; 827 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 828 which_ru); 829 pda = pda->next; 830 } 831 832#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 833 /* initialize nodes which write new Q (Wnq) */ 834 if (nfaults == 2) { 835 pda = asmap->qInfo; 836 for (i = 0; i < numParityNodes; i++) { 837 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, 838 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 839 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 840 "Wnq", allocList); 841 RF_ASSERT(pda != NULL); 842 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) 843 * filled in by xor node */ 844 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for 845 * parity write 846 * operation */ 847 writeQNodes[i].params[2].v = parityStripeID; 848 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 849 which_ru); 850 pda = pda->next; 851 } 852 } 853#endif 854 /* 855 * Step 4. connect the nodes. 856 */ 857 858 /* connect header to block node */ 859 dag_h->succedents[0] = blockNode; 860 861 /* connect block node to read old data nodes */ 862 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults))); 863 for (i = 0; i < numDataNodes; i++) { 864 blockNode->succedents[i] = &readDataNodes[i]; 865 RF_ASSERT(readDataNodes[i].numAntecedents == 1); 866 readDataNodes[i].antecedents[0] = blockNode; 867 readDataNodes[i].antType[0] = rf_control; 868 } 869 870 /* connect block node to read old parity nodes */ 871 for (i = 0; i < numParityNodes; i++) { 872 blockNode->succedents[numDataNodes + i] = &readParityNodes[i]; 873 RF_ASSERT(readParityNodes[i].numAntecedents == 1); 874 readParityNodes[i].antecedents[0] = blockNode; 875 readParityNodes[i].antType[0] = rf_control; 876 } 877 878#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 879 /* connect block node to read old Q nodes */ 880 if (nfaults == 2) { 881 for (i = 0; i < numParityNodes; i++) { 882 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i]; 883 RF_ASSERT(readQNodes[i].numAntecedents == 1); 884 readQNodes[i].antecedents[0] = blockNode; 885 readQNodes[i].antType[0] = rf_control; 886 } 887 } 888#endif 889 /* connect read old data nodes to xor nodes */ 890 for (i = 0; i < numDataNodes; i++) { 891 RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes)); 892 for (j = 0; j < numParityNodes; j++) { 893 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes); 894 readDataNodes[i].succedents[j] = &xorNodes[j]; 895 xorNodes[j].antecedents[i] = &readDataNodes[i]; 896 xorNodes[j].antType[i] = rf_trueData; 897 } 898 } 899 900#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 901 /* connect read old data nodes to q nodes */ 902 if (nfaults == 2) { 903 for (i = 0; i < numDataNodes; i++) { 904 for (j = 0; j < numParityNodes; j++) { 905 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes); 906 readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j]; 907 qNodes[j].antecedents[i] = &readDataNodes[i]; 908 qNodes[j].antType[i] = rf_trueData; 909 } 910 } 911 } 912#endif 913 /* connect read old parity nodes to xor nodes */ 914 for (i = 0; i < numParityNodes; i++) { 915 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 916 for (j = 0; j < numParityNodes; j++) { 917 readParityNodes[i].succedents[j] = &xorNodes[j]; 918 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i]; 919 xorNodes[j].antType[numDataNodes + i] = rf_trueData; 920 } 921 } 922 923#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 924 /* connect read old q nodes to q nodes */ 925 if (nfaults == 2) { 926 for (i = 0; i < numParityNodes; i++) { 927 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 928 for (j = 0; j < numParityNodes; j++) { 929 readQNodes[i].succedents[j] = &qNodes[j]; 930 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i]; 931 qNodes[j].antType[numDataNodes + i] = rf_trueData; 932 } 933 } 934 } 935#endif 936 /* connect xor nodes to commit node */ 937 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes)); 938 for (i = 0; i < numParityNodes; i++) { 939 RF_ASSERT(xorNodes[i].numSuccedents == 1); 940 xorNodes[i].succedents[0] = commitNode; 941 commitNode->antecedents[i] = &xorNodes[i]; 942 commitNode->antType[i] = rf_control; 943 } 944 945#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 946 /* connect q nodes to commit node */ 947 if (nfaults == 2) { 948 for (i = 0; i < numParityNodes; i++) { 949 RF_ASSERT(qNodes[i].numSuccedents == 1); 950 qNodes[i].succedents[0] = commitNode; 951 commitNode->antecedents[i + numParityNodes] = &qNodes[i]; 952 commitNode->antType[i + numParityNodes] = rf_control; 953 } 954 } 955#endif 956 /* connect commit node to write nodes */ 957 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes))); 958 for (i = 0; i < numDataNodes; i++) { 959 RF_ASSERT(writeDataNodes[i].numAntecedents == 1); 960 commitNode->succedents[i] = &writeDataNodes[i]; 961 writeDataNodes[i].antecedents[0] = commitNode; 962 writeDataNodes[i].antType[0] = rf_trueData; 963 } 964 for (i = 0; i < numParityNodes; i++) { 965 RF_ASSERT(writeParityNodes[i].numAntecedents == 1); 966 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i]; 967 writeParityNodes[i].antecedents[0] = commitNode; 968 writeParityNodes[i].antType[0] = rf_trueData; 969 } 970#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 971 if (nfaults == 2) { 972 for (i = 0; i < numParityNodes; i++) { 973 RF_ASSERT(writeQNodes[i].numAntecedents == 1); 974 commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i]; 975 writeQNodes[i].antecedents[0] = commitNode; 976 writeQNodes[i].antType[0] = rf_trueData; 977 } 978 } 979#endif 980 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 981 RF_ASSERT(termNode->numSuccedents == 0); 982 for (i = 0; i < numDataNodes; i++) { 983 /* connect write new data nodes to term node */ 984 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 985 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 986 writeDataNodes[i].succedents[0] = termNode; 987 termNode->antecedents[i] = &writeDataNodes[i]; 988 termNode->antType[i] = rf_control; 989 } 990 991 for (i = 0; i < numParityNodes; i++) { 992 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 993 writeParityNodes[i].succedents[0] = termNode; 994 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i]; 995 termNode->antType[numDataNodes + i] = rf_control; 996 } 997 998#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 999 if (nfaults == 2) { 1000 for (i = 0; i < numParityNodes; i++) { 1001 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1002 writeQNodes[i].succedents[0] = termNode; 1003 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i]; 1004 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1005 } 1006 } 1007#endif 1008} 1009 1010 1011/****************************************************************************** 1012 * create a write graph (fault-free or degraded) for RAID level 1 1013 * 1014 * Hdr -> Commit -> Wpd -> Nil -> Trm 1015 * -> Wsd -> 1016 * 1017 * The "Wpd" node writes data to the primary copy in the mirror pair 1018 * The "Wsd" node writes data to the secondary copy in the mirror pair 1019 * 1020 * Parameters: raidPtr - description of the physical array 1021 * asmap - logical & physical addresses for this access 1022 * bp - buffer ptr (holds write data) 1023 * flags - general flags (e.g. disk locking) 1024 * allocList - list of memory allocated in DAG creation 1025 *****************************************************************************/ 1026 1027void 1028rf_CreateRaidOneWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 1029 RF_DagHeader_t *dag_h, void *bp, 1030 RF_RaidAccessFlags_t flags, 1031 RF_AllocListElem_t *allocList) 1032{ 1033 RF_DagNode_t *unblockNode, *termNode, *commitNode; 1034 RF_DagNode_t *nodes, *wndNode, *wmirNode; 1035 int nWndNodes, nWmirNodes, i; 1036 RF_ReconUnitNum_t which_ru; 1037 RF_PhysDiskAddr_t *pda, *pdaP; 1038 RF_StripeNum_t parityStripeID; 1039 1040 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 1041 asmap->raidAddress, &which_ru); 1042#if RF_DEBUG_DAG 1043 if (rf_dagDebug) { 1044 printf("[Creating RAID level 1 write DAG]\n"); 1045 } 1046#endif 1047 dag_h->creator = "RaidOneWriteDAG"; 1048 1049 /* 2 implies access not SU aligned */ 1050 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; 1051 nWndNodes = (asmap->physInfo->next) ? 2 : 1; 1052 1053 /* alloc the Wnd nodes and the Wmir node */ 1054 if (asmap->numDataFailed == 1) 1055 nWndNodes--; 1056 if (asmap->numParityFailed == 1) 1057 nWmirNodes--; 1058 1059 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock 1060 * + terminator) */ 1061 RF_MallocAndAdd(nodes, 1062 (nWndNodes + nWmirNodes + 3) * sizeof(RF_DagNode_t), 1063 (RF_DagNode_t *), allocList); 1064 i = 0; 1065 wndNode = &nodes[i]; 1066 i += nWndNodes; 1067 wmirNode = &nodes[i]; 1068 i += nWmirNodes; 1069 commitNode = &nodes[i]; 1070 i += 1; 1071 unblockNode = &nodes[i]; 1072 i += 1; 1073 termNode = &nodes[i]; 1074 i += 1; 1075 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3)); 1076 1077 /* this dag can commit immediately */ 1078 dag_h->numCommitNodes = 1; 1079 dag_h->numCommits = 0; 1080 dag_h->numSuccedents = 1; 1081 1082 /* initialize the commit, unblock, and term nodes */ 1083 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, 1084 rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 1085 0, 0, 0, dag_h, "Cmt", allocList); 1086 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 1087 rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 1088 0, 0, dag_h, "Nil", allocList); 1089 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, 1090 rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, 1091 dag_h, "Trm", allocList); 1092 1093 /* initialize the wnd nodes */ 1094 if (nWndNodes > 0) { 1095 pda = asmap->physInfo; 1096 for (i = 0; i < nWndNodes; i++) { 1097 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, 1098 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1099 rf_GenericWakeupFunc, 1, 1, 4, 0, 1100 dag_h, "Wpd", allocList); 1101 RF_ASSERT(pda != NULL); 1102 wndNode[i].params[0].p = pda; 1103 wndNode[i].params[1].p = pda->bufPtr; 1104 wndNode[i].params[2].v = parityStripeID; 1105 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 1106 pda = pda->next; 1107 } 1108 RF_ASSERT(pda == NULL); 1109 } 1110 /* initialize the mirror nodes */ 1111 if (nWmirNodes > 0) { 1112 pda = asmap->physInfo; 1113 pdaP = asmap->parityInfo; 1114 for (i = 0; i < nWmirNodes; i++) { 1115 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, 1116 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1117 rf_GenericWakeupFunc, 1, 1, 4, 0, 1118 dag_h, "Wsd", allocList); 1119 RF_ASSERT(pda != NULL); 1120 wmirNode[i].params[0].p = pdaP; 1121 wmirNode[i].params[1].p = pda->bufPtr; 1122 wmirNode[i].params[2].v = parityStripeID; 1123 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 1124 pda = pda->next; 1125 pdaP = pdaP->next; 1126 } 1127 RF_ASSERT(pda == NULL); 1128 RF_ASSERT(pdaP == NULL); 1129 } 1130 /* link the header node to the commit node */ 1131 RF_ASSERT(dag_h->numSuccedents == 1); 1132 RF_ASSERT(commitNode->numAntecedents == 0); 1133 dag_h->succedents[0] = commitNode; 1134 1135 /* link the commit node to the write nodes */ 1136 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes)); 1137 for (i = 0; i < nWndNodes; i++) { 1138 RF_ASSERT(wndNode[i].numAntecedents == 1); 1139 commitNode->succedents[i] = &wndNode[i]; 1140 wndNode[i].antecedents[0] = commitNode; 1141 wndNode[i].antType[0] = rf_control; 1142 } 1143 for (i = 0; i < nWmirNodes; i++) { 1144 RF_ASSERT(wmirNode[i].numAntecedents == 1); 1145 commitNode->succedents[i + nWndNodes] = &wmirNode[i]; 1146 wmirNode[i].antecedents[0] = commitNode; 1147 wmirNode[i].antType[0] = rf_control; 1148 } 1149 1150 /* link the write nodes to the unblock node */ 1151 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes)); 1152 for (i = 0; i < nWndNodes; i++) { 1153 RF_ASSERT(wndNode[i].numSuccedents == 1); 1154 wndNode[i].succedents[0] = unblockNode; 1155 unblockNode->antecedents[i] = &wndNode[i]; 1156 unblockNode->antType[i] = rf_control; 1157 } 1158 for (i = 0; i < nWmirNodes; i++) { 1159 RF_ASSERT(wmirNode[i].numSuccedents == 1); 1160 wmirNode[i].succedents[0] = unblockNode; 1161 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i]; 1162 unblockNode->antType[i + nWndNodes] = rf_control; 1163 } 1164 1165 /* link the unblock node to the term node */ 1166 RF_ASSERT(unblockNode->numSuccedents == 1); 1167 RF_ASSERT(termNode->numAntecedents == 1); 1168 RF_ASSERT(termNode->numSuccedents == 0); 1169 unblockNode->succedents[0] = termNode; 1170 termNode->antecedents[0] = unblockNode; 1171 termNode->antType[0] = rf_control; 1172} 1173