rf_dagffwr.c revision 1.23
1/* $NetBSD: rf_dagffwr.c,v 1.23 2004/03/20 04:22:05 oster Exp $ */ 2/* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29/* 30 * rf_dagff.c 31 * 32 * code for creating fault-free DAGs 33 * 34 */ 35 36#include <sys/cdefs.h> 37__KERNEL_RCSID(0, "$NetBSD: rf_dagffwr.c,v 1.23 2004/03/20 04:22:05 oster Exp $"); 38 39#include <dev/raidframe/raidframevar.h> 40 41#include "rf_raid.h" 42#include "rf_dag.h" 43#include "rf_dagutils.h" 44#include "rf_dagfuncs.h" 45#include "rf_debugMem.h" 46#include "rf_dagffrd.h" 47#include "rf_general.h" 48#include "rf_dagffwr.h" 49#include "rf_map.h" 50 51/****************************************************************************** 52 * 53 * General comments on DAG creation: 54 * 55 * All DAGs in this file use roll-away error recovery. Each DAG has a single 56 * commit node, usually called "Cmt." If an error occurs before the Cmt node 57 * is reached, the execution engine will halt forward execution and work 58 * backward through the graph, executing the undo functions. Assuming that 59 * each node in the graph prior to the Cmt node are undoable and atomic - or - 60 * does not make changes to permanent state, the graph will fail atomically. 61 * If an error occurs after the Cmt node executes, the engine will roll-forward 62 * through the graph, blindly executing nodes until it reaches the end. 63 * If a graph reaches the end, it is assumed to have completed successfully. 64 * 65 * A graph has only 1 Cmt node. 66 * 67 */ 68 69 70/****************************************************************************** 71 * 72 * The following wrappers map the standard DAG creation interface to the 73 * DAG creation routines. Additionally, these wrappers enable experimentation 74 * with new DAG structures by providing an extra level of indirection, allowing 75 * the DAG creation routines to be replaced at this single point. 76 */ 77 78 79void 80rf_CreateNonRedundantWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 81 RF_DagHeader_t *dag_h, void *bp, 82 RF_RaidAccessFlags_t flags, 83 RF_AllocListElem_t *allocList, 84 RF_IoType_t type) 85{ 86 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 87 RF_IO_TYPE_WRITE); 88} 89 90void 91rf_CreateRAID0WriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 92 RF_DagHeader_t *dag_h, void *bp, 93 RF_RaidAccessFlags_t flags, 94 RF_AllocListElem_t *allocList, 95 RF_IoType_t type) 96{ 97 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 98 RF_IO_TYPE_WRITE); 99} 100 101void 102rf_CreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 103 RF_DagHeader_t *dag_h, void *bp, 104 RF_RaidAccessFlags_t flags, 105 RF_AllocListElem_t *allocList) 106{ 107 /* "normal" rollaway */ 108 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, 109 allocList, &rf_xorFuncs, NULL); 110} 111 112void 113rf_CreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 114 RF_DagHeader_t *dag_h, void *bp, 115 RF_RaidAccessFlags_t flags, 116 RF_AllocListElem_t *allocList) 117{ 118 /* "normal" rollaway */ 119 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, 120 allocList, 1, rf_RegularXorFunc, RF_TRUE); 121} 122 123 124/****************************************************************************** 125 * 126 * DAG creation code begins here 127 */ 128 129 130/****************************************************************************** 131 * 132 * creates a DAG to perform a large-write operation: 133 * 134 * / Rod \ / Wnd \ 135 * H -- block- Rod - Xor - Cmt - Wnd --- T 136 * \ Rod / \ Wnp / 137 * \[Wnq]/ 138 * 139 * The XOR node also does the Q calculation in the P+Q architecture. 140 * All nodes are before the commit node (Cmt) are assumed to be atomic and 141 * undoable - or - they make no changes to permanent state. 142 * 143 * Rod = read old data 144 * Cmt = commit node 145 * Wnp = write new parity 146 * Wnd = write new data 147 * Wnq = write new "q" 148 * [] denotes optional segments in the graph 149 * 150 * Parameters: raidPtr - description of the physical array 151 * asmap - logical & physical addresses for this access 152 * bp - buffer ptr (holds write data) 153 * flags - general flags (e.g. disk locking) 154 * allocList - list of memory allocated in DAG creation 155 * nfaults - number of faults array can tolerate 156 * (equal to # redundancy units in stripe) 157 * redfuncs - list of redundancy generating functions 158 * 159 *****************************************************************************/ 160 161void 162rf_CommonCreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 163 RF_DagHeader_t *dag_h, void *bp, 164 RF_RaidAccessFlags_t flags, 165 RF_AllocListElem_t *allocList, 166 int nfaults, int (*redFunc) (RF_DagNode_t *), 167 int allowBufferRecycle) 168{ 169 RF_DagNode_t *wndNodes, *rodNodes, *xorNode, *wnpNode, *tmpNode; 170 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode; 171 int nWndNodes, nRodNodes, i, nodeNum, asmNum; 172 RF_AccessStripeMapHeader_t *new_asm_h[2]; 173 RF_StripeNum_t parityStripeID; 174 char *sosBuffer, *eosBuffer; 175 RF_ReconUnitNum_t which_ru; 176 RF_RaidLayout_t *layoutPtr; 177 RF_PhysDiskAddr_t *pda; 178 RF_VoidPointerListElem_t *vple; 179 180 layoutPtr = &(raidPtr->Layout); 181 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, 182 asmap->raidAddress, 183 &which_ru); 184 185#if RF_DEBUG_DAG 186 if (rf_dagDebug) { 187 printf("[Creating large-write DAG]\n"); 188 } 189#endif 190 dag_h->creator = "LargeWriteDAG"; 191 192 dag_h->numCommitNodes = 1; 193 dag_h->numCommits = 0; 194 dag_h->numSuccedents = 1; 195 196 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */ 197 nWndNodes = asmap->numStripeUnitsAccessed; 198 199 for (i = 0; i < nWndNodes; i++) { 200 tmpNode = rf_AllocDAGNode(); 201 tmpNode->list_next = dag_h->nodes; 202 dag_h->nodes = tmpNode; 203 } 204 wndNodes = dag_h->nodes; 205 206 xorNode = rf_AllocDAGNode(); 207 xorNode->list_next = dag_h->nodes; 208 dag_h->nodes = xorNode; 209 210 wnpNode = rf_AllocDAGNode(); 211 wnpNode->list_next = dag_h->nodes; 212 dag_h->nodes = wnpNode; 213 214 blockNode = rf_AllocDAGNode(); 215 blockNode->list_next = dag_h->nodes; 216 dag_h->nodes = blockNode; 217 218 commitNode = rf_AllocDAGNode(); 219 commitNode->list_next = dag_h->nodes; 220 dag_h->nodes = commitNode; 221 222 termNode = rf_AllocDAGNode(); 223 termNode->list_next = dag_h->nodes; 224 dag_h->nodes = termNode; 225 226#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 227 if (nfaults == 2) { 228 wnqNode = rf_AllocDAGNode(); 229 } else { 230#endif 231 wnqNode = NULL; 232#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 233 } 234#endif 235 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, 236 new_asm_h, &nRodNodes, &sosBuffer, 237 &eosBuffer, allocList); 238 if (nRodNodes > 0) { 239 for (i = 0; i < nRodNodes; i++) { 240 tmpNode = rf_AllocDAGNode(); 241 tmpNode->list_next = dag_h->nodes; 242 dag_h->nodes = tmpNode; 243 } 244 rodNodes = dag_h->nodes; 245 } else { 246 rodNodes = NULL; 247 } 248 249 /* begin node initialization */ 250 if (nRodNodes > 0) { 251 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 252 rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, 253 dag_h, "Nil", allocList); 254 } else { 255 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 256 rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, 257 dag_h, "Nil", allocList); 258 } 259 260 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, 261 rf_NullNodeUndoFunc, NULL, nWndNodes + nfaults, 1, 0, 0, 262 dag_h, "Cmt", allocList); 263 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, 264 rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, 265 dag_h, "Trm", allocList); 266 267 /* initialize the Rod nodes */ 268 tmpNode = rodNodes; 269 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) { 270 if (new_asm_h[asmNum]) { 271 pda = new_asm_h[asmNum]->stripeMap->physInfo; 272 while (pda) { 273 rf_InitNode(tmpNode, rf_wait, 274 RF_FALSE, rf_DiskReadFunc, 275 rf_DiskReadUndoFunc, 276 rf_GenericWakeupFunc, 277 1, 1, 4, 0, dag_h, 278 "Rod", allocList); 279 tmpNode->params[0].p = pda; 280 tmpNode->params[1].p = pda->bufPtr; 281 tmpNode->params[2].v = parityStripeID; 282 tmpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 283 which_ru); 284 nodeNum++; 285 pda = pda->next; 286 tmpNode = tmpNode->list_next; 287 } 288 } 289 } 290 RF_ASSERT(nodeNum == nRodNodes); 291 292 /* initialize the wnd nodes */ 293 pda = asmap->physInfo; 294 tmpNode = wndNodes; 295 for (i = 0; i < nWndNodes; i++) { 296 rf_InitNode(tmpNode, rf_wait, RF_FALSE, 297 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 298 rf_GenericWakeupFunc, 1, 1, 4, 0, 299 dag_h, "Wnd", allocList); 300 RF_ASSERT(pda != NULL); 301 tmpNode->params[0].p = pda; 302 tmpNode->params[1].p = pda->bufPtr; 303 tmpNode->params[2].v = parityStripeID; 304 tmpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 305 pda = pda->next; 306 tmpNode = tmpNode->list_next; 307 } 308 309 /* initialize the redundancy node */ 310 if (nRodNodes > 0) { 311 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, 312 rf_NullNodeUndoFunc, NULL, 1, 313 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1, 314 nfaults, dag_h, "Xr ", allocList); 315 } else { 316 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, 317 rf_NullNodeUndoFunc, NULL, 1, 318 1, 2 * (nWndNodes + nRodNodes) + 1, 319 nfaults, dag_h, "Xr ", allocList); 320 } 321 xorNode->flags |= RF_DAGNODE_FLAG_YIELD; 322 tmpNode = wndNodes; 323 for (i = 0; i < nWndNodes; i++) { 324 /* pda */ 325 xorNode->params[2 * i + 0] = tmpNode->params[0]; 326 /* buf ptr */ 327 xorNode->params[2 * i + 1] = tmpNode->params[1]; 328 tmpNode = tmpNode->list_next; 329 } 330 tmpNode = rodNodes; 331 for (i = 0; i < nRodNodes; i++) { 332 /* pda */ 333 xorNode->params[2 * (nWndNodes + i) + 0] = tmpNode->params[0]; 334 /* buf ptr */ 335 xorNode->params[2 * (nWndNodes + i) + 1] = tmpNode->params[1]; 336 tmpNode = tmpNode->list_next; 337 } 338 /* xor node needs to get at RAID information */ 339 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; 340 341 /* 342 * Look for an Rod node that reads a complete SU. If none, 343 * alloc a buffer to receive the parity info. Note that we 344 * can't use a new data buffer because it will not have gotten 345 * written when the xor occurs. */ 346 if (allowBufferRecycle) { 347 tmpNode = rodNodes; 348 for (i = 0; i < nRodNodes; i++) { 349 if (((RF_PhysDiskAddr_t *) tmpNode->params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit) 350 break; 351 tmpNode = tmpNode->list_next; 352 } 353 } 354 if ((!allowBufferRecycle) || (i == nRodNodes)) { 355 xorNode->results[0] = rf_AllocIOBuffer(raidPtr, 356 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit)); 357 vple = rf_AllocVPListElem(); 358 vple->p = xorNode->results[0]; 359 vple->next = dag_h->iobufs; 360 dag_h->iobufs = vple; 361 } else { 362 /* this works because the only way we get here is if 363 allowBufferRecycle is true and we went through the 364 above for loop, and exited via the break before 365 i==nRodNodes was true. That means tmpNode will 366 still point to a valid node -- the one we want for 367 here! */ 368 xorNode->results[0] = tmpNode->params[1].p; 369 } 370 371 /* initialize the Wnp node */ 372 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, 373 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, 374 dag_h, "Wnp", allocList); 375 wnpNode->params[0].p = asmap->parityInfo; 376 wnpNode->params[1].p = xorNode->results[0]; 377 wnpNode->params[2].v = parityStripeID; 378 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 379 /* parityInfo must describe entire parity unit */ 380 RF_ASSERT(asmap->parityInfo->next == NULL); 381 382#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 383 if (nfaults == 2) { 384 /* 385 * We never try to recycle a buffer for the Q calcuation 386 * in addition to the parity. This would cause two buffers 387 * to get smashed during the P and Q calculation, guaranteeing 388 * one would be wrong. 389 */ 390 RF_MallocAndAdd(xorNode->results[1], 391 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), 392 (void *), allocList); 393 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, 394 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 395 1, 1, 4, 0, dag_h, "Wnq", allocList); 396 wnqNode->params[0].p = asmap->qInfo; 397 wnqNode->params[1].p = xorNode->results[1]; 398 wnqNode->params[2].v = parityStripeID; 399 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 400 /* parityInfo must describe entire parity unit */ 401 RF_ASSERT(asmap->parityInfo->next == NULL); 402 } 403#endif 404 /* 405 * Connect nodes to form graph. 406 */ 407 408 /* connect dag header to block node */ 409 RF_ASSERT(blockNode->numAntecedents == 0); 410 dag_h->succedents[0] = blockNode; 411 412 if (nRodNodes > 0) { 413 /* connect the block node to the Rod nodes */ 414 RF_ASSERT(blockNode->numSuccedents == nRodNodes); 415 RF_ASSERT(xorNode->numAntecedents == nRodNodes); 416 tmpNode = rodNodes; 417 for (i = 0; i < nRodNodes; i++) { 418 RF_ASSERT(tmpNode.numAntecedents == 1); 419 blockNode->succedents[i] = tmpNode; 420 tmpNode->antecedents[0] = blockNode; 421 tmpNode->antType[0] = rf_control; 422 423 /* connect the Rod nodes to the Xor node */ 424 RF_ASSERT(tmpNode.numSuccedents == 1); 425 tmpNode->succedents[0] = xorNode; 426 xorNode->antecedents[i] = tmpNode; 427 xorNode->antType[i] = rf_trueData; 428 tmpNode = tmpNode->list_next; 429 } 430 } else { 431 /* connect the block node to the Xor node */ 432 RF_ASSERT(blockNode->numSuccedents == 1); 433 RF_ASSERT(xorNode->numAntecedents == 1); 434 blockNode->succedents[0] = xorNode; 435 xorNode->antecedents[0] = blockNode; 436 xorNode->antType[0] = rf_control; 437 } 438 439 /* connect the xor node to the commit node */ 440 RF_ASSERT(xorNode->numSuccedents == 1); 441 RF_ASSERT(commitNode->numAntecedents == 1); 442 xorNode->succedents[0] = commitNode; 443 commitNode->antecedents[0] = xorNode; 444 commitNode->antType[0] = rf_control; 445 446 /* connect the commit node to the write nodes */ 447 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults); 448 tmpNode = wndNodes; 449 for (i = 0; i < nWndNodes; i++) { 450 RF_ASSERT(wndNodes->numAntecedents == 1); 451 commitNode->succedents[i] = tmpNode; 452 tmpNode->antecedents[0] = commitNode; 453 tmpNode->antType[0] = rf_control; 454 tmpNode = tmpNode->list_next; 455 } 456 RF_ASSERT(wnpNode->numAntecedents == 1); 457 commitNode->succedents[nWndNodes] = wnpNode; 458 wnpNode->antecedents[0] = commitNode; 459 wnpNode->antType[0] = rf_trueData; 460#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 461 if (nfaults == 2) { 462 RF_ASSERT(wnqNode->numAntecedents == 1); 463 commitNode->succedents[nWndNodes + 1] = wnqNode; 464 wnqNode->antecedents[0] = commitNode; 465 wnqNode->antType[0] = rf_trueData; 466 } 467#endif 468 /* connect the write nodes to the term node */ 469 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults); 470 RF_ASSERT(termNode->numSuccedents == 0); 471 tmpNode = wndNodes; 472 for (i = 0; i < nWndNodes; i++) { 473 RF_ASSERT(wndNodes->numSuccedents == 1); 474 tmpNode->succedents[0] = termNode; 475 termNode->antecedents[i] = tmpNode; 476 termNode->antType[i] = rf_control; 477 tmpNode = tmpNode->list_next; 478 } 479 RF_ASSERT(wnpNode->numSuccedents == 1); 480 wnpNode->succedents[0] = termNode; 481 termNode->antecedents[nWndNodes] = wnpNode; 482 termNode->antType[nWndNodes] = rf_control; 483#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 484 if (nfaults == 2) { 485 RF_ASSERT(wnqNode->numSuccedents == 1); 486 wnqNode->succedents[0] = termNode; 487 termNode->antecedents[nWndNodes + 1] = wnqNode; 488 termNode->antType[nWndNodes + 1] = rf_control; 489 } 490#endif 491} 492/****************************************************************************** 493 * 494 * creates a DAG to perform a small-write operation (either raid 5 or pq), 495 * which is as follows: 496 * 497 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm 498 * \- Rod X / \----> Wnd [Und]-/ 499 * [\- Rod X / \---> Wnd [Und]-/] 500 * [\- Roq -> Q / \--> Wnq [Unq]-/] 501 * 502 * Rop = read old parity 503 * Rod = read old data 504 * Roq = read old "q" 505 * Cmt = commit node 506 * Und = unlock data disk 507 * Unp = unlock parity disk 508 * Unq = unlock q disk 509 * Wnp = write new parity 510 * Wnd = write new data 511 * Wnq = write new "q" 512 * [ ] denotes optional segments in the graph 513 * 514 * Parameters: raidPtr - description of the physical array 515 * asmap - logical & physical addresses for this access 516 * bp - buffer ptr (holds write data) 517 * flags - general flags (e.g. disk locking) 518 * allocList - list of memory allocated in DAG creation 519 * pfuncs - list of parity generating functions 520 * qfuncs - list of q generating functions 521 * 522 * A null qfuncs indicates single fault tolerant 523 *****************************************************************************/ 524 525void 526rf_CommonCreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 527 RF_DagHeader_t *dag_h, void *bp, 528 RF_RaidAccessFlags_t flags, 529 RF_AllocListElem_t *allocList, 530 const RF_RedFuncs_t *pfuncs, 531 const RF_RedFuncs_t *qfuncs) 532{ 533 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode; 534 RF_DagNode_t *tmpNode, *tmpreadDataNode, *tmpreadParityNode; 535 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode; 536 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes; 537 RF_DagNode_t *tmpxorNode, *tmpqNode, *tmpwriteDataNode, *tmpreadQNode; 538 RF_DagNode_t *tmpwriteParityNode; 539#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 540 RF_DagNode_t *tmpwriteQNode; 541#endif 542 int i, j, nNodes, totalNumNodes; 543 RF_ReconUnitNum_t which_ru; 544 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *); 545 int (*qfunc) (RF_DagNode_t *); 546 int numDataNodes, numParityNodes; 547 RF_StripeNum_t parityStripeID; 548 RF_PhysDiskAddr_t *pda; 549 char *name, *qname; 550 long nfaults; 551 552 nfaults = qfuncs ? 2 : 1; 553 554 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 555 asmap->raidAddress, &which_ru); 556 pda = asmap->physInfo; 557 numDataNodes = asmap->numStripeUnitsAccessed; 558 numParityNodes = (asmap->parityInfo->next) ? 2 : 1; 559 560#if RF_DEBUG_DAG 561 if (rf_dagDebug) { 562 printf("[Creating small-write DAG]\n"); 563 } 564#endif 565 RF_ASSERT(numDataNodes > 0); 566 dag_h->creator = "SmallWriteDAG"; 567 568 dag_h->numCommitNodes = 1; 569 dag_h->numCommits = 0; 570 dag_h->numSuccedents = 1; 571 572 /* 573 * DAG creation occurs in four steps: 574 * 1. count the number of nodes in the DAG 575 * 2. create the nodes 576 * 3. initialize the nodes 577 * 4. connect the nodes 578 */ 579 580 /* 581 * Step 1. compute number of nodes in the graph 582 */ 583 584 /* number of nodes: a read and write for each data unit a 585 * redundancy computation node for each parity node (nfaults * 586 * nparity) a read and write for each parity unit a block and 587 * commit node (2) a terminate node if atomic RMW an unlock 588 * node for each data unit, redundancy unit */ 589 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) 590 + (nfaults * 2 * numParityNodes) + 3; 591 /* 592 * Step 2. create the nodes 593 */ 594 595 blockNode = rf_AllocDAGNode(); 596 blockNode->list_next = dag_h->nodes; 597 dag_h->nodes = blockNode; 598 599 commitNode = rf_AllocDAGNode(); 600 commitNode->list_next = dag_h->nodes; 601 dag_h->nodes = commitNode; 602 603 for (i = 0; i < numDataNodes; i++) { 604 tmpNode = rf_AllocDAGNode(); 605 tmpNode->list_next = dag_h->nodes; 606 dag_h->nodes = tmpNode; 607 } 608 readDataNodes = dag_h->nodes; 609 610 for (i = 0; i < numParityNodes; i++) { 611 tmpNode = rf_AllocDAGNode(); 612 tmpNode->list_next = dag_h->nodes; 613 dag_h->nodes = tmpNode; 614 } 615 readParityNodes = dag_h->nodes; 616 617 for (i = 0; i < numDataNodes; i++) { 618 tmpNode = rf_AllocDAGNode(); 619 tmpNode->list_next = dag_h->nodes; 620 dag_h->nodes = tmpNode; 621 } 622 writeDataNodes = dag_h->nodes; 623 624 for (i = 0; i < numParityNodes; i++) { 625 tmpNode = rf_AllocDAGNode(); 626 tmpNode->list_next = dag_h->nodes; 627 dag_h->nodes = tmpNode; 628 } 629 writeParityNodes = dag_h->nodes; 630 631 for (i = 0; i < numParityNodes; i++) { 632 tmpNode = rf_AllocDAGNode(); 633 tmpNode->list_next = dag_h->nodes; 634 dag_h->nodes = tmpNode; 635 } 636 xorNodes = dag_h->nodes; 637 638 termNode = rf_AllocDAGNode(); 639 termNode->list_next = dag_h->nodes; 640 dag_h->nodes = termNode; 641 642#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 643 if (nfaults == 2) { 644 for (i = 0; i < numParityNodes; i++) { 645 tmpNode = rf_AllocDAGNode(); 646 tmpNode->list_next = dag_h->nodes; 647 dag_h->nodes = tmpNode; 648 } 649 readQNodes = dag_h->nodes; 650 651 for (i = 0; i < numParityNodes; i++) { 652 tmpNode = rf_AllocDAGNode(); 653 tmpNode->list_next = dag_h->nodes; 654 dag_h->nodes = tmpNode; 655 } 656 writeQNodes = dag_h->nodes; 657 658 for (i = 0; i < numParityNodes; i++) { 659 tmpNode = rf_AllocDAGNode(); 660 tmpNode->list_next = dag_h->nodes; 661 dag_h->nodes = tmpNode; 662 } 663 qNodes = dag_h->nodes; 664 } else { 665#endif 666 readQNodes = writeQNodes = qNodes = NULL; 667#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 668 } 669#endif 670 RF_ASSERT(i == totalNumNodes); 671 672 /* 673 * Step 3. initialize the nodes 674 */ 675 /* initialize block node (Nil) */ 676 nNodes = numDataNodes + (nfaults * numParityNodes); 677 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 678 rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, 679 dag_h, "Nil", allocList); 680 681 /* initialize commit node (Cmt) */ 682 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, 683 rf_NullNodeUndoFunc, NULL, nNodes, 684 (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList); 685 686 /* initialize terminate node (Trm) */ 687 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, 688 rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, 689 dag_h, "Trm", allocList); 690 691 /* initialize nodes which read old data (Rod) */ 692 tmpreadDataNode = readDataNodes; 693 for (i = 0; i < numDataNodes; i++) { 694 rf_InitNode(tmpreadDataNode, rf_wait, RF_FALSE, 695 rf_DiskReadFunc, rf_DiskReadUndoFunc, 696 rf_GenericWakeupFunc, (nfaults * numParityNodes), 697 1, 4, 0, dag_h, "Rod", allocList); 698 RF_ASSERT(pda != NULL); 699 /* physical disk addr desc */ 700 tmpreadDataNode->params[0].p = pda; 701 /* buffer to hold old data */ 702 tmpreadDataNode->params[1].p = rf_AllocBuffer(raidPtr, pda, allocList); 703 tmpreadDataNode->params[2].v = parityStripeID; 704 tmpreadDataNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 705 which_ru); 706 pda = pda->next; 707 for (j = 0; j < tmpreadDataNode->numSuccedents; j++) { 708 tmpreadDataNode->propList[j] = NULL; 709 } 710 tmpreadDataNode = tmpreadDataNode->list_next; 711 } 712 713 /* initialize nodes which read old parity (Rop) */ 714 pda = asmap->parityInfo; 715 i = 0; 716 tmpreadParityNode = readParityNodes; 717 for (i = 0; i < numParityNodes; i++) { 718 RF_ASSERT(pda != NULL); 719 rf_InitNode(tmpreadParityNode, rf_wait, RF_FALSE, 720 rf_DiskReadFunc, rf_DiskReadUndoFunc, 721 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, 722 dag_h, "Rop", allocList); 723 tmpreadParityNode->params[0].p = pda; 724 /* buffer to hold old parity */ 725 tmpreadParityNode->params[1].p = rf_AllocBuffer(raidPtr, pda, allocList); 726 tmpreadParityNode->params[2].v = parityStripeID; 727 tmpreadParityNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 728 which_ru); 729 pda = pda->next; 730 for (j = 0; j < tmpreadParityNode->numSuccedents; j++) { 731 tmpreadParityNode->propList[0] = NULL; 732 } 733 tmpreadParityNode = tmpreadParityNode->list_next; 734 } 735 736#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 737 /* initialize nodes which read old Q (Roq) */ 738 if (nfaults == 2) { 739 pda = asmap->qInfo; 740 tmpreadQNode = readQNodes; 741 for (i = 0; i < numParityNodes; i++) { 742 RF_ASSERT(pda != NULL); 743 rf_InitNode(tmpreadQNode, rf_wait, RF_FALSE, 744 rf_DiskReadFunc, rf_DiskReadUndoFunc, 745 rf_GenericWakeupFunc, numParityNodes, 746 1, 4, 0, dag_h, "Roq", allocList); 747 tmpreadQNode->params[0].p = pda; 748 /* buffer to hold old Q */ 749 tmpreadQNode->params[1].p = rf_AllocBuffer(raidPtr, pda, allocList); 750 tmpreadQNode->params[2].v = parityStripeID; 751 tmpreadQNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 752 which_ru); 753 pda = pda->next; 754 for (j = 0; j < tmpreadQNode->numSuccedents; j++) { 755 tmpreadQNode->propList[0] = NULL; 756 } 757 tmpreadQNode = tmpreadQNode->list_next; 758 } 759 } 760#endif 761 /* initialize nodes which write new data (Wnd) */ 762 pda = asmap->physInfo; 763 tmpwriteDataNode = writeDataNodes; 764 for (i = 0; i < numDataNodes; i++) { 765 RF_ASSERT(pda != NULL); 766 rf_InitNode(tmpwriteDataNode, rf_wait, RF_FALSE, 767 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 768 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 769 "Wnd", allocList); 770 /* physical disk addr desc */ 771 tmpwriteDataNode->params[0].p = pda; 772 /* buffer holding new data to be written */ 773 tmpwriteDataNode->params[1].p = pda->bufPtr; 774 tmpwriteDataNode->params[2].v = parityStripeID; 775 tmpwriteDataNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 776 which_ru); 777 pda = pda->next; 778 tmpwriteDataNode = tmpwriteDataNode->list_next; 779 } 780 781 /* 782 * Initialize nodes which compute new parity and Q. 783 */ 784 /* 785 * We use the simple XOR func in the double-XOR case, and when 786 * we're accessing only a portion of one stripe unit. The 787 * distinction between the two is that the regular XOR func 788 * assumes that the targbuf is a full SU in size, and examines 789 * the pda associated with the buffer to decide where within 790 * the buffer to XOR the data, whereas the simple XOR func 791 * just XORs the data into the start of the buffer. */ 792 if ((numParityNodes == 2) || ((numDataNodes == 1) 793 && (asmap->totalSectorsAccessed < 794 raidPtr->Layout.sectorsPerStripeUnit))) { 795 func = pfuncs->simple; 796 undoFunc = rf_NullNodeUndoFunc; 797 name = pfuncs->SimpleName; 798 if (qfuncs) { 799 qfunc = qfuncs->simple; 800 qname = qfuncs->SimpleName; 801 } else { 802 qfunc = NULL; 803 qname = NULL; 804 } 805 } else { 806 func = pfuncs->regular; 807 undoFunc = rf_NullNodeUndoFunc; 808 name = pfuncs->RegularName; 809 if (qfuncs) { 810 qfunc = qfuncs->regular; 811 qname = qfuncs->RegularName; 812 } else { 813 qfunc = NULL; 814 qname = NULL; 815 } 816 } 817 /* 818 * Initialize the xor nodes: params are {pda,buf} 819 * from {Rod,Wnd,Rop} nodes, and raidPtr 820 */ 821 if (numParityNodes == 2) { 822 /* double-xor case */ 823 tmpxorNode = xorNodes; 824 tmpreadDataNode = readDataNodes; 825 tmpreadParityNode = readParityNodes; 826 tmpwriteDataNode = writeDataNodes; 827 tmpqNode = qNodes; 828 tmpreadQNode = readQNodes; 829 for (i = 0; i < numParityNodes; i++) { 830 /* note: no wakeup func for xor */ 831 rf_InitNode(tmpxorNode, rf_wait, RF_FALSE, func, 832 undoFunc, NULL, 1, 833 (numDataNodes + numParityNodes), 834 7, 1, dag_h, name, allocList); 835 tmpxorNode->flags |= RF_DAGNODE_FLAG_YIELD; 836 tmpxorNode->params[0] = tmpreadDataNode->params[0]; 837 tmpxorNode->params[1] = tmpreadDataNode->params[1]; 838 tmpxorNode->params[2] = tmpreadParityNode->params[0]; 839 tmpxorNode->params[3] = tmpreadParityNode->params[1]; 840 tmpxorNode->params[4] = tmpwriteDataNode->params[0]; 841 tmpxorNode->params[5] = tmpwriteDataNode->params[1]; 842 tmpxorNode->params[6].p = raidPtr; 843 /* use old parity buf as target buf */ 844 tmpxorNode->results[0] = tmpreadParityNode->params[1].p; 845#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 846 if (nfaults == 2) { 847 /* note: no wakeup func for qor */ 848 rf_InitNode(tmpqNode, rf_wait, RF_FALSE, 849 qfunc, undoFunc, NULL, 1, 850 (numDataNodes + numParityNodes), 851 7, 1, dag_h, qname, allocList); 852 tmpqNode->params[0] = tmpreadDataNode->params[0]; 853 tmpqNode->params[1] = tmpreadDataNode->params[1]; 854 tmpqNode->params[2] = tmpreadQNode->.params[0]; 855 tmpqNode->params[3] = tmpreadQNode->params[1]; 856 tmpqNode->params[4] = tmpwriteDataNode->params[0]; 857 tmpqNode->params[5] = tmpwriteDataNode->params[1]; 858 tmpqNode->params[6].p = raidPtr; 859 /* use old Q buf as target buf */ 860 tmpqNode->results[0] = tmpreadQNode->params[1].p; 861 tmpqNode = tmpqNode->list_next; 862 tmpreadQNodes = tmpreadQNodes->list_next; 863 } 864#endif 865 tmpxorNode = tmpxorNode->list_next; 866 tmpreadDataNode = tmpreadDataNode->list_next; 867 tmpreadParityNode = tmpreadParityNode->list_next; 868 tmpwriteDataNode = tmpwriteDataNode->list_next; 869 } 870 } else { 871 /* there is only one xor node in this case */ 872 rf_InitNode(xorNodes, rf_wait, RF_FALSE, func, 873 undoFunc, NULL, 1, (numDataNodes + numParityNodes), 874 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, 875 dag_h, name, allocList); 876 xorNodes->flags |= RF_DAGNODE_FLAG_YIELD; 877 tmpreadDataNode = readDataNodes; 878 for (i = 0; i < numDataNodes; i++) { /* used to be"numDataNodes + 1" until we factored 879 out the "+1" into the "deal with Rop separately below */ 880 /* set up params related to Rod nodes */ 881 xorNodes->params[2 * i + 0] = tmpreadDataNode->params[0]; /* pda */ 882 xorNodes->params[2 * i + 1] = tmpreadDataNode->params[1]; /* buffer ptr */ 883 tmpreadDataNode = tmpreadDataNode->list_next; 884 } 885 /* deal with Rop separately */ 886 xorNodes->params[2 * numDataNodes + 0] = readParityNodes->params[0]; /* pda */ 887 xorNodes->params[2 * numDataNodes + 1] = readParityNodes->params[1]; /* buffer ptr */ 888 889 tmpwriteDataNode = writeDataNodes; 890 for (i = 0; i < numDataNodes; i++) { 891 /* set up params related to Wnd and Wnp nodes */ 892 xorNodes->params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 893 tmpwriteDataNode->params[0]; 894 xorNodes->params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 895 tmpwriteDataNode->params[1]; 896 tmpwriteDataNode = tmpwriteDataNode->list_next; 897 } 898 /* xor node needs to get at RAID information */ 899 xorNodes->params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 900 xorNodes->results[0] = readParityNodes->params[1].p; 901#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 902 if (nfaults == 2) { 903 rf_InitNode(qNodes, rf_wait, RF_FALSE, qfunc, 904 undoFunc, NULL, 1, 905 (numDataNodes + numParityNodes), 906 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, 907 dag_h, qname, allocList); 908 tmpreadDataNode = readDataNodes; 909 for (i = 0; i < numDataNodes; i++) { 910 /* set up params related to Rod */ 911 qNodes->params[2 * i + 0] = tmpreadDataNode->params[0]; /* pda */ 912 qNodes->params[2 * i + 1] = tmpreadDataNode->params[1]; /* buffer ptr */ 913 tmpreadDataNode = tmpreadDataNode->list_next; 914 } 915 /* and read old q */ 916 qNodes->params[2 * numDataNodes + 0] = /* pda */ 917 readQNodes->params[0]; 918 qNodes->params[2 * numDataNodes + 1] = /* buffer ptr */ 919 readQNodes->params[1]; 920 tmpwriteDataNode = writeDataNodes; 921 for (i = 0; i < numDataNodes; i++) { 922 /* set up params related to Wnd nodes */ 923 qNodes->params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 924 tmpwriteDataNode->params[0]; 925 qNodes->params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 926 tmpwriteDataNode->params[1]; 927 tmpwriteDataNode = tmpwriteDataNode->list_next; 928 } 929 /* xor node needs to get at RAID information */ 930 qNodes->params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 931 qNodes->results[0] = readQNodes->params[1].p; 932 } 933#endif 934 } 935 936 /* initialize nodes which write new parity (Wnp) */ 937 pda = asmap->parityInfo; 938 tmpwriteParityNode = writeParityNodes; 939 tmpxorNode = xorNodes; 940 for (i = 0; i < numParityNodes; i++) { 941 rf_InitNode(tmpwriteParityNode, rf_wait, RF_FALSE, 942 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 943 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 944 "Wnp", allocList); 945 RF_ASSERT(pda != NULL); 946 tmpwriteParityNode->params[0].p = pda; /* param 1 (bufPtr) 947 * filled in by xor node */ 948 tmpwriteParityNode->params[1].p = tmpxorNode->results[0]; /* buffer pointer for 949 * parity write 950 * operation */ 951 tmpwriteParityNode->params[2].v = parityStripeID; 952 tmpwriteParityNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 953 which_ru); 954 pda = pda->next; 955 tmpwriteParityNode = tmpwriteParityNode->list_next; 956 tmpxorNode = tmpxorNode->list_next; 957 } 958 959#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 960 /* initialize nodes which write new Q (Wnq) */ 961 if (nfaults == 2) { 962 pda = asmap->qInfo; 963 tmpwriteQNode = writeQNodes; 964 tmpqNode = qNodes; 965 for (i = 0; i < numParityNodes; i++) { 966 rf_InitNode(tmpwriteQNode, rf_wait, RF_FALSE, 967 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 968 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 969 "Wnq", allocList); 970 RF_ASSERT(pda != NULL); 971 tmpwriteQNode->params[0].p = pda; /* param 1 (bufPtr) 972 * filled in by xor node */ 973 tmpwriteQNode->params[1].p = tmpqNode->results[0]; /* buffer pointer for 974 * parity write 975 * operation */ 976 tmpwriteQNode->params[2].v = parityStripeID; 977 tmpwriteQNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 978 which_ru); 979 pda = pda->next; 980 tmpwriteQNode = tmpwriteQNode->list_next; 981 tmpqNode = tmpqNode->list_next; 982 } 983 } 984#endif 985 /* 986 * Step 4. connect the nodes. 987 */ 988 989 /* connect header to block node */ 990 dag_h->succedents[0] = blockNode; 991 992 /* connect block node to read old data nodes */ 993 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults))); 994 tmpreadDataNode = readDataNodes; 995 for (i = 0; i < numDataNodes; i++) { 996 blockNode->succedents[i] = tmpreadDataNode; 997 RF_ASSERT(tmpreadDataNode->numAntecedents == 1); 998 tmpreadDataNode->antecedents[0] = blockNode; 999 tmpreadDataNode->antType[0] = rf_control; 1000 tmpreadDataNode = tmpreadDataNode->list_next; 1001 } 1002 1003 /* connect block node to read old parity nodes */ 1004 tmpreadParityNode = readParityNodes; 1005 for (i = 0; i < numParityNodes; i++) { 1006 blockNode->succedents[numDataNodes + i] = tmpreadParityNode; 1007 RF_ASSERT(tmpreadParityNode->numAntecedents == 1); 1008 tmpreadParityNode->antecedents[0] = blockNode; 1009 tmpreadParityNode->antType[0] = rf_control; 1010 tmpreadParityNode = tmpreadParityNode->list_next; 1011 } 1012 1013#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 1014 /* connect block node to read old Q nodes */ 1015 if (nfaults == 2) { 1016 tmpreadQNode = readQNodes; 1017 for (i = 0; i < numParityNodes; i++) { 1018 blockNode->succedents[numDataNodes + numParityNodes + i] = tmpreadQNode; 1019 RF_ASSERT(tmpreadQNode->numAntecedents == 1); 1020 tmpreadQNode->antecedents[0] = blockNode; 1021 tmpreadQNode->antType[0] = rf_control; 1022 tmpreadQNode = tmpreadQNode->list_next; 1023 } 1024 } 1025#endif 1026 /* connect read old data nodes to xor nodes */ 1027 tmpreadDataNode = readDataNodes; 1028 for (i = 0; i < numDataNodes; i++) { 1029 RF_ASSERT(tmpreadDataNode->numSuccedents == (nfaults * numParityNodes)); 1030 tmpxorNode = xorNodes; 1031 for (j = 0; j < numParityNodes; j++) { 1032 RF_ASSERT(tmpxorNode->numAntecedents == numDataNodes + numParityNodes); 1033 tmpreadDataNode->succedents[j] = tmpxorNode; 1034 tmpxorNode->antecedents[i] = tmpreadDataNode; 1035 tmpxorNode->antType[i] = rf_trueData; 1036 tmpxorNode = tmpxorNode->list_next; 1037 } 1038 tmpreadDataNode = tmpreadDataNode->list_next; 1039 } 1040 1041#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 1042 /* connect read old data nodes to q nodes */ 1043 if (nfaults == 2) { 1044 tmpreadDataNode = readDataNodes; 1045 for (i = 0; i < numDataNodes; i++) { 1046 tmpqNode = qNodes; 1047 for (j = 0; j < numParityNodes; j++) { 1048 RF_ASSERT(tmpqNode->numAntecedents == numDataNodes + numParityNodes); 1049 tmpreadDataNode->succedents[numParityNodes + j] = tmpqNode; 1050 tmpqNode->antecedents[i] = tmpreadDataNode; 1051 tmpqNode->antType[i] = rf_trueData; 1052 tmpqNode = tmpqNode->list_next; 1053 } 1054 tmpreadDataNode = tmpreadDataNode->list_next; 1055 } 1056 } 1057#endif 1058 /* connect read old parity nodes to xor nodes */ 1059 tmpreadParityNode = readParityNodes; 1060 for (i = 0; i < numParityNodes; i++) { 1061 RF_ASSERT(tmpreadParityNode->numSuccedents == numParityNodes); 1062 tmpxorNode = xorNodes; 1063 for (j = 0; j < numParityNodes; j++) { 1064 tmpreadParityNode->succedents[j] = tmpxorNode; 1065 tmpxorNode->antecedents[numDataNodes + i] = tmpreadParityNode; 1066 tmpxorNode->antType[numDataNodes + i] = rf_trueData; 1067 tmpxorNode = tmpxorNode->list_next; 1068 } 1069 tmpreadParityNode = tmpreadParityNode->list_next; 1070 } 1071 1072#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 1073 /* connect read old q nodes to q nodes */ 1074 if (nfaults == 2) { 1075 tmpreadParityNode = readParityNodes; 1076 tmpreadQNode = readQNodes; 1077 for (i = 0; i < numParityNodes; i++) { 1078 RF_ASSERT(tmpreadParityNode->numSuccedents == numParityNodes); 1079 tmpqNode = qNodes; 1080 for (j = 0; j < numParityNodes; j++) { 1081 tmpreadQNode->succedents[j] = tmpqNode; 1082 tmpqNode->antecedents[numDataNodes + i] = tmpreadQNodes; 1083 tmpqNode->antType[numDataNodes + i] = rf_trueData; 1084 tmpqNode = tmpqNode->list_next; 1085 } 1086 tmpreadParityNode = tmpreadParityNode->list_next; 1087 tmpreadQNode = tmpreadQNode->list_next; 1088 } 1089 } 1090#endif 1091 /* connect xor nodes to commit node */ 1092 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes)); 1093 tmpxorNode = xorNodes; 1094 for (i = 0; i < numParityNodes; i++) { 1095 RF_ASSERT(tmpxorNode->numSuccedents == 1); 1096 tmpxorNode->succedents[0] = commitNode; 1097 commitNode->antecedents[i] = tmpxorNode; 1098 commitNode->antType[i] = rf_control; 1099 tmpxorNode = tmpxorNode->list_next; 1100 } 1101 1102#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 1103 /* connect q nodes to commit node */ 1104 if (nfaults == 2) { 1105 tmpqNode = qNodes; 1106 for (i = 0; i < numParityNodes; i++) { 1107 RF_ASSERT(tmpqNode->numSuccedents == 1); 1108 tmpqNode->succedents[0] = commitNode; 1109 commitNode->antecedents[i + numParityNodes] = tmpqNode; 1110 commitNode->antType[i + numParityNodes] = rf_control; 1111 tmpqNode = tmpqNode->list_next; 1112 } 1113 } 1114#endif 1115 /* connect commit node to write nodes */ 1116 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes))); 1117 tmpwriteDataNode = writeDataNodes; 1118 for (i = 0; i < numDataNodes; i++) { 1119 RF_ASSERT(tmpwriteDataNodes->numAntecedents == 1); 1120 commitNode->succedents[i] = tmpwriteDataNode; 1121 tmpwriteDataNode->antecedents[0] = commitNode; 1122 tmpwriteDataNode->antType[0] = rf_trueData; 1123 tmpwriteDataNode = tmpwriteDataNode->list_next; 1124 } 1125 tmpwriteParityNode = writeParityNodes; 1126 for (i = 0; i < numParityNodes; i++) { 1127 RF_ASSERT(tmpwriteParityNode->numAntecedents == 1); 1128 commitNode->succedents[i + numDataNodes] = tmpwriteParityNode; 1129 tmpwriteParityNode->antecedents[0] = commitNode; 1130 tmpwriteParityNode->antType[0] = rf_trueData; 1131 tmpwriteParityNode = tmpwriteParityNode->list_next; 1132 } 1133#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 1134 if (nfaults == 2) { 1135 tmpwriteQNode = writeQNodes; 1136 for (i = 0; i < numParityNodes; i++) { 1137 RF_ASSERT(tmpwriteQNode->numAntecedents == 1); 1138 commitNode->succedents[i + numDataNodes + numParityNodes] = tmpwriteQNode; 1139 tmpwriteQNode->antecedents[0] = commitNode; 1140 tmpwriteQNode->antType[0] = rf_trueData; 1141 tmpwriteQNode = tmpwriteQNode->list_next; 1142 } 1143 } 1144#endif 1145 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 1146 RF_ASSERT(termNode->numSuccedents == 0); 1147 tmpwriteDataNode = writeDataNodes; 1148 for (i = 0; i < numDataNodes; i++) { 1149 /* connect write new data nodes to term node */ 1150 RF_ASSERT(tmpwriteDataNode->numSuccedents == 1); 1151 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 1152 tmpwriteDataNode->succedents[0] = termNode; 1153 termNode->antecedents[i] = tmpwriteDataNode; 1154 termNode->antType[i] = rf_control; 1155 tmpwriteDataNode = tmpwriteDataNode->list_next; 1156 } 1157 1158 tmpwriteParityNode = writeParityNodes; 1159 for (i = 0; i < numParityNodes; i++) { 1160 RF_ASSERT(tmpwriteParityNode->numSuccedents == 1); 1161 tmpwriteParityNode->succedents[0] = termNode; 1162 termNode->antecedents[numDataNodes + i] = tmpwriteParityNode; 1163 termNode->antType[numDataNodes + i] = rf_control; 1164 tmpwriteParityNode = tmpwriteParityNode->list_next; 1165 } 1166 1167#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 1168 if (nfaults == 2) { 1169 tmpwriteQNode = writeQNodes; 1170 for (i = 0; i < numParityNodes; i++) { 1171 RF_ASSERT(tmpwriteQNode->numSuccedents == 1); 1172 tmpwriteQNode->succedents[0] = termNode; 1173 termNode->antecedents[numDataNodes + numParityNodes + i] = tmpwriteQNode; 1174 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1175 tmpwriteQNode = tmpwriteQNode->list_next; 1176 } 1177 } 1178#endif 1179} 1180 1181 1182/****************************************************************************** 1183 * create a write graph (fault-free or degraded) for RAID level 1 1184 * 1185 * Hdr -> Commit -> Wpd -> Nil -> Trm 1186 * -> Wsd -> 1187 * 1188 * The "Wpd" node writes data to the primary copy in the mirror pair 1189 * The "Wsd" node writes data to the secondary copy in the mirror pair 1190 * 1191 * Parameters: raidPtr - description of the physical array 1192 * asmap - logical & physical addresses for this access 1193 * bp - buffer ptr (holds write data) 1194 * flags - general flags (e.g. disk locking) 1195 * allocList - list of memory allocated in DAG creation 1196 *****************************************************************************/ 1197 1198void 1199rf_CreateRaidOneWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 1200 RF_DagHeader_t *dag_h, void *bp, 1201 RF_RaidAccessFlags_t flags, 1202 RF_AllocListElem_t *allocList) 1203{ 1204 RF_DagNode_t *unblockNode, *termNode, *commitNode; 1205 RF_DagNode_t *wndNode, *wmirNode; 1206 RF_DagNode_t *tmpNode, *tmpwndNode, *tmpwmirNode; 1207 int nWndNodes, nWmirNodes, i; 1208 RF_ReconUnitNum_t which_ru; 1209 RF_PhysDiskAddr_t *pda, *pdaP; 1210 RF_StripeNum_t parityStripeID; 1211 1212 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 1213 asmap->raidAddress, &which_ru); 1214#if RF_DEBUG_DAG 1215 if (rf_dagDebug) { 1216 printf("[Creating RAID level 1 write DAG]\n"); 1217 } 1218#endif 1219 dag_h->creator = "RaidOneWriteDAG"; 1220 1221 /* 2 implies access not SU aligned */ 1222 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; 1223 nWndNodes = (asmap->physInfo->next) ? 2 : 1; 1224 1225 /* alloc the Wnd nodes and the Wmir node */ 1226 if (asmap->numDataFailed == 1) 1227 nWndNodes--; 1228 if (asmap->numParityFailed == 1) 1229 nWmirNodes--; 1230 1231 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock 1232 * + terminator) */ 1233 for (i = 0; i < nWndNodes; i++) { 1234 tmpNode = rf_AllocDAGNode(); 1235 tmpNode->list_next = dag_h->nodes; 1236 dag_h->nodes = tmpNode; 1237 } 1238 wndNode = dag_h->nodes; 1239 1240 for (i = 0; i < nWmirNodes; i++) { 1241 tmpNode = rf_AllocDAGNode(); 1242 tmpNode->list_next = dag_h->nodes; 1243 dag_h->nodes = tmpNode; 1244 } 1245 wmirNode = dag_h->nodes; 1246 1247 commitNode = rf_AllocDAGNode(); 1248 commitNode->list_next = dag_h->nodes; 1249 dag_h->nodes = commitNode; 1250 1251 unblockNode = rf_AllocDAGNode(); 1252 unblockNode->list_next = dag_h->nodes; 1253 dag_h->nodes = unblockNode; 1254 1255 termNode = rf_AllocDAGNode(); 1256 termNode->list_next = dag_h->nodes; 1257 dag_h->nodes = termNode; 1258 1259 /* this dag can commit immediately */ 1260 dag_h->numCommitNodes = 1; 1261 dag_h->numCommits = 0; 1262 dag_h->numSuccedents = 1; 1263 1264 /* initialize the commit, unblock, and term nodes */ 1265 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, 1266 rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 1267 0, 0, 0, dag_h, "Cmt", allocList); 1268 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 1269 rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 1270 0, 0, dag_h, "Nil", allocList); 1271 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, 1272 rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, 1273 dag_h, "Trm", allocList); 1274 1275 /* initialize the wnd nodes */ 1276 if (nWndNodes > 0) { 1277 pda = asmap->physInfo; 1278 tmpwndNode = wndNode; 1279 for (i = 0; i < nWndNodes; i++) { 1280 rf_InitNode(tmpwndNode, rf_wait, RF_FALSE, 1281 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1282 rf_GenericWakeupFunc, 1, 1, 4, 0, 1283 dag_h, "Wpd", allocList); 1284 RF_ASSERT(pda != NULL); 1285 tmpwndNode->params[0].p = pda; 1286 tmpwndNode->params[1].p = pda->bufPtr; 1287 tmpwndNode->params[2].v = parityStripeID; 1288 tmpwndNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 1289 pda = pda->next; 1290 tmpwndNode = tmpwndNode->list_next; 1291 } 1292 RF_ASSERT(pda == NULL); 1293 } 1294 /* initialize the mirror nodes */ 1295 if (nWmirNodes > 0) { 1296 pda = asmap->physInfo; 1297 pdaP = asmap->parityInfo; 1298 tmpwmirNode = wmirNode; 1299 for (i = 0; i < nWmirNodes; i++) { 1300 rf_InitNode(tmpwmirNode, rf_wait, RF_FALSE, 1301 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1302 rf_GenericWakeupFunc, 1, 1, 4, 0, 1303 dag_h, "Wsd", allocList); 1304 RF_ASSERT(pda != NULL); 1305 tmpwmirNode->params[0].p = pdaP; 1306 tmpwmirNode->params[1].p = pda->bufPtr; 1307 tmpwmirNode->params[2].v = parityStripeID; 1308 tmpwmirNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 1309 pda = pda->next; 1310 pdaP = pdaP->next; 1311 tmpwmirNode = tmpwmirNode->list_next; 1312 } 1313 RF_ASSERT(pda == NULL); 1314 RF_ASSERT(pdaP == NULL); 1315 } 1316 /* link the header node to the commit node */ 1317 RF_ASSERT(dag_h->numSuccedents == 1); 1318 RF_ASSERT(commitNode->numAntecedents == 0); 1319 dag_h->succedents[0] = commitNode; 1320 1321 /* link the commit node to the write nodes */ 1322 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes)); 1323 tmpwndNode = wndNode; 1324 for (i = 0; i < nWndNodes; i++) { 1325 RF_ASSERT(tmpwndNode->numAntecedents == 1); 1326 commitNode->succedents[i] = tmpwndNode; 1327 tmpwndNode->antecedents[0] = commitNode; 1328 tmpwndNode->antType[0] = rf_control; 1329 tmpwndNode = tmpwndNode->list_next; 1330 } 1331 tmpwmirNode = wmirNode; 1332 for (i = 0; i < nWmirNodes; i++) { 1333 RF_ASSERT(tmpwmirNode->numAntecedents == 1); 1334 commitNode->succedents[i + nWndNodes] = tmpwmirNode; 1335 tmpwmirNode->antecedents[0] = commitNode; 1336 tmpwmirNode->antType[0] = rf_control; 1337 tmpwmirNode = tmpwmirNode->list_next; 1338 } 1339 1340 /* link the write nodes to the unblock node */ 1341 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes)); 1342 tmpwndNode = wndNode; 1343 for (i = 0; i < nWndNodes; i++) { 1344 RF_ASSERT(tmpwndNode->numSuccedents == 1); 1345 tmpwndNode->succedents[0] = unblockNode; 1346 unblockNode->antecedents[i] = tmpwndNode; 1347 unblockNode->antType[i] = rf_control; 1348 tmpwndNode = tmpwndNode->list_next; 1349 } 1350 tmpwmirNode = wmirNode; 1351 for (i = 0; i < nWmirNodes; i++) { 1352 RF_ASSERT(tmpwmirNode->numSuccedents == 1); 1353 tmpwmirNode->succedents[0] = unblockNode; 1354 unblockNode->antecedents[i + nWndNodes] = tmpwmirNode; 1355 unblockNode->antType[i + nWndNodes] = rf_control; 1356 tmpwmirNode = tmpwmirNode->list_next; 1357 } 1358 1359 /* link the unblock node to the term node */ 1360 RF_ASSERT(unblockNode->numSuccedents == 1); 1361 RF_ASSERT(termNode->numAntecedents == 1); 1362 RF_ASSERT(termNode->numSuccedents == 0); 1363 unblockNode->succedents[0] = termNode; 1364 termNode->antecedents[0] = unblockNode; 1365 termNode->antType[0] = rf_control; 1366} 1367