workgroup.cpp revision 8413:92457dfb91bd
1/*
2 * Copyright (c) 2001, 2015, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 *
23 */
24
25#include "precompiled.hpp"
26#include "gc/shared/workgroup.hpp"
27#include "memory/allocation.hpp"
28#include "memory/allocation.inline.hpp"
29#include "runtime/atomic.inline.hpp"
30#include "runtime/os.hpp"
31
32// Definitions of WorkGang methods.
33
34AbstractWorkGang::AbstractWorkGang(const char* name,
35                                   bool  are_GC_task_threads,
36                                   bool  are_ConcurrentGC_threads) :
37  _name(name),
38  _are_GC_task_threads(are_GC_task_threads),
39  _are_ConcurrentGC_threads(are_ConcurrentGC_threads) {
40
41  assert(!(are_GC_task_threads && are_ConcurrentGC_threads),
42         "They cannot both be STW GC and Concurrent threads" );
43
44  // Other initialization.
45  _monitor = new Monitor(/* priority */       Mutex::leaf,
46                         /* name */           "WorkGroup monitor",
47                         /* allow_vm_block */ are_GC_task_threads,
48                                              Monitor::_safepoint_check_sometimes);
49  assert(monitor() != NULL, "Failed to allocate monitor");
50  _terminate = false;
51  _task = NULL;
52  _sequence_number = 0;
53  _started_workers = 0;
54  _finished_workers = 0;
55}
56
57WorkGang::WorkGang(const char* name,
58                   uint        workers,
59                   bool        are_GC_task_threads,
60                   bool        are_ConcurrentGC_threads) :
61  AbstractWorkGang(name, are_GC_task_threads, are_ConcurrentGC_threads) {
62  _total_workers = workers;
63}
64
65GangWorker* WorkGang::allocate_worker(uint which) {
66  GangWorker* new_worker = new GangWorker(this, which);
67  return new_worker;
68}
69
70// The current implementation will exit if the allocation
71// of any worker fails.  Still, return a boolean so that
72// a future implementation can possibly do a partial
73// initialization of the workers and report such to the
74// caller.
75bool WorkGang::initialize_workers() {
76
77  if (TraceWorkGang) {
78    tty->print_cr("Constructing work gang %s with %d threads",
79                  name(),
80                  total_workers());
81  }
82  _gang_workers = NEW_C_HEAP_ARRAY(GangWorker*, total_workers(), mtInternal);
83  if (gang_workers() == NULL) {
84    vm_exit_out_of_memory(0, OOM_MALLOC_ERROR, "Cannot create GangWorker array.");
85    return false;
86  }
87  os::ThreadType worker_type;
88  if (are_ConcurrentGC_threads()) {
89    worker_type = os::cgc_thread;
90  } else {
91    worker_type = os::pgc_thread;
92  }
93  for (uint worker = 0; worker < total_workers(); worker += 1) {
94    GangWorker* new_worker = allocate_worker(worker);
95    assert(new_worker != NULL, "Failed to allocate GangWorker");
96    _gang_workers[worker] = new_worker;
97    if (new_worker == NULL || !os::create_thread(new_worker, worker_type)) {
98      vm_exit_out_of_memory(0, OOM_MALLOC_ERROR,
99              "Cannot create worker GC thread. Out of system resources.");
100      return false;
101    }
102    if (!DisableStartThread) {
103      os::start_thread(new_worker);
104    }
105  }
106  return true;
107}
108
109AbstractWorkGang::~AbstractWorkGang() {
110  if (TraceWorkGang) {
111    tty->print_cr("Destructing work gang %s", name());
112  }
113  stop();   // stop all the workers
114  for (uint worker = 0; worker < total_workers(); worker += 1) {
115    delete gang_worker(worker);
116  }
117  delete gang_workers();
118  delete monitor();
119}
120
121GangWorker* AbstractWorkGang::gang_worker(uint i) const {
122  // Array index bounds checking.
123  GangWorker* result = NULL;
124  assert(gang_workers() != NULL, "No workers for indexing");
125  assert(i < total_workers(), "Worker index out of bounds");
126  result = _gang_workers[i];
127  assert(result != NULL, "Indexing to null worker");
128  return result;
129}
130
131void WorkGang::run_task(AbstractGangTask* task) {
132  run_task(task, total_workers());
133}
134
135void WorkGang::run_task(AbstractGangTask* task, uint no_of_parallel_workers) {
136  task->set_for_termination(no_of_parallel_workers);
137
138  // This thread is executed by the VM thread which does not block
139  // on ordinary MutexLocker's.
140  MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
141  if (TraceWorkGang) {
142    tty->print_cr("Running work gang %s task %s", name(), task->name());
143  }
144  // Tell all the workers to run a task.
145  assert(task != NULL, "Running a null task");
146  // Initialize.
147  _task = task;
148  _sequence_number += 1;
149  _started_workers = 0;
150  _finished_workers = 0;
151  // Tell the workers to get to work.
152  monitor()->notify_all();
153  // Wait for them to be finished
154  while (finished_workers() < no_of_parallel_workers) {
155    if (TraceWorkGang) {
156      tty->print_cr("Waiting in work gang %s: %u/%u finished sequence %d",
157                    name(), finished_workers(), no_of_parallel_workers,
158                    _sequence_number);
159    }
160    monitor()->wait(/* no_safepoint_check */ true);
161  }
162  _task = NULL;
163  if (TraceWorkGang) {
164    tty->print_cr("\nFinished work gang %s: %u/%u sequence %d",
165                  name(), finished_workers(), no_of_parallel_workers,
166                  _sequence_number);
167    Thread* me = Thread::current();
168    tty->print_cr("  T: " PTR_FORMAT "  VM_thread: %d", p2i(me), me->is_VM_thread());
169  }
170}
171
172void FlexibleWorkGang::run_task(AbstractGangTask* task) {
173  // If active_workers() is passed, _finished_workers
174  // must only be incremented for workers that find non_null
175  // work (as opposed to all those that just check that the
176  // task is not null).
177  WorkGang::run_task(task, (uint) active_workers());
178}
179
180void AbstractWorkGang::stop() {
181  // Tell all workers to terminate, then wait for them to become inactive.
182  MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
183  if (TraceWorkGang) {
184    tty->print_cr("Stopping work gang %s task %s", name(), task()->name());
185  }
186  _task = NULL;
187  _terminate = true;
188  monitor()->notify_all();
189  while (finished_workers() < active_workers()) {
190    if (TraceWorkGang) {
191      tty->print_cr("Waiting in work gang %s: %u/%u finished",
192                    name(), finished_workers(), active_workers());
193    }
194    monitor()->wait(/* no_safepoint_check */ true);
195  }
196}
197
198void AbstractWorkGang::internal_worker_poll(WorkData* data) const {
199  assert(monitor()->owned_by_self(), "worker_poll is an internal method");
200  assert(data != NULL, "worker data is null");
201  data->set_terminate(terminate());
202  data->set_task(task());
203  data->set_sequence_number(sequence_number());
204}
205
206void AbstractWorkGang::internal_note_start() {
207  assert(monitor()->owned_by_self(), "note_finish is an internal method");
208  _started_workers += 1;
209}
210
211void AbstractWorkGang::internal_note_finish() {
212  assert(monitor()->owned_by_self(), "note_finish is an internal method");
213  _finished_workers += 1;
214}
215
216void AbstractWorkGang::print_worker_threads_on(outputStream* st) const {
217  uint    num_thr = total_workers();
218  for (uint i = 0; i < num_thr; i++) {
219    gang_worker(i)->print_on(st);
220    st->cr();
221  }
222}
223
224void AbstractWorkGang::threads_do(ThreadClosure* tc) const {
225  assert(tc != NULL, "Null ThreadClosure");
226  uint num_thr = total_workers();
227  for (uint i = 0; i < num_thr; i++) {
228    tc->do_thread(gang_worker(i));
229  }
230}
231
232// GangWorker methods.
233
234GangWorker::GangWorker(AbstractWorkGang* gang, uint id) {
235  _gang = gang;
236  set_id(id);
237  set_name("%s#%d", gang->name(), id);
238}
239
240void GangWorker::run() {
241  initialize();
242  loop();
243}
244
245void GangWorker::initialize() {
246  this->initialize_thread_local_storage();
247  this->record_stack_base_and_size();
248  this->initialize_named_thread();
249  assert(_gang != NULL, "No gang to run in");
250  os::set_priority(this, NearMaxPriority);
251  if (TraceWorkGang) {
252    tty->print_cr("Running gang worker for gang %s id %u",
253                  gang()->name(), id());
254  }
255  // The VM thread should not execute here because MutexLocker's are used
256  // as (opposed to MutexLockerEx's).
257  assert(!Thread::current()->is_VM_thread(), "VM thread should not be part"
258         " of a work gang");
259}
260
261void GangWorker::loop() {
262  int previous_sequence_number = 0;
263  Monitor* gang_monitor = gang()->monitor();
264  for ( ; /* !terminate() */; ) {
265    WorkData data;
266    int part;  // Initialized below.
267    {
268      // Grab the gang mutex.
269      MutexLocker ml(gang_monitor);
270      // Wait for something to do.
271      // Polling outside the while { wait } avoids missed notifies
272      // in the outer loop.
273      gang()->internal_worker_poll(&data);
274      if (TraceWorkGang) {
275        tty->print("Polled outside for work in gang %s worker %u",
276                   gang()->name(), id());
277        tty->print("  terminate: %s",
278                   data.terminate() ? "true" : "false");
279        tty->print("  sequence: %d (prev: %d)",
280                   data.sequence_number(), previous_sequence_number);
281        if (data.task() != NULL) {
282          tty->print("  task: %s", data.task()->name());
283        } else {
284          tty->print("  task: NULL");
285        }
286        tty->cr();
287      }
288      for ( ; /* break or return */; ) {
289        // Terminate if requested.
290        if (data.terminate()) {
291          gang()->internal_note_finish();
292          gang_monitor->notify_all();
293          return;
294        }
295        // Check for new work.
296        if ((data.task() != NULL) &&
297            (data.sequence_number() != previous_sequence_number)) {
298          if (gang()->needs_more_workers()) {
299            gang()->internal_note_start();
300            gang_monitor->notify_all();
301            part = gang()->started_workers() - 1;
302            break;
303          }
304        }
305        // Nothing to do.
306        gang_monitor->wait(/* no_safepoint_check */ true);
307        gang()->internal_worker_poll(&data);
308        if (TraceWorkGang) {
309          tty->print("Polled inside for work in gang %s worker %u",
310                     gang()->name(), id());
311          tty->print("  terminate: %s",
312                     data.terminate() ? "true" : "false");
313          tty->print("  sequence: %d (prev: %d)",
314                     data.sequence_number(), previous_sequence_number);
315          if (data.task() != NULL) {
316            tty->print("  task: %s", data.task()->name());
317          } else {
318            tty->print("  task: NULL");
319          }
320          tty->cr();
321        }
322      }
323      // Drop gang mutex.
324    }
325    if (TraceWorkGang) {
326      tty->print("Work for work gang %s id %u task %s part %d",
327                 gang()->name(), id(), data.task()->name(), part);
328    }
329    assert(data.task() != NULL, "Got null task");
330    data.task()->work(part);
331    {
332      if (TraceWorkGang) {
333        tty->print("Finish for work gang %s id %u task %s part %d",
334                   gang()->name(), id(), data.task()->name(), part);
335      }
336      // Grab the gang mutex.
337      MutexLocker ml(gang_monitor);
338      gang()->internal_note_finish();
339      // Tell the gang you are done.
340      gang_monitor->notify_all();
341      // Drop the gang mutex.
342    }
343    previous_sequence_number = data.sequence_number();
344  }
345}
346
347bool GangWorker::is_GC_task_thread() const {
348  return gang()->are_GC_task_threads();
349}
350
351bool GangWorker::is_ConcurrentGC_thread() const {
352  return gang()->are_ConcurrentGC_threads();
353}
354
355void GangWorker::print_on(outputStream* st) const {
356  st->print("\"%s\" ", name());
357  Thread::print_on(st);
358  st->cr();
359}
360
361// Printing methods
362
363const char* AbstractWorkGang::name() const {
364  return _name;
365}
366
367#ifndef PRODUCT
368
369const char* AbstractGangTask::name() const {
370  return _name;
371}
372
373#endif /* PRODUCT */
374
375// FlexibleWorkGang
376
377
378// *** WorkGangBarrierSync
379
380WorkGangBarrierSync::WorkGangBarrierSync()
381  : _monitor(Mutex::safepoint, "work gang barrier sync", true,
382             Monitor::_safepoint_check_never),
383    _n_workers(0), _n_completed(0), _should_reset(false), _aborted(false) {
384}
385
386WorkGangBarrierSync::WorkGangBarrierSync(uint n_workers, const char* name)
387  : _monitor(Mutex::safepoint, name, true, Monitor::_safepoint_check_never),
388    _n_workers(n_workers), _n_completed(0), _should_reset(false), _aborted(false) {
389}
390
391void WorkGangBarrierSync::set_n_workers(uint n_workers) {
392  _n_workers    = n_workers;
393  _n_completed  = 0;
394  _should_reset = false;
395  _aborted      = false;
396}
397
398bool WorkGangBarrierSync::enter() {
399  MutexLockerEx x(monitor(), Mutex::_no_safepoint_check_flag);
400  if (should_reset()) {
401    // The should_reset() was set and we are the first worker to enter
402    // the sync barrier. We will zero the n_completed() count which
403    // effectively resets the barrier.
404    zero_completed();
405    set_should_reset(false);
406  }
407  inc_completed();
408  if (n_completed() == n_workers()) {
409    // At this point we would like to reset the barrier to be ready in
410    // case it is used again. However, we cannot set n_completed() to
411    // 0, even after the notify_all(), given that some other workers
412    // might still be waiting for n_completed() to become ==
413    // n_workers(). So, if we set n_completed() to 0, those workers
414    // will get stuck (as they will wake up, see that n_completed() !=
415    // n_workers() and go back to sleep). Instead, we raise the
416    // should_reset() flag and the barrier will be reset the first
417    // time a worker enters it again.
418    set_should_reset(true);
419    monitor()->notify_all();
420  } else {
421    while (n_completed() != n_workers() && !aborted()) {
422      monitor()->wait(/* no_safepoint_check */ true);
423    }
424  }
425  return !aborted();
426}
427
428void WorkGangBarrierSync::abort() {
429  MutexLockerEx x(monitor(), Mutex::_no_safepoint_check_flag);
430  set_aborted();
431  monitor()->notify_all();
432}
433
434// SubTasksDone functions.
435
436SubTasksDone::SubTasksDone(uint n) :
437  _n_tasks(n), _n_threads(1), _tasks(NULL) {
438  _tasks = NEW_C_HEAP_ARRAY(uint, n, mtInternal);
439  guarantee(_tasks != NULL, "alloc failure");
440  clear();
441}
442
443bool SubTasksDone::valid() {
444  return _tasks != NULL;
445}
446
447void SubTasksDone::set_n_threads(uint t) {
448  assert(_claimed == 0 || _threads_completed == _n_threads,
449         "should not be called while tasks are being processed!");
450  _n_threads = (t == 0 ? 1 : t);
451}
452
453void SubTasksDone::clear() {
454  for (uint i = 0; i < _n_tasks; i++) {
455    _tasks[i] = 0;
456  }
457  _threads_completed = 0;
458#ifdef ASSERT
459  _claimed = 0;
460#endif
461}
462
463bool SubTasksDone::is_task_claimed(uint t) {
464  assert(t < _n_tasks, "bad task id.");
465  uint old = _tasks[t];
466  if (old == 0) {
467    old = Atomic::cmpxchg(1, &_tasks[t], 0);
468  }
469  assert(_tasks[t] == 1, "What else?");
470  bool res = old != 0;
471#ifdef ASSERT
472  if (!res) {
473    assert(_claimed < _n_tasks, "Too many tasks claimed; missing clear?");
474    Atomic::inc((volatile jint*) &_claimed);
475  }
476#endif
477  return res;
478}
479
480void SubTasksDone::all_tasks_completed() {
481  jint observed = _threads_completed;
482  jint old;
483  do {
484    old = observed;
485    observed = Atomic::cmpxchg(old+1, &_threads_completed, old);
486  } while (observed != old);
487  // If this was the last thread checking in, clear the tasks.
488  if (observed+1 == (jint)_n_threads) clear();
489}
490
491
492SubTasksDone::~SubTasksDone() {
493  if (_tasks != NULL) FREE_C_HEAP_ARRAY(jint, _tasks);
494}
495
496// *** SequentialSubTasksDone
497
498void SequentialSubTasksDone::clear() {
499  _n_tasks   = _n_claimed   = 0;
500  _n_threads = _n_completed = 0;
501}
502
503bool SequentialSubTasksDone::valid() {
504  return _n_threads > 0;
505}
506
507bool SequentialSubTasksDone::is_task_claimed(uint& t) {
508  uint* n_claimed_ptr = &_n_claimed;
509  t = *n_claimed_ptr;
510  while (t < _n_tasks) {
511    jint res = Atomic::cmpxchg(t+1, n_claimed_ptr, t);
512    if (res == (jint)t) {
513      return false;
514    }
515    t = *n_claimed_ptr;
516  }
517  return true;
518}
519
520bool SequentialSubTasksDone::all_tasks_completed() {
521  uint* n_completed_ptr = &_n_completed;
522  uint  complete        = *n_completed_ptr;
523  while (true) {
524    uint res = Atomic::cmpxchg(complete+1, n_completed_ptr, complete);
525    if (res == complete) {
526      break;
527    }
528    complete = res;
529  }
530  if (complete+1 == _n_threads) {
531    clear();
532    return true;
533  }
534  return false;
535}
536
537bool FreeIdSet::_stat_init = false;
538FreeIdSet* FreeIdSet::_sets[NSets];
539bool FreeIdSet::_safepoint;
540
541FreeIdSet::FreeIdSet(int sz, Monitor* mon) :
542  _sz(sz), _mon(mon), _hd(0), _waiters(0), _index(-1), _claimed(0)
543{
544  _ids = NEW_C_HEAP_ARRAY(int, sz, mtInternal);
545  for (int i = 0; i < sz; i++) _ids[i] = i+1;
546  _ids[sz-1] = end_of_list; // end of list.
547  if (_stat_init) {
548    for (int j = 0; j < NSets; j++) _sets[j] = NULL;
549    _stat_init = true;
550  }
551  // Add to sets.  (This should happen while the system is still single-threaded.)
552  for (int j = 0; j < NSets; j++) {
553    if (_sets[j] == NULL) {
554      _sets[j] = this;
555      _index = j;
556      break;
557    }
558  }
559  guarantee(_index != -1, "Too many FreeIdSets in use!");
560}
561
562FreeIdSet::~FreeIdSet() {
563  _sets[_index] = NULL;
564  FREE_C_HEAP_ARRAY(int, _ids);
565}
566
567void FreeIdSet::set_safepoint(bool b) {
568  _safepoint = b;
569  if (b) {
570    for (int j = 0; j < NSets; j++) {
571      if (_sets[j] != NULL && _sets[j]->_waiters > 0) {
572        Monitor* mon = _sets[j]->_mon;
573        mon->lock_without_safepoint_check();
574        mon->notify_all();
575        mon->unlock();
576      }
577    }
578  }
579}
580
581#define FID_STATS 0
582
583int FreeIdSet::claim_par_id() {
584#if FID_STATS
585  thread_t tslf = thr_self();
586  tty->print("claim_par_id[%d]: sz = %d, claimed = %d\n", tslf, _sz, _claimed);
587#endif
588  MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
589  while (!_safepoint && _hd == end_of_list) {
590    _waiters++;
591#if FID_STATS
592    if (_waiters > 5) {
593      tty->print("claim_par_id waiting[%d]: %d waiters, %d claimed.\n",
594                 tslf, _waiters, _claimed);
595    }
596#endif
597    _mon->wait(Mutex::_no_safepoint_check_flag);
598    _waiters--;
599  }
600  if (_hd == end_of_list) {
601#if FID_STATS
602    tty->print("claim_par_id[%d]: returning EOL.\n", tslf);
603#endif
604    return -1;
605  } else {
606    int res = _hd;
607    _hd = _ids[res];
608    _ids[res] = claimed;  // For debugging.
609    _claimed++;
610#if FID_STATS
611    tty->print("claim_par_id[%d]: returning %d, claimed = %d.\n",
612               tslf, res, _claimed);
613#endif
614    return res;
615  }
616}
617
618bool FreeIdSet::claim_perm_id(int i) {
619  assert(0 <= i && i < _sz, "Out of range.");
620  MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
621  int prev = end_of_list;
622  int cur = _hd;
623  while (cur != end_of_list) {
624    if (cur == i) {
625      if (prev == end_of_list) {
626        _hd = _ids[cur];
627      } else {
628        _ids[prev] = _ids[cur];
629      }
630      _ids[cur] = claimed;
631      _claimed++;
632      return true;
633    } else {
634      prev = cur;
635      cur = _ids[cur];
636    }
637  }
638  return false;
639
640}
641
642void FreeIdSet::release_par_id(int id) {
643  MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
644  assert(_ids[id] == claimed, "Precondition.");
645  _ids[id] = _hd;
646  _hd = id;
647  _claimed--;
648#if FID_STATS
649  tty->print("[%d] release_par_id(%d), waiters =%d,  claimed = %d.\n",
650             thr_self(), id, _waiters, _claimed);
651#endif
652  if (_waiters > 0)
653    // Notify all would be safer, but this is OK, right?
654    _mon->notify_all();
655}
656