rf_dagffwr.c revision 1.12
1/* $NetBSD: rf_dagffwr.c,v 1.12 2003/12/29 03:33:47 oster Exp $ */ 2/* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29/* 30 * rf_dagff.c 31 * 32 * code for creating fault-free DAGs 33 * 34 */ 35 36#include <sys/cdefs.h> 37__KERNEL_RCSID(0, "$NetBSD: rf_dagffwr.c,v 1.12 2003/12/29 03:33:47 oster Exp $"); 38 39#include <dev/raidframe/raidframevar.h> 40 41#include "rf_raid.h" 42#include "rf_dag.h" 43#include "rf_dagutils.h" 44#include "rf_dagfuncs.h" 45#include "rf_debugMem.h" 46#include "rf_dagffrd.h" 47#include "rf_general.h" 48#include "rf_dagffwr.h" 49 50/****************************************************************************** 51 * 52 * General comments on DAG creation: 53 * 54 * All DAGs in this file use roll-away error recovery. Each DAG has a single 55 * commit node, usually called "Cmt." If an error occurs before the Cmt node 56 * is reached, the execution engine will halt forward execution and work 57 * backward through the graph, executing the undo functions. Assuming that 58 * each node in the graph prior to the Cmt node are undoable and atomic - or - 59 * does not make changes to permanent state, the graph will fail atomically. 60 * If an error occurs after the Cmt node executes, the engine will roll-forward 61 * through the graph, blindly executing nodes until it reaches the end. 62 * If a graph reaches the end, it is assumed to have completed successfully. 63 * 64 * A graph has only 1 Cmt node. 65 * 66 */ 67 68 69/****************************************************************************** 70 * 71 * The following wrappers map the standard DAG creation interface to the 72 * DAG creation routines. Additionally, these wrappers enable experimentation 73 * with new DAG structures by providing an extra level of indirection, allowing 74 * the DAG creation routines to be replaced at this single point. 75 */ 76 77 78void 79rf_CreateNonRedundantWriteDAG( 80 RF_Raid_t * raidPtr, 81 RF_AccessStripeMap_t * asmap, 82 RF_DagHeader_t * dag_h, 83 void *bp, 84 RF_RaidAccessFlags_t flags, 85 RF_AllocListElem_t * allocList, 86 RF_IoType_t type) 87{ 88 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 89 RF_IO_TYPE_WRITE); 90} 91 92void 93rf_CreateRAID0WriteDAG( 94 RF_Raid_t * raidPtr, 95 RF_AccessStripeMap_t * asmap, 96 RF_DagHeader_t * dag_h, 97 void *bp, 98 RF_RaidAccessFlags_t flags, 99 RF_AllocListElem_t * allocList, 100 RF_IoType_t type) 101{ 102 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 103 RF_IO_TYPE_WRITE); 104} 105 106void 107rf_CreateSmallWriteDAG( 108 RF_Raid_t * raidPtr, 109 RF_AccessStripeMap_t * asmap, 110 RF_DagHeader_t * dag_h, 111 void *bp, 112 RF_RaidAccessFlags_t flags, 113 RF_AllocListElem_t * allocList) 114{ 115 /* "normal" rollaway */ 116 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 117 &rf_xorFuncs, NULL); 118} 119 120void 121rf_CreateLargeWriteDAG( 122 RF_Raid_t * raidPtr, 123 RF_AccessStripeMap_t * asmap, 124 RF_DagHeader_t * dag_h, 125 void *bp, 126 RF_RaidAccessFlags_t flags, 127 RF_AllocListElem_t * allocList) 128{ 129 /* "normal" rollaway */ 130 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 131 1, rf_RegularXorFunc, RF_TRUE); 132} 133 134 135/****************************************************************************** 136 * 137 * DAG creation code begins here 138 */ 139 140 141/****************************************************************************** 142 * 143 * creates a DAG to perform a large-write operation: 144 * 145 * / Rod \ / Wnd \ 146 * H -- block- Rod - Xor - Cmt - Wnd --- T 147 * \ Rod / \ Wnp / 148 * \[Wnq]/ 149 * 150 * The XOR node also does the Q calculation in the P+Q architecture. 151 * All nodes are before the commit node (Cmt) are assumed to be atomic and 152 * undoable - or - they make no changes to permanent state. 153 * 154 * Rod = read old data 155 * Cmt = commit node 156 * Wnp = write new parity 157 * Wnd = write new data 158 * Wnq = write new "q" 159 * [] denotes optional segments in the graph 160 * 161 * Parameters: raidPtr - description of the physical array 162 * asmap - logical & physical addresses for this access 163 * bp - buffer ptr (holds write data) 164 * flags - general flags (e.g. disk locking) 165 * allocList - list of memory allocated in DAG creation 166 * nfaults - number of faults array can tolerate 167 * (equal to # redundancy units in stripe) 168 * redfuncs - list of redundancy generating functions 169 * 170 *****************************************************************************/ 171 172void 173rf_CommonCreateLargeWriteDAG( 174 RF_Raid_t * raidPtr, 175 RF_AccessStripeMap_t * asmap, 176 RF_DagHeader_t * dag_h, 177 void *bp, 178 RF_RaidAccessFlags_t flags, 179 RF_AllocListElem_t * allocList, 180 int nfaults, 181 int (*redFunc) (RF_DagNode_t *), 182 int allowBufferRecycle) 183{ 184 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode; 185 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode; 186 int nWndNodes, nRodNodes, i, nodeNum, asmNum; 187 RF_AccessStripeMapHeader_t *new_asm_h[2]; 188 RF_StripeNum_t parityStripeID; 189 char *sosBuffer, *eosBuffer; 190 RF_ReconUnitNum_t which_ru; 191 RF_RaidLayout_t *layoutPtr; 192 RF_PhysDiskAddr_t *pda; 193 194 layoutPtr = &(raidPtr->Layout); 195 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, asmap->raidAddress, 196 &which_ru); 197 198 if (rf_dagDebug) { 199 printf("[Creating large-write DAG]\n"); 200 } 201 dag_h->creator = "LargeWriteDAG"; 202 203 dag_h->numCommitNodes = 1; 204 dag_h->numCommits = 0; 205 dag_h->numSuccedents = 1; 206 207 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */ 208 nWndNodes = asmap->numStripeUnitsAccessed; 209 RF_MallocAndAdd(nodes, 210 (nWndNodes + 4 + nfaults) * sizeof(RF_DagNode_t), 211 (RF_DagNode_t *), allocList); 212 i = 0; 213 wndNodes = &nodes[i]; 214 i += nWndNodes; 215 xorNode = &nodes[i]; 216 i += 1; 217 wnpNode = &nodes[i]; 218 i += 1; 219 blockNode = &nodes[i]; 220 i += 1; 221 commitNode = &nodes[i]; 222 i += 1; 223 termNode = &nodes[i]; 224 i += 1; 225 if (nfaults == 2) { 226 wnqNode = &nodes[i]; 227 i += 1; 228 } else { 229 wnqNode = NULL; 230 } 231 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, 232 &nRodNodes, &sosBuffer, &eosBuffer, allocList); 233 if (nRodNodes > 0) { 234 RF_MallocAndAdd(rodNodes, nRodNodes * sizeof(RF_DagNode_t), 235 (RF_DagNode_t *), allocList); 236 } else { 237 rodNodes = NULL; 238 } 239 240 /* begin node initialization */ 241 if (nRodNodes > 0) { 242 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 243 NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList); 244 } else { 245 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 246 NULL, 1, 0, 0, 0, dag_h, "Nil", allocList); 247 } 248 249 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 250 nWndNodes + nfaults, 1, 0, 0, dag_h, "Cmt", allocList); 251 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 252 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList); 253 254 /* initialize the Rod nodes */ 255 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) { 256 if (new_asm_h[asmNum]) { 257 pda = new_asm_h[asmNum]->stripeMap->physInfo; 258 while (pda) { 259 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, 260 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 261 "Rod", allocList); 262 rodNodes[nodeNum].params[0].p = pda; 263 rodNodes[nodeNum].params[1].p = pda->bufPtr; 264 rodNodes[nodeNum].params[2].v = parityStripeID; 265 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 266 0, 0, which_ru); 267 nodeNum++; 268 pda = pda->next; 269 } 270 } 271 } 272 RF_ASSERT(nodeNum == nRodNodes); 273 274 /* initialize the wnd nodes */ 275 pda = asmap->physInfo; 276 for (i = 0; i < nWndNodes; i++) { 277 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 278 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList); 279 RF_ASSERT(pda != NULL); 280 wndNodes[i].params[0].p = pda; 281 wndNodes[i].params[1].p = pda->bufPtr; 282 wndNodes[i].params[2].v = parityStripeID; 283 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 284 pda = pda->next; 285 } 286 287 /* initialize the redundancy node */ 288 if (nRodNodes > 0) { 289 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, 290 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, 291 "Xr ", allocList); 292 } else { 293 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, 294 1, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList); 295 } 296 xorNode->flags |= RF_DAGNODE_FLAG_YIELD; 297 for (i = 0; i < nWndNodes; i++) { 298 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */ 299 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */ 300 } 301 for (i = 0; i < nRodNodes; i++) { 302 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */ 303 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */ 304 } 305 /* xor node needs to get at RAID information */ 306 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; 307 308 /* 309 * Look for an Rod node that reads a complete SU. If none, alloc a buffer 310 * to receive the parity info. Note that we can't use a new data buffer 311 * because it will not have gotten written when the xor occurs. 312 */ 313 if (allowBufferRecycle) { 314 for (i = 0; i < nRodNodes; i++) { 315 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit) 316 break; 317 } 318 } 319 if ((!allowBufferRecycle) || (i == nRodNodes)) { 320 RF_MallocAndAdd(xorNode->results[0], 321 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), 322 (void *), allocList); 323 } else { 324 xorNode->results[0] = rodNodes[i].params[1].p; 325 } 326 327 /* initialize the Wnp node */ 328 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 329 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList); 330 wnpNode->params[0].p = asmap->parityInfo; 331 wnpNode->params[1].p = xorNode->results[0]; 332 wnpNode->params[2].v = parityStripeID; 333 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 334 /* parityInfo must describe entire parity unit */ 335 RF_ASSERT(asmap->parityInfo->next == NULL); 336 337 if (nfaults == 2) { 338 /* 339 * We never try to recycle a buffer for the Q calcuation 340 * in addition to the parity. This would cause two buffers 341 * to get smashed during the P and Q calculation, guaranteeing 342 * one would be wrong. 343 */ 344 RF_MallocAndAdd(xorNode->results[1], 345 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), 346 (void *), allocList); 347 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 348 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList); 349 wnqNode->params[0].p = asmap->qInfo; 350 wnqNode->params[1].p = xorNode->results[1]; 351 wnqNode->params[2].v = parityStripeID; 352 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 353 /* parityInfo must describe entire parity unit */ 354 RF_ASSERT(asmap->parityInfo->next == NULL); 355 } 356 /* 357 * Connect nodes to form graph. 358 */ 359 360 /* connect dag header to block node */ 361 RF_ASSERT(blockNode->numAntecedents == 0); 362 dag_h->succedents[0] = blockNode; 363 364 if (nRodNodes > 0) { 365 /* connect the block node to the Rod nodes */ 366 RF_ASSERT(blockNode->numSuccedents == nRodNodes); 367 RF_ASSERT(xorNode->numAntecedents == nRodNodes); 368 for (i = 0; i < nRodNodes; i++) { 369 RF_ASSERT(rodNodes[i].numAntecedents == 1); 370 blockNode->succedents[i] = &rodNodes[i]; 371 rodNodes[i].antecedents[0] = blockNode; 372 rodNodes[i].antType[0] = rf_control; 373 374 /* connect the Rod nodes to the Xor node */ 375 RF_ASSERT(rodNodes[i].numSuccedents == 1); 376 rodNodes[i].succedents[0] = xorNode; 377 xorNode->antecedents[i] = &rodNodes[i]; 378 xorNode->antType[i] = rf_trueData; 379 } 380 } else { 381 /* connect the block node to the Xor node */ 382 RF_ASSERT(blockNode->numSuccedents == 1); 383 RF_ASSERT(xorNode->numAntecedents == 1); 384 blockNode->succedents[0] = xorNode; 385 xorNode->antecedents[0] = blockNode; 386 xorNode->antType[0] = rf_control; 387 } 388 389 /* connect the xor node to the commit node */ 390 RF_ASSERT(xorNode->numSuccedents == 1); 391 RF_ASSERT(commitNode->numAntecedents == 1); 392 xorNode->succedents[0] = commitNode; 393 commitNode->antecedents[0] = xorNode; 394 commitNode->antType[0] = rf_control; 395 396 /* connect the commit node to the write nodes */ 397 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults); 398 for (i = 0; i < nWndNodes; i++) { 399 RF_ASSERT(wndNodes->numAntecedents == 1); 400 commitNode->succedents[i] = &wndNodes[i]; 401 wndNodes[i].antecedents[0] = commitNode; 402 wndNodes[i].antType[0] = rf_control; 403 } 404 RF_ASSERT(wnpNode->numAntecedents == 1); 405 commitNode->succedents[nWndNodes] = wnpNode; 406 wnpNode->antecedents[0] = commitNode; 407 wnpNode->antType[0] = rf_trueData; 408 if (nfaults == 2) { 409 RF_ASSERT(wnqNode->numAntecedents == 1); 410 commitNode->succedents[nWndNodes + 1] = wnqNode; 411 wnqNode->antecedents[0] = commitNode; 412 wnqNode->antType[0] = rf_trueData; 413 } 414 /* connect the write nodes to the term node */ 415 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults); 416 RF_ASSERT(termNode->numSuccedents == 0); 417 for (i = 0; i < nWndNodes; i++) { 418 RF_ASSERT(wndNodes->numSuccedents == 1); 419 wndNodes[i].succedents[0] = termNode; 420 termNode->antecedents[i] = &wndNodes[i]; 421 termNode->antType[i] = rf_control; 422 } 423 RF_ASSERT(wnpNode->numSuccedents == 1); 424 wnpNode->succedents[0] = termNode; 425 termNode->antecedents[nWndNodes] = wnpNode; 426 termNode->antType[nWndNodes] = rf_control; 427 if (nfaults == 2) { 428 RF_ASSERT(wnqNode->numSuccedents == 1); 429 wnqNode->succedents[0] = termNode; 430 termNode->antecedents[nWndNodes + 1] = wnqNode; 431 termNode->antType[nWndNodes + 1] = rf_control; 432 } 433} 434/****************************************************************************** 435 * 436 * creates a DAG to perform a small-write operation (either raid 5 or pq), 437 * which is as follows: 438 * 439 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm 440 * \- Rod X / \----> Wnd [Und]-/ 441 * [\- Rod X / \---> Wnd [Und]-/] 442 * [\- Roq -> Q / \--> Wnq [Unq]-/] 443 * 444 * Rop = read old parity 445 * Rod = read old data 446 * Roq = read old "q" 447 * Cmt = commit node 448 * Und = unlock data disk 449 * Unp = unlock parity disk 450 * Unq = unlock q disk 451 * Wnp = write new parity 452 * Wnd = write new data 453 * Wnq = write new "q" 454 * [ ] denotes optional segments in the graph 455 * 456 * Parameters: raidPtr - description of the physical array 457 * asmap - logical & physical addresses for this access 458 * bp - buffer ptr (holds write data) 459 * flags - general flags (e.g. disk locking) 460 * allocList - list of memory allocated in DAG creation 461 * pfuncs - list of parity generating functions 462 * qfuncs - list of q generating functions 463 * 464 * A null qfuncs indicates single fault tolerant 465 *****************************************************************************/ 466 467void 468rf_CommonCreateSmallWriteDAG( 469 RF_Raid_t * raidPtr, 470 RF_AccessStripeMap_t * asmap, 471 RF_DagHeader_t * dag_h, 472 void *bp, 473 RF_RaidAccessFlags_t flags, 474 RF_AllocListElem_t * allocList, 475 const RF_RedFuncs_t * pfuncs, 476 const RF_RedFuncs_t * qfuncs) 477{ 478 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode; 479 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes; 480 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes; 481 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes; 482 int i, j, nNodes, totalNumNodes, lu_flag; 483 RF_ReconUnitNum_t which_ru; 484 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *); 485 int (*qfunc) (RF_DagNode_t *); 486 int numDataNodes, numParityNodes; 487 RF_StripeNum_t parityStripeID; 488 RF_PhysDiskAddr_t *pda; 489 char *name, *qname; 490 long nfaults; 491 492 nfaults = qfuncs ? 2 : 1; 493 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */ 494 495 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 496 asmap->raidAddress, &which_ru); 497 pda = asmap->physInfo; 498 numDataNodes = asmap->numStripeUnitsAccessed; 499 numParityNodes = (asmap->parityInfo->next) ? 2 : 1; 500 501 if (rf_dagDebug) { 502 printf("[Creating small-write DAG]\n"); 503 } 504 RF_ASSERT(numDataNodes > 0); 505 dag_h->creator = "SmallWriteDAG"; 506 507 dag_h->numCommitNodes = 1; 508 dag_h->numCommits = 0; 509 dag_h->numSuccedents = 1; 510 511 /* 512 * DAG creation occurs in four steps: 513 * 1. count the number of nodes in the DAG 514 * 2. create the nodes 515 * 3. initialize the nodes 516 * 4. connect the nodes 517 */ 518 519 /* 520 * Step 1. compute number of nodes in the graph 521 */ 522 523 /* number of nodes: a read and write for each data unit a redundancy 524 * computation node for each parity node (nfaults * nparity) a read 525 * and write for each parity unit a block and commit node (2) a 526 * terminate node if atomic RMW an unlock node for each data unit, 527 * redundancy unit */ 528 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) 529 + (nfaults * 2 * numParityNodes) + 3; 530 if (lu_flag) { 531 totalNumNodes += (numDataNodes + (nfaults * numParityNodes)); 532 } 533 /* 534 * Step 2. create the nodes 535 */ 536 RF_MallocAndAdd(nodes, totalNumNodes * sizeof(RF_DagNode_t), 537 (RF_DagNode_t *), allocList); 538 i = 0; 539 blockNode = &nodes[i]; 540 i += 1; 541 commitNode = &nodes[i]; 542 i += 1; 543 readDataNodes = &nodes[i]; 544 i += numDataNodes; 545 readParityNodes = &nodes[i]; 546 i += numParityNodes; 547 writeDataNodes = &nodes[i]; 548 i += numDataNodes; 549 writeParityNodes = &nodes[i]; 550 i += numParityNodes; 551 xorNodes = &nodes[i]; 552 i += numParityNodes; 553 termNode = &nodes[i]; 554 i += 1; 555 if (lu_flag) { 556 unlockDataNodes = &nodes[i]; 557 i += numDataNodes; 558 unlockParityNodes = &nodes[i]; 559 i += numParityNodes; 560 } else { 561 unlockDataNodes = unlockParityNodes = NULL; 562 } 563 if (nfaults == 2) { 564 readQNodes = &nodes[i]; 565 i += numParityNodes; 566 writeQNodes = &nodes[i]; 567 i += numParityNodes; 568 qNodes = &nodes[i]; 569 i += numParityNodes; 570 if (lu_flag) { 571 unlockQNodes = &nodes[i]; 572 i += numParityNodes; 573 } else { 574 unlockQNodes = NULL; 575 } 576 } else { 577 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL; 578 } 579 RF_ASSERT(i == totalNumNodes); 580 581 /* 582 * Step 3. initialize the nodes 583 */ 584 /* initialize block node (Nil) */ 585 nNodes = numDataNodes + (nfaults * numParityNodes); 586 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 587 NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList); 588 589 /* initialize commit node (Cmt) */ 590 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 591 NULL, nNodes, (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList); 592 593 /* initialize terminate node (Trm) */ 594 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, 595 NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList); 596 597 /* initialize nodes which read old data (Rod) */ 598 for (i = 0; i < numDataNodes; i++) { 599 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, 600 rf_GenericWakeupFunc, (nfaults * numParityNodes), 1, 4, 0, dag_h, 601 "Rod", allocList); 602 RF_ASSERT(pda != NULL); 603 /* physical disk addr desc */ 604 readDataNodes[i].params[0].p = pda; 605 /* buffer to hold old data */ 606 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, 607 dag_h, pda, allocList); 608 readDataNodes[i].params[2].v = parityStripeID; 609 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 610 lu_flag, 0, which_ru); 611 pda = pda->next; 612 for (j = 0; j < readDataNodes[i].numSuccedents; j++) { 613 readDataNodes[i].propList[j] = NULL; 614 } 615 } 616 617 /* initialize nodes which read old parity (Rop) */ 618 pda = asmap->parityInfo; 619 i = 0; 620 for (i = 0; i < numParityNodes; i++) { 621 RF_ASSERT(pda != NULL); 622 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, 623 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 624 0, dag_h, "Rop", allocList); 625 readParityNodes[i].params[0].p = pda; 626 /* buffer to hold old parity */ 627 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, 628 dag_h, pda, allocList); 629 readParityNodes[i].params[2].v = parityStripeID; 630 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 631 lu_flag, 0, which_ru); 632 pda = pda->next; 633 for (j = 0; j < readParityNodes[i].numSuccedents; j++) { 634 readParityNodes[i].propList[0] = NULL; 635 } 636 } 637 638 /* initialize nodes which read old Q (Roq) */ 639 if (nfaults == 2) { 640 pda = asmap->qInfo; 641 for (i = 0; i < numParityNodes; i++) { 642 RF_ASSERT(pda != NULL); 643 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, 644 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList); 645 readQNodes[i].params[0].p = pda; 646 /* buffer to hold old Q */ 647 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, 648 allocList); 649 readQNodes[i].params[2].v = parityStripeID; 650 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 651 lu_flag, 0, which_ru); 652 pda = pda->next; 653 for (j = 0; j < readQNodes[i].numSuccedents; j++) { 654 readQNodes[i].propList[0] = NULL; 655 } 656 } 657 } 658 /* initialize nodes which write new data (Wnd) */ 659 pda = asmap->physInfo; 660 for (i = 0; i < numDataNodes; i++) { 661 RF_ASSERT(pda != NULL); 662 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, 663 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 664 "Wnd", allocList); 665 /* physical disk addr desc */ 666 writeDataNodes[i].params[0].p = pda; 667 /* buffer holding new data to be written */ 668 writeDataNodes[i].params[1].p = pda->bufPtr; 669 writeDataNodes[i].params[2].v = parityStripeID; 670 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 671 0, 0, which_ru); 672 if (lu_flag) { 673 /* initialize node to unlock the disk queue */ 674 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, 675 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, 676 "Und", allocList); 677 /* physical disk addr desc */ 678 unlockDataNodes[i].params[0].p = pda; 679 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 680 0, lu_flag, which_ru); 681 } 682 pda = pda->next; 683 } 684 685 /* 686 * Initialize nodes which compute new parity and Q. 687 */ 688 /* 689 * We use the simple XOR func in the double-XOR case, and when 690 * we're accessing only a portion of one stripe unit. The distinction 691 * between the two is that the regular XOR func assumes that the targbuf 692 * is a full SU in size, and examines the pda associated with the buffer 693 * to decide where within the buffer to XOR the data, whereas 694 * the simple XOR func just XORs the data into the start of the buffer. 695 */ 696 if ((numParityNodes == 2) || ((numDataNodes == 1) 697 && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) { 698 func = pfuncs->simple; 699 undoFunc = rf_NullNodeUndoFunc; 700 name = pfuncs->SimpleName; 701 if (qfuncs) { 702 qfunc = qfuncs->simple; 703 qname = qfuncs->SimpleName; 704 } else { 705 qfunc = NULL; 706 qname = NULL; 707 } 708 } else { 709 func = pfuncs->regular; 710 undoFunc = rf_NullNodeUndoFunc; 711 name = pfuncs->RegularName; 712 if (qfuncs) { 713 qfunc = qfuncs->regular; 714 qname = qfuncs->RegularName; 715 } else { 716 qfunc = NULL; 717 qname = NULL; 718 } 719 } 720 /* 721 * Initialize the xor nodes: params are {pda,buf} 722 * from {Rod,Wnd,Rop} nodes, and raidPtr 723 */ 724 if (numParityNodes == 2) { 725 /* double-xor case */ 726 for (i = 0; i < numParityNodes; i++) { 727 /* note: no wakeup func for xor */ 728 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, 729 1, (numDataNodes + numParityNodes), 7, 1, dag_h, name, allocList); 730 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD; 731 xorNodes[i].params[0] = readDataNodes[i].params[0]; 732 xorNodes[i].params[1] = readDataNodes[i].params[1]; 733 xorNodes[i].params[2] = readParityNodes[i].params[0]; 734 xorNodes[i].params[3] = readParityNodes[i].params[1]; 735 xorNodes[i].params[4] = writeDataNodes[i].params[0]; 736 xorNodes[i].params[5] = writeDataNodes[i].params[1]; 737 xorNodes[i].params[6].p = raidPtr; 738 /* use old parity buf as target buf */ 739 xorNodes[i].results[0] = readParityNodes[i].params[1].p; 740 if (nfaults == 2) { 741 /* note: no wakeup func for qor */ 742 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1, 743 (numDataNodes + numParityNodes), 7, 1, dag_h, qname, allocList); 744 qNodes[i].params[0] = readDataNodes[i].params[0]; 745 qNodes[i].params[1] = readDataNodes[i].params[1]; 746 qNodes[i].params[2] = readQNodes[i].params[0]; 747 qNodes[i].params[3] = readQNodes[i].params[1]; 748 qNodes[i].params[4] = writeDataNodes[i].params[0]; 749 qNodes[i].params[5] = writeDataNodes[i].params[1]; 750 qNodes[i].params[6].p = raidPtr; 751 /* use old Q buf as target buf */ 752 qNodes[i].results[0] = readQNodes[i].params[1].p; 753 } 754 } 755 } else { 756 /* there is only one xor node in this case */ 757 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, 1, 758 (numDataNodes + numParityNodes), 759 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList); 760 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD; 761 for (i = 0; i < numDataNodes + 1; i++) { 762 /* set up params related to Rod and Rop nodes */ 763 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 764 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */ 765 } 766 for (i = 0; i < numDataNodes; i++) { 767 /* set up params related to Wnd and Wnp nodes */ 768 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 769 writeDataNodes[i].params[0]; 770 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 771 writeDataNodes[i].params[1]; 772 } 773 /* xor node needs to get at RAID information */ 774 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 775 xorNodes[0].results[0] = readParityNodes[0].params[1].p; 776 if (nfaults == 2) { 777 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1, 778 (numDataNodes + numParityNodes), 779 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, 780 qname, allocList); 781 for (i = 0; i < numDataNodes; i++) { 782 /* set up params related to Rod */ 783 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 784 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */ 785 } 786 /* and read old q */ 787 qNodes[0].params[2 * numDataNodes + 0] = /* pda */ 788 readQNodes[0].params[0]; 789 qNodes[0].params[2 * numDataNodes + 1] = /* buffer ptr */ 790 readQNodes[0].params[1]; 791 for (i = 0; i < numDataNodes; i++) { 792 /* set up params related to Wnd nodes */ 793 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 794 writeDataNodes[i].params[0]; 795 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 796 writeDataNodes[i].params[1]; 797 } 798 /* xor node needs to get at RAID information */ 799 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 800 qNodes[0].results[0] = readQNodes[0].params[1].p; 801 } 802 } 803 804 /* initialize nodes which write new parity (Wnp) */ 805 pda = asmap->parityInfo; 806 for (i = 0; i < numParityNodes; i++) { 807 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, 808 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 809 "Wnp", allocList); 810 RF_ASSERT(pda != NULL); 811 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) 812 * filled in by xor node */ 813 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for 814 * parity write 815 * operation */ 816 writeParityNodes[i].params[2].v = parityStripeID; 817 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 818 0, 0, which_ru); 819 if (lu_flag) { 820 /* initialize node to unlock the disk queue */ 821 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, 822 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, 823 "Unp", allocList); 824 unlockParityNodes[i].params[0].p = pda; /* physical disk addr 825 * desc */ 826 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 827 0, lu_flag, which_ru); 828 } 829 pda = pda->next; 830 } 831 832 /* initialize nodes which write new Q (Wnq) */ 833 if (nfaults == 2) { 834 pda = asmap->qInfo; 835 for (i = 0; i < numParityNodes; i++) { 836 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, 837 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 838 "Wnq", allocList); 839 RF_ASSERT(pda != NULL); 840 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) 841 * filled in by xor node */ 842 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for 843 * parity write 844 * operation */ 845 writeQNodes[i].params[2].v = parityStripeID; 846 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 847 0, 0, which_ru); 848 if (lu_flag) { 849 /* initialize node to unlock the disk queue */ 850 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, 851 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, 852 "Unq", allocList); 853 unlockQNodes[i].params[0].p = pda; /* physical disk addr 854 * desc */ 855 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 856 0, lu_flag, which_ru); 857 } 858 pda = pda->next; 859 } 860 } 861 /* 862 * Step 4. connect the nodes. 863 */ 864 865 /* connect header to block node */ 866 dag_h->succedents[0] = blockNode; 867 868 /* connect block node to read old data nodes */ 869 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults))); 870 for (i = 0; i < numDataNodes; i++) { 871 blockNode->succedents[i] = &readDataNodes[i]; 872 RF_ASSERT(readDataNodes[i].numAntecedents == 1); 873 readDataNodes[i].antecedents[0] = blockNode; 874 readDataNodes[i].antType[0] = rf_control; 875 } 876 877 /* connect block node to read old parity nodes */ 878 for (i = 0; i < numParityNodes; i++) { 879 blockNode->succedents[numDataNodes + i] = &readParityNodes[i]; 880 RF_ASSERT(readParityNodes[i].numAntecedents == 1); 881 readParityNodes[i].antecedents[0] = blockNode; 882 readParityNodes[i].antType[0] = rf_control; 883 } 884 885 /* connect block node to read old Q nodes */ 886 if (nfaults == 2) { 887 for (i = 0; i < numParityNodes; i++) { 888 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i]; 889 RF_ASSERT(readQNodes[i].numAntecedents == 1); 890 readQNodes[i].antecedents[0] = blockNode; 891 readQNodes[i].antType[0] = rf_control; 892 } 893 } 894 /* connect read old data nodes to xor nodes */ 895 for (i = 0; i < numDataNodes; i++) { 896 RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes)); 897 for (j = 0; j < numParityNodes; j++) { 898 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes); 899 readDataNodes[i].succedents[j] = &xorNodes[j]; 900 xorNodes[j].antecedents[i] = &readDataNodes[i]; 901 xorNodes[j].antType[i] = rf_trueData; 902 } 903 } 904 905 /* connect read old data nodes to q nodes */ 906 if (nfaults == 2) { 907 for (i = 0; i < numDataNodes; i++) { 908 for (j = 0; j < numParityNodes; j++) { 909 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes); 910 readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j]; 911 qNodes[j].antecedents[i] = &readDataNodes[i]; 912 qNodes[j].antType[i] = rf_trueData; 913 } 914 } 915 } 916 /* connect read old parity nodes to xor nodes */ 917 for (i = 0; i < numParityNodes; i++) { 918 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 919 for (j = 0; j < numParityNodes; j++) { 920 readParityNodes[i].succedents[j] = &xorNodes[j]; 921 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i]; 922 xorNodes[j].antType[numDataNodes + i] = rf_trueData; 923 } 924 } 925 926 /* connect read old q nodes to q nodes */ 927 if (nfaults == 2) { 928 for (i = 0; i < numParityNodes; i++) { 929 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 930 for (j = 0; j < numParityNodes; j++) { 931 readQNodes[i].succedents[j] = &qNodes[j]; 932 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i]; 933 qNodes[j].antType[numDataNodes + i] = rf_trueData; 934 } 935 } 936 } 937 /* connect xor nodes to commit node */ 938 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes)); 939 for (i = 0; i < numParityNodes; i++) { 940 RF_ASSERT(xorNodes[i].numSuccedents == 1); 941 xorNodes[i].succedents[0] = commitNode; 942 commitNode->antecedents[i] = &xorNodes[i]; 943 commitNode->antType[i] = rf_control; 944 } 945 946 /* connect q nodes to commit node */ 947 if (nfaults == 2) { 948 for (i = 0; i < numParityNodes; i++) { 949 RF_ASSERT(qNodes[i].numSuccedents == 1); 950 qNodes[i].succedents[0] = commitNode; 951 commitNode->antecedents[i + numParityNodes] = &qNodes[i]; 952 commitNode->antType[i + numParityNodes] = rf_control; 953 } 954 } 955 /* connect commit node to write nodes */ 956 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes))); 957 for (i = 0; i < numDataNodes; i++) { 958 RF_ASSERT(writeDataNodes[i].numAntecedents == 1); 959 commitNode->succedents[i] = &writeDataNodes[i]; 960 writeDataNodes[i].antecedents[0] = commitNode; 961 writeDataNodes[i].antType[0] = rf_trueData; 962 } 963 for (i = 0; i < numParityNodes; i++) { 964 RF_ASSERT(writeParityNodes[i].numAntecedents == 1); 965 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i]; 966 writeParityNodes[i].antecedents[0] = commitNode; 967 writeParityNodes[i].antType[0] = rf_trueData; 968 } 969 if (nfaults == 2) { 970 for (i = 0; i < numParityNodes; i++) { 971 RF_ASSERT(writeQNodes[i].numAntecedents == 1); 972 commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i]; 973 writeQNodes[i].antecedents[0] = commitNode; 974 writeQNodes[i].antType[0] = rf_trueData; 975 } 976 } 977 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 978 RF_ASSERT(termNode->numSuccedents == 0); 979 for (i = 0; i < numDataNodes; i++) { 980 if (lu_flag) { 981 /* connect write new data nodes to unlock nodes */ 982 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 983 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1); 984 writeDataNodes[i].succedents[0] = &unlockDataNodes[i]; 985 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i]; 986 unlockDataNodes[i].antType[0] = rf_control; 987 988 /* connect unlock nodes to term node */ 989 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1); 990 unlockDataNodes[i].succedents[0] = termNode; 991 termNode->antecedents[i] = &unlockDataNodes[i]; 992 termNode->antType[i] = rf_control; 993 } else { 994 /* connect write new data nodes to term node */ 995 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 996 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 997 writeDataNodes[i].succedents[0] = termNode; 998 termNode->antecedents[i] = &writeDataNodes[i]; 999 termNode->antType[i] = rf_control; 1000 } 1001 } 1002 1003 for (i = 0; i < numParityNodes; i++) { 1004 if (lu_flag) { 1005 /* connect write new parity nodes to unlock nodes */ 1006 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 1007 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1); 1008 writeParityNodes[i].succedents[0] = &unlockParityNodes[i]; 1009 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i]; 1010 unlockParityNodes[i].antType[0] = rf_control; 1011 1012 /* connect unlock nodes to term node */ 1013 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1); 1014 unlockParityNodes[i].succedents[0] = termNode; 1015 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i]; 1016 termNode->antType[numDataNodes + i] = rf_control; 1017 } else { 1018 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 1019 writeParityNodes[i].succedents[0] = termNode; 1020 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i]; 1021 termNode->antType[numDataNodes + i] = rf_control; 1022 } 1023 } 1024 1025 if (nfaults == 2) { 1026 for (i = 0; i < numParityNodes; i++) { 1027 if (lu_flag) { 1028 /* connect write new Q nodes to unlock nodes */ 1029 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1030 RF_ASSERT(unlockQNodes[i].numAntecedents == 1); 1031 writeQNodes[i].succedents[0] = &unlockQNodes[i]; 1032 unlockQNodes[i].antecedents[0] = &writeQNodes[i]; 1033 unlockQNodes[i].antType[0] = rf_control; 1034 1035 /* connect unlock nodes to unblock node */ 1036 RF_ASSERT(unlockQNodes[i].numSuccedents == 1); 1037 unlockQNodes[i].succedents[0] = termNode; 1038 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i]; 1039 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1040 } else { 1041 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1042 writeQNodes[i].succedents[0] = termNode; 1043 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i]; 1044 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1045 } 1046 } 1047 } 1048} 1049 1050 1051/****************************************************************************** 1052 * create a write graph (fault-free or degraded) for RAID level 1 1053 * 1054 * Hdr -> Commit -> Wpd -> Nil -> Trm 1055 * -> Wsd -> 1056 * 1057 * The "Wpd" node writes data to the primary copy in the mirror pair 1058 * The "Wsd" node writes data to the secondary copy in the mirror pair 1059 * 1060 * Parameters: raidPtr - description of the physical array 1061 * asmap - logical & physical addresses for this access 1062 * bp - buffer ptr (holds write data) 1063 * flags - general flags (e.g. disk locking) 1064 * allocList - list of memory allocated in DAG creation 1065 *****************************************************************************/ 1066 1067void 1068rf_CreateRaidOneWriteDAG( 1069 RF_Raid_t * raidPtr, 1070 RF_AccessStripeMap_t * asmap, 1071 RF_DagHeader_t * dag_h, 1072 void *bp, 1073 RF_RaidAccessFlags_t flags, 1074 RF_AllocListElem_t * allocList) 1075{ 1076 RF_DagNode_t *unblockNode, *termNode, *commitNode; 1077 RF_DagNode_t *nodes, *wndNode, *wmirNode; 1078 int nWndNodes, nWmirNodes, i; 1079 RF_ReconUnitNum_t which_ru; 1080 RF_PhysDiskAddr_t *pda, *pdaP; 1081 RF_StripeNum_t parityStripeID; 1082 1083 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 1084 asmap->raidAddress, &which_ru); 1085 if (rf_dagDebug) { 1086 printf("[Creating RAID level 1 write DAG]\n"); 1087 } 1088 dag_h->creator = "RaidOneWriteDAG"; 1089 1090 /* 2 implies access not SU aligned */ 1091 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; 1092 nWndNodes = (asmap->physInfo->next) ? 2 : 1; 1093 1094 /* alloc the Wnd nodes and the Wmir node */ 1095 if (asmap->numDataFailed == 1) 1096 nWndNodes--; 1097 if (asmap->numParityFailed == 1) 1098 nWmirNodes--; 1099 1100 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock 1101 * + terminator) */ 1102 RF_MallocAndAdd(nodes, 1103 (nWndNodes + nWmirNodes + 3) * sizeof(RF_DagNode_t), 1104 (RF_DagNode_t *), allocList); 1105 i = 0; 1106 wndNode = &nodes[i]; 1107 i += nWndNodes; 1108 wmirNode = &nodes[i]; 1109 i += nWmirNodes; 1110 commitNode = &nodes[i]; 1111 i += 1; 1112 unblockNode = &nodes[i]; 1113 i += 1; 1114 termNode = &nodes[i]; 1115 i += 1; 1116 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3)); 1117 1118 /* this dag can commit immediately */ 1119 dag_h->numCommitNodes = 1; 1120 dag_h->numCommits = 0; 1121 dag_h->numSuccedents = 1; 1122 1123 /* initialize the commit, unblock, and term nodes */ 1124 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 1125 NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Cmt", allocList); 1126 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 1127 NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList); 1128 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, 1129 NULL, 0, 1, 0, 0, dag_h, "Trm", allocList); 1130 1131 /* initialize the wnd nodes */ 1132 if (nWndNodes > 0) { 1133 pda = asmap->physInfo; 1134 for (i = 0; i < nWndNodes; i++) { 1135 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1136 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList); 1137 RF_ASSERT(pda != NULL); 1138 wndNode[i].params[0].p = pda; 1139 wndNode[i].params[1].p = pda->bufPtr; 1140 wndNode[i].params[2].v = parityStripeID; 1141 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1142 pda = pda->next; 1143 } 1144 RF_ASSERT(pda == NULL); 1145 } 1146 /* initialize the mirror nodes */ 1147 if (nWmirNodes > 0) { 1148 pda = asmap->physInfo; 1149 pdaP = asmap->parityInfo; 1150 for (i = 0; i < nWmirNodes; i++) { 1151 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1152 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList); 1153 RF_ASSERT(pda != NULL); 1154 wmirNode[i].params[0].p = pdaP; 1155 wmirNode[i].params[1].p = pda->bufPtr; 1156 wmirNode[i].params[2].v = parityStripeID; 1157 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1158 pda = pda->next; 1159 pdaP = pdaP->next; 1160 } 1161 RF_ASSERT(pda == NULL); 1162 RF_ASSERT(pdaP == NULL); 1163 } 1164 /* link the header node to the commit node */ 1165 RF_ASSERT(dag_h->numSuccedents == 1); 1166 RF_ASSERT(commitNode->numAntecedents == 0); 1167 dag_h->succedents[0] = commitNode; 1168 1169 /* link the commit node to the write nodes */ 1170 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes)); 1171 for (i = 0; i < nWndNodes; i++) { 1172 RF_ASSERT(wndNode[i].numAntecedents == 1); 1173 commitNode->succedents[i] = &wndNode[i]; 1174 wndNode[i].antecedents[0] = commitNode; 1175 wndNode[i].antType[0] = rf_control; 1176 } 1177 for (i = 0; i < nWmirNodes; i++) { 1178 RF_ASSERT(wmirNode[i].numAntecedents == 1); 1179 commitNode->succedents[i + nWndNodes] = &wmirNode[i]; 1180 wmirNode[i].antecedents[0] = commitNode; 1181 wmirNode[i].antType[0] = rf_control; 1182 } 1183 1184 /* link the write nodes to the unblock node */ 1185 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes)); 1186 for (i = 0; i < nWndNodes; i++) { 1187 RF_ASSERT(wndNode[i].numSuccedents == 1); 1188 wndNode[i].succedents[0] = unblockNode; 1189 unblockNode->antecedents[i] = &wndNode[i]; 1190 unblockNode->antType[i] = rf_control; 1191 } 1192 for (i = 0; i < nWmirNodes; i++) { 1193 RF_ASSERT(wmirNode[i].numSuccedents == 1); 1194 wmirNode[i].succedents[0] = unblockNode; 1195 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i]; 1196 unblockNode->antType[i + nWndNodes] = rf_control; 1197 } 1198 1199 /* link the unblock node to the term node */ 1200 RF_ASSERT(unblockNode->numSuccedents == 1); 1201 RF_ASSERT(termNode->numAntecedents == 1); 1202 RF_ASSERT(termNode->numSuccedents == 0); 1203 unblockNode->succedents[0] = termNode; 1204 termNode->antecedents[0] = unblockNode; 1205 termNode->antType[0] = rf_control; 1206} 1207