1package TAP::Parser::Multiplexer;
2
3use strict;
4use warnings;
5
6use IO::Select;
7use Errno;
8
9use base 'TAP::Object';
10
11use constant IS_WIN32 => $^O =~ /^(MS)?Win32$/;
12use constant IS_VMS => $^O eq 'VMS';
13use constant SELECT_OK => !( IS_VMS || IS_WIN32 );
14
15=head1 NAME
16
17TAP::Parser::Multiplexer - Multiplex multiple TAP::Parsers
18
19=head1 VERSION
20
21Version 3.44
22
23=cut
24
25our $VERSION = '3.44';
26
27=head1 SYNOPSIS
28
29    use TAP::Parser::Multiplexer;
30
31    my $mux = TAP::Parser::Multiplexer->new;
32    $mux->add( $parser1, $stash1 );
33    $mux->add( $parser2, $stash2 );
34    while ( my ( $parser, $stash, $result ) = $mux->next ) {
35        # do stuff
36    }
37
38=head1 DESCRIPTION
39
40C<TAP::Parser::Multiplexer> gathers input from multiple TAP::Parsers.
41Internally it calls select on the input file handles for those parsers
42to wait for one or more of them to have input available.
43
44See L<TAP::Harness> for an example of its use.
45
46=head1 METHODS
47
48=head2 Class Methods
49
50=head3 C<new>
51
52    my $mux = TAP::Parser::Multiplexer->new;
53
54Returns a new C<TAP::Parser::Multiplexer> object.
55
56=cut
57
58# new() implementation supplied by TAP::Object
59
60sub _initialize {
61    my $self = shift;
62    $self->{select} = IO::Select->new;
63    $self->{avid}   = [];                # Parsers that can't select
64    $self->{count}  = 0;
65    return $self;
66}
67
68##############################################################################
69
70=head2 Instance Methods
71
72=head3 C<add>
73
74  $mux->add( $parser, $stash );
75
76Add a TAP::Parser to the multiplexer. C<$stash> is an optional opaque
77reference that will be returned from C<next> along with the parser and
78the next result.
79
80=cut
81
82sub add {
83    my ( $self, $parser, $stash ) = @_;
84
85    if ( SELECT_OK && ( my @handles = $parser->get_select_handles ) ) {
86        my $sel = $self->{select};
87
88        # We have to turn handles into file numbers here because by
89        # the time we want to remove them from our IO::Select they
90        # will already have been closed by the iterator.
91        my @filenos = map { fileno $_ } @handles;
92        for my $h (@handles) {
93            $sel->add( [ $h, $parser, $stash, @filenos ] );
94        }
95
96        $self->{count}++;
97    }
98    else {
99        push @{ $self->{avid} }, [ $parser, $stash ];
100    }
101}
102
103=head3 C<parsers>
104
105  my $count   = $mux->parsers;
106
107Returns the number of parsers. Parsers are removed from the multiplexer
108when their input is exhausted.
109
110=cut
111
112sub parsers {
113    my $self = shift;
114    return $self->{count} + scalar @{ $self->{avid} };
115}
116
117sub _iter {
118    my $self = shift;
119
120    my $sel   = $self->{select};
121    my $avid  = $self->{avid};
122    my @ready = ();
123
124    return sub {
125
126        # Drain all the non-selectable parsers first
127        if (@$avid) {
128            my ( $parser, $stash ) = @{ $avid->[0] };
129            my $result = $parser->next;
130            shift @$avid unless defined $result;
131            return ( $parser, $stash, $result );
132        }
133
134        until (@ready) {
135            return unless $sel->count;
136            @ready = $sel->can_read;
137            last if @ready || $! != Errno::EINTR;
138        }
139
140        my ( $h, $parser, $stash, @handles ) = @{ shift @ready };
141        my $result = $parser->next;
142
143        unless ( defined $result ) {
144            $sel->remove(@handles);
145            $self->{count}--;
146
147            # Force another can_read - we may now have removed a handle
148            # thought to have been ready.
149            @ready = ();
150        }
151
152        return ( $parser, $stash, $result );
153    };
154}
155
156=head3 C<next>
157
158Return a result from the next available parser. Returns a list
159containing the parser from which the result came, the stash that
160corresponds with that parser and the result.
161
162    my ( $parser, $stash, $result ) = $mux->next;
163
164If C<$result> is undefined the corresponding parser has reached the end
165of its input (and will automatically be removed from the multiplexer).
166
167When all parsers are exhausted an empty list will be returned.
168
169    if ( my ( $parser, $stash, $result ) = $mux->next ) {
170        if ( ! defined $result ) {
171            # End of this parser
172        }
173        else {
174            # Process result
175        }
176    }
177    else {
178        # All parsers finished
179    }
180
181=cut
182
183sub next {
184    my $self = shift;
185    return ( $self->{_iter} ||= $self->_iter )->();
186}
187
188=head1 See Also
189
190L<TAP::Parser>
191
192L<TAP::Harness>
193
194=cut
195
1961;
197