1/* 2 * Copyright (C) 2011 University of Szeged 3 * Copyright (C) 2011 Gabor Loki <loki@webkit.org> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY UNIVERSITY OF SZEGED ``AS IS'' AND ANY 16 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL UNIVERSITY OF SZEGED OR 19 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 20 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 22 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 23 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 25 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28#include "config.h" 29 30#if ENABLE(THREADING_GENERIC) 31 32#include "ParallelJobs.h" 33#include <wtf/NumberOfCores.h> 34 35namespace WTF { 36 37Vector< RefPtr<ParallelEnvironment::ThreadPrivate> >* ParallelEnvironment::s_threadPool = 0; 38 39ParallelEnvironment::ParallelEnvironment(ThreadFunction threadFunction, size_t sizeOfParameter, int requestedJobNumber) : 40 m_threadFunction(threadFunction), 41 m_sizeOfParameter(sizeOfParameter) 42{ 43 ASSERT_ARG(requestedJobNumber, requestedJobNumber >= 1); 44 45 int maxNumberOfCores = numberOfProcessorCores(); 46 47 if (!requestedJobNumber || requestedJobNumber > maxNumberOfCores) 48 requestedJobNumber = static_cast<unsigned>(maxNumberOfCores); 49 50 if (!s_threadPool) 51 s_threadPool = new Vector< RefPtr<ThreadPrivate> >(); 52 53 // The main thread should be also a worker. 54 int maxNumberOfNewThreads = requestedJobNumber - 1; 55 56 for (int i = 0; i < maxNumberOfCores && m_threads.size() < static_cast<unsigned>(maxNumberOfNewThreads); ++i) { 57 if (s_threadPool->size() < static_cast<unsigned>(i) + 1U) 58 s_threadPool->append(ThreadPrivate::create()); 59 60 if ((*s_threadPool)[i]->tryLockFor(this)) 61 m_threads.append((*s_threadPool)[i]); 62 } 63 64 m_numberOfJobs = m_threads.size() + 1; 65} 66 67void ParallelEnvironment::execute(void* parameters) 68{ 69 unsigned char* currentParameter = static_cast<unsigned char*>(parameters); 70 size_t i; 71 for (i = 0; i < m_threads.size(); ++i) { 72 m_threads[i]->execute(m_threadFunction, currentParameter); 73 currentParameter += m_sizeOfParameter; 74 } 75 76 // The work for the main thread. 77 (*m_threadFunction)(currentParameter); 78 79 // Wait until all jobs are done. 80 for (i = 0; i < m_threads.size(); ++i) 81 m_threads[i]->waitForFinish(); 82} 83 84bool ParallelEnvironment::ThreadPrivate::tryLockFor(ParallelEnvironment* parent) 85{ 86 bool locked = m_mutex.tryLock(); 87 88 if (!locked) 89 return false; 90 91 if (m_parent) { 92 m_mutex.unlock(); 93 return false; 94 } 95 96 if (!m_threadID) 97 m_threadID = createThread(&ParallelEnvironment::ThreadPrivate::workerThread, this, "Parallel worker"); 98 99 if (m_threadID) 100 m_parent = parent; 101 102 m_mutex.unlock(); 103 return m_threadID; 104} 105 106void ParallelEnvironment::ThreadPrivate::execute(ThreadFunction threadFunction, void* parameters) 107{ 108 MutexLocker lock(m_mutex); 109 110 m_threadFunction = threadFunction; 111 m_parameters = parameters; 112 m_running = true; 113 m_threadCondition.signal(); 114} 115 116void ParallelEnvironment::ThreadPrivate::waitForFinish() 117{ 118 MutexLocker lock(m_mutex); 119 120 while (m_running) 121 m_threadCondition.wait(m_mutex); 122} 123 124void ParallelEnvironment::ThreadPrivate::workerThread(void* threadData) 125{ 126 ThreadPrivate* sharedThread = reinterpret_cast<ThreadPrivate*>(threadData); 127 MutexLocker lock(sharedThread->m_mutex); 128 129 while (sharedThread->m_threadID) { 130 if (sharedThread->m_running) { 131 (*sharedThread->m_threadFunction)(sharedThread->m_parameters); 132 sharedThread->m_running = false; 133 sharedThread->m_parent = 0; 134 sharedThread->m_threadCondition.signal(); 135 } 136 137 sharedThread->m_threadCondition.wait(sharedThread->m_mutex); 138 } 139} 140 141} // namespace WTF 142#endif // ENABLE(THREADING_GENERIC) 143