1/* 2 * Copyright 2015, Axel Dörfler, axeld@pinc-software.de. 3 * Distributed under the terms of the MIT License. 4 */ 5 6 7#include "Worker.h" 8 9 10static const bigtime_t kWorkerTimeout = 1000000; 11 // One second until a worker thread quits without a job 12 13static const int32 kWorkerCountPerCPU = 3; 14 15static int32 sWorkerCount; 16 17 18Worker::Worker(JobQueue& queue) 19 : 20 fThread(-1), 21 fJobQueue(queue) 22{ 23} 24 25 26Worker::~Worker() 27{ 28} 29 30 31status_t 32Worker::Init() 33{ 34 fThread = spawn_thread(&Worker::_Process, Name(), B_NORMAL_PRIORITY, 35 this); 36 if (fThread < 0) 37 return fThread; 38 39 status_t status = resume_thread(fThread); 40 if (status == B_OK) 41 atomic_add(&sWorkerCount, 1); 42 43 return status; 44} 45 46 47status_t 48Worker::Process() 49{ 50 while (true) { 51 BJob* job; 52 status_t status = fJobQueue.Pop(Timeout(), false, &job); 53 if (status != B_OK) 54 return status; 55 56 status = Run(job); 57 if (status != B_OK) { 58 // TODO: proper error reporting on failed job! 59 debug_printf("Launching %s failed: %s\n", job->Title().String(), 60 strerror(status)); 61 } 62 } 63} 64 65 66bigtime_t 67Worker::Timeout() const 68{ 69 return kWorkerTimeout; 70} 71 72 73const char* 74Worker::Name() const 75{ 76 return "worker"; 77} 78 79 80status_t 81Worker::Run(BJob* job) 82{ 83 return job->Run(); 84} 85 86 87/*static*/ status_t 88Worker::_Process(void* _self) 89{ 90 Worker* self = (Worker*)_self; 91 status_t status = self->Process(); 92 delete self; 93 94 return status; 95} 96 97 98// #pragma mark - 99 100 101MainWorker::MainWorker(JobQueue& queue) 102 : 103 Worker(queue), 104 fMaxWorkerCount(kWorkerCountPerCPU) 105{ 106 // TODO: keep track of workers, and quit them on destruction 107 system_info info; 108 if (get_system_info(&info) == B_OK) 109 fMaxWorkerCount = info.cpu_count * kWorkerCountPerCPU; 110} 111 112 113bigtime_t 114MainWorker::Timeout() const 115{ 116 return B_INFINITE_TIMEOUT; 117} 118 119 120const char* 121MainWorker::Name() const 122{ 123 return "main worker"; 124} 125 126 127status_t 128MainWorker::Run(BJob* job) 129{ 130 int32 count = atomic_get(&sWorkerCount); 131 132 size_t jobCount = fJobQueue.CountJobs(); 133 if (jobCount > INT_MAX) 134 jobCount = INT_MAX; 135 136 if ((int32)jobCount > count && count < fMaxWorkerCount) { 137 Worker* worker = new Worker(fJobQueue); 138 worker->Init(); 139 } 140 141 return Worker::Run(job); 142} 143