1/*
2    Title:      Task farm for Multi-Threaded Garbage Collector
3
4    Copyright (c) 2010 David C. J. Matthews
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2.1 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
20*/
21
22#ifdef HAVE_CONFIG_H
23#include "config.h"
24#elif defined(_WIN32)
25#include "winconfig.h"
26#else
27#error "No configuration file"
28#endif
29
30#ifdef HAVE_STDLIB_H
31#include <stdlib.h>
32#endif
33#ifdef HAVE_MALLOC_H
34#include <malloc.h>
35#endif
36
37#ifdef HAVE_SYS_TIME_H
38#include <sys/time.h>
39#endif
40
41#ifdef HAVE_ASSERT_H
42#include <assert.h>
43#define ASSERT(x)   assert(x)
44#else
45#define ASSERT(x)
46#endif
47
48#include "gctaskfarm.h"
49#include "diagnostics.h"
50#include "timing.h"
51
52static GCTaskId gTask;
53
54GCTaskId *globalTask = &gTask;
55
56GCTaskFarm::GCTaskFarm(): workLock("GC task farm work")
57{
58    queueSize = queueIn = queuedItems = 0;
59    workQueue = 0;
60    terminate = false;
61    threadCount = activeThreadCount = 0;
62#if (defined(HAVE_PTHREAD_H) || defined(HAVE_WINDOWS_H))
63    threadHandles = 0;
64#endif
65}
66
67GCTaskFarm::~GCTaskFarm()
68{
69    Terminate();
70    free(workQueue);
71#if (defined(HAVE_PTHREAD_H) || defined(HAVE_WINDOWS_H))
72    free(threadHandles);
73#endif
74}
75
76
77bool GCTaskFarm::Initialise(unsigned thrdCount, unsigned qSize)
78{
79    terminate = false;
80    if (!waitForWork.Init(0, thrdCount)) return false;
81    workQueue = (queue_entry*)calloc(qSize, sizeof(queue_entry));
82    if (workQueue == 0) return false;
83#if ((!defined(_WIN32) || defined(__CYGWIN__)) && defined(HAVE_PTHREAD_H))
84    queueSize = qSize;
85    threadHandles = (pthread_t*)calloc(thrdCount, sizeof(pthread_t));
86    if (threadHandles == 0) return false;
87#elif defined(HAVE_WINDOWS_H)
88    queueSize = qSize;
89    threadHandles = (HANDLE*)calloc(thrdCount, sizeof(HANDLE));
90    if (threadHandles == 0) return false;
91#else
92    queueSize = 0;
93#endif
94    // Create the worker threads.
95    for (unsigned i = 0; i < thrdCount; i++) {
96        // Fork a thread
97#if ((!defined(_WIN32) || defined(__CYGWIN__)) && defined(HAVE_PTHREAD_H))
98        // Create a thread that isn't joinable since we don't want to wait
99        // for it to finish.
100        pthread_t pthreadId;
101        bool isError = pthread_create(&pthreadId, NULL, WorkerThreadFunction, this) != 0;
102        if (isError) break;
103        threadHandles[threadCount++] = pthreadId;
104#elif defined(HAVE_WINDOWS_H)
105        DWORD dwThrdId; // Have to provide this although we don't use it.
106        HANDLE threadHandle =
107            CreateThread(NULL, 0, WorkerThreadFunction, this, 0, &dwThrdId);
108        if (threadHandle == NULL) break;
109        threadHandles[threadCount++] = threadHandle;
110#endif
111    }
112
113    return true;
114}
115
116void GCTaskFarm::Terminate()
117{
118    terminate = true;
119    // Increment the semaphore by the number of threads to release them all.
120    for (unsigned i = 0; i < threadCount; i++) waitForWork.Signal();
121    // Wait for the threads to terminate.
122#if ((!defined(_WIN32) || defined(__CYGWIN__)) && defined(HAVE_PTHREAD_H))
123    for (unsigned j = 0; j < threadCount; j++)
124        pthread_join(threadHandles[j], NULL);
125#elif defined(HAVE_WINDOWS_H)
126    if (threadCount != 0)
127        WaitForMultipleObjects(threadCount, threadHandles, TRUE, 10000);
128#endif
129}
130
131// Add work to the queue.  Returns true if it succeeds.
132bool GCTaskFarm::AddWork(gctask work, void *arg1, void *arg2)
133{
134    bool wantSignal = false;
135    {
136        PLocker l(&workLock);
137        if (queuedItems == queueSize) return false; // Queue is full
138        workQueue[queueIn].task = work;
139        workQueue[queueIn].arg1 = arg1;
140        workQueue[queueIn].arg2 = arg2;
141        queueIn++;
142        if (queueIn == queueSize) queueIn = 0;
143        queuedItems++;
144        wantSignal = queuedItems <= threadCount;
145    }
146    if (wantSignal) waitForWork.Signal();
147    return true;
148}
149
150// Schedule this as a task or run it immediately if the queue is full.
151void GCTaskFarm::AddWorkOrRunNow(gctask work, void *arg1, void *arg2)
152{
153    if (! AddWork(work, arg1, arg2))
154        (*work)(globalTask, arg1, arg2);
155}
156
157void GCTaskFarm::ThreadFunction()
158{
159    GCTaskId myTaskId;
160#if (defined(_WIN32) && ! defined(__CYGWIN__))
161    DWORD startActive = GetTickCount();
162#else
163    struct timeval startTime;
164    gettimeofday(&startTime, NULL);
165#endif
166    workLock.Lock();
167    activeThreadCount++;
168    while (! terminate) {
169        // Invariant: We have the lock and the activeThreadCount includes this thread.
170        // Find some work.
171
172        if (queuedItems > 0) { // There is work
173            unsigned outPos;
174            if (queuedItems > queueIn)
175                outPos = queueIn+queueSize-queuedItems;
176            else outPos = queueIn-queuedItems;
177            gctask work = workQueue[outPos].task;
178            void *arg1 = workQueue[outPos].arg1;
179            void *arg2 = workQueue[outPos].arg2;
180            workQueue[outPos].task = 0;
181            queuedItems--;
182            ASSERT(work != 0);
183            workLock.Unlock();
184            (*work)(&myTaskId, arg1, arg2);
185            workLock.Lock();
186        }
187        else {
188            activeThreadCount--; // We're no longer active
189            // If there is no work and we're the last active thread signal the
190            // main thread that the queue is empty
191            bool wantSignal = activeThreadCount == 0;
192            if (wantSignal)
193                waitForCompletion.Signal();
194            // Now release the lock.  In our Windows partial implementation of
195            // condition vars we assume that signalling is done with the lock
196            // still held.
197            workLock.Unlock();
198
199            if (debugOptions & DEBUG_GCTASKS)
200            {
201#if (defined(_WIN32) && ! defined(__CYGWIN__))
202                Log("GCTask: Thread %p blocking after %u milliseconds\n", &myTaskId,
203                     GetTickCount() - startActive);
204#else
205                struct timeval endTime;
206                gettimeofday(&endTime, NULL);
207                subTimevals(&endTime, &startTime);
208                Log("GCTask: Thread %p blocking after %0.4f seconds\n", &myTaskId,
209                    (float)endTime.tv_sec + (float)endTime.tv_usec / 1.0E6);
210#endif
211            }
212
213            if (terminate) return;
214            // Block until there's work.
215            waitForWork.Wait();
216            // We've been woken up
217            if (debugOptions & DEBUG_GCTASKS)
218            {
219#if (defined(_WIN32) && ! defined(__CYGWIN__))
220                startActive = GetTickCount();
221#else
222                gettimeofday(&startTime, NULL);
223#endif
224                Log("GCTask: Thread %p resuming\n", &myTaskId);
225            }
226            workLock.Lock();
227            activeThreadCount++;
228        }
229    }
230    activeThreadCount--;
231    workLock.Unlock();
232}
233
234#if ((!defined(_WIN32) || defined(__CYGWIN__)) && defined(HAVE_PTHREAD_H))
235void *GCTaskFarm::WorkerThreadFunction(void *parameter)
236{
237    GCTaskFarm *t = (GCTaskFarm *)parameter;
238    t->ThreadFunction();
239    return 0;
240}
241#elif defined(HAVE_WINDOWS_H)
242DWORD WINAPI GCTaskFarm::WorkerThreadFunction(void *parameter)
243{
244    GCTaskFarm *t = (GCTaskFarm *)parameter;
245    t->ThreadFunction();
246    return 0;
247}
248#endif
249
250// Wait until the queue is empty.
251void GCTaskFarm::WaitForCompletion(void)
252{
253#if (defined(_WIN32) && ! defined(__CYGWIN__))
254    DWORD startWait;
255    if (debugOptions & DEBUG_GCTASKS)
256        startWait = GetTickCount();
257#else
258    struct timeval startWait;
259    if (debugOptions & DEBUG_GCTASKS)
260        gettimeofday(&startWait, NULL);
261#endif
262    workLock.Lock();
263    while (activeThreadCount > 0 || queuedItems > 0)
264        waitForCompletion.Wait(&workLock);
265    workLock.Unlock();
266
267    if (debugOptions & DEBUG_GCTASKS)
268    {
269#if (defined(_WIN32) && ! defined(__CYGWIN__))
270        Log("GCTask: Threads completed after %u milliseconds\n", GetTickCount()-startWait);
271#else
272        struct timeval endWait;
273        gettimeofday(&endWait, NULL);
274        subTimevals(&endWait, &startWait);
275        Log("GCTask: Threads completed after %0.4f seconds\n",
276            (float)endWait.tv_sec + (float)endWait.tv_usec / 1.0E6);
277#endif
278    }
279}
280