1/*
2 * Copyright (c) 2004 Roman Shaposhnik
3 *
4 * Many thanks to Steven M. Schultz for providing clever ideas and
5 * to Michael Niedermayer <michaelni@gmx.at> for writing initial
6 * implementation.
7 *
8 * This file is part of FFmpeg.
9 *
10 * FFmpeg is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License as published by the Free Software Foundation; either
13 * version 2.1 of the License, or (at your option) any later version.
14 *
15 * FFmpeg is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 * Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with FFmpeg; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24#include <pthread.h>
25
26#include "avcodec.h"
27
28typedef int (action_func)(AVCodecContext *c, void *arg);
29
30typedef struct ThreadContext {
31    pthread_t *workers;
32    action_func *func;
33    void *args;
34    int *rets;
35    int rets_count;
36    int job_count;
37    int job_size;
38
39    pthread_cond_t last_job_cond;
40    pthread_cond_t current_job_cond;
41    pthread_mutex_t current_job_lock;
42    int current_job;
43    int done;
44} ThreadContext;
45
46static void* attribute_align_arg worker(void *v)
47{
48    AVCodecContext *avctx = v;
49    ThreadContext *c = avctx->thread_opaque;
50    int our_job = c->job_count;
51    int thread_count = avctx->thread_count;
52    int self_id;
53
54    pthread_mutex_lock(&c->current_job_lock);
55    self_id = c->current_job++;
56    for (;;){
57        while (our_job >= c->job_count) {
58            if (c->current_job == thread_count + c->job_count)
59                pthread_cond_signal(&c->last_job_cond);
60
61            pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
62            our_job = self_id;
63
64            if (c->done) {
65                pthread_mutex_unlock(&c->current_job_lock);
66                return NULL;
67            }
68        }
69        pthread_mutex_unlock(&c->current_job_lock);
70
71        c->rets[our_job%c->rets_count] = c->func(avctx, (char*)c->args + our_job*c->job_size);
72
73        pthread_mutex_lock(&c->current_job_lock);
74        our_job = c->current_job++;
75    }
76}
77
78static av_always_inline void avcodec_thread_park_workers(ThreadContext *c, int thread_count)
79{
80    pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
81    pthread_mutex_unlock(&c->current_job_lock);
82}
83
84void avcodec_thread_free(AVCodecContext *avctx)
85{
86    ThreadContext *c = avctx->thread_opaque;
87    int i;
88
89    pthread_mutex_lock(&c->current_job_lock);
90    c->done = 1;
91    pthread_cond_broadcast(&c->current_job_cond);
92    pthread_mutex_unlock(&c->current_job_lock);
93
94    for (i=0; i<avctx->thread_count; i++)
95         pthread_join(c->workers[i], NULL);
96
97    pthread_mutex_destroy(&c->current_job_lock);
98    pthread_cond_destroy(&c->current_job_cond);
99    pthread_cond_destroy(&c->last_job_cond);
100    av_free(c->workers);
101    av_freep(&avctx->thread_opaque);
102}
103
104int avcodec_thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size)
105{
106    ThreadContext *c= avctx->thread_opaque;
107    int dummy_ret;
108
109    if (job_count <= 0)
110        return 0;
111
112    pthread_mutex_lock(&c->current_job_lock);
113
114    c->current_job = avctx->thread_count;
115    c->job_count = job_count;
116    c->job_size = job_size;
117    c->args = arg;
118    c->func = func;
119    if (ret) {
120        c->rets = ret;
121        c->rets_count = job_count;
122    } else {
123        c->rets = &dummy_ret;
124        c->rets_count = 1;
125    }
126    pthread_cond_broadcast(&c->current_job_cond);
127
128    avcodec_thread_park_workers(c, avctx->thread_count);
129
130    return 0;
131}
132
133int avcodec_thread_init(AVCodecContext *avctx, int thread_count)
134{
135    int i;
136    ThreadContext *c;
137
138    c = av_mallocz(sizeof(ThreadContext));
139    if (!c)
140        return -1;
141
142    c->workers = av_mallocz(sizeof(pthread_t)*thread_count);
143    if (!c->workers) {
144        av_free(c);
145        return -1;
146    }
147
148    avctx->thread_opaque = c;
149    avctx->thread_count = thread_count;
150    c->current_job = 0;
151    c->job_count = 0;
152    c->job_size = 0;
153    c->done = 0;
154    pthread_cond_init(&c->current_job_cond, NULL);
155    pthread_cond_init(&c->last_job_cond, NULL);
156    pthread_mutex_init(&c->current_job_lock, NULL);
157    pthread_mutex_lock(&c->current_job_lock);
158    for (i=0; i<thread_count; i++) {
159        if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
160           avctx->thread_count = i;
161           pthread_mutex_unlock(&c->current_job_lock);
162           avcodec_thread_free(avctx);
163           return -1;
164        }
165    }
166
167    avcodec_thread_park_workers(c, thread_count);
168
169    avctx->execute = avcodec_thread_execute;
170    return 0;
171}
172