1/* 2 * This file is part of FFmpeg. 3 * 4 * FFmpeg is free software; you can redistribute it and/or 5 * modify it under the terms of the GNU Lesser General Public 6 * License as published by the Free Software Foundation; either 7 * version 2.1 of the License, or (at your option) any later version. 8 * 9 * FFmpeg is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 * Lesser General Public License for more details. 13 * 14 * You should have received a copy of the GNU Lesser General Public 15 * License along with FFmpeg; if not, write to the Free Software 16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 17 */ 18 19/** 20 * @file 21 * Slice multithreading support functions 22 * @see doc/multithreading.txt 23 */ 24 25#include "config.h" 26 27#if HAVE_PTHREADS 28#include <pthread.h> 29#elif HAVE_W32THREADS 30#include "compat/w32pthreads.h" 31#elif HAVE_OS2THREADS 32#include "compat/os2threads.h" 33#endif 34 35#include "avcodec.h" 36#include "internal.h" 37#include "pthread_internal.h" 38#include "thread.h" 39 40#include "libavutil/common.h" 41#include "libavutil/cpu.h" 42#include "libavutil/mem.h" 43 44typedef int (action_func)(AVCodecContext *c, void *arg); 45typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr); 46 47typedef struct SliceThreadContext { 48 pthread_t *workers; 49 action_func *func; 50 action_func2 *func2; 51 void *args; 52 int *rets; 53 int rets_count; 54 int job_count; 55 int job_size; 56 57 pthread_cond_t last_job_cond; 58 pthread_cond_t current_job_cond; 59 pthread_mutex_t current_job_lock; 60 unsigned current_execute; 61 int current_job; 62 int done; 63 64 int *entries; 65 int entries_count; 66 int thread_count; 67 pthread_cond_t *progress_cond; 68 pthread_mutex_t *progress_mutex; 69} SliceThreadContext; 70 71static void* attribute_align_arg worker(void *v) 72{ 73 AVCodecContext *avctx = v; 74 SliceThreadContext *c = avctx->internal->thread_ctx; 75 unsigned last_execute = 0; 76 int our_job = c->job_count; 77 int thread_count = avctx->thread_count; 78 int self_id; 79 80 pthread_mutex_lock(&c->current_job_lock); 81 self_id = c->current_job++; 82 for (;;){ 83 while (our_job >= c->job_count) { 84 if (c->current_job == thread_count + c->job_count) 85 pthread_cond_signal(&c->last_job_cond); 86 87 while (last_execute == c->current_execute && !c->done) 88 pthread_cond_wait(&c->current_job_cond, &c->current_job_lock); 89 last_execute = c->current_execute; 90 our_job = self_id; 91 92 if (c->done) { 93 pthread_mutex_unlock(&c->current_job_lock); 94 return NULL; 95 } 96 } 97 pthread_mutex_unlock(&c->current_job_lock); 98 99 c->rets[our_job%c->rets_count] = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size): 100 c->func2(avctx, c->args, our_job, self_id); 101 102 pthread_mutex_lock(&c->current_job_lock); 103 our_job = c->current_job++; 104 } 105} 106 107void ff_slice_thread_free(AVCodecContext *avctx) 108{ 109 SliceThreadContext *c = avctx->internal->thread_ctx; 110 int i; 111 112 pthread_mutex_lock(&c->current_job_lock); 113 c->done = 1; 114 pthread_cond_broadcast(&c->current_job_cond); 115 pthread_mutex_unlock(&c->current_job_lock); 116 117 for (i=0; i<avctx->thread_count; i++) 118 pthread_join(c->workers[i], NULL); 119 120 pthread_mutex_destroy(&c->current_job_lock); 121 pthread_cond_destroy(&c->current_job_cond); 122 pthread_cond_destroy(&c->last_job_cond); 123 av_free(c->workers); 124 av_freep(&avctx->internal->thread_ctx); 125} 126 127static av_always_inline void thread_park_workers(SliceThreadContext *c, int thread_count) 128{ 129 while (c->current_job != thread_count + c->job_count) 130 pthread_cond_wait(&c->last_job_cond, &c->current_job_lock); 131 pthread_mutex_unlock(&c->current_job_lock); 132} 133 134static int thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size) 135{ 136 SliceThreadContext *c = avctx->internal->thread_ctx; 137 int dummy_ret; 138 139 if (!(avctx->active_thread_type&FF_THREAD_SLICE) || avctx->thread_count <= 1) 140 return avcodec_default_execute(avctx, func, arg, ret, job_count, job_size); 141 142 if (job_count <= 0) 143 return 0; 144 145 pthread_mutex_lock(&c->current_job_lock); 146 147 c->current_job = avctx->thread_count; 148 c->job_count = job_count; 149 c->job_size = job_size; 150 c->args = arg; 151 c->func = func; 152 if (ret) { 153 c->rets = ret; 154 c->rets_count = job_count; 155 } else { 156 c->rets = &dummy_ret; 157 c->rets_count = 1; 158 } 159 c->current_execute++; 160 pthread_cond_broadcast(&c->current_job_cond); 161 162 thread_park_workers(c, avctx->thread_count); 163 164 return 0; 165} 166 167static int thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count) 168{ 169 SliceThreadContext *c = avctx->internal->thread_ctx; 170 c->func2 = func2; 171 return thread_execute(avctx, NULL, arg, ret, job_count, 0); 172} 173 174int ff_slice_thread_init(AVCodecContext *avctx) 175{ 176 int i; 177 SliceThreadContext *c; 178 int thread_count = avctx->thread_count; 179 180#if HAVE_W32THREADS 181 w32thread_init(); 182#endif 183 184 if (!thread_count) { 185 int nb_cpus = av_cpu_count(); 186 if (avctx->height) 187 nb_cpus = FFMIN(nb_cpus, (avctx->height+15)/16); 188 // use number of cores + 1 as thread count if there is more than one 189 if (nb_cpus > 1) 190 thread_count = avctx->thread_count = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS); 191 else 192 thread_count = avctx->thread_count = 1; 193 } 194 195 if (thread_count <= 1) { 196 avctx->active_thread_type = 0; 197 return 0; 198 } 199 200 c = av_mallocz(sizeof(SliceThreadContext)); 201 if (!c) 202 return -1; 203 204 c->workers = av_mallocz_array(thread_count, sizeof(pthread_t)); 205 if (!c->workers) { 206 av_free(c); 207 return -1; 208 } 209 210 avctx->internal->thread_ctx = c; 211 c->current_job = 0; 212 c->job_count = 0; 213 c->job_size = 0; 214 c->done = 0; 215 pthread_cond_init(&c->current_job_cond, NULL); 216 pthread_cond_init(&c->last_job_cond, NULL); 217 pthread_mutex_init(&c->current_job_lock, NULL); 218 pthread_mutex_lock(&c->current_job_lock); 219 for (i=0; i<thread_count; i++) { 220 if(pthread_create(&c->workers[i], NULL, worker, avctx)) { 221 avctx->thread_count = i; 222 pthread_mutex_unlock(&c->current_job_lock); 223 ff_thread_free(avctx); 224 return -1; 225 } 226 } 227 228 thread_park_workers(c, thread_count); 229 230 avctx->execute = thread_execute; 231 avctx->execute2 = thread_execute2; 232 return 0; 233} 234 235void ff_thread_report_progress2(AVCodecContext *avctx, int field, int thread, int n) 236{ 237 SliceThreadContext *p = avctx->internal->thread_ctx; 238 int *entries = p->entries; 239 240 pthread_mutex_lock(&p->progress_mutex[thread]); 241 entries[field] +=n; 242 pthread_cond_signal(&p->progress_cond[thread]); 243 pthread_mutex_unlock(&p->progress_mutex[thread]); 244} 245 246void ff_thread_await_progress2(AVCodecContext *avctx, int field, int thread, int shift) 247{ 248 SliceThreadContext *p = avctx->internal->thread_ctx; 249 int *entries = p->entries; 250 251 if (!entries || !field) return; 252 253 thread = thread ? thread - 1 : p->thread_count - 1; 254 255 pthread_mutex_lock(&p->progress_mutex[thread]); 256 while ((entries[field - 1] - entries[field]) < shift){ 257 pthread_cond_wait(&p->progress_cond[thread], &p->progress_mutex[thread]); 258 } 259 pthread_mutex_unlock(&p->progress_mutex[thread]); 260} 261 262int ff_alloc_entries(AVCodecContext *avctx, int count) 263{ 264 int i; 265 266 if (avctx->active_thread_type & FF_THREAD_SLICE) { 267 SliceThreadContext *p = avctx->internal->thread_ctx; 268 p->thread_count = avctx->thread_count; 269 p->entries = av_mallocz_array(count, sizeof(int)); 270 271 if (!p->entries) { 272 return AVERROR(ENOMEM); 273 } 274 275 p->entries_count = count; 276 p->progress_mutex = av_malloc_array(p->thread_count, sizeof(pthread_mutex_t)); 277 p->progress_cond = av_malloc_array(p->thread_count, sizeof(pthread_cond_t)); 278 279 for (i = 0; i < p->thread_count; i++) { 280 pthread_mutex_init(&p->progress_mutex[i], NULL); 281 pthread_cond_init(&p->progress_cond[i], NULL); 282 } 283 } 284 285 return 0; 286} 287 288void ff_reset_entries(AVCodecContext *avctx) 289{ 290 SliceThreadContext *p = avctx->internal->thread_ctx; 291 memset(p->entries, 0, p->entries_count * sizeof(int)); 292} 293