1/* 2 * Copyright 2018-2022, Andrew Lindesay <apl@lindesay.co.nz>. 3 * All rights reserved. Distributed under the terms of the MIT License. 4 */ 5 6 7#include "ProcessCoordinator.h" 8 9#include <AutoLocker.h> 10#include <Catalog.h> 11#include <StringFormat.h> 12#include <Uuid.h> 13 14#include "Logger.h" 15 16 17#undef B_TRANSLATION_CONTEXT 18#define B_TRANSLATION_CONTEXT "ProcessCoordinator" 19 20#define LOCK_TIMEOUT_MICROS (1000 * 1000) 21 22// These are keys that are used to store the ProcessCoordinatorState data into 23// a BMessage instance. 24 25#define KEY_PROCESS_COORDINATOR_IDENTIFIER "processCoordinatorIdentifier" 26#define KEY_PROGRESS "progress" 27#define KEY_MESSAGE "message" 28#define KEY_IS_RUNNING "isRunning" 29#define KEY_ERROR_STATUS "errorStatus" 30 31 32// #pragma mark - ProcessCoordinatorState implementation 33 34 35ProcessCoordinatorState::ProcessCoordinatorState(BMessage* from) 36{ 37 if (from->FindString(KEY_PROCESS_COORDINATOR_IDENTIFIER, 38 &fProcessCoordinatorIdentifier) != B_OK) { 39 HDFATAL("unable to find the key [%s]", 40 KEY_PROCESS_COORDINATOR_IDENTIFIER); 41 } 42 43 if (from->FindFloat(KEY_PROGRESS, &fProgress) != B_OK) { 44 HDFATAL("unable to find the key [%s]", KEY_PROGRESS); 45 } 46 47 if (from->FindString(KEY_MESSAGE, &fMessage) != B_OK) { 48 HDFATAL("unable to find the key [%s]", KEY_MESSAGE); 49 } 50 51 if (from->FindBool(KEY_IS_RUNNING, &fIsRunning) != B_OK) { 52 HDFATAL("unable to find the key [%s]", KEY_IS_RUNNING); 53 } 54 55 int64 errorStatusNumeric; 56 if (from->FindInt64(KEY_ERROR_STATUS, &errorStatusNumeric) != B_OK) { 57 HDFATAL("unable to find the key [%s]", KEY_ERROR_STATUS); 58 } 59 fErrorStatus = static_cast<status_t>(errorStatusNumeric); 60} 61 62 63ProcessCoordinatorState::ProcessCoordinatorState( 64 const ProcessCoordinator* processCoordinator, float progress, 65 const BString& message, bool isRunning, status_t errorStatus) 66 : 67 fProcessCoordinatorIdentifier(processCoordinator->Identifier()), 68 fProgress(progress), 69 fMessage(message), 70 fIsRunning(isRunning), 71 fErrorStatus(errorStatus) 72{ 73} 74 75 76ProcessCoordinatorState::~ProcessCoordinatorState() 77{ 78} 79 80 81const BString 82ProcessCoordinatorState::ProcessCoordinatorIdentifier() const 83{ 84 return fProcessCoordinatorIdentifier; 85} 86 87 88float 89ProcessCoordinatorState::Progress() const 90{ 91 return fProgress; 92} 93 94 95BString 96ProcessCoordinatorState::Message() const 97{ 98 return fMessage; 99} 100 101 102bool 103ProcessCoordinatorState::IsRunning() const 104{ 105 return fIsRunning; 106} 107 108 109status_t 110ProcessCoordinatorState::ErrorStatus() const 111{ 112 return fErrorStatus; 113} 114 115 116status_t 117ProcessCoordinatorState::Archive(BMessage* into, bool deep) const 118{ 119 status_t result = B_OK; 120 if (result == B_OK) { 121 result = into->AddString(KEY_PROCESS_COORDINATOR_IDENTIFIER, 122 fProcessCoordinatorIdentifier); 123 } 124 if (result == B_OK) 125 result = into->AddFloat(KEY_PROGRESS, fProgress); 126 if (result == B_OK) 127 result = into->AddString(KEY_MESSAGE, fMessage); 128 if (result == B_OK) 129 result = into->AddBool(KEY_IS_RUNNING, fIsRunning); 130 if (result == B_OK) 131 result = into->AddInt64(KEY_ERROR_STATUS, static_cast<int64>(fErrorStatus)); 132 return result; 133} 134 135 136// #pragma mark - ProcessCoordinator implementation 137 138 139ProcessCoordinator::ProcessCoordinator(const char* name, BMessage* message) 140 : 141 fName(name), 142 fLock(), 143 fCoordinateAndCallListenerRerun(false), 144 fCoordinateAndCallListenerRerunLock(), 145 fListener(NULL), 146 fMessage(message), 147 fWasStopped(false), 148 fIdentifier(BUuid().ToString()) 149{ 150} 151 152 153ProcessCoordinator::~ProcessCoordinator() 154{ 155 AutoLocker<BLocker> locker(&fLock); 156 for (int32 i = 0; i < fNodes.CountItems(); i++) { 157 AbstractProcessNode* node = fNodes.ItemAt(i); 158 node->Process()->SetListener(NULL); 159 delete node; 160 } 161 delete fMessage; 162} 163 164const BString& 165ProcessCoordinator::Identifier() const 166{ 167 return fIdentifier; 168} 169 170 171void 172ProcessCoordinator::SetListener(ProcessCoordinatorListener* listener) 173{ 174 fListener = listener; 175} 176 177 178void 179ProcessCoordinator::AddNode(AbstractProcessNode* node) 180{ 181 AutoLocker<BLocker> locker(&fLock); 182 fNodes.AddItem(node); 183 node->SetListener(this); 184 node->Process()->SetListener(this); 185} 186 187 188void 189ProcessCoordinator::ProcessChanged() 190{ 191 _CoordinateAndCallListener(); 192} 193 194 195bool 196ProcessCoordinator::IsRunning() 197{ 198 AutoLocker<BLocker> locker(&fLock); 199 for (int32 i = 0; i < fNodes.CountItems(); i++) { 200 AbstractProcessNode* node = fNodes.ItemAt(i); 201 if (node->IsRunning()) 202 return true; 203 } 204 205 return false; 206} 207 208 209void 210ProcessCoordinator::Start() 211{ 212 _CoordinateAndCallListener(); 213} 214 215 216void 217ProcessCoordinator::RequestStop() 218{ 219 AutoLocker<BLocker> locker(&fLock); 220 if (!fWasStopped) { 221 fWasStopped = true; 222 HDINFO("[Coordinator] will stop process coordinator"); 223 for (int32 i = 0; i < fNodes.CountItems(); i++) { 224 AbstractProcessNode* node = fNodes.ItemAt(i); 225 HDINFO("[Coordinator] stopping process [%s]", 226 node->Process()->Name()); 227 node->RequestStop(); 228 } 229 } 230} 231 232 233status_t 234ProcessCoordinator::ErrorStatus() 235{ 236 AutoLocker<BLocker> locker(&fLock); 237 for (int32 i = 0; i < fNodes.CountItems(); i++) { 238 status_t result = fNodes.ItemAt(i)->Process()->ErrorStatus(); 239 240 if (result != B_OK) 241 return result; 242 } 243 244 return B_OK; 245} 246 247 248float 249ProcessCoordinator::Progress() 250{ 251 AutoLocker<BLocker> locker(&fLock); 252 float result = 0.0f; 253 254 if (!fWasStopped) { 255 int32 count = fNodes.CountItems(); 256 257 // if there is only one then return it's value directly because this 258 // allows for the indeterminate state of -1. 259 260 if (count == 1) 261 result = fNodes.ItemAt(0)->Process()->Progress(); 262 else { 263 float progressPerNode = 1.0f / ((float) count); 264 265 for (int32 i = count - 1; i >= 0; i--) { 266 AbstractProcess* process = fNodes.ItemAt(i)->Process(); 267 268 switch(process->ProcessState()) { 269 case PROCESS_INITIAL: 270 break; 271 case PROCESS_RUNNING: 272 result += (progressPerNode * fmaxf( 273 0.0f, fminf(1.0, process->Progress()))); 274 break; 275 case PROCESS_COMPLETE: 276 result += progressPerNode; 277 break; 278 } 279 } 280 } 281 } 282 return result; 283} 284 285 286const BString& 287ProcessCoordinator::Name() const 288{ 289 return fName; 290} 291 292 293BMessage* 294ProcessCoordinator::Message() const 295{ 296 return fMessage; 297} 298 299 300BString 301ProcessCoordinator::_CreateStatusMessage() 302{ 303 // work through the nodes and take a description from the first one. If 304 // there are others present then use a 'plus X others' suffix. Go backwards 305 // through the processes so that the most recent activity is shown first. 306 307 BString firstProcessDescription; 308 uint32 additionalRunningProcesses = 0; 309 310 for (int32 i = fNodes.CountItems() - 1; i >= 0; i--) { 311 AbstractProcess* process = fNodes.ItemAt(i)->Process(); 312 if (process->ProcessState() == PROCESS_RUNNING) { 313 if (firstProcessDescription.IsEmpty()) { 314 if (strlen(process->Description()) != 0) 315 firstProcessDescription = process->Description(); 316 else 317 additionalRunningProcesses++; 318 } 319 else 320 additionalRunningProcesses++; 321 } 322 } 323 324 if (firstProcessDescription.IsEmpty()) 325 return "???"; 326 327 if (additionalRunningProcesses == 0) 328 return firstProcessDescription; 329 330 static BStringFormat format(B_TRANSLATE( 331 "%FIRST_PROCESS_DESCRIPTION% +" 332 "{0, plural, one{# process} other{# processes}}")); 333 BString result; 334 format.Format(result, additionalRunningProcesses); 335 result.ReplaceAll("%FIRST_PROCESS_DESCRIPTION%", firstProcessDescription); 336 337 return result; 338} 339 340 341/*! This method assumes that a lock is held on the coordinator. */ 342 343ProcessCoordinatorState 344ProcessCoordinator::_CreateStatus() 345{ 346 return ProcessCoordinatorState( 347 this, Progress(), _CreateStatusMessage(), IsRunning(), ErrorStatus()); 348} 349 350 351/*! This will try to obtain the lock and if it cannot obtain the lock then 352 it will flag that when the coordinator has finished its current 353 coordination, it should initiate another coordination. 354 */ 355void 356ProcessCoordinator::_CoordinateAndCallListener() 357{ 358 if (fLock.LockWithTimeout(LOCK_TIMEOUT_MICROS) != B_OK) { 359 HDDEBUG("[Coordinator] would coordinate nodes, but coordination is " 360 "in progress - will defer"); 361 AutoLocker<BLocker> locker(&fCoordinateAndCallListenerRerunLock); 362 fCoordinateAndCallListenerRerun = true; 363 return; 364 } 365 366 ProcessCoordinatorState state = _Coordinate(); 367 368 if (fListener != NULL) 369 fListener->CoordinatorChanged(state); 370 371 fLock.Unlock(); 372 373 bool coordinateAndCallListenerRerun = false; 374 375 { 376 AutoLocker<BLocker> locker(&fCoordinateAndCallListenerRerunLock); 377 coordinateAndCallListenerRerun = fCoordinateAndCallListenerRerun; 378 fCoordinateAndCallListenerRerun = false; 379 } 380 381 if (coordinateAndCallListenerRerun) { 382 HDDEBUG("[Coordinator] will run deferred coordination"); 383 _CoordinateAndCallListener(); 384 } 385} 386 387 388ProcessCoordinatorState 389ProcessCoordinator::_Coordinate() 390{ 391 HDTRACE("[Coordinator] will coordinate nodes"); 392 AutoLocker<BLocker> locker(&fLock); 393 _StopSuccessorNodesToErroredOrStoppedNodes(); 394 395 // go through the nodes and find those that are still to be run and 396 // for which the preconditions are met to start. 397 for (int32 i = 0; i < fNodes.CountItems(); i++) { 398 AbstractProcessNode* node = fNodes.ItemAt(i); 399 400 if (node->Process()->ProcessState() == PROCESS_INITIAL) { 401 if (node->AllPredecessorsComplete()) 402 node->Start(); 403 else { 404 HDTRACE("[Coordinator] all predecessors not complete -> " 405 "[%s] not started", node->Process()->Name()); 406 } 407 } else { 408 HDTRACE("[Coordinator] process [%s] running or complete", 409 node->Process()->Name()); 410 } 411 } 412 413 return _CreateStatus(); 414} 415 416 417/*! This method assumes that a lock is held on the coordinator. */ 418 419void 420ProcessCoordinator::_StopSuccessorNodesToErroredOrStoppedNodes() 421{ 422 for (int32 i = 0; i < fNodes.CountItems(); i++) { 423 AbstractProcessNode* node = fNodes.ItemAt(i); 424 AbstractProcess* process = node->Process(); 425 426 if (process->WasStopped() || process->ErrorStatus() != B_OK) 427 _StopSuccessorNodes(node); 428 } 429} 430 431 432/*! This method assumes that a lock is held on the coordinator. */ 433 434void 435ProcessCoordinator::_StopSuccessorNodes(AbstractProcessNode* predecessorNode) 436{ 437 for (int32 i = 0; i < predecessorNode->CountSuccessors(); i++) { 438 AbstractProcessNode* node = predecessorNode->SuccessorAt(i); 439 AbstractProcess* process = node->Process(); 440 441 if (process->ProcessState() == PROCESS_INITIAL) { 442 HDDEBUG("[Coordinator] [%s] (failed) --> [%s] (stopping)", 443 predecessorNode->Process()->Name(), process->Name()); 444 node->RequestStop(); 445 _StopSuccessorNodes(node); 446 } 447 } 448} 449 450 451int32 452ProcessCoordinator::_CountNodesCompleted() 453{ 454 int32 nodesCompleted = 0; 455 for (int32 i = 0; i < fNodes.CountItems(); i++) { 456 AbstractProcess *process = fNodes.ItemAt(i)->Process(); 457 if (process->ProcessState() == PROCESS_COMPLETE) 458 nodesCompleted++; 459 } 460 return nodesCompleted; 461} 462 463 464BString 465ProcessCoordinator::LogReport() 466{ 467 BString result; 468 AutoLocker<BLocker> locker(&fLock); 469 470 for (int32 i = 0; i < fNodes.CountItems(); i++) { 471 if (0 != result.Length()) 472 result.Append("\n"); 473 AbstractProcessNode* node = fNodes.ItemAt(i); 474 result.Append(node->LogReport()); 475 } 476 477 return result; 478} 479