rf_dagffwr.c revision 1.24
1/* $NetBSD: rf_dagffwr.c,v 1.24 2004/03/20 05:21:53 oster Exp $ */ 2/* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29/* 30 * rf_dagff.c 31 * 32 * code for creating fault-free DAGs 33 * 34 */ 35 36#include <sys/cdefs.h> 37__KERNEL_RCSID(0, "$NetBSD: rf_dagffwr.c,v 1.24 2004/03/20 05:21:53 oster Exp $"); 38 39#include <dev/raidframe/raidframevar.h> 40 41#include "rf_raid.h" 42#include "rf_dag.h" 43#include "rf_dagutils.h" 44#include "rf_dagfuncs.h" 45#include "rf_debugMem.h" 46#include "rf_dagffrd.h" 47#include "rf_general.h" 48#include "rf_dagffwr.h" 49#include "rf_map.h" 50 51/****************************************************************************** 52 * 53 * General comments on DAG creation: 54 * 55 * All DAGs in this file use roll-away error recovery. Each DAG has a single 56 * commit node, usually called "Cmt." If an error occurs before the Cmt node 57 * is reached, the execution engine will halt forward execution and work 58 * backward through the graph, executing the undo functions. Assuming that 59 * each node in the graph prior to the Cmt node are undoable and atomic - or - 60 * does not make changes to permanent state, the graph will fail atomically. 61 * If an error occurs after the Cmt node executes, the engine will roll-forward 62 * through the graph, blindly executing nodes until it reaches the end. 63 * If a graph reaches the end, it is assumed to have completed successfully. 64 * 65 * A graph has only 1 Cmt node. 66 * 67 */ 68 69 70/****************************************************************************** 71 * 72 * The following wrappers map the standard DAG creation interface to the 73 * DAG creation routines. Additionally, these wrappers enable experimentation 74 * with new DAG structures by providing an extra level of indirection, allowing 75 * the DAG creation routines to be replaced at this single point. 76 */ 77 78 79void 80rf_CreateNonRedundantWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 81 RF_DagHeader_t *dag_h, void *bp, 82 RF_RaidAccessFlags_t flags, 83 RF_AllocListElem_t *allocList, 84 RF_IoType_t type) 85{ 86 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 87 RF_IO_TYPE_WRITE); 88} 89 90void 91rf_CreateRAID0WriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 92 RF_DagHeader_t *dag_h, void *bp, 93 RF_RaidAccessFlags_t flags, 94 RF_AllocListElem_t *allocList, 95 RF_IoType_t type) 96{ 97 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 98 RF_IO_TYPE_WRITE); 99} 100 101void 102rf_CreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 103 RF_DagHeader_t *dag_h, void *bp, 104 RF_RaidAccessFlags_t flags, 105 RF_AllocListElem_t *allocList) 106{ 107 /* "normal" rollaway */ 108 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, 109 allocList, &rf_xorFuncs, NULL); 110} 111 112void 113rf_CreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 114 RF_DagHeader_t *dag_h, void *bp, 115 RF_RaidAccessFlags_t flags, 116 RF_AllocListElem_t *allocList) 117{ 118 /* "normal" rollaway */ 119 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, 120 allocList, 1, rf_RegularXorFunc, RF_TRUE); 121} 122 123 124/****************************************************************************** 125 * 126 * DAG creation code begins here 127 */ 128 129 130/****************************************************************************** 131 * 132 * creates a DAG to perform a large-write operation: 133 * 134 * / Rod \ / Wnd \ 135 * H -- block- Rod - Xor - Cmt - Wnd --- T 136 * \ Rod / \ Wnp / 137 * \[Wnq]/ 138 * 139 * The XOR node also does the Q calculation in the P+Q architecture. 140 * All nodes are before the commit node (Cmt) are assumed to be atomic and 141 * undoable - or - they make no changes to permanent state. 142 * 143 * Rod = read old data 144 * Cmt = commit node 145 * Wnp = write new parity 146 * Wnd = write new data 147 * Wnq = write new "q" 148 * [] denotes optional segments in the graph 149 * 150 * Parameters: raidPtr - description of the physical array 151 * asmap - logical & physical addresses for this access 152 * bp - buffer ptr (holds write data) 153 * flags - general flags (e.g. disk locking) 154 * allocList - list of memory allocated in DAG creation 155 * nfaults - number of faults array can tolerate 156 * (equal to # redundancy units in stripe) 157 * redfuncs - list of redundancy generating functions 158 * 159 *****************************************************************************/ 160 161void 162rf_CommonCreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 163 RF_DagHeader_t *dag_h, void *bp, 164 RF_RaidAccessFlags_t flags, 165 RF_AllocListElem_t *allocList, 166 int nfaults, int (*redFunc) (RF_DagNode_t *), 167 int allowBufferRecycle) 168{ 169 RF_DagNode_t *wndNodes, *rodNodes, *xorNode, *wnpNode, *tmpNode; 170 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode; 171 int nWndNodes, nRodNodes, i, nodeNum, asmNum; 172 RF_AccessStripeMapHeader_t *new_asm_h[2]; 173 RF_StripeNum_t parityStripeID; 174 char *sosBuffer, *eosBuffer; 175 RF_ReconUnitNum_t which_ru; 176 RF_RaidLayout_t *layoutPtr; 177 RF_PhysDiskAddr_t *pda; 178 179 layoutPtr = &(raidPtr->Layout); 180 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, 181 asmap->raidAddress, 182 &which_ru); 183 184#if RF_DEBUG_DAG 185 if (rf_dagDebug) { 186 printf("[Creating large-write DAG]\n"); 187 } 188#endif 189 dag_h->creator = "LargeWriteDAG"; 190 191 dag_h->numCommitNodes = 1; 192 dag_h->numCommits = 0; 193 dag_h->numSuccedents = 1; 194 195 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */ 196 nWndNodes = asmap->numStripeUnitsAccessed; 197 198 for (i = 0; i < nWndNodes; i++) { 199 tmpNode = rf_AllocDAGNode(); 200 tmpNode->list_next = dag_h->nodes; 201 dag_h->nodes = tmpNode; 202 } 203 wndNodes = dag_h->nodes; 204 205 xorNode = rf_AllocDAGNode(); 206 xorNode->list_next = dag_h->nodes; 207 dag_h->nodes = xorNode; 208 209 wnpNode = rf_AllocDAGNode(); 210 wnpNode->list_next = dag_h->nodes; 211 dag_h->nodes = wnpNode; 212 213 blockNode = rf_AllocDAGNode(); 214 blockNode->list_next = dag_h->nodes; 215 dag_h->nodes = blockNode; 216 217 commitNode = rf_AllocDAGNode(); 218 commitNode->list_next = dag_h->nodes; 219 dag_h->nodes = commitNode; 220 221 termNode = rf_AllocDAGNode(); 222 termNode->list_next = dag_h->nodes; 223 dag_h->nodes = termNode; 224 225#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 226 if (nfaults == 2) { 227 wnqNode = rf_AllocDAGNode(); 228 } else { 229#endif 230 wnqNode = NULL; 231#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 232 } 233#endif 234 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, 235 new_asm_h, &nRodNodes, &sosBuffer, 236 &eosBuffer, allocList); 237 if (nRodNodes > 0) { 238 for (i = 0; i < nRodNodes; i++) { 239 tmpNode = rf_AllocDAGNode(); 240 tmpNode->list_next = dag_h->nodes; 241 dag_h->nodes = tmpNode; 242 } 243 rodNodes = dag_h->nodes; 244 } else { 245 rodNodes = NULL; 246 } 247 248 /* begin node initialization */ 249 if (nRodNodes > 0) { 250 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 251 rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, 252 dag_h, "Nil", allocList); 253 } else { 254 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 255 rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, 256 dag_h, "Nil", allocList); 257 } 258 259 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, 260 rf_NullNodeUndoFunc, NULL, nWndNodes + nfaults, 1, 0, 0, 261 dag_h, "Cmt", allocList); 262 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, 263 rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, 264 dag_h, "Trm", allocList); 265 266 /* initialize the Rod nodes */ 267 tmpNode = rodNodes; 268 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) { 269 if (new_asm_h[asmNum]) { 270 pda = new_asm_h[asmNum]->stripeMap->physInfo; 271 while (pda) { 272 rf_InitNode(tmpNode, rf_wait, 273 RF_FALSE, rf_DiskReadFunc, 274 rf_DiskReadUndoFunc, 275 rf_GenericWakeupFunc, 276 1, 1, 4, 0, dag_h, 277 "Rod", allocList); 278 tmpNode->params[0].p = pda; 279 tmpNode->params[1].p = pda->bufPtr; 280 tmpNode->params[2].v = parityStripeID; 281 tmpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 282 which_ru); 283 nodeNum++; 284 pda = pda->next; 285 tmpNode = tmpNode->list_next; 286 } 287 } 288 } 289 RF_ASSERT(nodeNum == nRodNodes); 290 291 /* initialize the wnd nodes */ 292 pda = asmap->physInfo; 293 tmpNode = wndNodes; 294 for (i = 0; i < nWndNodes; i++) { 295 rf_InitNode(tmpNode, rf_wait, RF_FALSE, 296 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 297 rf_GenericWakeupFunc, 1, 1, 4, 0, 298 dag_h, "Wnd", allocList); 299 RF_ASSERT(pda != NULL); 300 tmpNode->params[0].p = pda; 301 tmpNode->params[1].p = pda->bufPtr; 302 tmpNode->params[2].v = parityStripeID; 303 tmpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 304 pda = pda->next; 305 tmpNode = tmpNode->list_next; 306 } 307 308 /* initialize the redundancy node */ 309 if (nRodNodes > 0) { 310 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, 311 rf_NullNodeUndoFunc, NULL, 1, 312 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1, 313 nfaults, dag_h, "Xr ", allocList); 314 } else { 315 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, 316 rf_NullNodeUndoFunc, NULL, 1, 317 1, 2 * (nWndNodes + nRodNodes) + 1, 318 nfaults, dag_h, "Xr ", allocList); 319 } 320 xorNode->flags |= RF_DAGNODE_FLAG_YIELD; 321 tmpNode = wndNodes; 322 for (i = 0; i < nWndNodes; i++) { 323 /* pda */ 324 xorNode->params[2 * i + 0] = tmpNode->params[0]; 325 /* buf ptr */ 326 xorNode->params[2 * i + 1] = tmpNode->params[1]; 327 tmpNode = tmpNode->list_next; 328 } 329 tmpNode = rodNodes; 330 for (i = 0; i < nRodNodes; i++) { 331 /* pda */ 332 xorNode->params[2 * (nWndNodes + i) + 0] = tmpNode->params[0]; 333 /* buf ptr */ 334 xorNode->params[2 * (nWndNodes + i) + 1] = tmpNode->params[1]; 335 tmpNode = tmpNode->list_next; 336 } 337 /* xor node needs to get at RAID information */ 338 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; 339 340 /* 341 * Look for an Rod node that reads a complete SU. If none, 342 * alloc a buffer to receive the parity info. Note that we 343 * can't use a new data buffer because it will not have gotten 344 * written when the xor occurs. */ 345 if (allowBufferRecycle) { 346 tmpNode = rodNodes; 347 for (i = 0; i < nRodNodes; i++) { 348 if (((RF_PhysDiskAddr_t *) tmpNode->params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit) 349 break; 350 tmpNode = tmpNode->list_next; 351 } 352 } 353 if ((!allowBufferRecycle) || (i == nRodNodes)) { 354 xorNode->results[0] = rf_AllocBuffer(raidPtr, dag_h, 355 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit)); 356 } else { 357 /* this works because the only way we get here is if 358 allowBufferRecycle is true and we went through the 359 above for loop, and exited via the break before 360 i==nRodNodes was true. That means tmpNode will 361 still point to a valid node -- the one we want for 362 here! */ 363 xorNode->results[0] = tmpNode->params[1].p; 364 } 365 366 /* initialize the Wnp node */ 367 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, 368 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, 369 dag_h, "Wnp", allocList); 370 wnpNode->params[0].p = asmap->parityInfo; 371 wnpNode->params[1].p = xorNode->results[0]; 372 wnpNode->params[2].v = parityStripeID; 373 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 374 /* parityInfo must describe entire parity unit */ 375 RF_ASSERT(asmap->parityInfo->next == NULL); 376 377#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 378 if (nfaults == 2) { 379 /* 380 * We never try to recycle a buffer for the Q calcuation 381 * in addition to the parity. This would cause two buffers 382 * to get smashed during the P and Q calculation, guaranteeing 383 * one would be wrong. 384 */ 385 RF_MallocAndAdd(xorNode->results[1], 386 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), 387 (void *), allocList); 388 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, 389 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 390 1, 1, 4, 0, dag_h, "Wnq", allocList); 391 wnqNode->params[0].p = asmap->qInfo; 392 wnqNode->params[1].p = xorNode->results[1]; 393 wnqNode->params[2].v = parityStripeID; 394 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 395 /* parityInfo must describe entire parity unit */ 396 RF_ASSERT(asmap->parityInfo->next == NULL); 397 } 398#endif 399 /* 400 * Connect nodes to form graph. 401 */ 402 403 /* connect dag header to block node */ 404 RF_ASSERT(blockNode->numAntecedents == 0); 405 dag_h->succedents[0] = blockNode; 406 407 if (nRodNodes > 0) { 408 /* connect the block node to the Rod nodes */ 409 RF_ASSERT(blockNode->numSuccedents == nRodNodes); 410 RF_ASSERT(xorNode->numAntecedents == nRodNodes); 411 tmpNode = rodNodes; 412 for (i = 0; i < nRodNodes; i++) { 413 RF_ASSERT(tmpNode.numAntecedents == 1); 414 blockNode->succedents[i] = tmpNode; 415 tmpNode->antecedents[0] = blockNode; 416 tmpNode->antType[0] = rf_control; 417 418 /* connect the Rod nodes to the Xor node */ 419 RF_ASSERT(tmpNode.numSuccedents == 1); 420 tmpNode->succedents[0] = xorNode; 421 xorNode->antecedents[i] = tmpNode; 422 xorNode->antType[i] = rf_trueData; 423 tmpNode = tmpNode->list_next; 424 } 425 } else { 426 /* connect the block node to the Xor node */ 427 RF_ASSERT(blockNode->numSuccedents == 1); 428 RF_ASSERT(xorNode->numAntecedents == 1); 429 blockNode->succedents[0] = xorNode; 430 xorNode->antecedents[0] = blockNode; 431 xorNode->antType[0] = rf_control; 432 } 433 434 /* connect the xor node to the commit node */ 435 RF_ASSERT(xorNode->numSuccedents == 1); 436 RF_ASSERT(commitNode->numAntecedents == 1); 437 xorNode->succedents[0] = commitNode; 438 commitNode->antecedents[0] = xorNode; 439 commitNode->antType[0] = rf_control; 440 441 /* connect the commit node to the write nodes */ 442 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults); 443 tmpNode = wndNodes; 444 for (i = 0; i < nWndNodes; i++) { 445 RF_ASSERT(wndNodes->numAntecedents == 1); 446 commitNode->succedents[i] = tmpNode; 447 tmpNode->antecedents[0] = commitNode; 448 tmpNode->antType[0] = rf_control; 449 tmpNode = tmpNode->list_next; 450 } 451 RF_ASSERT(wnpNode->numAntecedents == 1); 452 commitNode->succedents[nWndNodes] = wnpNode; 453 wnpNode->antecedents[0] = commitNode; 454 wnpNode->antType[0] = rf_trueData; 455#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 456 if (nfaults == 2) { 457 RF_ASSERT(wnqNode->numAntecedents == 1); 458 commitNode->succedents[nWndNodes + 1] = wnqNode; 459 wnqNode->antecedents[0] = commitNode; 460 wnqNode->antType[0] = rf_trueData; 461 } 462#endif 463 /* connect the write nodes to the term node */ 464 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults); 465 RF_ASSERT(termNode->numSuccedents == 0); 466 tmpNode = wndNodes; 467 for (i = 0; i < nWndNodes; i++) { 468 RF_ASSERT(wndNodes->numSuccedents == 1); 469 tmpNode->succedents[0] = termNode; 470 termNode->antecedents[i] = tmpNode; 471 termNode->antType[i] = rf_control; 472 tmpNode = tmpNode->list_next; 473 } 474 RF_ASSERT(wnpNode->numSuccedents == 1); 475 wnpNode->succedents[0] = termNode; 476 termNode->antecedents[nWndNodes] = wnpNode; 477 termNode->antType[nWndNodes] = rf_control; 478#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 479 if (nfaults == 2) { 480 RF_ASSERT(wnqNode->numSuccedents == 1); 481 wnqNode->succedents[0] = termNode; 482 termNode->antecedents[nWndNodes + 1] = wnqNode; 483 termNode->antType[nWndNodes + 1] = rf_control; 484 } 485#endif 486} 487/****************************************************************************** 488 * 489 * creates a DAG to perform a small-write operation (either raid 5 or pq), 490 * which is as follows: 491 * 492 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm 493 * \- Rod X / \----> Wnd [Und]-/ 494 * [\- Rod X / \---> Wnd [Und]-/] 495 * [\- Roq -> Q / \--> Wnq [Unq]-/] 496 * 497 * Rop = read old parity 498 * Rod = read old data 499 * Roq = read old "q" 500 * Cmt = commit node 501 * Und = unlock data disk 502 * Unp = unlock parity disk 503 * Unq = unlock q disk 504 * Wnp = write new parity 505 * Wnd = write new data 506 * Wnq = write new "q" 507 * [ ] denotes optional segments in the graph 508 * 509 * Parameters: raidPtr - description of the physical array 510 * asmap - logical & physical addresses for this access 511 * bp - buffer ptr (holds write data) 512 * flags - general flags (e.g. disk locking) 513 * allocList - list of memory allocated in DAG creation 514 * pfuncs - list of parity generating functions 515 * qfuncs - list of q generating functions 516 * 517 * A null qfuncs indicates single fault tolerant 518 *****************************************************************************/ 519 520void 521rf_CommonCreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 522 RF_DagHeader_t *dag_h, void *bp, 523 RF_RaidAccessFlags_t flags, 524 RF_AllocListElem_t *allocList, 525 const RF_RedFuncs_t *pfuncs, 526 const RF_RedFuncs_t *qfuncs) 527{ 528 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode; 529 RF_DagNode_t *tmpNode, *tmpreadDataNode, *tmpreadParityNode; 530 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode; 531 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes; 532 RF_DagNode_t *tmpxorNode, *tmpqNode, *tmpwriteDataNode, *tmpreadQNode; 533 RF_DagNode_t *tmpwriteParityNode; 534#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 535 RF_DagNode_t *tmpwriteQNode; 536#endif 537 int i, j, nNodes, totalNumNodes; 538 RF_ReconUnitNum_t which_ru; 539 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *); 540 int (*qfunc) (RF_DagNode_t *); 541 int numDataNodes, numParityNodes; 542 RF_StripeNum_t parityStripeID; 543 RF_PhysDiskAddr_t *pda; 544 char *name, *qname; 545 long nfaults; 546 547 nfaults = qfuncs ? 2 : 1; 548 549 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 550 asmap->raidAddress, &which_ru); 551 pda = asmap->physInfo; 552 numDataNodes = asmap->numStripeUnitsAccessed; 553 numParityNodes = (asmap->parityInfo->next) ? 2 : 1; 554 555#if RF_DEBUG_DAG 556 if (rf_dagDebug) { 557 printf("[Creating small-write DAG]\n"); 558 } 559#endif 560 RF_ASSERT(numDataNodes > 0); 561 dag_h->creator = "SmallWriteDAG"; 562 563 dag_h->numCommitNodes = 1; 564 dag_h->numCommits = 0; 565 dag_h->numSuccedents = 1; 566 567 /* 568 * DAG creation occurs in four steps: 569 * 1. count the number of nodes in the DAG 570 * 2. create the nodes 571 * 3. initialize the nodes 572 * 4. connect the nodes 573 */ 574 575 /* 576 * Step 1. compute number of nodes in the graph 577 */ 578 579 /* number of nodes: a read and write for each data unit a 580 * redundancy computation node for each parity node (nfaults * 581 * nparity) a read and write for each parity unit a block and 582 * commit node (2) a terminate node if atomic RMW an unlock 583 * node for each data unit, redundancy unit */ 584 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) 585 + (nfaults * 2 * numParityNodes) + 3; 586 /* 587 * Step 2. create the nodes 588 */ 589 590 blockNode = rf_AllocDAGNode(); 591 blockNode->list_next = dag_h->nodes; 592 dag_h->nodes = blockNode; 593 594 commitNode = rf_AllocDAGNode(); 595 commitNode->list_next = dag_h->nodes; 596 dag_h->nodes = commitNode; 597 598 for (i = 0; i < numDataNodes; i++) { 599 tmpNode = rf_AllocDAGNode(); 600 tmpNode->list_next = dag_h->nodes; 601 dag_h->nodes = tmpNode; 602 } 603 readDataNodes = dag_h->nodes; 604 605 for (i = 0; i < numParityNodes; i++) { 606 tmpNode = rf_AllocDAGNode(); 607 tmpNode->list_next = dag_h->nodes; 608 dag_h->nodes = tmpNode; 609 } 610 readParityNodes = dag_h->nodes; 611 612 for (i = 0; i < numDataNodes; i++) { 613 tmpNode = rf_AllocDAGNode(); 614 tmpNode->list_next = dag_h->nodes; 615 dag_h->nodes = tmpNode; 616 } 617 writeDataNodes = dag_h->nodes; 618 619 for (i = 0; i < numParityNodes; i++) { 620 tmpNode = rf_AllocDAGNode(); 621 tmpNode->list_next = dag_h->nodes; 622 dag_h->nodes = tmpNode; 623 } 624 writeParityNodes = dag_h->nodes; 625 626 for (i = 0; i < numParityNodes; i++) { 627 tmpNode = rf_AllocDAGNode(); 628 tmpNode->list_next = dag_h->nodes; 629 dag_h->nodes = tmpNode; 630 } 631 xorNodes = dag_h->nodes; 632 633 termNode = rf_AllocDAGNode(); 634 termNode->list_next = dag_h->nodes; 635 dag_h->nodes = termNode; 636 637#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 638 if (nfaults == 2) { 639 for (i = 0; i < numParityNodes; i++) { 640 tmpNode = rf_AllocDAGNode(); 641 tmpNode->list_next = dag_h->nodes; 642 dag_h->nodes = tmpNode; 643 } 644 readQNodes = dag_h->nodes; 645 646 for (i = 0; i < numParityNodes; i++) { 647 tmpNode = rf_AllocDAGNode(); 648 tmpNode->list_next = dag_h->nodes; 649 dag_h->nodes = tmpNode; 650 } 651 writeQNodes = dag_h->nodes; 652 653 for (i = 0; i < numParityNodes; i++) { 654 tmpNode = rf_AllocDAGNode(); 655 tmpNode->list_next = dag_h->nodes; 656 dag_h->nodes = tmpNode; 657 } 658 qNodes = dag_h->nodes; 659 } else { 660#endif 661 readQNodes = writeQNodes = qNodes = NULL; 662#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 663 } 664#endif 665 RF_ASSERT(i == totalNumNodes); 666 667 /* 668 * Step 3. initialize the nodes 669 */ 670 /* initialize block node (Nil) */ 671 nNodes = numDataNodes + (nfaults * numParityNodes); 672 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 673 rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, 674 dag_h, "Nil", allocList); 675 676 /* initialize commit node (Cmt) */ 677 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, 678 rf_NullNodeUndoFunc, NULL, nNodes, 679 (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList); 680 681 /* initialize terminate node (Trm) */ 682 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, 683 rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, 684 dag_h, "Trm", allocList); 685 686 /* initialize nodes which read old data (Rod) */ 687 tmpreadDataNode = readDataNodes; 688 for (i = 0; i < numDataNodes; i++) { 689 rf_InitNode(tmpreadDataNode, rf_wait, RF_FALSE, 690 rf_DiskReadFunc, rf_DiskReadUndoFunc, 691 rf_GenericWakeupFunc, (nfaults * numParityNodes), 692 1, 4, 0, dag_h, "Rod", allocList); 693 RF_ASSERT(pda != NULL); 694 /* physical disk addr desc */ 695 tmpreadDataNode->params[0].p = pda; 696 /* buffer to hold old data */ 697 tmpreadDataNode->params[1].p = rf_AllocBuffer(raidPtr, dag_h, 698 pda->numSector << raidPtr->logBytesPerSector); 699 tmpreadDataNode->params[2].v = parityStripeID; 700 tmpreadDataNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 701 which_ru); 702 pda = pda->next; 703 for (j = 0; j < tmpreadDataNode->numSuccedents; j++) { 704 tmpreadDataNode->propList[j] = NULL; 705 } 706 tmpreadDataNode = tmpreadDataNode->list_next; 707 } 708 709 /* initialize nodes which read old parity (Rop) */ 710 pda = asmap->parityInfo; 711 i = 0; 712 tmpreadParityNode = readParityNodes; 713 for (i = 0; i < numParityNodes; i++) { 714 RF_ASSERT(pda != NULL); 715 rf_InitNode(tmpreadParityNode, rf_wait, RF_FALSE, 716 rf_DiskReadFunc, rf_DiskReadUndoFunc, 717 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, 718 dag_h, "Rop", allocList); 719 tmpreadParityNode->params[0].p = pda; 720 /* buffer to hold old parity */ 721 tmpreadParityNode->params[1].p = rf_AllocBuffer(raidPtr, dag_h, 722 pda->numSector << raidPtr->logBytesPerSector); 723 tmpreadParityNode->params[2].v = parityStripeID; 724 tmpreadParityNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 725 which_ru); 726 pda = pda->next; 727 for (j = 0; j < tmpreadParityNode->numSuccedents; j++) { 728 tmpreadParityNode->propList[0] = NULL; 729 } 730 tmpreadParityNode = tmpreadParityNode->list_next; 731 } 732 733#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 734 /* initialize nodes which read old Q (Roq) */ 735 if (nfaults == 2) { 736 pda = asmap->qInfo; 737 tmpreadQNode = readQNodes; 738 for (i = 0; i < numParityNodes; i++) { 739 RF_ASSERT(pda != NULL); 740 rf_InitNode(tmpreadQNode, rf_wait, RF_FALSE, 741 rf_DiskReadFunc, rf_DiskReadUndoFunc, 742 rf_GenericWakeupFunc, numParityNodes, 743 1, 4, 0, dag_h, "Roq", allocList); 744 tmpreadQNode->params[0].p = pda; 745 /* buffer to hold old Q */ 746 tmpreadQNode->params[1].p = rf_AllocBuffer(raidPtr, dag_h, 747 pda->numSector << raidPtr->logBytesPerSector); 748 tmpreadQNode->params[2].v = parityStripeID; 749 tmpreadQNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 750 which_ru); 751 pda = pda->next; 752 for (j = 0; j < tmpreadQNode->numSuccedents; j++) { 753 tmpreadQNode->propList[0] = NULL; 754 } 755 tmpreadQNode = tmpreadQNode->list_next; 756 } 757 } 758#endif 759 /* initialize nodes which write new data (Wnd) */ 760 pda = asmap->physInfo; 761 tmpwriteDataNode = writeDataNodes; 762 for (i = 0; i < numDataNodes; i++) { 763 RF_ASSERT(pda != NULL); 764 rf_InitNode(tmpwriteDataNode, rf_wait, RF_FALSE, 765 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 766 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 767 "Wnd", allocList); 768 /* physical disk addr desc */ 769 tmpwriteDataNode->params[0].p = pda; 770 /* buffer holding new data to be written */ 771 tmpwriteDataNode->params[1].p = pda->bufPtr; 772 tmpwriteDataNode->params[2].v = parityStripeID; 773 tmpwriteDataNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 774 which_ru); 775 pda = pda->next; 776 tmpwriteDataNode = tmpwriteDataNode->list_next; 777 } 778 779 /* 780 * Initialize nodes which compute new parity and Q. 781 */ 782 /* 783 * We use the simple XOR func in the double-XOR case, and when 784 * we're accessing only a portion of one stripe unit. The 785 * distinction between the two is that the regular XOR func 786 * assumes that the targbuf is a full SU in size, and examines 787 * the pda associated with the buffer to decide where within 788 * the buffer to XOR the data, whereas the simple XOR func 789 * just XORs the data into the start of the buffer. */ 790 if ((numParityNodes == 2) || ((numDataNodes == 1) 791 && (asmap->totalSectorsAccessed < 792 raidPtr->Layout.sectorsPerStripeUnit))) { 793 func = pfuncs->simple; 794 undoFunc = rf_NullNodeUndoFunc; 795 name = pfuncs->SimpleName; 796 if (qfuncs) { 797 qfunc = qfuncs->simple; 798 qname = qfuncs->SimpleName; 799 } else { 800 qfunc = NULL; 801 qname = NULL; 802 } 803 } else { 804 func = pfuncs->regular; 805 undoFunc = rf_NullNodeUndoFunc; 806 name = pfuncs->RegularName; 807 if (qfuncs) { 808 qfunc = qfuncs->regular; 809 qname = qfuncs->RegularName; 810 } else { 811 qfunc = NULL; 812 qname = NULL; 813 } 814 } 815 /* 816 * Initialize the xor nodes: params are {pda,buf} 817 * from {Rod,Wnd,Rop} nodes, and raidPtr 818 */ 819 if (numParityNodes == 2) { 820 /* double-xor case */ 821 tmpxorNode = xorNodes; 822 tmpreadDataNode = readDataNodes; 823 tmpreadParityNode = readParityNodes; 824 tmpwriteDataNode = writeDataNodes; 825 tmpqNode = qNodes; 826 tmpreadQNode = readQNodes; 827 for (i = 0; i < numParityNodes; i++) { 828 /* note: no wakeup func for xor */ 829 rf_InitNode(tmpxorNode, rf_wait, RF_FALSE, func, 830 undoFunc, NULL, 1, 831 (numDataNodes + numParityNodes), 832 7, 1, dag_h, name, allocList); 833 tmpxorNode->flags |= RF_DAGNODE_FLAG_YIELD; 834 tmpxorNode->params[0] = tmpreadDataNode->params[0]; 835 tmpxorNode->params[1] = tmpreadDataNode->params[1]; 836 tmpxorNode->params[2] = tmpreadParityNode->params[0]; 837 tmpxorNode->params[3] = tmpreadParityNode->params[1]; 838 tmpxorNode->params[4] = tmpwriteDataNode->params[0]; 839 tmpxorNode->params[5] = tmpwriteDataNode->params[1]; 840 tmpxorNode->params[6].p = raidPtr; 841 /* use old parity buf as target buf */ 842 tmpxorNode->results[0] = tmpreadParityNode->params[1].p; 843#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 844 if (nfaults == 2) { 845 /* note: no wakeup func for qor */ 846 rf_InitNode(tmpqNode, rf_wait, RF_FALSE, 847 qfunc, undoFunc, NULL, 1, 848 (numDataNodes + numParityNodes), 849 7, 1, dag_h, qname, allocList); 850 tmpqNode->params[0] = tmpreadDataNode->params[0]; 851 tmpqNode->params[1] = tmpreadDataNode->params[1]; 852 tmpqNode->params[2] = tmpreadQNode->.params[0]; 853 tmpqNode->params[3] = tmpreadQNode->params[1]; 854 tmpqNode->params[4] = tmpwriteDataNode->params[0]; 855 tmpqNode->params[5] = tmpwriteDataNode->params[1]; 856 tmpqNode->params[6].p = raidPtr; 857 /* use old Q buf as target buf */ 858 tmpqNode->results[0] = tmpreadQNode->params[1].p; 859 tmpqNode = tmpqNode->list_next; 860 tmpreadQNodes = tmpreadQNodes->list_next; 861 } 862#endif 863 tmpxorNode = tmpxorNode->list_next; 864 tmpreadDataNode = tmpreadDataNode->list_next; 865 tmpreadParityNode = tmpreadParityNode->list_next; 866 tmpwriteDataNode = tmpwriteDataNode->list_next; 867 } 868 } else { 869 /* there is only one xor node in this case */ 870 rf_InitNode(xorNodes, rf_wait, RF_FALSE, func, 871 undoFunc, NULL, 1, (numDataNodes + numParityNodes), 872 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, 873 dag_h, name, allocList); 874 xorNodes->flags |= RF_DAGNODE_FLAG_YIELD; 875 tmpreadDataNode = readDataNodes; 876 for (i = 0; i < numDataNodes; i++) { /* used to be"numDataNodes + 1" until we factored 877 out the "+1" into the "deal with Rop separately below */ 878 /* set up params related to Rod nodes */ 879 xorNodes->params[2 * i + 0] = tmpreadDataNode->params[0]; /* pda */ 880 xorNodes->params[2 * i + 1] = tmpreadDataNode->params[1]; /* buffer ptr */ 881 tmpreadDataNode = tmpreadDataNode->list_next; 882 } 883 /* deal with Rop separately */ 884 xorNodes->params[2 * numDataNodes + 0] = readParityNodes->params[0]; /* pda */ 885 xorNodes->params[2 * numDataNodes + 1] = readParityNodes->params[1]; /* buffer ptr */ 886 887 tmpwriteDataNode = writeDataNodes; 888 for (i = 0; i < numDataNodes; i++) { 889 /* set up params related to Wnd and Wnp nodes */ 890 xorNodes->params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 891 tmpwriteDataNode->params[0]; 892 xorNodes->params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 893 tmpwriteDataNode->params[1]; 894 tmpwriteDataNode = tmpwriteDataNode->list_next; 895 } 896 /* xor node needs to get at RAID information */ 897 xorNodes->params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 898 xorNodes->results[0] = readParityNodes->params[1].p; 899#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 900 if (nfaults == 2) { 901 rf_InitNode(qNodes, rf_wait, RF_FALSE, qfunc, 902 undoFunc, NULL, 1, 903 (numDataNodes + numParityNodes), 904 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, 905 dag_h, qname, allocList); 906 tmpreadDataNode = readDataNodes; 907 for (i = 0; i < numDataNodes; i++) { 908 /* set up params related to Rod */ 909 qNodes->params[2 * i + 0] = tmpreadDataNode->params[0]; /* pda */ 910 qNodes->params[2 * i + 1] = tmpreadDataNode->params[1]; /* buffer ptr */ 911 tmpreadDataNode = tmpreadDataNode->list_next; 912 } 913 /* and read old q */ 914 qNodes->params[2 * numDataNodes + 0] = /* pda */ 915 readQNodes->params[0]; 916 qNodes->params[2 * numDataNodes + 1] = /* buffer ptr */ 917 readQNodes->params[1]; 918 tmpwriteDataNode = writeDataNodes; 919 for (i = 0; i < numDataNodes; i++) { 920 /* set up params related to Wnd nodes */ 921 qNodes->params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 922 tmpwriteDataNode->params[0]; 923 qNodes->params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 924 tmpwriteDataNode->params[1]; 925 tmpwriteDataNode = tmpwriteDataNode->list_next; 926 } 927 /* xor node needs to get at RAID information */ 928 qNodes->params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 929 qNodes->results[0] = readQNodes->params[1].p; 930 } 931#endif 932 } 933 934 /* initialize nodes which write new parity (Wnp) */ 935 pda = asmap->parityInfo; 936 tmpwriteParityNode = writeParityNodes; 937 tmpxorNode = xorNodes; 938 for (i = 0; i < numParityNodes; i++) { 939 rf_InitNode(tmpwriteParityNode, rf_wait, RF_FALSE, 940 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 941 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 942 "Wnp", allocList); 943 RF_ASSERT(pda != NULL); 944 tmpwriteParityNode->params[0].p = pda; /* param 1 (bufPtr) 945 * filled in by xor node */ 946 tmpwriteParityNode->params[1].p = tmpxorNode->results[0]; /* buffer pointer for 947 * parity write 948 * operation */ 949 tmpwriteParityNode->params[2].v = parityStripeID; 950 tmpwriteParityNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 951 which_ru); 952 pda = pda->next; 953 tmpwriteParityNode = tmpwriteParityNode->list_next; 954 tmpxorNode = tmpxorNode->list_next; 955 } 956 957#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 958 /* initialize nodes which write new Q (Wnq) */ 959 if (nfaults == 2) { 960 pda = asmap->qInfo; 961 tmpwriteQNode = writeQNodes; 962 tmpqNode = qNodes; 963 for (i = 0; i < numParityNodes; i++) { 964 rf_InitNode(tmpwriteQNode, rf_wait, RF_FALSE, 965 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 966 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 967 "Wnq", allocList); 968 RF_ASSERT(pda != NULL); 969 tmpwriteQNode->params[0].p = pda; /* param 1 (bufPtr) 970 * filled in by xor node */ 971 tmpwriteQNode->params[1].p = tmpqNode->results[0]; /* buffer pointer for 972 * parity write 973 * operation */ 974 tmpwriteQNode->params[2].v = parityStripeID; 975 tmpwriteQNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 976 which_ru); 977 pda = pda->next; 978 tmpwriteQNode = tmpwriteQNode->list_next; 979 tmpqNode = tmpqNode->list_next; 980 } 981 } 982#endif 983 /* 984 * Step 4. connect the nodes. 985 */ 986 987 /* connect header to block node */ 988 dag_h->succedents[0] = blockNode; 989 990 /* connect block node to read old data nodes */ 991 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults))); 992 tmpreadDataNode = readDataNodes; 993 for (i = 0; i < numDataNodes; i++) { 994 blockNode->succedents[i] = tmpreadDataNode; 995 RF_ASSERT(tmpreadDataNode->numAntecedents == 1); 996 tmpreadDataNode->antecedents[0] = blockNode; 997 tmpreadDataNode->antType[0] = rf_control; 998 tmpreadDataNode = tmpreadDataNode->list_next; 999 } 1000 1001 /* connect block node to read old parity nodes */ 1002 tmpreadParityNode = readParityNodes; 1003 for (i = 0; i < numParityNodes; i++) { 1004 blockNode->succedents[numDataNodes + i] = tmpreadParityNode; 1005 RF_ASSERT(tmpreadParityNode->numAntecedents == 1); 1006 tmpreadParityNode->antecedents[0] = blockNode; 1007 tmpreadParityNode->antType[0] = rf_control; 1008 tmpreadParityNode = tmpreadParityNode->list_next; 1009 } 1010 1011#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 1012 /* connect block node to read old Q nodes */ 1013 if (nfaults == 2) { 1014 tmpreadQNode = readQNodes; 1015 for (i = 0; i < numParityNodes; i++) { 1016 blockNode->succedents[numDataNodes + numParityNodes + i] = tmpreadQNode; 1017 RF_ASSERT(tmpreadQNode->numAntecedents == 1); 1018 tmpreadQNode->antecedents[0] = blockNode; 1019 tmpreadQNode->antType[0] = rf_control; 1020 tmpreadQNode = tmpreadQNode->list_next; 1021 } 1022 } 1023#endif 1024 /* connect read old data nodes to xor nodes */ 1025 tmpreadDataNode = readDataNodes; 1026 for (i = 0; i < numDataNodes; i++) { 1027 RF_ASSERT(tmpreadDataNode->numSuccedents == (nfaults * numParityNodes)); 1028 tmpxorNode = xorNodes; 1029 for (j = 0; j < numParityNodes; j++) { 1030 RF_ASSERT(tmpxorNode->numAntecedents == numDataNodes + numParityNodes); 1031 tmpreadDataNode->succedents[j] = tmpxorNode; 1032 tmpxorNode->antecedents[i] = tmpreadDataNode; 1033 tmpxorNode->antType[i] = rf_trueData; 1034 tmpxorNode = tmpxorNode->list_next; 1035 } 1036 tmpreadDataNode = tmpreadDataNode->list_next; 1037 } 1038 1039#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 1040 /* connect read old data nodes to q nodes */ 1041 if (nfaults == 2) { 1042 tmpreadDataNode = readDataNodes; 1043 for (i = 0; i < numDataNodes; i++) { 1044 tmpqNode = qNodes; 1045 for (j = 0; j < numParityNodes; j++) { 1046 RF_ASSERT(tmpqNode->numAntecedents == numDataNodes + numParityNodes); 1047 tmpreadDataNode->succedents[numParityNodes + j] = tmpqNode; 1048 tmpqNode->antecedents[i] = tmpreadDataNode; 1049 tmpqNode->antType[i] = rf_trueData; 1050 tmpqNode = tmpqNode->list_next; 1051 } 1052 tmpreadDataNode = tmpreadDataNode->list_next; 1053 } 1054 } 1055#endif 1056 /* connect read old parity nodes to xor nodes */ 1057 tmpreadParityNode = readParityNodes; 1058 for (i = 0; i < numParityNodes; i++) { 1059 RF_ASSERT(tmpreadParityNode->numSuccedents == numParityNodes); 1060 tmpxorNode = xorNodes; 1061 for (j = 0; j < numParityNodes; j++) { 1062 tmpreadParityNode->succedents[j] = tmpxorNode; 1063 tmpxorNode->antecedents[numDataNodes + i] = tmpreadParityNode; 1064 tmpxorNode->antType[numDataNodes + i] = rf_trueData; 1065 tmpxorNode = tmpxorNode->list_next; 1066 } 1067 tmpreadParityNode = tmpreadParityNode->list_next; 1068 } 1069 1070#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 1071 /* connect read old q nodes to q nodes */ 1072 if (nfaults == 2) { 1073 tmpreadParityNode = readParityNodes; 1074 tmpreadQNode = readQNodes; 1075 for (i = 0; i < numParityNodes; i++) { 1076 RF_ASSERT(tmpreadParityNode->numSuccedents == numParityNodes); 1077 tmpqNode = qNodes; 1078 for (j = 0; j < numParityNodes; j++) { 1079 tmpreadQNode->succedents[j] = tmpqNode; 1080 tmpqNode->antecedents[numDataNodes + i] = tmpreadQNodes; 1081 tmpqNode->antType[numDataNodes + i] = rf_trueData; 1082 tmpqNode = tmpqNode->list_next; 1083 } 1084 tmpreadParityNode = tmpreadParityNode->list_next; 1085 tmpreadQNode = tmpreadQNode->list_next; 1086 } 1087 } 1088#endif 1089 /* connect xor nodes to commit node */ 1090 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes)); 1091 tmpxorNode = xorNodes; 1092 for (i = 0; i < numParityNodes; i++) { 1093 RF_ASSERT(tmpxorNode->numSuccedents == 1); 1094 tmpxorNode->succedents[0] = commitNode; 1095 commitNode->antecedents[i] = tmpxorNode; 1096 commitNode->antType[i] = rf_control; 1097 tmpxorNode = tmpxorNode->list_next; 1098 } 1099 1100#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 1101 /* connect q nodes to commit node */ 1102 if (nfaults == 2) { 1103 tmpqNode = qNodes; 1104 for (i = 0; i < numParityNodes; i++) { 1105 RF_ASSERT(tmpqNode->numSuccedents == 1); 1106 tmpqNode->succedents[0] = commitNode; 1107 commitNode->antecedents[i + numParityNodes] = tmpqNode; 1108 commitNode->antType[i + numParityNodes] = rf_control; 1109 tmpqNode = tmpqNode->list_next; 1110 } 1111 } 1112#endif 1113 /* connect commit node to write nodes */ 1114 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes))); 1115 tmpwriteDataNode = writeDataNodes; 1116 for (i = 0; i < numDataNodes; i++) { 1117 RF_ASSERT(tmpwriteDataNodes->numAntecedents == 1); 1118 commitNode->succedents[i] = tmpwriteDataNode; 1119 tmpwriteDataNode->antecedents[0] = commitNode; 1120 tmpwriteDataNode->antType[0] = rf_trueData; 1121 tmpwriteDataNode = tmpwriteDataNode->list_next; 1122 } 1123 tmpwriteParityNode = writeParityNodes; 1124 for (i = 0; i < numParityNodes; i++) { 1125 RF_ASSERT(tmpwriteParityNode->numAntecedents == 1); 1126 commitNode->succedents[i + numDataNodes] = tmpwriteParityNode; 1127 tmpwriteParityNode->antecedents[0] = commitNode; 1128 tmpwriteParityNode->antType[0] = rf_trueData; 1129 tmpwriteParityNode = tmpwriteParityNode->list_next; 1130 } 1131#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 1132 if (nfaults == 2) { 1133 tmpwriteQNode = writeQNodes; 1134 for (i = 0; i < numParityNodes; i++) { 1135 RF_ASSERT(tmpwriteQNode->numAntecedents == 1); 1136 commitNode->succedents[i + numDataNodes + numParityNodes] = tmpwriteQNode; 1137 tmpwriteQNode->antecedents[0] = commitNode; 1138 tmpwriteQNode->antType[0] = rf_trueData; 1139 tmpwriteQNode = tmpwriteQNode->list_next; 1140 } 1141 } 1142#endif 1143 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 1144 RF_ASSERT(termNode->numSuccedents == 0); 1145 tmpwriteDataNode = writeDataNodes; 1146 for (i = 0; i < numDataNodes; i++) { 1147 /* connect write new data nodes to term node */ 1148 RF_ASSERT(tmpwriteDataNode->numSuccedents == 1); 1149 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 1150 tmpwriteDataNode->succedents[0] = termNode; 1151 termNode->antecedents[i] = tmpwriteDataNode; 1152 termNode->antType[i] = rf_control; 1153 tmpwriteDataNode = tmpwriteDataNode->list_next; 1154 } 1155 1156 tmpwriteParityNode = writeParityNodes; 1157 for (i = 0; i < numParityNodes; i++) { 1158 RF_ASSERT(tmpwriteParityNode->numSuccedents == 1); 1159 tmpwriteParityNode->succedents[0] = termNode; 1160 termNode->antecedents[numDataNodes + i] = tmpwriteParityNode; 1161 termNode->antType[numDataNodes + i] = rf_control; 1162 tmpwriteParityNode = tmpwriteParityNode->list_next; 1163 } 1164 1165#if (RF_INCLUDE_DECL_PQ > 0) || (RF_INCLUDE_RAID6 > 0) 1166 if (nfaults == 2) { 1167 tmpwriteQNode = writeQNodes; 1168 for (i = 0; i < numParityNodes; i++) { 1169 RF_ASSERT(tmpwriteQNode->numSuccedents == 1); 1170 tmpwriteQNode->succedents[0] = termNode; 1171 termNode->antecedents[numDataNodes + numParityNodes + i] = tmpwriteQNode; 1172 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1173 tmpwriteQNode = tmpwriteQNode->list_next; 1174 } 1175 } 1176#endif 1177} 1178 1179 1180/****************************************************************************** 1181 * create a write graph (fault-free or degraded) for RAID level 1 1182 * 1183 * Hdr -> Commit -> Wpd -> Nil -> Trm 1184 * -> Wsd -> 1185 * 1186 * The "Wpd" node writes data to the primary copy in the mirror pair 1187 * The "Wsd" node writes data to the secondary copy in the mirror pair 1188 * 1189 * Parameters: raidPtr - description of the physical array 1190 * asmap - logical & physical addresses for this access 1191 * bp - buffer ptr (holds write data) 1192 * flags - general flags (e.g. disk locking) 1193 * allocList - list of memory allocated in DAG creation 1194 *****************************************************************************/ 1195 1196void 1197rf_CreateRaidOneWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap, 1198 RF_DagHeader_t *dag_h, void *bp, 1199 RF_RaidAccessFlags_t flags, 1200 RF_AllocListElem_t *allocList) 1201{ 1202 RF_DagNode_t *unblockNode, *termNode, *commitNode; 1203 RF_DagNode_t *wndNode, *wmirNode; 1204 RF_DagNode_t *tmpNode, *tmpwndNode, *tmpwmirNode; 1205 int nWndNodes, nWmirNodes, i; 1206 RF_ReconUnitNum_t which_ru; 1207 RF_PhysDiskAddr_t *pda, *pdaP; 1208 RF_StripeNum_t parityStripeID; 1209 1210 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 1211 asmap->raidAddress, &which_ru); 1212#if RF_DEBUG_DAG 1213 if (rf_dagDebug) { 1214 printf("[Creating RAID level 1 write DAG]\n"); 1215 } 1216#endif 1217 dag_h->creator = "RaidOneWriteDAG"; 1218 1219 /* 2 implies access not SU aligned */ 1220 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; 1221 nWndNodes = (asmap->physInfo->next) ? 2 : 1; 1222 1223 /* alloc the Wnd nodes and the Wmir node */ 1224 if (asmap->numDataFailed == 1) 1225 nWndNodes--; 1226 if (asmap->numParityFailed == 1) 1227 nWmirNodes--; 1228 1229 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock 1230 * + terminator) */ 1231 for (i = 0; i < nWndNodes; i++) { 1232 tmpNode = rf_AllocDAGNode(); 1233 tmpNode->list_next = dag_h->nodes; 1234 dag_h->nodes = tmpNode; 1235 } 1236 wndNode = dag_h->nodes; 1237 1238 for (i = 0; i < nWmirNodes; i++) { 1239 tmpNode = rf_AllocDAGNode(); 1240 tmpNode->list_next = dag_h->nodes; 1241 dag_h->nodes = tmpNode; 1242 } 1243 wmirNode = dag_h->nodes; 1244 1245 commitNode = rf_AllocDAGNode(); 1246 commitNode->list_next = dag_h->nodes; 1247 dag_h->nodes = commitNode; 1248 1249 unblockNode = rf_AllocDAGNode(); 1250 unblockNode->list_next = dag_h->nodes; 1251 dag_h->nodes = unblockNode; 1252 1253 termNode = rf_AllocDAGNode(); 1254 termNode->list_next = dag_h->nodes; 1255 dag_h->nodes = termNode; 1256 1257 /* this dag can commit immediately */ 1258 dag_h->numCommitNodes = 1; 1259 dag_h->numCommits = 0; 1260 dag_h->numSuccedents = 1; 1261 1262 /* initialize the commit, unblock, and term nodes */ 1263 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, 1264 rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 1265 0, 0, 0, dag_h, "Cmt", allocList); 1266 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, 1267 rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 1268 0, 0, dag_h, "Nil", allocList); 1269 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, 1270 rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, 1271 dag_h, "Trm", allocList); 1272 1273 /* initialize the wnd nodes */ 1274 if (nWndNodes > 0) { 1275 pda = asmap->physInfo; 1276 tmpwndNode = wndNode; 1277 for (i = 0; i < nWndNodes; i++) { 1278 rf_InitNode(tmpwndNode, rf_wait, RF_FALSE, 1279 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1280 rf_GenericWakeupFunc, 1, 1, 4, 0, 1281 dag_h, "Wpd", allocList); 1282 RF_ASSERT(pda != NULL); 1283 tmpwndNode->params[0].p = pda; 1284 tmpwndNode->params[1].p = pda->bufPtr; 1285 tmpwndNode->params[2].v = parityStripeID; 1286 tmpwndNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 1287 pda = pda->next; 1288 tmpwndNode = tmpwndNode->list_next; 1289 } 1290 RF_ASSERT(pda == NULL); 1291 } 1292 /* initialize the mirror nodes */ 1293 if (nWmirNodes > 0) { 1294 pda = asmap->physInfo; 1295 pdaP = asmap->parityInfo; 1296 tmpwmirNode = wmirNode; 1297 for (i = 0; i < nWmirNodes; i++) { 1298 rf_InitNode(tmpwmirNode, rf_wait, RF_FALSE, 1299 rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1300 rf_GenericWakeupFunc, 1, 1, 4, 0, 1301 dag_h, "Wsd", allocList); 1302 RF_ASSERT(pda != NULL); 1303 tmpwmirNode->params[0].p = pdaP; 1304 tmpwmirNode->params[1].p = pda->bufPtr; 1305 tmpwmirNode->params[2].v = parityStripeID; 1306 tmpwmirNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, which_ru); 1307 pda = pda->next; 1308 pdaP = pdaP->next; 1309 tmpwmirNode = tmpwmirNode->list_next; 1310 } 1311 RF_ASSERT(pda == NULL); 1312 RF_ASSERT(pdaP == NULL); 1313 } 1314 /* link the header node to the commit node */ 1315 RF_ASSERT(dag_h->numSuccedents == 1); 1316 RF_ASSERT(commitNode->numAntecedents == 0); 1317 dag_h->succedents[0] = commitNode; 1318 1319 /* link the commit node to the write nodes */ 1320 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes)); 1321 tmpwndNode = wndNode; 1322 for (i = 0; i < nWndNodes; i++) { 1323 RF_ASSERT(tmpwndNode->numAntecedents == 1); 1324 commitNode->succedents[i] = tmpwndNode; 1325 tmpwndNode->antecedents[0] = commitNode; 1326 tmpwndNode->antType[0] = rf_control; 1327 tmpwndNode = tmpwndNode->list_next; 1328 } 1329 tmpwmirNode = wmirNode; 1330 for (i = 0; i < nWmirNodes; i++) { 1331 RF_ASSERT(tmpwmirNode->numAntecedents == 1); 1332 commitNode->succedents[i + nWndNodes] = tmpwmirNode; 1333 tmpwmirNode->antecedents[0] = commitNode; 1334 tmpwmirNode->antType[0] = rf_control; 1335 tmpwmirNode = tmpwmirNode->list_next; 1336 } 1337 1338 /* link the write nodes to the unblock node */ 1339 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes)); 1340 tmpwndNode = wndNode; 1341 for (i = 0; i < nWndNodes; i++) { 1342 RF_ASSERT(tmpwndNode->numSuccedents == 1); 1343 tmpwndNode->succedents[0] = unblockNode; 1344 unblockNode->antecedents[i] = tmpwndNode; 1345 unblockNode->antType[i] = rf_control; 1346 tmpwndNode = tmpwndNode->list_next; 1347 } 1348 tmpwmirNode = wmirNode; 1349 for (i = 0; i < nWmirNodes; i++) { 1350 RF_ASSERT(tmpwmirNode->numSuccedents == 1); 1351 tmpwmirNode->succedents[0] = unblockNode; 1352 unblockNode->antecedents[i + nWndNodes] = tmpwmirNode; 1353 unblockNode->antType[i + nWndNodes] = rf_control; 1354 tmpwmirNode = tmpwmirNode->list_next; 1355 } 1356 1357 /* link the unblock node to the term node */ 1358 RF_ASSERT(unblockNode->numSuccedents == 1); 1359 RF_ASSERT(termNode->numAntecedents == 1); 1360 RF_ASSERT(termNode->numSuccedents == 0); 1361 unblockNode->succedents[0] = termNode; 1362 termNode->antecedents[0] = unblockNode; 1363 termNode->antType[0] = rf_control; 1364} 1365