1/*
2 * This file is part of FFmpeg.
3 *
4 * FFmpeg is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * FFmpeg is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with FFmpeg; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19/**
20 * @file
21 * Slice multithreading support functions
22 * @see doc/multithreading.txt
23 */
24
25#include "config.h"
26
27#if HAVE_PTHREADS
28#include <pthread.h>
29#elif HAVE_W32THREADS
30#include "compat/w32pthreads.h"
31#elif HAVE_OS2THREADS
32#include "compat/os2threads.h"
33#endif
34
35#include "avcodec.h"
36#include "internal.h"
37#include "pthread_internal.h"
38#include "thread.h"
39
40#include "libavutil/common.h"
41#include "libavutil/cpu.h"
42#include "libavutil/mem.h"
43
44typedef int (action_func)(AVCodecContext *c, void *arg);
45typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr);
46
47typedef struct SliceThreadContext {
48    pthread_t *workers;
49    action_func *func;
50    action_func2 *func2;
51    void *args;
52    int *rets;
53    int rets_count;
54    int job_count;
55    int job_size;
56
57    pthread_cond_t last_job_cond;
58    pthread_cond_t current_job_cond;
59    pthread_mutex_t current_job_lock;
60    unsigned current_execute;
61    int current_job;
62    int done;
63
64    int *entries;
65    int entries_count;
66    int thread_count;
67    pthread_cond_t *progress_cond;
68    pthread_mutex_t *progress_mutex;
69} SliceThreadContext;
70
71static void* attribute_align_arg worker(void *v)
72{
73    AVCodecContext *avctx = v;
74    SliceThreadContext *c = avctx->internal->thread_ctx;
75    unsigned last_execute = 0;
76    int our_job = c->job_count;
77    int thread_count = avctx->thread_count;
78    int self_id;
79
80    pthread_mutex_lock(&c->current_job_lock);
81    self_id = c->current_job++;
82    for (;;){
83        while (our_job >= c->job_count) {
84            if (c->current_job == thread_count + c->job_count)
85                pthread_cond_signal(&c->last_job_cond);
86
87            while (last_execute == c->current_execute && !c->done)
88                pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
89            last_execute = c->current_execute;
90            our_job = self_id;
91
92            if (c->done) {
93                pthread_mutex_unlock(&c->current_job_lock);
94                return NULL;
95            }
96        }
97        pthread_mutex_unlock(&c->current_job_lock);
98
99        c->rets[our_job%c->rets_count] = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size):
100                                                   c->func2(avctx, c->args, our_job, self_id);
101
102        pthread_mutex_lock(&c->current_job_lock);
103        our_job = c->current_job++;
104    }
105}
106
107void ff_slice_thread_free(AVCodecContext *avctx)
108{
109    SliceThreadContext *c = avctx->internal->thread_ctx;
110    int i;
111
112    pthread_mutex_lock(&c->current_job_lock);
113    c->done = 1;
114    pthread_cond_broadcast(&c->current_job_cond);
115    pthread_mutex_unlock(&c->current_job_lock);
116
117    for (i=0; i<avctx->thread_count; i++)
118         pthread_join(c->workers[i], NULL);
119
120    pthread_mutex_destroy(&c->current_job_lock);
121    pthread_cond_destroy(&c->current_job_cond);
122    pthread_cond_destroy(&c->last_job_cond);
123    av_free(c->workers);
124    av_freep(&avctx->internal->thread_ctx);
125}
126
127static av_always_inline void thread_park_workers(SliceThreadContext *c, int thread_count)
128{
129    while (c->current_job != thread_count + c->job_count)
130        pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
131    pthread_mutex_unlock(&c->current_job_lock);
132}
133
134static int thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size)
135{
136    SliceThreadContext *c = avctx->internal->thread_ctx;
137    int dummy_ret;
138
139    if (!(avctx->active_thread_type&FF_THREAD_SLICE) || avctx->thread_count <= 1)
140        return avcodec_default_execute(avctx, func, arg, ret, job_count, job_size);
141
142    if (job_count <= 0)
143        return 0;
144
145    pthread_mutex_lock(&c->current_job_lock);
146
147    c->current_job = avctx->thread_count;
148    c->job_count = job_count;
149    c->job_size = job_size;
150    c->args = arg;
151    c->func = func;
152    if (ret) {
153        c->rets = ret;
154        c->rets_count = job_count;
155    } else {
156        c->rets = &dummy_ret;
157        c->rets_count = 1;
158    }
159    c->current_execute++;
160    pthread_cond_broadcast(&c->current_job_cond);
161
162    thread_park_workers(c, avctx->thread_count);
163
164    return 0;
165}
166
167static int thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count)
168{
169    SliceThreadContext *c = avctx->internal->thread_ctx;
170    c->func2 = func2;
171    return thread_execute(avctx, NULL, arg, ret, job_count, 0);
172}
173
174int ff_slice_thread_init(AVCodecContext *avctx)
175{
176    int i;
177    SliceThreadContext *c;
178    int thread_count = avctx->thread_count;
179
180#if HAVE_W32THREADS
181    w32thread_init();
182#endif
183
184    if (!thread_count) {
185        int nb_cpus = av_cpu_count();
186        if  (avctx->height)
187            nb_cpus = FFMIN(nb_cpus, (avctx->height+15)/16);
188        // use number of cores + 1 as thread count if there is more than one
189        if (nb_cpus > 1)
190            thread_count = avctx->thread_count = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS);
191        else
192            thread_count = avctx->thread_count = 1;
193    }
194
195    if (thread_count <= 1) {
196        avctx->active_thread_type = 0;
197        return 0;
198    }
199
200    c = av_mallocz(sizeof(SliceThreadContext));
201    if (!c)
202        return -1;
203
204    c->workers = av_mallocz_array(thread_count, sizeof(pthread_t));
205    if (!c->workers) {
206        av_free(c);
207        return -1;
208    }
209
210    avctx->internal->thread_ctx = c;
211    c->current_job = 0;
212    c->job_count = 0;
213    c->job_size = 0;
214    c->done = 0;
215    pthread_cond_init(&c->current_job_cond, NULL);
216    pthread_cond_init(&c->last_job_cond, NULL);
217    pthread_mutex_init(&c->current_job_lock, NULL);
218    pthread_mutex_lock(&c->current_job_lock);
219    for (i=0; i<thread_count; i++) {
220        if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
221           avctx->thread_count = i;
222           pthread_mutex_unlock(&c->current_job_lock);
223           ff_thread_free(avctx);
224           return -1;
225        }
226    }
227
228    thread_park_workers(c, thread_count);
229
230    avctx->execute = thread_execute;
231    avctx->execute2 = thread_execute2;
232    return 0;
233}
234
235void ff_thread_report_progress2(AVCodecContext *avctx, int field, int thread, int n)
236{
237    SliceThreadContext *p = avctx->internal->thread_ctx;
238    int *entries = p->entries;
239
240    pthread_mutex_lock(&p->progress_mutex[thread]);
241    entries[field] +=n;
242    pthread_cond_signal(&p->progress_cond[thread]);
243    pthread_mutex_unlock(&p->progress_mutex[thread]);
244}
245
246void ff_thread_await_progress2(AVCodecContext *avctx, int field, int thread, int shift)
247{
248    SliceThreadContext *p  = avctx->internal->thread_ctx;
249    int *entries      = p->entries;
250
251    if (!entries || !field) return;
252
253    thread = thread ? thread - 1 : p->thread_count - 1;
254
255    pthread_mutex_lock(&p->progress_mutex[thread]);
256    while ((entries[field - 1] - entries[field]) < shift){
257        pthread_cond_wait(&p->progress_cond[thread], &p->progress_mutex[thread]);
258    }
259    pthread_mutex_unlock(&p->progress_mutex[thread]);
260}
261
262int ff_alloc_entries(AVCodecContext *avctx, int count)
263{
264    int i;
265
266    if (avctx->active_thread_type & FF_THREAD_SLICE)  {
267        SliceThreadContext *p = avctx->internal->thread_ctx;
268        p->thread_count  = avctx->thread_count;
269        p->entries       = av_mallocz_array(count, sizeof(int));
270
271        if (!p->entries) {
272            return AVERROR(ENOMEM);
273        }
274
275        p->entries_count  = count;
276        p->progress_mutex = av_malloc_array(p->thread_count, sizeof(pthread_mutex_t));
277        p->progress_cond  = av_malloc_array(p->thread_count, sizeof(pthread_cond_t));
278
279        for (i = 0; i < p->thread_count; i++) {
280            pthread_mutex_init(&p->progress_mutex[i], NULL);
281            pthread_cond_init(&p->progress_cond[i], NULL);
282        }
283    }
284
285    return 0;
286}
287
288void ff_reset_entries(AVCodecContext *avctx)
289{
290    SliceThreadContext *p = avctx->internal->thread_ctx;
291    memset(p->entries, 0, p->entries_count * sizeof(int));
292}
293