1/*
2 * Copyright 2018-2022, Andrew Lindesay <apl@lindesay.co.nz>.
3 * All rights reserved. Distributed under the terms of the MIT License.
4 */
5
6
7#include "ProcessCoordinator.h"
8
9#include <AutoLocker.h>
10#include <Catalog.h>
11#include <StringFormat.h>
12#include <Uuid.h>
13
14#include "Logger.h"
15
16
17#undef B_TRANSLATION_CONTEXT
18#define B_TRANSLATION_CONTEXT "ProcessCoordinator"
19
20#define LOCK_TIMEOUT_MICROS (1000 * 1000)
21
22// These are keys that are used to store the ProcessCoordinatorState data into
23// a BMessage instance.
24
25#define KEY_PROCESS_COORDINATOR_IDENTIFIER	"processCoordinatorIdentifier"
26#define KEY_PROGRESS						"progress"
27#define KEY_MESSAGE							"message"
28#define KEY_IS_RUNNING						"isRunning"
29#define KEY_ERROR_STATUS					"errorStatus"
30
31
32// #pragma mark - ProcessCoordinatorState implementation
33
34
35ProcessCoordinatorState::ProcessCoordinatorState(BMessage* from)
36{
37	if (from->FindString(KEY_PROCESS_COORDINATOR_IDENTIFIER,
38			&fProcessCoordinatorIdentifier) != B_OK) {
39		HDFATAL("unable to find the key [%s]",
40			KEY_PROCESS_COORDINATOR_IDENTIFIER);
41	}
42
43	if (from->FindFloat(KEY_PROGRESS, &fProgress) != B_OK) {
44		HDFATAL("unable to find the key [%s]", KEY_PROGRESS);
45	}
46
47	if (from->FindString(KEY_MESSAGE, &fMessage) != B_OK) {
48		HDFATAL("unable to find the key [%s]", KEY_MESSAGE);
49	}
50
51	if (from->FindBool(KEY_IS_RUNNING, &fIsRunning) != B_OK) {
52		HDFATAL("unable to find the key [%s]", KEY_IS_RUNNING);
53	}
54
55	int64 errorStatusNumeric;
56	if (from->FindInt64(KEY_ERROR_STATUS, &errorStatusNumeric) != B_OK) {
57		HDFATAL("unable to find the key [%s]", KEY_ERROR_STATUS);
58	}
59	fErrorStatus = static_cast<status_t>(errorStatusNumeric);
60}
61
62
63ProcessCoordinatorState::ProcessCoordinatorState(
64	const ProcessCoordinator* processCoordinator, float progress,
65	const BString& message, bool isRunning, status_t errorStatus)
66	:
67	fProcessCoordinatorIdentifier(processCoordinator->Identifier()),
68	fProgress(progress),
69	fMessage(message),
70	fIsRunning(isRunning),
71	fErrorStatus(errorStatus)
72{
73}
74
75
76ProcessCoordinatorState::~ProcessCoordinatorState()
77{
78}
79
80
81const BString
82ProcessCoordinatorState::ProcessCoordinatorIdentifier() const
83{
84	return fProcessCoordinatorIdentifier;
85}
86
87
88float
89ProcessCoordinatorState::Progress() const
90{
91	return fProgress;
92}
93
94
95BString
96ProcessCoordinatorState::Message() const
97{
98	return fMessage;
99}
100
101
102bool
103ProcessCoordinatorState::IsRunning() const
104{
105	return fIsRunning;
106}
107
108
109status_t
110ProcessCoordinatorState::ErrorStatus() const
111{
112	return fErrorStatus;
113}
114
115
116status_t
117ProcessCoordinatorState::Archive(BMessage* into, bool deep) const
118{
119	status_t result = B_OK;
120	if (result == B_OK) {
121		result = into->AddString(KEY_PROCESS_COORDINATOR_IDENTIFIER,
122			fProcessCoordinatorIdentifier);
123	}
124	if (result == B_OK)
125		result = into->AddFloat(KEY_PROGRESS, fProgress);
126	if (result == B_OK)
127		result = into->AddString(KEY_MESSAGE, fMessage);
128	if (result == B_OK)
129		result = into->AddBool(KEY_IS_RUNNING, fIsRunning);
130	if (result == B_OK)
131		result = into->AddInt64(KEY_ERROR_STATUS, static_cast<int64>(fErrorStatus));
132	return result;
133}
134
135
136// #pragma mark - ProcessCoordinator implementation
137
138
139ProcessCoordinator::ProcessCoordinator(const char* name, BMessage* message)
140	:
141	fName(name),
142	fLock(),
143	fCoordinateAndCallListenerRerun(false),
144	fCoordinateAndCallListenerRerunLock(),
145	fListener(NULL),
146	fMessage(message),
147	fWasStopped(false),
148	fIdentifier(BUuid().ToString())
149{
150}
151
152
153ProcessCoordinator::~ProcessCoordinator()
154{
155	AutoLocker<BLocker> locker(&fLock);
156	for (int32 i = 0; i < fNodes.CountItems(); i++) {
157		AbstractProcessNode* node = fNodes.ItemAt(i);
158		node->Process()->SetListener(NULL);
159		delete node;
160	}
161	delete fMessage;
162}
163
164const BString&
165ProcessCoordinator::Identifier() const
166{
167	return fIdentifier;
168}
169
170
171void
172ProcessCoordinator::SetListener(ProcessCoordinatorListener* listener)
173{
174	fListener = listener;
175}
176
177
178void
179ProcessCoordinator::AddNode(AbstractProcessNode* node)
180{
181	AutoLocker<BLocker> locker(&fLock);
182	fNodes.AddItem(node);
183	node->SetListener(this);
184	node->Process()->SetListener(this);
185}
186
187
188void
189ProcessCoordinator::ProcessChanged()
190{
191	_CoordinateAndCallListener();
192}
193
194
195bool
196ProcessCoordinator::IsRunning()
197{
198	AutoLocker<BLocker> locker(&fLock);
199	for (int32 i = 0; i < fNodes.CountItems(); i++) {
200		AbstractProcessNode* node = fNodes.ItemAt(i);
201		if (node->IsRunning())
202			return true;
203	}
204
205	return false;
206}
207
208
209void
210ProcessCoordinator::Start()
211{
212	_CoordinateAndCallListener();
213}
214
215
216void
217ProcessCoordinator::RequestStop()
218{
219	AutoLocker<BLocker> locker(&fLock);
220	if (!fWasStopped) {
221		fWasStopped = true;
222		HDINFO("[Coordinator] will stop process coordinator");
223		for (int32 i = 0; i < fNodes.CountItems(); i++) {
224			AbstractProcessNode* node = fNodes.ItemAt(i);
225			HDINFO("[Coordinator] stopping process [%s]",
226				node->Process()->Name());
227			node->RequestStop();
228		}
229	}
230}
231
232
233status_t
234ProcessCoordinator::ErrorStatus()
235{
236	AutoLocker<BLocker> locker(&fLock);
237	for (int32 i = 0; i < fNodes.CountItems(); i++) {
238		status_t result = fNodes.ItemAt(i)->Process()->ErrorStatus();
239
240		if (result != B_OK)
241			return result;
242	}
243
244	return B_OK;
245}
246
247
248float
249ProcessCoordinator::Progress()
250{
251	AutoLocker<BLocker> locker(&fLock);
252	float result = 0.0f;
253
254	if (!fWasStopped) {
255		int32 count = fNodes.CountItems();
256
257		// if there is only one then return it's value directly because this
258		// allows for the indeterminate state of -1.
259
260		if (count == 1)
261			result = fNodes.ItemAt(0)->Process()->Progress();
262		else {
263			float progressPerNode = 1.0f / ((float) count);
264
265			for (int32 i = count - 1; i >= 0; i--) {
266				AbstractProcess* process = fNodes.ItemAt(i)->Process();
267
268				switch(process->ProcessState()) {
269					case PROCESS_INITIAL:
270						break;
271					case PROCESS_RUNNING:
272						result += (progressPerNode * fmaxf(
273							0.0f, fminf(1.0, process->Progress())));
274						break;
275					case PROCESS_COMPLETE:
276						result += progressPerNode;
277						break;
278				}
279			}
280		}
281	}
282	return result;
283}
284
285
286const BString&
287ProcessCoordinator::Name() const
288{
289	return fName;
290}
291
292
293BMessage*
294ProcessCoordinator::Message() const
295{
296	return fMessage;
297}
298
299
300BString
301ProcessCoordinator::_CreateStatusMessage()
302{
303	// work through the nodes and take a description from the first one.  If
304	// there are others present then use a 'plus X others' suffix.  Go backwards
305	// through the processes so that the most recent activity is shown first.
306
307	BString firstProcessDescription;
308	uint32 additionalRunningProcesses = 0;
309
310	for (int32 i = fNodes.CountItems() - 1; i >= 0; i--) {
311		AbstractProcess* process = fNodes.ItemAt(i)->Process();
312		if (process->ProcessState() == PROCESS_RUNNING) {
313			if (firstProcessDescription.IsEmpty()) {
314				if (strlen(process->Description()) != 0)
315					firstProcessDescription = process->Description();
316				else
317					additionalRunningProcesses++;
318			}
319			else
320				additionalRunningProcesses++;
321		}
322	}
323
324	if (firstProcessDescription.IsEmpty())
325		return "???";
326
327	if (additionalRunningProcesses == 0)
328		return firstProcessDescription;
329
330	static BStringFormat format(B_TRANSLATE(
331		"%FIRST_PROCESS_DESCRIPTION% +"
332		"{0, plural, one{# process} other{# processes}}"));
333	BString result;
334	format.Format(result, additionalRunningProcesses);
335	result.ReplaceAll("%FIRST_PROCESS_DESCRIPTION%", firstProcessDescription);
336
337	return result;
338}
339
340
341/*! This method assumes that a lock is held on the coordinator. */
342
343ProcessCoordinatorState
344ProcessCoordinator::_CreateStatus()
345{
346	return ProcessCoordinatorState(
347		this, Progress(), _CreateStatusMessage(), IsRunning(), ErrorStatus());
348}
349
350
351/*! This will try to obtain the lock and if it cannot obtain the lock then
352    it will flag that when the coordinator has finished its current
353    coordination, it should initiate another coordination.
354 */
355void
356ProcessCoordinator::_CoordinateAndCallListener()
357{
358	if (fLock.LockWithTimeout(LOCK_TIMEOUT_MICROS) != B_OK) {
359		HDDEBUG("[Coordinator] would coordinate nodes, but coordination is "
360			"in progress - will defer");
361		AutoLocker<BLocker> locker(&fCoordinateAndCallListenerRerunLock);
362		fCoordinateAndCallListenerRerun = true;
363		return;
364	}
365
366	ProcessCoordinatorState state = _Coordinate();
367
368	if (fListener != NULL)
369		fListener->CoordinatorChanged(state);
370
371	fLock.Unlock();
372
373	bool coordinateAndCallListenerRerun = false;
374
375	{
376		AutoLocker<BLocker> locker(&fCoordinateAndCallListenerRerunLock);
377		coordinateAndCallListenerRerun = fCoordinateAndCallListenerRerun;
378		fCoordinateAndCallListenerRerun = false;
379	}
380
381	if (coordinateAndCallListenerRerun) {
382		HDDEBUG("[Coordinator] will run deferred coordination");
383		_CoordinateAndCallListener();
384	}
385}
386
387
388ProcessCoordinatorState
389ProcessCoordinator::_Coordinate()
390{
391	HDTRACE("[Coordinator] will coordinate nodes");
392	AutoLocker<BLocker> locker(&fLock);
393	_StopSuccessorNodesToErroredOrStoppedNodes();
394
395	// go through the nodes and find those that are still to be run and
396	// for which the preconditions are met to start.
397	for (int32 i = 0; i < fNodes.CountItems(); i++) {
398		AbstractProcessNode* node = fNodes.ItemAt(i);
399
400		if (node->Process()->ProcessState() == PROCESS_INITIAL) {
401			if (node->AllPredecessorsComplete())
402				node->Start();
403			else {
404				HDTRACE("[Coordinator] all predecessors not complete -> "
405					"[%s] not started", node->Process()->Name());
406			}
407		} else {
408			HDTRACE("[Coordinator] process [%s] running or complete",
409				node->Process()->Name());
410		}
411	}
412
413	return _CreateStatus();
414}
415
416
417/*! This method assumes that a lock is held on the coordinator. */
418
419void
420ProcessCoordinator::_StopSuccessorNodesToErroredOrStoppedNodes()
421{
422	for (int32 i = 0; i < fNodes.CountItems(); i++) {
423		AbstractProcessNode* node = fNodes.ItemAt(i);
424		AbstractProcess* process = node->Process();
425
426		if (process->WasStopped() || process->ErrorStatus() != B_OK)
427			_StopSuccessorNodes(node);
428	}
429}
430
431
432/*! This method assumes that a lock is held on the coordinator. */
433
434void
435ProcessCoordinator::_StopSuccessorNodes(AbstractProcessNode* predecessorNode)
436{
437	for (int32 i = 0; i < predecessorNode->CountSuccessors(); i++) {
438		AbstractProcessNode* node = predecessorNode->SuccessorAt(i);
439		AbstractProcess* process = node->Process();
440
441		if (process->ProcessState() == PROCESS_INITIAL) {
442			HDDEBUG("[Coordinator] [%s] (failed) --> [%s] (stopping)",
443				predecessorNode->Process()->Name(), process->Name());
444			node->RequestStop();
445			_StopSuccessorNodes(node);
446		}
447	}
448}
449
450
451int32
452ProcessCoordinator::_CountNodesCompleted()
453{
454	int32 nodesCompleted = 0;
455	for (int32 i = 0; i < fNodes.CountItems(); i++) {
456		AbstractProcess *process = fNodes.ItemAt(i)->Process();
457		if (process->ProcessState() == PROCESS_COMPLETE)
458			nodesCompleted++;
459	}
460	return nodesCompleted;
461}
462
463
464BString
465ProcessCoordinator::LogReport()
466{
467	BString result;
468	AutoLocker<BLocker> locker(&fLock);
469
470	for (int32 i = 0; i < fNodes.CountItems(); i++) {
471		if (0 != result.Length())
472			result.Append("\n");
473		AbstractProcessNode* node = fNodes.ItemAt(i);
474		result.Append(node->LogReport());
475	}
476
477	return result;
478}
479