1package Test2::IPC::Driver::Files;
2use strict;
3use warnings;
4
5our $VERSION = '1.302194';
6
7BEGIN { require Test2::IPC::Driver; our @ISA = qw(Test2::IPC::Driver) }
8
9use Test2::Util::HashBase qw{tempdir event_ids read_ids timeouts tid pid globals};
10
11use Scalar::Util qw/blessed/;
12use File::Temp();
13use Storable();
14use File::Spec();
15use POSIX();
16
17use Test2::Util qw/try get_tid pkg_to_file IS_WIN32 ipc_separator do_rename do_unlink try_sig_mask/;
18use Test2::API qw/test2_ipc_set_pending/;
19
20sub is_viable { 1 }
21
22sub init {
23    my $self = shift;
24
25    my $tmpdir = File::Temp::tempdir(
26        $ENV{T2_TEMPDIR_TEMPLATE} || "test2" . ipc_separator . $$ . ipc_separator . "XXXXXX",
27        CLEANUP => 0,
28        TMPDIR => 1,
29    );
30
31    $self->abort_trace("Could not get a temp dir") unless $tmpdir;
32
33    $self->{+TEMPDIR} = File::Spec->canonpath($tmpdir);
34
35    print STDERR "\nIPC Temp Dir: $tmpdir\n\n"
36        if $ENV{T2_KEEP_TEMPDIR};
37
38    $self->{+EVENT_IDS} = {};
39    $self->{+READ_IDS} = {};
40    $self->{+TIMEOUTS} = {};
41
42    $self->{+TID} = get_tid();
43    $self->{+PID} = $$;
44
45    $self->{+GLOBALS} = {};
46
47    return $self;
48}
49
50sub hub_file {
51    my $self = shift;
52    my ($hid) = @_;
53    my $tdir = $self->{+TEMPDIR};
54    return File::Spec->catfile($tdir, "HUB" . ipc_separator . $hid);
55}
56
57sub event_file {
58    my $self = shift;
59    my ($hid, $e) = @_;
60
61    my $tempdir = $self->{+TEMPDIR};
62    my $type = blessed($e) or $self->abort("'$e' is not a blessed object!");
63
64    $self->abort("'$e' is not an event object!")
65        unless $type->isa('Test2::Event');
66
67    my $tid = get_tid();
68    my $eid = $self->{+EVENT_IDS}->{$hid}->{$$}->{$tid} += 1;
69
70    my @type = split '::', $type;
71    my $name = join(ipc_separator, $hid, $$, $tid, $eid, @type);
72
73    return File::Spec->catfile($tempdir, $name);
74}
75
76sub add_hub {
77    my $self = shift;
78    my ($hid) = @_;
79
80    my $hfile = $self->hub_file($hid);
81
82    $self->abort_trace("File for hub '$hid' already exists")
83        if -e $hfile;
84
85    open(my $fh, '>', $hfile) or $self->abort_trace("Could not create hub file '$hid': $!");
86    print $fh "$$\n" . get_tid() . "\n";
87    close($fh);
88}
89
90sub drop_hub {
91    my $self = shift;
92    my ($hid) = @_;
93
94    my $tdir = $self->{+TEMPDIR};
95    my $hfile = $self->hub_file($hid);
96
97    $self->abort_trace("File for hub '$hid' does not exist")
98        unless -e $hfile;
99
100    open(my $fh, '<', $hfile) or $self->abort_trace("Could not open hub file '$hid': $!");
101    my ($pid, $tid) = <$fh>;
102    close($fh);
103
104    $self->abort_trace("A hub file can only be closed by the process that started it\nExpected $pid, got $$")
105        unless $pid == $$;
106
107    $self->abort_trace("A hub file can only be closed by the thread that started it\nExpected $tid, got " . get_tid())
108        unless get_tid() == $tid;
109
110    if ($ENV{T2_KEEP_TEMPDIR}) {
111        my ($ok, $err) = do_rename($hfile, File::Spec->canonpath("$hfile.complete"));
112        $self->abort_trace("Could not rename file '$hfile' -> '$hfile.complete': $err") unless $ok
113    }
114    else {
115        my ($ok, $err) = do_unlink($hfile);
116        $self->abort_trace("Could not remove file for hub '$hid': $err") unless $ok
117    }
118
119    opendir(my $dh, $tdir) or $self->abort_trace("Could not open temp dir!");
120
121    my %bad;
122    for my $file (readdir($dh)) {
123        next if $file =~ m{\.complete$};
124        next unless $file =~ m{^$hid};
125
126        eval { $bad{$file} = $self->read_event_file(File::Spec->catfile($tdir, $file)); 1 } or $bad{$file} = $@ || "Unknown error reading file";
127    }
128    closedir($dh);
129
130    return unless keys %bad;
131
132    my $data;
133    my $ok = eval {
134        require JSON::PP;
135        local *UNIVERSAL::TO_JSON = sub { +{ %{$_[0]} } };
136        my $json = JSON::PP->new->ascii->pretty->canonical->allow_unknown->allow_blessed->convert_blessed;
137        $data = $json->encode(\%bad);
138        1;
139    };
140    $ok ||= eval {
141        require Data::Dumper;
142        local $Data::Dumper::Sortkeys = 1;
143        $data = Data::Dumper::Dumper(\%bad);
144        1;
145    };
146
147    $data = "Could not dump data... sorry." unless defined $data;
148
149    $self->abort_trace("Not all files from hub '$hid' have been collected!\nHere is the leftover data:\n========================\n$data\n===================\n");
150}
151
152sub send {
153    my $self = shift;
154    my ($hid, $e, $global) = @_;
155
156    my $tempdir = $self->{+TEMPDIR};
157    my $hfile = $self->hub_file($hid);
158    my $dest = $global ? 'GLOBAL' : $hid;
159
160    $self->abort(<<"    EOT") unless $global || -f $hfile;
161hub '$hid' is not available, failed to send event!
162
163There was an attempt to send an event to a hub in a parent process or thread,
164but that hub appears to be gone. This can happen if you fork, or start a new
165thread from inside subtest, and the parent finishes the subtest before the
166child returns.
167
168This can also happen if the parent process is done testing before the child
169finishes. Test2 normally waits automatically in the root process, but will not
170do so if Test::Builder is loaded for legacy reasons.
171    EOT
172
173    my $file = $self->event_file($dest, $e);
174    my $ready = File::Spec->canonpath("$file.ready");
175
176    if ($global) {
177        my $name = $ready;
178        $name =~ s{^.*(GLOBAL)}{GLOBAL};
179        $self->{+GLOBALS}->{$hid}->{$name}++;
180    }
181
182    # Write and rename the file.
183    my ($ren_ok, $ren_err);
184    my ($ok, $err) = try_sig_mask(sub {
185        Storable::store($e, $file);
186        ($ren_ok, $ren_err) = do_rename("$file", $ready);
187    });
188
189    if ($ok) {
190        $self->abort("Could not rename file '$file' -> '$ready': $ren_err") unless $ren_ok;
191        test2_ipc_set_pending($file);
192    }
193    else {
194        my $src_file = __FILE__;
195        $err =~ s{ at \Q$src_file\E.*$}{};
196        chomp($err);
197        my $tid = get_tid();
198        my $trace = $e->trace->debug;
199        my $type = blessed($e);
200
201        $self->abort(<<"        EOT");
202
203*******************************************************************************
204There was an error writing an event:
205Destination: $dest
206Origin PID:  $$
207Origin TID:  $tid
208Event Type:  $type
209Event Trace: $trace
210File Name:   $file
211Ready Name:  $ready
212Error: $err
213*******************************************************************************
214
215        EOT
216    }
217
218    return 1;
219}
220
221sub driver_abort {
222    my $self = shift;
223    my ($msg) = @_;
224
225    local ($@, $!, $?, $^E);
226    eval {
227        my $abort = File::Spec->catfile($self->{+TEMPDIR}, "ABORT");
228        open(my $fh, '>>', $abort) or die "Could not open abort file: $!";
229        print $fh $msg, "\n";
230        close($fh) or die "Could not close abort file: $!";
231        1;
232    } or warn $@;
233}
234
235sub cull {
236    my $self = shift;
237    my ($hid) = @_;
238
239    my $tempdir = $self->{+TEMPDIR};
240
241    opendir(my $dh, $tempdir) or $self->abort("could not open IPC temp dir ($tempdir)!");
242
243    my $read = $self->{+READ_IDS};
244    my $timeouts = $self->{+TIMEOUTS};
245
246    my @out;
247    for my $info (sort cmp_events map { $self->should_read_event($hid, $_) } readdir($dh)) {
248        unless ($info->{global}) {
249            my $next = $self->{+READ_IDS}->{$info->{hid}}->{$info->{pid}}->{$info->{tid}} ||= 1;
250
251            $timeouts->{$info->{file}} ||= time;
252
253            if ($next != $info->{eid}) {
254                # Wait up to N seconds for missing events
255                next unless 5 < time - $timeouts->{$info->{file}};
256                $self->abort("Missing event HID: $info->{hid}, PID: $info->{pid}, TID: $info->{tid}, EID: $info->{eid}.");
257            }
258
259            $self->{+READ_IDS}->{$info->{hid}}->{$info->{pid}}->{$info->{tid}} = $info->{eid} + 1;
260        }
261
262        my $full = $info->{full_path};
263        my $obj = $self->read_event_file($full);
264        push @out => $obj;
265
266        # Do not remove global events
267        next if $info->{global};
268
269        if ($ENV{T2_KEEP_TEMPDIR}) {
270            my $complete = File::Spec->canonpath("$full.complete");
271            my ($ok, $err) = do_rename($full, $complete);
272            $self->abort("Could not rename IPC file '$full', '$complete': $err") unless $ok;
273        }
274        else {
275            my ($ok, $err) = do_unlink("$full");
276            $self->abort("Could not unlink IPC file '$full': $err") unless $ok;
277        }
278    }
279
280    closedir($dh);
281    return @out;
282}
283
284sub parse_event_filename {
285    my $self = shift;
286    my ($file) = @_;
287
288    # The || is to force 0 in false
289    my $complete = substr($file, -9, 9) eq '.complete' || 0 and substr($file, -9, 9, "");
290    my $ready    = substr($file, -6, 6) eq '.ready'    || 0 and substr($file, -6, 6, "");
291
292    my @parts = split ipc_separator, $file;
293    my ($global, $hid) = $parts[0] eq 'GLOBAL' ? (1, shift @parts) : (0, join ipc_separator, splice(@parts, 0, 4));
294    my ($pid, $tid, $eid) = splice(@parts, 0, 3);
295    my $type = join '::' => @parts;
296
297    return {
298        file     => $file,
299        ready    => !!$ready,
300        complete => !!$complete,
301        global   => $global,
302        type     => $type,
303        hid      => $hid,
304        pid      => $pid,
305        tid      => $tid,
306        eid      => $eid,
307    };
308}
309
310sub should_read_event {
311    my $self = shift;
312    my ($hid, $file) = @_;
313
314    return if substr($file, 0, 1) eq '.';
315    return if substr($file, 0, 3) eq 'HUB';
316    CORE::exit(255) if $file eq 'ABORT';
317
318    my $parsed = $self->parse_event_filename($file);
319
320    return if $parsed->{complete};
321    return unless $parsed->{ready};
322    return unless $parsed->{global} || $parsed->{hid} eq $hid;
323
324    return if $parsed->{global} && $self->{+GLOBALS}->{$hid}->{$file}++;
325
326    # Untaint the path.
327    my $full = File::Spec->catfile($self->{+TEMPDIR}, $file);
328    ($full) = ($full =~ m/^(.*)$/gs) if ${^TAINT};
329
330    $parsed->{full_path} = $full;
331
332    return $parsed;
333}
334
335sub cmp_events {
336    # Globals first
337    return -1 if $a->{global} && !$b->{global};
338    return  1 if $b->{global} && !$a->{global};
339
340    return $a->{pid} <=> $b->{pid}
341        || $a->{tid} <=> $b->{tid}
342        || $a->{eid} <=> $b->{eid};
343}
344
345sub read_event_file {
346    my $self = shift;
347    my ($file) = @_;
348
349    my $obj = Storable::retrieve($file);
350    $self->abort("Got an unblessed object: '$obj'")
351        unless blessed($obj);
352
353    unless ($obj->isa('Test2::Event')) {
354        my $pkg  = blessed($obj);
355        my $mod_file = pkg_to_file($pkg);
356        my ($ok, $err) = try { require $mod_file };
357
358        $self->abort("Event has unknown type ($pkg), tried to load '$mod_file' but failed: $err")
359            unless $ok;
360
361        $self->abort("'$obj' is not a 'Test2::Event' object")
362            unless $obj->isa('Test2::Event');
363    }
364
365    return $obj;
366}
367
368sub waiting {
369    my $self = shift;
370    require Test2::Event::Waiting;
371    $self->send(
372        GLOBAL => Test2::Event::Waiting->new(
373            trace => Test2::EventFacet::Trace->new(frame => [caller()]),
374        ),
375        'GLOBAL'
376    );
377    return;
378}
379
380sub DESTROY {
381    my $self = shift;
382
383    return unless defined $self->pid;
384    return unless defined $self->tid;
385
386    return unless $$        == $self->pid;
387    return unless get_tid() == $self->tid;
388
389    my $tempdir = $self->{+TEMPDIR};
390
391    my $aborted = 0;
392    my $abort_file = File::Spec->catfile($self->{+TEMPDIR}, "ABORT");
393    if (-e $abort_file) {
394        $aborted = 1;
395        my ($ok, $err) = do_unlink($abort_file);
396        warn $err unless $ok;
397    }
398
399    opendir(my $dh, $tempdir) or $self->abort("Could not open temp dir! ($tempdir)");
400    while(my $file = readdir($dh)) {
401        next if $file =~ m/^\.+$/;
402        next if $file =~ m/\.complete$/;
403        my $full = File::Spec->catfile($tempdir, $file);
404
405        my $sep = ipc_separator;
406        if ($aborted || $file =~ m/^(GLOBAL|HUB$sep)/) {
407            $full =~ m/^(.*)$/;
408            $full = $1; # Untaint it
409            next if $ENV{T2_KEEP_TEMPDIR};
410            my ($ok, $err) = do_unlink($full);
411            $self->abort("Could not unlink IPC file '$full': $err") unless $ok;
412            next;
413        }
414
415        $self->abort("Leftover files in the directory ($full)!\n");
416    }
417    closedir($dh);
418
419    if ($ENV{T2_KEEP_TEMPDIR}) {
420        print STDERR "# Not removing temp dir: $tempdir\n";
421        return;
422    }
423
424    my $abort = File::Spec->catfile($self->{+TEMPDIR}, "ABORT");
425    unlink($abort) if -e $abort;
426    rmdir($tempdir) or warn "Could not remove IPC temp dir ($tempdir)";
427}
428
4291;
430
431__END__
432
433=pod
434
435=encoding UTF-8
436
437=head1 NAME
438
439Test2::IPC::Driver::Files - Temp dir + Files concurrency model.
440
441=head1 DESCRIPTION
442
443This is the default, and fallback concurrency model for L<Test2>. This
444sends events between processes and threads using serialized files in a
445temporary directory. This is not particularly fast, but it works everywhere.
446
447=head1 SYNOPSIS
448
449    use Test2::IPC::Driver::Files;
450
451    # IPC is now enabled
452
453=head1 ENVIRONMENT VARIABLES
454
455=over 4
456
457=item T2_KEEP_TEMPDIR=0
458
459When true, the tempdir used by the IPC driver will not be deleted when the test
460is done.
461
462=item T2_TEMPDIR_TEMPLATE='test2-XXXXXX'
463
464This can be used to set the template for the IPC temp dir. The template should
465follow template specifications from L<File::Temp>.
466
467=back
468
469=head1 SEE ALSO
470
471See L<Test2::IPC::Driver> for methods.
472
473=head1 SOURCE
474
475The source code repository for Test2 can be found at
476F<http://github.com/Test-More/test-more/>.
477
478=head1 MAINTAINERS
479
480=over 4
481
482=item Chad Granum E<lt>exodist@cpan.orgE<gt>
483
484=back
485
486=head1 AUTHORS
487
488=over 4
489
490=item Chad Granum E<lt>exodist@cpan.orgE<gt>
491
492=back
493
494=head1 COPYRIGHT
495
496Copyright 2020 Chad Granum E<lt>exodist@cpan.orgE<gt>.
497
498This program is free software; you can redistribute it and/or
499modify it under the same terms as Perl itself.
500
501See F<http://dev.perl.org/licenses/>
502
503=cut
504