1/* @(#) implementation of packet-io functions for udpxy
2 *
3 * Copyright 2008-2011 Pavel V. Cherenkov (pcherenkov@gmail.com)
4 *
5 *  This file is part of udpxy.
6 *
7 *  udpxy is free software: you can redistribute it and/or modify
8 *  it under the terms of the GNU General Public License as published by
9 *  the Free Software Foundation, either version 3 of the License, or
10 *  (at your option) any later version.
11 *
12 *  udpxy is distributed in the hope that it will be useful,
13 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 *  GNU General Public License for more details.
16 *
17 *  You should have received a copy of the GNU General Public License
18 *  along with udpxy.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21#include <sys/types.h>
22#include <unistd.h>
23#include <stdio.h>
24#include <assert.h>
25#include <errno.h>
26#include <stdlib.h>
27#include <sys/uio.h>
28#include <string.h>
29#include <strings.h>
30#include <time.h>
31
32#include "udpxy.h"
33#include "dpkt.h"
34#include "rtp.h"
35#include "util.h"
36#include "mtrace.h"
37
38extern FILE* g_flog;
39
40static const size_t TS_SEG_LEN = 188;
41
42/* data-stream format type */
43enum {
44    UPXDT_UNKNOWN = 0,     /* no assumptions */
45    UPXDT_TS,          /* MPEG-TS */
46    UPXDT_RTP_TS,      /* RTP over MPEG-TS */
47    UPXDT_UDS,         /* UDS file format */
48    UPXDT_RAW          /* read AS-IS */
49};
50static const char* upxfmt_NAME[] = {
51    "UNKNOWN",
52    "MPEG-TS",
53    "RTP-TS",
54    "UDPXY-UDS",
55    "RAW"
56};
57static const int UPXDT_LEN = sizeof(upxfmt_NAME) / sizeof(upxfmt_NAME[0]);
58
59
60const char*
61fmt2str( upxfmt_t fmt )
62{
63    int ifmt = fmt;
64
65    (void) UPXDT_LEN;
66    assert( (ifmt >= 0 ) && (ifmt < UPXDT_LEN) );
67    return upxfmt_NAME[ ifmt ];
68}
69
70
71/* check for MPEG TS signature, complain if not found
72 */
73static int
74ts_sigcheck( const int c, off_t offset, ssize_t len,
75             FILE* log, const char* func )
76
77{
78    assert( len );
79    (void)(len); /* NOP to avoid warnings */
80
81    if( c == MPEG_TS_SIG ) return 0;
82
83    if( log && func ) {
84        (void) offset; /* get rid of a warning if TRACE is disabled */
85        TRACE( (void)tmfprintf( log, "%s: TS signature mismatch TS=[0x%02X], found=[0x%02X]; "
86            "offset [0x%X(%u)] of packet len=[%lu]\n",
87            func, MPEG_TS_SIG, c, offset, offset, (u_long)len ) );
88    }
89
90    return -1;
91}
92
93
94/* determine type of stream in memory
95 */
96upxfmt_t
97get_mstream_type( const char* data, size_t len, FILE* log )
98{
99    int sig = 0;
100    size_t hdrlen = 0;
101    ssize_t n = -1;
102
103    assert( data && len );
104
105    if( len < (RTP_HDR_SIZE + 1) ) {
106        (void) tmfprintf( log, "%s: read [%ld] bytes,"
107                " not enough for RTP header\n", __func__,
108                (long)n );
109        return UPXDT_UNKNOWN;
110    }
111
112    /* if the 1st byte has MPEG-TS signature - skip further checks */
113    sig = data[0] & 0xFF;
114    if( 0 == ts_sigcheck( sig, 0, 1, NULL /*log*/, __func__ ) )
115        return UPXDT_TS;
116
117    /* if not RTP - quit */
118    if( 0 == RTP_verify( data, RTP_HDR_SIZE, log ) )
119        return UPXDT_UNKNOWN;
120
121    /* check the first byte after RTP header - should be
122     * TS signature to be RTP over TS */
123
124    if( (0 != RTP_hdrlen( data, len, &hdrlen, log )) ||
125        (len < hdrlen) ) {
126        return UPXDT_UNKNOWN;
127    }
128
129    sig = data[ hdrlen ];
130    if( 0 != ts_sigcheck( sig, 0, 1, log, __func__ ) )
131        return UPXDT_UNKNOWN;
132
133    return UPXDT_RTP_TS;
134}
135
136
137
138/* determine type of stream saved in file
139 */
140upxfmt_t
141get_fstream_type( int fd, FILE* log )
142{
143    ssize_t n = 0;
144    off_t offset = 0, where = 0;
145    upxfmt_t dtype = UPXDT_UNKNOWN;
146    char* data = NULL;
147
148    /* read in enough data to contain extended header
149     * and beginning of payload segment */
150    size_t len = TS_SEG_LEN + RTP_XTHDRLEN;
151
152    assert( (fd > 0) && log );
153
154    if( NULL == (data = malloc( len )) ) {
155        mperror( log, errno, "%s: malloc", __func__ );
156        return UPXDT_UNKNOWN;
157    }
158
159    do {
160        /* check if it is a MPEG TS stream
161         */
162        n = read( fd, data, len );
163        if( 0 != sizecheck( "Not enough space for stream data",
164                    len, n, log, __func__ ) ) break;
165        offset += n;
166
167        dtype = get_mstream_type( data, len, log );
168        if( UPXDT_UNKNOWN == dtype ) {
169            TRACE( (void)tmfprintf( log, "%s: file type is not recognized\n",
170                        __func__ ) );
171            dtype = UPXDT_UNKNOWN;
172            break;
173        }
174    } while(0);
175
176    if( NULL != data ) free( data );
177
178    if( n <= 0 ) {
179        mperror( log, errno, "%s", __func__ );
180        return UPXDT_UNKNOWN;
181    }
182
183    where = lseek( fd, (-1) * offset, SEEK_CUR );
184    if( -1 == where ) {
185        mperror( log, errno, "%s: lseek", __func__ );
186        return UPXDT_UNKNOWN;
187    }
188
189    /*
190    TRACE( (void)tmfprintf( log, "%s: stream type = [%d]=[%s]\n",
191                __func__, (int)dtype, fmt2str(dtype) ) );
192    */
193
194    return dtype;
195}
196
197
198/* read a sequence of MPEG TS packets (to fit into the given buffer)
199 */
200static ssize_t
201read_ts_file( int fd, char* data, const size_t len, FILE* log )
202{
203    const size_t pkt_len = ((len - 1) / TS_SEG_LEN) * TS_SEG_LEN;
204    off_t k = 0;
205    u_int bad_frg = 0;
206    ssize_t n = -1;
207
208    assert( (fd > 0) && data && len && log);
209
210    assert( !buf_overrun( data, len, 0, pkt_len, log ) );
211    n = read_buf( fd, data, pkt_len, log );
212    if( n <= 0 ) return n;
213
214    if( 0 != sizecheck( "Bad TS packet stream",
215                pkt_len, n, log, __func__ ) )
216        return -1;
217
218    /* make sure we've read TS records, not random data
219     */
220    for( k = 0; k < (off_t)pkt_len; k += TS_SEG_LEN ) {
221        if( -1 == ts_sigcheck( data[k], k, (u_long)pkt_len,
222                    log, __func__ ) ) {
223            ++bad_frg;
224        }
225    }
226
227    return (bad_frg ? -1 : n);
228}
229
230
231/* read an RTP packet
232 */
233static ssize_t
234read_rtp_file( int fd, char* data, const size_t len, FILE* log )
235{
236    ssize_t nrd = -1, offset = 0;
237    size_t hdrlen = 0, rdlen = 0;
238    u_int frg = 0;
239    int rtp_end = 0, rc = 0;
240    off_t where = 0;
241
242    assert( (fd > 0) && data && len && log);
243
244    assert( !buf_overrun( data, len, 0, RTP_HDR_SIZE, log ) );
245    nrd = read_buf( fd, data, RTP_HDR_SIZE, log );
246    if( nrd <= 0 ) return nrd;
247    offset += nrd;
248
249    if( -1 == sizecheck( "Bad RTP header", RTP_HDR_SIZE, nrd,
250                log, __func__ ) )
251        return -1;
252
253    if( 0 == RTP_verify( data, nrd, log ) )
254        return -1;
255
256    if( -1 == (rc = RTP_hdrlen( data, nrd, &hdrlen, log )) )
257        return -1;
258
259    /* if there is an extended header, read it in */
260    if( ENOMEM == rc ) {
261        assert( !buf_overrun( data, len, offset,
262                    RTP_XTHDRLEN - RTP_HDR_SIZE, log ) );
263        nrd = read_buf( fd, data + offset,
264                RTP_XTHDRLEN - RTP_HDR_SIZE, log );
265        if( (nrd <= 0) ||
266            (-1 == sizecheck("Bad RTP x-header",
267                             RTP_XTHDRLEN - RTP_HDR_SIZE, nrd,
268                             log, __func__ )) ) {
269            return -1;
270        }
271        if( 0 == nrd ) return nrd;
272
273        offset += nrd;
274        rc = RTP_hdrlen( data, offset, &hdrlen, log );
275        if( 0 != rc ) {
276            TRACE( (void)tmfprintf( log, "%s: bad RTP header - quitting\n",
277                        __func__ ) );
278            return -1;
279        }
280
281        /*
282        TRACE( (void)tmfprintf( log, "%s: RTP x-header length=[%lu]\n",
283                    __func__, (u_long)hdrlen ) );
284        */
285
286        if( (size_t)offset > hdrlen ) {
287            /* read more than needed: step back */
288
289            where = lseek( fd, (-1)*((size_t)offset - hdrlen), SEEK_CUR );
290            if( -1 == where ) {
291                mperror( log, errno, "%s: lseek", __func__ );
292                return -1;
293            }
294
295            offset -= ((size_t)offset - hdrlen);
296            assert( (size_t)offset == hdrlen );
297
298            /*
299            TRACE( (void)tmfprintf( log, "%s: back to fpos=[0x%X], "
300                        "offset=[%ld]\n", __func__, (u_int)where, (long)offset ) );
301            */
302        }
303        else if( hdrlen > (size_t)offset ) {
304            /* read remainder of the header in */
305
306            assert( !buf_overrun( data, len, offset,
307                        (hdrlen - (size_t)offset), log ) );
308            nrd = read_buf( fd, data + offset,
309                    (hdrlen - (size_t)offset), log );
310            if( nrd <= 0 ||
311               (-1 == sizecheck("Bad RTP x-header tail",
312                             (hdrlen - (size_t)offset), nrd,
313                             log, __func__ )) ) {
314                    return -1;
315            }
316            if( 0 == nrd ) return nrd;
317
318            offset += nrd;
319            assert( (size_t)offset == hdrlen );
320        }
321    } /* read extended header */
322
323
324    /* read TS records until there is another RTP header or EOF */
325    for( frg = 0; (ssize_t)len > offset; ++frg ) {
326
327        rdlen = ( (len - offset) < TS_SEG_LEN
328                  ? (len - offset)
329                  : TS_SEG_LEN );
330
331       /*
332       TRACE( (void)tmfprintf( log, "%s: reading [%lu] more bytes\n",
333               __func__, (u_long)rdlen ) );
334       */
335
336        assert( !buf_overrun( data, len, offset, rdlen, log ) );
337        nrd = read_buf( fd, data + offset, rdlen, log );
338        if( nrd <= 0 ) break;
339
340        /* if it's an RTP header, roll back and return
341         */
342        rtp_end = RTP_verify( data + offset, nrd, log );
343        if( 1 == rtp_end ) {
344            if( -1 == lseek( fd, (-1) * nrd, SEEK_CUR ) ) {
345                mperror( log, errno, "%s: lseek", __func__ );
346                return -1;
347            }
348
349            break;
350        }
351
352        /* check if it is a TS packet and it's of the right size
353         */
354        if( (-1 == ts_sigcheck( data[offset], offset, (u_long)TS_SEG_LEN,
355                    log, __func__ )) ||
356            (-1 == sizecheck( "Bad TS segment size", TS_SEG_LEN, nrd,
357                    log, __func__ )) ) {
358            TRACE( hex_dump( "Data in question", data, offset + nrd, log ) );
359            return -1;
360        }
361
362        offset += nrd;
363    } /* for */
364
365
366    /* If it is not EOF and no RTP header for the next message is found,
367     * it is either our buffer is too small (to fit the whole message)
368     * or the stream is invalid
369     */
370    if( !rtp_end && (0 != nrd) ) {
371        (void)tmfprintf( log, "%s: no RTP end after reading [%ld] bytes\n",
372                __func__, (long)offset );
373        return -1;
374    }
375
376    return (nrd < 0) ? nrd : offset;
377}
378
379
380/* read record of one of the supported types from file
381 */
382ssize_t
383read_frecord( int fd, char* data, const size_t len,
384             upxfmt_t* stream_type, FILE* log )
385{
386    upxfmt_t stype;
387    /* off_t where = -1, endmark = -1; */
388    ssize_t nrd = -1;
389
390    assert( fd > 0 );
391    assert( data && len );
392    assert( stream_type && log );
393
394    stype = *stream_type;
395
396    /*
397    where = lseek( fd, 0, SEEK_CUR );
398    TRACE( (void)tmfprintf( log, "%s: BEGIN reading at pos=[0x%X:%u]\n",
399                __func__, (u_int)where, (u_int)where ) );
400    */
401
402    if( UPXDT_UNKNOWN == *stream_type ) {
403        stype = get_fstream_type( fd, log );
404
405        if( UPXDT_UNKNOWN == stype ) {
406            (void)tmfprintf( log, "%s: Unsupported type\n", __func__ );
407            return -1;
408        }
409
410        *stream_type = stype;
411    } /* UPXDT_UNKNOWN */
412
413    if( UPXDT_TS == stype ) {
414        nrd = read_ts_file( fd, data, len, log );
415    }
416    else if( UPXDT_RTP_TS == stype ) {
417        nrd = read_rtp_file( fd, data, len, log );
418    }
419    else {
420        (void)tmfprintf( log, "%s: unknown stream type [%d]\n",
421                __func__, stype );
422        return -1;
423    }
424
425    /*
426    if( nrd >= 0 ) {
427        endmark = lseek( fd, 0, SEEK_CUR );
428
429        TRACE( (void)tmfprintf( log, "%s: END reading [%ld] bytes at pos=[0x%X:%u]\n",
430                __func__, (long)nrd, (u_int)endmark, (u_int)endmark ) );
431
432        TRACE( sizecheck( "WARNING: Read file discrepancy",
433                    where + nrd, endmark,
434                    log, __func__ ) );
435    }
436    */
437
438    return nrd;
439}
440
441
442/* write data as a UDS record
443 */
444static ssize_t
445write_uds_record( int fd, const char* data, size_t len, FILE* log )
446{
447    assert( (fd > 0) && data && len );
448    (void)(data && len && fd);
449    (void)tmfprintf( log, "%s: UDS conversion not yet implemented\n",
450            __func__ );
451    return -1;
452}
453
454
455/* write RTP record into TS stream
456 */
457static ssize_t
458write_rtp2ts( int fd, const char* data, size_t len, FILE* log )
459{
460    void* buf = (void*)data;
461    size_t pldlen = len;
462    const int NO_VERIFY = 0;
463    int rc = 0;
464
465    assert( (fd > 0) && data && len && log );
466
467    rc = RTP_process( &buf, &pldlen, NO_VERIFY, log );
468    if( -1 == rc ) return -1;
469
470    assert( !buf_overrun( buf, len, 0, pldlen, log ) );
471    return write_buf( fd, buf, pldlen, log );
472}
473
474
475/* write record after converting it from source into destination
476 * format
477 */
478ssize_t
479write_frecord( int fd, const char* data, size_t len,
480              upxfmt_t sfmt, upxfmt_t dfmt, FILE* log )
481{
482    ssize_t nwr = -1;
483    int fmt_ok = 0;
484    const char *str_from = NULL, *str_to = NULL;
485
486    if( UPXDT_UDS == dfmt ) {
487        fmt_ok = 1;
488        nwr = write_uds_record( fd, data, len, log );
489    }
490    else if( UPXDT_TS == dfmt ) {
491        if( UPXDT_RTP_TS == sfmt ) {
492            fmt_ok = 1;
493            nwr = write_rtp2ts( fd, data, len, log );
494        }
495    }
496
497    if( !fmt_ok ) {
498        str_from = fmt2str(sfmt);
499        str_to   = fmt2str(dfmt);
500        (void)tmfprintf( log, "Conversion from [%s] into [%s] is not supported\n",
501                str_from, str_to );
502        return -1;
503    }
504
505    return nwr;
506}
507
508
509/* reset packet-buffer registry in stream spec
510 */
511void
512reset_pkt_registry( struct dstream_ctx* ds )
513{
514    assert( ds );
515
516    ds->flags &= ~F_DROP_PACKET;
517    ds->pkt_count = 0;
518}
519
520
521/* release resources allocated for stream spec
522 */
523void
524free_dstream_ctx( struct dstream_ctx* ds )
525{
526    assert( ds );
527
528    if( NULL != ds->pkt ) {
529        free( ds->pkt );
530        ds->pkt = NULL;
531
532        ds->pkt_count = ds->max_pkt = 0;
533    }
534}
535
536
537
538/* register received packet into registry (for scattered output)
539 */
540static int
541register_packet( struct dstream_ctx* spc, char* buf, size_t len )
542{
543    struct iovec* new_pkt = NULL;
544    static const int DO_VERIFY = 1;
545
546    void* new_buf = buf;
547    size_t new_len = len;
548
549    assert( spc->max_pkt > 0 );
550
551    /* enlarge packet registry if needed */
552    if( spc->pkt_count >= spc->max_pkt ) {
553        spc->max_pkt <<= 1;
554        spc->pkt = realloc( spc->pkt, spc->max_pkt * sizeof(spc->pkt[0]) );
555        if( NULL == spc->pkt ) {
556            mperror( g_flog, errno, "%s: realloc", __func__ );
557            return -1;
558        }
559
560        TRACE( (void)tmfprintf( g_flog, "RTP packet registry "
561                "expanded to [%lu] records\n", (u_long)spc->max_pkt ) );
562    }
563
564    /* put packet info into registry */
565
566    new_pkt = &(spc->pkt[ spc->pkt_count ]);
567
568    /*
569    TRACE( (void)tmfprintf( stderr, "IN: packet [%lu]: buf=[%p], len=[%lu]\n",
570                (u_long)spc->pkt_count, (void*)buf, (u_long)len ) );
571    */
572
573    if( 0 != RTP_process( &new_buf, &new_len, DO_VERIFY, g_flog ) ) {
574        TRACE( (void)tmfputs("register packet: dropping\n", g_flog) );
575        spc->flags |= F_DROP_PACKET;
576
577        return 0;
578    }
579
580    new_pkt->iov_base = new_buf;
581    new_pkt->iov_len = new_len;
582
583    /*
584    TRACE( (void)tmfprintf( stderr, "OUT: packet [%lu]: buf=[%p], len=[%lu]\n",
585                (u_long)spc->pkt_count, new_pkt->iov_base,
586                (u_long)new_pkt->iov_len ) );
587    */
588
589    spc->pkt_count++;
590    return 0;
591}
592
593
594/* read data from source, determine underlying protocol
595 * (if not already known); and process the packet
596 * if needed (for RTP - register packet)
597 *
598 * return the number of octets read from the source
599 * into the buffer
600 */
601static ssize_t
602read_packet( struct dstream_ctx* spc, int fd, char* buf, size_t len )
603{
604    ssize_t n = -1;
605    size_t chunk_len = len;
606
607    assert( spc && buf && len );
608    assert( fd > 0 );
609
610    /* if *RAW* data specified - read AS IS
611     * and exit */
612    if( UPXDT_RAW == spc->stype ) {
613        return read_buf( fd, buf, len, g_flog );
614    }
615
616    /* if it is (or *could* be) RTP, read only MTU bytes
617     */
618    if( (spc->stype == UPXDT_RTP_TS) || (spc->flags & F_CHECK_FMT) )
619        chunk_len = (len > spc->mtu) ? spc->mtu : len;
620
621    if( spc->flags & F_FILE_INPUT ) {
622        assert( !buf_overrun( buf, len, 0, chunk_len, g_flog ) );
623        n = read_frecord( fd, buf, chunk_len, &(spc->stype), g_flog );
624        if( n <= 0 ) return n;
625    }
626    else {
627        assert( !buf_overrun(buf, len, 0, chunk_len, g_flog) );
628        n = read_buf( fd, buf, chunk_len, g_flog );
629        if( n <= 0 ) return n;
630    }
631
632    if( spc->flags & F_CHECK_FMT ) {
633        spc->stype = get_mstream_type( buf, n, g_flog );
634        switch (spc->stype) {
635            case UPXDT_RTP_TS:
636                /* scattered: exclude RTP headers */
637                spc->flags |= F_SCATTERED; break;
638            case UPXDT_TS:
639                spc->flags &= ~F_SCATTERED; break;
640            default:
641                spc->stype = UPXDT_RAW;
642                TRACE( (void)tmfputs( "Unrecognized stream type\n", g_flog ) );
643                break;
644        } /* switch */
645
646        TRACE( (void)tmfprintf( g_flog, "Established stream as [%s]\n",
647               fmt2str( spc->stype ) ) );
648
649        spc->flags &= ~F_CHECK_FMT;
650    }
651
652    if( spc->flags & F_SCATTERED )
653        if( -1 == register_packet( spc, buf, n ) ) return -1;
654
655    return n;
656}
657
658
659/* read data from source of specified type (UDP socket or otherwise);
660 * read as many fragments as specified (max_frgs) into the buffer
661 */
662ssize_t
663read_data( struct dstream_ctx* spc, int fd, char* data,
664           const ssize_t data_len, const struct rdata_opt* opt )
665{
666    int m = 0;
667    ssize_t n = 0, nrcv = -1;
668    time_t start_tm = time(NULL), cur_tm = 0;
669    time_t buftm_sec = 0;
670
671    assert( spc && (data_len > 0) && opt );
672
673    /* if max_frgs < 0, read as many packets as can fit in the buffer,
674     * otherwise read no more than max_frgs packets
675     */
676
677    for( m = 0, n = 0; ((opt->max_frgs < 0) ? 1 : (m < opt->max_frgs)); ++m ) {
678        nrcv = read_packet( spc, fd, data + n, data_len - n );
679        if( nrcv <= 0 ) {
680            if( EAGAIN == errno ) {
681                (void)tmfprintf( g_flog,
682                        "Receive on socket/file [%d] timed out\n",
683                        fd);
684            }
685
686            if( 0 == nrcv ) (void)tmfprintf(g_flog, "%s - EOF\n",__func__);
687            else {
688                mperror(g_flog, errno, "%s: read/recv", __func__);
689            }
690
691            break;
692        }
693
694        if( spc->flags & F_DROP_PACKET ) {
695            spc->flags &= ~F_DROP_PACKET;
696            continue;
697        }
698
699        n += nrcv;
700        if( n >= (data_len - nrcv) ) break;
701
702        if( -1 != opt->buf_tmout ) {
703            cur_tm = time(NULL);
704            buftm_sec = cur_tm - start_tm;
705            if( buftm_sec >= opt->buf_tmout ) {
706                TRACE( (void)tmfprintf( g_flog, "%s: Buffer timed out "
707                            "after [%ld] seconds\n", __func__,
708                            (long)buftm_sec ) );
709                break;
710            }
711            /*
712            else {
713                (void) tmfprintf( g_flog, "%s: Skip\n", __func__ );
714            }
715            */
716        }
717    } /* for */
718
719    if( (nrcv > 0) && !n ) {
720        TRACE( (void)tmfprintf( g_flog, "%s: no data to send "
721                    "out of [%d] packets\n", __func__, m ) );
722        return -1;
723    }
724
725    return (nrcv > 0) ? n : -1;
726}
727
728
729/* write data to destination(s)
730 */
731ssize_t
732write_data( const struct dstream_ctx* spc,
733            const char* data,
734            const ssize_t len,
735            int fd )
736{
737    ssize_t n = 0, error = IO_ERR;
738    int32_t n_count = -1;
739
740    assert( spc && data && len );
741    if( fd <= 0 ) return 0;
742
743    if( spc->flags & F_SCATTERED ) {
744        n_count = spc->pkt_count;
745        n = writev( fd, spc->pkt, n_count );
746        if( n <= 0 ) {
747            if( EAGAIN == errno ) {
748                (void)tmfprintf( g_flog, "Write on fd=[%d] timed out\n", fd);
749                error = IO_BLK;
750            }
751            mperror( g_flog, errno, "%s: writev", __func__ );
752            return error;
753        }
754    }
755    else {
756        n = write_buf( fd, data, len, g_flog );
757        if( n < 0 )
758            error = n;
759    }
760
761    return (n > 0) ? n : error;
762}
763
764
765/* initialize incoming-stream context:
766 *      set data type (if possible) and flags
767 */
768int
769init_dstream_ctx( struct dstream_ctx* ds, const char* cmd, const char* fname,
770                  ssize_t nmsgs )
771{
772    extern const char       CMD_UDP[];
773    extern const size_t     CMD_UDP_LEN;
774    extern const char       CMD_RTP[];
775    extern const size_t     CMD_RTP_LEN;
776
777    assert( ds && cmd && (nmsgs > 0) );
778
779    ds->flags = 0;
780    ds->pkt = NULL;
781    ds->max_pkt = ds->pkt_count = 0;
782    ds->mtu = ETHERNET_MTU;
783
784    if( NULL != fname ) {
785        ds->stype = UPXDT_UNKNOWN;
786        ds->flags |= (F_CHECK_FMT | F_FILE_INPUT);
787        TRACE( (void)tmfputs( "File stream, RTP check enabled\n", g_flog ) );
788    }
789    else if( 0 == strncmp( cmd, CMD_UDP, CMD_UDP_LEN ) ) {
790        ds->stype = UPXDT_UNKNOWN;
791        ds->flags |= F_CHECK_FMT;
792        TRACE( (void)tmfputs( "UDP stream, RTP check enabled\n", g_flog ) );
793    }
794    else if( 0 == strncmp( cmd, CMD_RTP, CMD_RTP_LEN ) ) {
795        ds->stype = UPXDT_RTP_TS;
796        ds->flags |= F_SCATTERED;
797        TRACE( (void)tmfputs( "RTP (over UDP) stream assumed,"
798                    " no checks\n", g_flog ) );
799    }
800    else {
801        TRACE( (void)tmfprintf( g_flog, "%s: "
802                    "Irrelevant command [%s]\n", __func__, cmd) );
803        return -1;
804    }
805
806    ds->pkt = calloc( nmsgs, sizeof(ds->pkt[0]) );
807    if( NULL == ds->pkt ) {
808        mperror( g_flog, errno, "%s: calloc", __func__ );
809        return -1;
810    }
811
812    ds->max_pkt = nmsgs;
813    return 0;
814}
815
816
817
818/* __EOF__ */
819