1/* Copyright (C) 2018-2020 Free Software Foundation, Inc.
2   Contributed by Nicolas Koenig
3
4   This file is part of the GNU Fortran runtime library (libgfortran).
5
6   Libgfortran is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 3, or (at your option)
9   any later version.
10
11   Libgfortran is distributed in the hope that it will be useful,
12   but WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14   GNU General Public License for more details.
15
16   Under Section 7 of GPL version 3, you are granted additional
17   permissions described in the GCC Runtime Library Exception, version
18   3.1, as published by the Free Software Foundation.
19
20   You should have received a copy of the GNU General Public License and
21   a copy of the GCC Runtime Library Exception along with this program;
22   see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
23   <http://www.gnu.org/licenses/>.  */
24
25#include "libgfortran.h"
26
27#define _GTHREAD_USE_COND_INIT_FUNC
28#include "../../libgcc/gthr.h"
29#include "io.h"
30#include "fbuf.h"
31#include "format.h"
32#include "unix.h"
33#include <string.h>
34#include <assert.h>
35
36#include <sys/types.h>
37
38#include "async.h"
39#if ASYNC_IO
40
41DEBUG_LINE (__thread const char *aio_prefix = MPREFIX);
42
43DEBUG_LINE (__gthread_mutex_t debug_queue_lock = __GTHREAD_MUTEX_INIT;)
44DEBUG_LINE (aio_lock_debug *aio_debug_head = NULL;)
45
46/* Current unit for asynchronous I/O.  Needed for error reporting.  */
47
48__thread gfc_unit *thread_unit = NULL;
49
50/* Queue entry for the asynchronous I/O entry.  */
51typedef struct transfer_queue
52{
53  enum aio_do type;
54  struct transfer_queue *next;
55  struct st_parameter_dt *new_pdt;
56  transfer_args arg;
57  _Bool has_id;
58  int read_flag;
59} transfer_queue;
60
61struct error {
62  st_parameter_dt *dtp;
63  int id;
64};
65
66/* Helper function to exchange the old vs. a new PDT.  */
67
68static void
69update_pdt (st_parameter_dt **old, st_parameter_dt *new) {
70  st_parameter_dt *temp;
71  NOTE ("Changing pdts, current_unit = %p", (void *) (new->u.p.current_unit));
72  temp = *old;
73  *old = new;
74  if (temp)
75    free (temp);
76}
77
78/* Destroy an adv_cond structure.  */
79
80static void
81destroy_adv_cond (struct adv_cond *ac)
82{
83  T_ERROR (__gthread_cond_destroy, &ac->signal);
84}
85
86/* Function invoked as start routine for a new asynchronous I/O unit.
87   Contains the main loop for accepting requests and handling them.  */
88
89static void *
90async_io (void *arg)
91{
92  DEBUG_LINE (aio_prefix = TPREFIX);
93  transfer_queue *ctq = NULL, *prev = NULL;
94  gfc_unit *u = (gfc_unit *) arg;
95  async_unit *au = u->au;
96  LOCK (&au->lock);
97  thread_unit = u;
98  au->thread = __gthread_self ();
99  while (true)
100    {
101      /* Main loop.  At this point, au->lock is always held. */
102      WAIT_SIGNAL_MUTEX (&au->work, au->tail != NULL, &au->lock);
103      LOCK (&au->lock);
104      ctq = au->head;
105      prev = NULL;
106      /* Loop over the queue entries until they are finished.  */
107      while (ctq)
108	{
109	  if (prev)
110	    free (prev);
111	  prev = ctq;
112	  if (!au->error.has_error)
113	    {
114	      UNLOCK (&au->lock);
115
116	      switch (ctq->type)
117		{
118		case AIO_WRITE_DONE:
119		  NOTE ("Finalizing write");
120		  st_write_done_worker (au->pdt);
121		  UNLOCK (&au->io_lock);
122		  break;
123
124		case AIO_READ_DONE:
125		  NOTE ("Finalizing read");
126		  st_read_done_worker (au->pdt);
127		  UNLOCK (&au->io_lock);
128		  break;
129
130		case AIO_DATA_TRANSFER_INIT:
131		  NOTE ("Data transfer init");
132		  LOCK (&au->io_lock);
133		  update_pdt (&au->pdt, ctq->new_pdt);
134		  data_transfer_init_worker (au->pdt, ctq->read_flag);
135		  break;
136
137		case AIO_TRANSFER_SCALAR:
138		  NOTE ("Starting scalar transfer");
139		  ctq->arg.scalar.transfer (au->pdt, ctq->arg.scalar.arg_bt,
140					    ctq->arg.scalar.data,
141					    ctq->arg.scalar.i,
142					    ctq->arg.scalar.s1,
143					    ctq->arg.scalar.s2);
144		  break;
145
146		case AIO_TRANSFER_ARRAY:
147		  NOTE ("Starting array transfer");
148		  NOTE ("ctq->arg.array.desc = %p",
149			(void *) (ctq->arg.array.desc));
150		  transfer_array_inner (au->pdt, ctq->arg.array.desc,
151					ctq->arg.array.kind,
152					ctq->arg.array.charlen);
153		  free (ctq->arg.array.desc);
154		  break;
155
156		case AIO_CLOSE:
157		  NOTE ("Received AIO_CLOSE");
158		  LOCK (&au->lock);
159		  goto finish_thread;
160
161		default:
162		  internal_error (NULL, "Invalid queue type");
163		  break;
164		}
165	      LOCK (&au->lock);
166	      if (unlikely (au->error.has_error))
167		au->error.last_good_id = au->id.low - 1;
168	    }
169	  else
170	    {
171	      if (ctq->type == AIO_WRITE_DONE || ctq->type == AIO_READ_DONE)
172		{
173		  UNLOCK (&au->io_lock);
174		}
175	      else if (ctq->type == AIO_CLOSE)
176		{
177		  NOTE ("Received AIO_CLOSE during error condition");
178		  goto finish_thread;
179		}
180	    }
181
182  	  NOTE ("Next ctq, current id: %d", au->id.low);
183  	  if (ctq->has_id && au->id.waiting == au->id.low++)
184	    SIGNAL (&au->id.done);
185
186	  ctq = ctq->next;
187	}
188      au->tail = NULL;
189      au->head = NULL;
190      au->empty = 1;
191      SIGNAL (&au->emptysignal);
192    }
193 finish_thread:
194  au->tail = NULL;
195  au->head = NULL;
196  au->empty = 1;
197  SIGNAL (&au->emptysignal);
198  free (ctq);
199  UNLOCK (&au->lock);
200  return NULL;
201}
202
203/* Free an asynchronous unit.  */
204
205static void
206free_async_unit (async_unit *au)
207{
208  if (au->tail)
209    internal_error (NULL, "Trying to free nonempty asynchronous unit");
210
211  destroy_adv_cond (&au->work);
212  destroy_adv_cond (&au->emptysignal);
213  destroy_adv_cond (&au->id.done);
214  T_ERROR (__gthread_mutex_destroy, &au->lock);
215  free (au);
216}
217
218/* Initialize an adv_cond structure.  */
219
220static void
221init_adv_cond (struct adv_cond *ac)
222{
223  ac->pending = 0;
224  __GTHREAD_COND_INIT_FUNCTION (&ac->signal);
225}
226
227/* Initialize an asyncronous unit, returning zero on success,
228 nonzero on failure.  It also sets u->au.  */
229
230void
231init_async_unit (gfc_unit *u)
232{
233  async_unit *au;
234  if (!__gthread_active_p ())
235    {
236      u->au = NULL;
237      return;
238    }
239
240  au = (async_unit *) xmalloc (sizeof (async_unit));
241  u->au = au;
242  init_adv_cond (&au->work);
243  init_adv_cond (&au->emptysignal);
244  __GTHREAD_MUTEX_INIT_FUNCTION (&au->lock);
245  __GTHREAD_MUTEX_INIT_FUNCTION (&au->io_lock);
246  LOCK (&au->lock);
247  T_ERROR (__gthread_create, &au->thread, &async_io, (void *) u);
248  au->pdt = NULL;
249  au->head = NULL;
250  au->tail = NULL;
251  au->empty = true;
252  au->id.waiting = -1;
253  au->id.low = 0;
254  au->id.high = 0;
255  au->error.fatal_error = 0;
256  au->error.has_error = 0;
257  au->error.last_good_id = 0;
258  init_adv_cond (&au->id.done);
259  UNLOCK (&au->lock);
260}
261
262/* Enqueue a transfer statement.  */
263
264void
265enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
266{
267  transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
268  tq->arg = *arg;
269  tq->type = type;
270  tq->has_id = 0;
271  LOCK (&au->lock);
272  if (!au->tail)
273    au->head = tq;
274  else
275    au->tail->next = tq;
276  au->tail = tq;
277  REVOKE_SIGNAL (&(au->emptysignal));
278  au->empty = false;
279  SIGNAL (&au->work);
280  UNLOCK (&au->lock);
281}
282
283/* Enqueue an st_write_done or st_read_done which contains an ID.  */
284
285int
286enqueue_done_id (async_unit *au, enum aio_do type)
287{
288  int ret;
289  transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
290
291  tq->type = type;
292  tq->has_id = 1;
293  LOCK (&au->lock);
294  if (!au->tail)
295    au->head = tq;
296  else
297    au->tail->next = tq;
298  au->tail = tq;
299  REVOKE_SIGNAL (&(au->emptysignal));
300  au->empty = false;
301  ret = au->id.high++;
302  NOTE ("Enqueue id: %d", ret);
303  SIGNAL (&au->work);
304  UNLOCK (&au->lock);
305  return ret;
306}
307
308/* Enqueue an st_write_done or st_read_done without an ID.  */
309
310void
311enqueue_done (async_unit *au, enum aio_do type)
312{
313  transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
314  tq->type = type;
315  tq->has_id = 0;
316  LOCK (&au->lock);
317  if (!au->tail)
318    au->head = tq;
319  else
320    au->tail->next = tq;
321  au->tail = tq;
322  REVOKE_SIGNAL (&(au->emptysignal));
323  au->empty = false;
324  SIGNAL (&au->work);
325  UNLOCK (&au->lock);
326}
327
328/* Enqueue a CLOSE statement.  */
329
330void
331enqueue_close (async_unit *au)
332{
333  transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
334
335  tq->type = AIO_CLOSE;
336  LOCK (&au->lock);
337  if (!au->tail)
338    au->head = tq;
339  else
340    au->tail->next = tq;
341  au->tail = tq;
342  REVOKE_SIGNAL (&(au->emptysignal));
343  au->empty = false;
344  SIGNAL (&au->work);
345  UNLOCK (&au->lock);
346}
347
348/* The asynchronous unit keeps the currently active PDT around.
349   This function changes that to the current one.  */
350
351void
352enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
353{
354  st_parameter_dt *new = xmalloc (sizeof (st_parameter_dt));
355  transfer_queue *tq = xmalloc (sizeof (transfer_queue));
356
357  memcpy ((void *) new, (void *) dt, sizeof (st_parameter_dt));
358
359  NOTE ("dt->internal_unit_desc = %p", dt->internal_unit_desc);
360  NOTE ("common.flags & mask = %d", dt->common.flags & IOPARM_LIBRETURN_MASK);
361  tq->next = NULL;
362  tq->type = AIO_DATA_TRANSFER_INIT;
363  tq->read_flag = read_flag;
364  tq->has_id = 0;
365  tq->new_pdt = new;
366  LOCK (&au->lock);
367
368  if (!au->tail)
369    au->head = tq;
370  else
371    au->tail->next = tq;
372  au->tail = tq;
373  REVOKE_SIGNAL (&(au->emptysignal));
374  au->empty = false;
375  SIGNAL (&au->work);
376  UNLOCK (&au->lock);
377}
378
379/* Collect the errors that may have happened asynchronously.  Return true if
380   an error has been encountered.  */
381
382bool
383collect_async_errors (st_parameter_common *cmp, async_unit *au)
384{
385  bool has_error = au->error.has_error;
386
387  if (has_error)
388    {
389      if (generate_error_common (cmp, au->error.family, au->error.message))
390	{
391	  au->error.has_error = 0;
392	  au->error.cmp = NULL;
393	}
394      else
395	{
396	  /* The program will exit later.  */
397	  au->error.fatal_error = true;
398	}
399    }
400  return has_error;
401}
402
403/* Perform a wait operation on an asynchronous unit with an ID specified,
404   which means collecting the errors that may have happened asynchronously.
405   Return true if an error has been encountered.  */
406
407bool
408async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
409{
410  bool ret;
411
412  if (au == NULL)
413    return false;
414
415  if (cmp == NULL)
416    cmp = au->error.cmp;
417
418  if (au->error.has_error)
419    {
420      if (i <= au->error.last_good_id)
421	return false;
422
423      return collect_async_errors (cmp, au);
424    }
425
426  LOCK (&au->lock);
427  if (i > au->id.high)
428    {
429      generate_error_common (cmp, LIBERROR_BAD_WAIT_ID, NULL);
430      UNLOCK (&au->lock);
431      return true;
432    }
433
434  NOTE ("Waiting for id %d", i);
435  if (au->id.waiting < i)
436    au->id.waiting = i;
437  SIGNAL (&(au->work));
438  WAIT_SIGNAL_MUTEX (&(au->id.done),
439		     (au->id.low >= au->id.waiting || au->empty), &au->lock);
440  LOCK (&au->lock);
441  ret = collect_async_errors (cmp, au);
442  UNLOCK (&au->lock);
443  return ret;
444}
445
446/* Perform a wait operation an an asynchronous unit without an ID.  */
447
448bool
449async_wait (st_parameter_common *cmp, async_unit *au)
450{
451  bool ret;
452
453  if (au == NULL)
454    return false;
455
456  if (cmp == NULL)
457    cmp = au->error.cmp;
458
459  LOCK (&(au->lock));
460  SIGNAL (&(au->work));
461
462  if (au->empty)
463    {
464      ret = collect_async_errors (cmp, au);
465      UNLOCK (&au->lock);
466      return ret;
467    }
468
469  WAIT_SIGNAL_MUTEX (&(au->emptysignal), (au->empty), &au->lock);
470  ret = collect_async_errors (cmp, au);
471  return ret;
472}
473
474/* Close an asynchronous unit.  */
475
476void
477async_close (async_unit *au)
478{
479  if (au == NULL)
480    return;
481
482  NOTE ("Closing async unit");
483  enqueue_close (au);
484  T_ERROR (__gthread_join, au->thread, NULL);
485  free_async_unit (au);
486}
487
488#else
489
490/* Only set u->au to NULL so no async I/O will happen.  */
491
492void
493init_async_unit (gfc_unit *u)
494{
495  u->au = NULL;
496  return;
497}
498
499/* Do-nothing function, which will not be called.  */
500
501void
502enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
503{
504  return;
505}
506
507/* Do-nothing function, which will not be called.  */
508
509int
510enqueue_done_id (async_unit *au, enum aio_do type)
511{
512  return 0;
513}
514
515/* Do-nothing function, which will not be called.  */
516
517void
518enqueue_done (async_unit *au, enum aio_do type)
519{
520  return;
521}
522
523/* Do-nothing function, which will not be called.  */
524
525void
526enqueue_close (async_unit *au)
527{
528  return;
529}
530
531/* Do-nothing function, which will not be called.  */
532
533void
534enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
535{
536  return;
537}
538
539/* Do-nothing function, which will not be called.  */
540
541bool
542collect_async_errors (st_parameter_common *cmp, async_unit *au)
543{
544  return false;
545}
546
547/* Do-nothing function, which will not be called.  */
548
549bool
550async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
551{
552  return false;
553}
554
555/* Do-nothing function, which will not be called.  */
556
557bool
558async_wait (st_parameter_common *cmp, async_unit *au)
559{
560  return false;
561}
562
563/* Do-nothing function, which will not be called.  */
564
565void
566async_close (async_unit *au)
567{
568  return;
569}
570
571#endif
572