1/*
2 * Copyright (c) 2004 Roman Shaposhnik
3 *
4 * Many thanks to Steven M. Schultz for providing clever ideas and
5 * to Michael Niedermayer <michaelni@gmx.at> for writing initial
6 * implementation.
7 *
8 * This file is part of FFmpeg.
9 *
10 * FFmpeg is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License as published by the Free Software Foundation; either
13 * version 2.1 of the License, or (at your option) any later version.
14 *
15 * FFmpeg is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 * Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with FFmpeg; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24#include <pthread.h>
25
26#include "avcodec.h"
27
28typedef int (action_func)(AVCodecContext *c, void *arg);
29typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr);
30
31typedef struct ThreadContext {
32    pthread_t *workers;
33    action_func *func;
34    action_func2 *func2;
35    void *args;
36    int *rets;
37    int rets_count;
38    int job_count;
39    int job_size;
40
41    pthread_cond_t last_job_cond;
42    pthread_cond_t current_job_cond;
43    pthread_mutex_t current_job_lock;
44    int current_job;
45    int done;
46} ThreadContext;
47
48static void* attribute_align_arg worker(void *v)
49{
50    AVCodecContext *avctx = v;
51    ThreadContext *c = avctx->thread_opaque;
52    int our_job = c->job_count;
53    int thread_count = avctx->thread_count;
54    int self_id;
55
56    pthread_mutex_lock(&c->current_job_lock);
57    self_id = c->current_job++;
58    for (;;){
59        while (our_job >= c->job_count) {
60            if (c->current_job == thread_count + c->job_count)
61                pthread_cond_signal(&c->last_job_cond);
62
63            pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
64            our_job = self_id;
65
66            if (c->done) {
67                pthread_mutex_unlock(&c->current_job_lock);
68                return NULL;
69            }
70        }
71        pthread_mutex_unlock(&c->current_job_lock);
72
73        c->rets[our_job%c->rets_count] = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size):
74                                                   c->func2(avctx, c->args, our_job, self_id);
75
76        pthread_mutex_lock(&c->current_job_lock);
77        our_job = c->current_job++;
78    }
79}
80
81static av_always_inline void avcodec_thread_park_workers(ThreadContext *c, int thread_count)
82{
83    pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
84    pthread_mutex_unlock(&c->current_job_lock);
85}
86
87void avcodec_thread_free(AVCodecContext *avctx)
88{
89    ThreadContext *c = avctx->thread_opaque;
90    int i;
91
92    pthread_mutex_lock(&c->current_job_lock);
93    c->done = 1;
94    pthread_cond_broadcast(&c->current_job_cond);
95    pthread_mutex_unlock(&c->current_job_lock);
96
97    for (i=0; i<avctx->thread_count; i++)
98         pthread_join(c->workers[i], NULL);
99
100    pthread_mutex_destroy(&c->current_job_lock);
101    pthread_cond_destroy(&c->current_job_cond);
102    pthread_cond_destroy(&c->last_job_cond);
103    av_free(c->workers);
104    av_freep(&avctx->thread_opaque);
105}
106
107static int avcodec_thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size)
108{
109    ThreadContext *c= avctx->thread_opaque;
110    int dummy_ret;
111
112    if (job_count <= 0)
113        return 0;
114
115    pthread_mutex_lock(&c->current_job_lock);
116
117    c->current_job = avctx->thread_count;
118    c->job_count = job_count;
119    c->job_size = job_size;
120    c->args = arg;
121    c->func = func;
122    if (ret) {
123        c->rets = ret;
124        c->rets_count = job_count;
125    } else {
126        c->rets = &dummy_ret;
127        c->rets_count = 1;
128    }
129    pthread_cond_broadcast(&c->current_job_cond);
130
131    avcodec_thread_park_workers(c, avctx->thread_count);
132
133    return 0;
134}
135
136static int avcodec_thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count)
137{
138    ThreadContext *c= avctx->thread_opaque;
139    c->func2 = func2;
140    return avcodec_thread_execute(avctx, NULL, arg, ret, job_count, 0);
141}
142
143int avcodec_thread_init(AVCodecContext *avctx, int thread_count)
144{
145    int i;
146    ThreadContext *c;
147
148    avctx->thread_count = thread_count;
149
150    if (thread_count <= 1)
151        return 0;
152
153    c = av_mallocz(sizeof(ThreadContext));
154    if (!c)
155        return -1;
156
157    c->workers = av_mallocz(sizeof(pthread_t)*thread_count);
158    if (!c->workers) {
159        av_free(c);
160        return -1;
161    }
162
163    avctx->thread_opaque = c;
164    c->current_job = 0;
165    c->job_count = 0;
166    c->job_size = 0;
167    c->done = 0;
168    pthread_cond_init(&c->current_job_cond, NULL);
169    pthread_cond_init(&c->last_job_cond, NULL);
170    pthread_mutex_init(&c->current_job_lock, NULL);
171    pthread_mutex_lock(&c->current_job_lock);
172    for (i=0; i<thread_count; i++) {
173        if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
174           avctx->thread_count = i;
175           pthread_mutex_unlock(&c->current_job_lock);
176           avcodec_thread_free(avctx);
177           return -1;
178        }
179    }
180
181    avcodec_thread_park_workers(c, thread_count);
182
183    avctx->execute = avcodec_thread_execute;
184    avctx->execute2 = avcodec_thread_execute2;
185    return 0;
186}
187