1/* @(#) implementation of packet-io functions for udpxy 2 * 3 * Copyright 2008-2011 Pavel V. Cherenkov (pcherenkov@gmail.com) 4 * 5 * This file is part of udpxy. 6 * 7 * udpxy is free software: you can redistribute it and/or modify 8 * it under the terms of the GNU General Public License as published by 9 * the Free Software Foundation, either version 3 of the License, or 10 * (at your option) any later version. 11 * 12 * udpxy is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 * GNU General Public License for more details. 16 * 17 * You should have received a copy of the GNU General Public License 18 * along with udpxy. If not, see <http://www.gnu.org/licenses/>. 19 */ 20 21#include <sys/types.h> 22#include <unistd.h> 23#include <stdio.h> 24#include <assert.h> 25#include <errno.h> 26#include <stdlib.h> 27#include <sys/uio.h> 28#include <string.h> 29#include <strings.h> 30#include <time.h> 31 32#include "udpxy.h" 33#include "dpkt.h" 34#include "rtp.h" 35#include "util.h" 36#include "mtrace.h" 37 38extern FILE* g_flog; 39 40static const size_t TS_SEG_LEN = 188; 41 42/* data-stream format type */ 43enum { 44 UPXDT_UNKNOWN = 0, /* no assumptions */ 45 UPXDT_TS, /* MPEG-TS */ 46 UPXDT_RTP_TS, /* RTP over MPEG-TS */ 47 UPXDT_UDS, /* UDS file format */ 48 UPXDT_RAW /* read AS-IS */ 49}; 50static const char* upxfmt_NAME[] = { 51 "UNKNOWN", 52 "MPEG-TS", 53 "RTP-TS", 54 "UDPXY-UDS", 55 "RAW" 56}; 57static const int UPXDT_LEN = sizeof(upxfmt_NAME) / sizeof(upxfmt_NAME[0]); 58 59 60const char* 61fmt2str( upxfmt_t fmt ) 62{ 63 int ifmt = fmt; 64 65 (void) UPXDT_LEN; 66 assert( (ifmt >= 0 ) && (ifmt < UPXDT_LEN) ); 67 return upxfmt_NAME[ ifmt ]; 68} 69 70 71/* check for MPEG TS signature, complain if not found 72 */ 73static int 74ts_sigcheck( const int c, off_t offset, ssize_t len, 75 FILE* log, const char* func ) 76 77{ 78 assert( len ); 79 (void)(len); /* NOP to avoid warnings */ 80 81 if( c == MPEG_TS_SIG ) return 0; 82 83 if( log && func ) { 84 (void) offset; /* get rid of a warning if TRACE is disabled */ 85 TRACE( (void)tmfprintf( log, "%s: TS signature mismatch TS=[0x%02X], found=[0x%02X]; " 86 "offset [0x%X(%u)] of packet len=[%lu]\n", 87 func, MPEG_TS_SIG, c, offset, offset, (u_long)len ) ); 88 } 89 90 return -1; 91} 92 93 94/* determine type of stream in memory 95 */ 96upxfmt_t 97get_mstream_type( const char* data, size_t len, FILE* log ) 98{ 99 int sig = 0; 100 size_t hdrlen = 0; 101 ssize_t n = -1; 102 103 assert( data && len ); 104 105 if( len < (RTP_HDR_SIZE + 1) ) { 106 (void) tmfprintf( log, "%s: read [%ld] bytes," 107 " not enough for RTP header\n", __func__, 108 (long)n ); 109 return UPXDT_UNKNOWN; 110 } 111 112 /* if the 1st byte has MPEG-TS signature - skip further checks */ 113 sig = data[0] & 0xFF; 114 if( 0 == ts_sigcheck( sig, 0, 1, NULL /*log*/, __func__ ) ) 115 return UPXDT_TS; 116 117 /* if not RTP - quit */ 118 if( 0 == RTP_verify( data, RTP_HDR_SIZE, log ) ) 119 return UPXDT_UNKNOWN; 120 121 /* check the first byte after RTP header - should be 122 * TS signature to be RTP over TS */ 123 124 if( (0 != RTP_hdrlen( data, len, &hdrlen, log )) || 125 (len < hdrlen) ) { 126 return UPXDT_UNKNOWN; 127 } 128 129 sig = data[ hdrlen ]; 130 if( 0 != ts_sigcheck( sig, 0, 1, log, __func__ ) ) 131 return UPXDT_UNKNOWN; 132 133 return UPXDT_RTP_TS; 134} 135 136 137 138/* determine type of stream saved in file 139 */ 140upxfmt_t 141get_fstream_type( int fd, FILE* log ) 142{ 143 ssize_t n = 0; 144 off_t offset = 0, where = 0; 145 upxfmt_t dtype = UPXDT_UNKNOWN; 146 char* data = NULL; 147 148 /* read in enough data to contain extended header 149 * and beginning of payload segment */ 150 size_t len = TS_SEG_LEN + RTP_XTHDRLEN; 151 152 assert( (fd > 0) && log ); 153 154 if( NULL == (data = malloc( len )) ) { 155 mperror( log, errno, "%s: malloc", __func__ ); 156 return UPXDT_UNKNOWN; 157 } 158 159 do { 160 /* check if it is a MPEG TS stream 161 */ 162 n = read( fd, data, len ); 163 if( 0 != sizecheck( "Not enough space for stream data", 164 len, n, log, __func__ ) ) break; 165 offset += n; 166 167 dtype = get_mstream_type( data, len, log ); 168 if( UPXDT_UNKNOWN == dtype ) { 169 TRACE( (void)tmfprintf( log, "%s: file type is not recognized\n", 170 __func__ ) ); 171 dtype = UPXDT_UNKNOWN; 172 break; 173 } 174 } while(0); 175 176 if( NULL != data ) free( data ); 177 178 if( n <= 0 ) { 179 mperror( log, errno, "%s", __func__ ); 180 return UPXDT_UNKNOWN; 181 } 182 183 where = lseek( fd, (-1) * offset, SEEK_CUR ); 184 if( -1 == where ) { 185 mperror( log, errno, "%s: lseek", __func__ ); 186 return UPXDT_UNKNOWN; 187 } 188 189 /* 190 TRACE( (void)tmfprintf( log, "%s: stream type = [%d]=[%s]\n", 191 __func__, (int)dtype, fmt2str(dtype) ) ); 192 */ 193 194 return dtype; 195} 196 197 198/* read a sequence of MPEG TS packets (to fit into the given buffer) 199 */ 200static ssize_t 201read_ts_file( int fd, char* data, const size_t len, FILE* log ) 202{ 203 const size_t pkt_len = ((len - 1) / TS_SEG_LEN) * TS_SEG_LEN; 204 off_t k = 0; 205 u_int bad_frg = 0; 206 ssize_t n = -1; 207 208 assert( (fd > 0) && data && len && log); 209 210 assert( !buf_overrun( data, len, 0, pkt_len, log ) ); 211 n = read_buf( fd, data, pkt_len, log ); 212 if( n <= 0 ) return n; 213 214 if( 0 != sizecheck( "Bad TS packet stream", 215 pkt_len, n, log, __func__ ) ) 216 return -1; 217 218 /* make sure we've read TS records, not random data 219 */ 220 for( k = 0; k < (off_t)pkt_len; k += TS_SEG_LEN ) { 221 if( -1 == ts_sigcheck( data[k], k, (u_long)pkt_len, 222 log, __func__ ) ) { 223 ++bad_frg; 224 } 225 } 226 227 return (bad_frg ? -1 : n); 228} 229 230 231/* read an RTP packet 232 */ 233static ssize_t 234read_rtp_file( int fd, char* data, const size_t len, FILE* log ) 235{ 236 ssize_t nrd = -1, offset = 0; 237 size_t hdrlen = 0, rdlen = 0; 238 u_int frg = 0; 239 int rtp_end = 0, rc = 0; 240 off_t where = 0; 241 242 assert( (fd > 0) && data && len && log); 243 244 assert( !buf_overrun( data, len, 0, RTP_HDR_SIZE, log ) ); 245 nrd = read_buf( fd, data, RTP_HDR_SIZE, log ); 246 if( nrd <= 0 ) return nrd; 247 offset += nrd; 248 249 if( -1 == sizecheck( "Bad RTP header", RTP_HDR_SIZE, nrd, 250 log, __func__ ) ) 251 return -1; 252 253 if( 0 == RTP_verify( data, nrd, log ) ) 254 return -1; 255 256 if( -1 == (rc = RTP_hdrlen( data, nrd, &hdrlen, log )) ) 257 return -1; 258 259 /* if there is an extended header, read it in */ 260 if( ENOMEM == rc ) { 261 assert( !buf_overrun( data, len, offset, 262 RTP_XTHDRLEN - RTP_HDR_SIZE, log ) ); 263 nrd = read_buf( fd, data + offset, 264 RTP_XTHDRLEN - RTP_HDR_SIZE, log ); 265 if( (nrd <= 0) || 266 (-1 == sizecheck("Bad RTP x-header", 267 RTP_XTHDRLEN - RTP_HDR_SIZE, nrd, 268 log, __func__ )) ) { 269 return -1; 270 } 271 if( 0 == nrd ) return nrd; 272 273 offset += nrd; 274 rc = RTP_hdrlen( data, offset, &hdrlen, log ); 275 if( 0 != rc ) { 276 TRACE( (void)tmfprintf( log, "%s: bad RTP header - quitting\n", 277 __func__ ) ); 278 return -1; 279 } 280 281 /* 282 TRACE( (void)tmfprintf( log, "%s: RTP x-header length=[%lu]\n", 283 __func__, (u_long)hdrlen ) ); 284 */ 285 286 if( (size_t)offset > hdrlen ) { 287 /* read more than needed: step back */ 288 289 where = lseek( fd, (-1)*((size_t)offset - hdrlen), SEEK_CUR ); 290 if( -1 == where ) { 291 mperror( log, errno, "%s: lseek", __func__ ); 292 return -1; 293 } 294 295 offset -= ((size_t)offset - hdrlen); 296 assert( (size_t)offset == hdrlen ); 297 298 /* 299 TRACE( (void)tmfprintf( log, "%s: back to fpos=[0x%X], " 300 "offset=[%ld]\n", __func__, (u_int)where, (long)offset ) ); 301 */ 302 } 303 else if( hdrlen > (size_t)offset ) { 304 /* read remainder of the header in */ 305 306 assert( !buf_overrun( data, len, offset, 307 (hdrlen - (size_t)offset), log ) ); 308 nrd = read_buf( fd, data + offset, 309 (hdrlen - (size_t)offset), log ); 310 if( nrd <= 0 || 311 (-1 == sizecheck("Bad RTP x-header tail", 312 (hdrlen - (size_t)offset), nrd, 313 log, __func__ )) ) { 314 return -1; 315 } 316 if( 0 == nrd ) return nrd; 317 318 offset += nrd; 319 assert( (size_t)offset == hdrlen ); 320 } 321 } /* read extended header */ 322 323 324 /* read TS records until there is another RTP header or EOF */ 325 for( frg = 0; (ssize_t)len > offset; ++frg ) { 326 327 rdlen = ( (len - offset) < TS_SEG_LEN 328 ? (len - offset) 329 : TS_SEG_LEN ); 330 331 /* 332 TRACE( (void)tmfprintf( log, "%s: reading [%lu] more bytes\n", 333 __func__, (u_long)rdlen ) ); 334 */ 335 336 assert( !buf_overrun( data, len, offset, rdlen, log ) ); 337 nrd = read_buf( fd, data + offset, rdlen, log ); 338 if( nrd <= 0 ) break; 339 340 /* if it's an RTP header, roll back and return 341 */ 342 rtp_end = RTP_verify( data + offset, nrd, log ); 343 if( 1 == rtp_end ) { 344 if( -1 == lseek( fd, (-1) * nrd, SEEK_CUR ) ) { 345 mperror( log, errno, "%s: lseek", __func__ ); 346 return -1; 347 } 348 349 break; 350 } 351 352 /* check if it is a TS packet and it's of the right size 353 */ 354 if( (-1 == ts_sigcheck( data[offset], offset, (u_long)TS_SEG_LEN, 355 log, __func__ )) || 356 (-1 == sizecheck( "Bad TS segment size", TS_SEG_LEN, nrd, 357 log, __func__ )) ) { 358 TRACE( hex_dump( "Data in question", data, offset + nrd, log ) ); 359 return -1; 360 } 361 362 offset += nrd; 363 } /* for */ 364 365 366 /* If it is not EOF and no RTP header for the next message is found, 367 * it is either our buffer is too small (to fit the whole message) 368 * or the stream is invalid 369 */ 370 if( !rtp_end && (0 != nrd) ) { 371 (void)tmfprintf( log, "%s: no RTP end after reading [%ld] bytes\n", 372 __func__, (long)offset ); 373 return -1; 374 } 375 376 return (nrd < 0) ? nrd : offset; 377} 378 379 380/* read record of one of the supported types from file 381 */ 382ssize_t 383read_frecord( int fd, char* data, const size_t len, 384 upxfmt_t* stream_type, FILE* log ) 385{ 386 upxfmt_t stype; 387 /* off_t where = -1, endmark = -1; */ 388 ssize_t nrd = -1; 389 390 assert( fd > 0 ); 391 assert( data && len ); 392 assert( stream_type && log ); 393 394 stype = *stream_type; 395 396 /* 397 where = lseek( fd, 0, SEEK_CUR ); 398 TRACE( (void)tmfprintf( log, "%s: BEGIN reading at pos=[0x%X:%u]\n", 399 __func__, (u_int)where, (u_int)where ) ); 400 */ 401 402 if( UPXDT_UNKNOWN == *stream_type ) { 403 stype = get_fstream_type( fd, log ); 404 405 if( UPXDT_UNKNOWN == stype ) { 406 (void)tmfprintf( log, "%s: Unsupported type\n", __func__ ); 407 return -1; 408 } 409 410 *stream_type = stype; 411 } /* UPXDT_UNKNOWN */ 412 413 if( UPXDT_TS == stype ) { 414 nrd = read_ts_file( fd, data, len, log ); 415 } 416 else if( UPXDT_RTP_TS == stype ) { 417 nrd = read_rtp_file( fd, data, len, log ); 418 } 419 else { 420 (void)tmfprintf( log, "%s: unknown stream type [%d]\n", 421 __func__, stype ); 422 return -1; 423 } 424 425 /* 426 if( nrd >= 0 ) { 427 endmark = lseek( fd, 0, SEEK_CUR ); 428 429 TRACE( (void)tmfprintf( log, "%s: END reading [%ld] bytes at pos=[0x%X:%u]\n", 430 __func__, (long)nrd, (u_int)endmark, (u_int)endmark ) ); 431 432 TRACE( sizecheck( "WARNING: Read file discrepancy", 433 where + nrd, endmark, 434 log, __func__ ) ); 435 } 436 */ 437 438 return nrd; 439} 440 441 442/* write data as a UDS record 443 */ 444static ssize_t 445write_uds_record( int fd, const char* data, size_t len, FILE* log ) 446{ 447 assert( (fd > 0) && data && len ); 448 (void)(data && len && fd); 449 (void)tmfprintf( log, "%s: UDS conversion not yet implemented\n", 450 __func__ ); 451 return -1; 452} 453 454 455/* write RTP record into TS stream 456 */ 457static ssize_t 458write_rtp2ts( int fd, const char* data, size_t len, FILE* log ) 459{ 460 void* buf = (void*)data; 461 size_t pldlen = len; 462 const int NO_VERIFY = 0; 463 int rc = 0; 464 465 assert( (fd > 0) && data && len && log ); 466 467 rc = RTP_process( &buf, &pldlen, NO_VERIFY, log ); 468 if( -1 == rc ) return -1; 469 470 assert( !buf_overrun( buf, len, 0, pldlen, log ) ); 471 return write_buf( fd, buf, pldlen, log ); 472} 473 474 475/* write record after converting it from source into destination 476 * format 477 */ 478ssize_t 479write_frecord( int fd, const char* data, size_t len, 480 upxfmt_t sfmt, upxfmt_t dfmt, FILE* log ) 481{ 482 ssize_t nwr = -1; 483 int fmt_ok = 0; 484 const char *str_from = NULL, *str_to = NULL; 485 486 if( UPXDT_UDS == dfmt ) { 487 fmt_ok = 1; 488 nwr = write_uds_record( fd, data, len, log ); 489 } 490 else if( UPXDT_TS == dfmt ) { 491 if( UPXDT_RTP_TS == sfmt ) { 492 fmt_ok = 1; 493 nwr = write_rtp2ts( fd, data, len, log ); 494 } 495 } 496 497 if( !fmt_ok ) { 498 str_from = fmt2str(sfmt); 499 str_to = fmt2str(dfmt); 500 (void)tmfprintf( log, "Conversion from [%s] into [%s] is not supported\n", 501 str_from, str_to ); 502 return -1; 503 } 504 505 return nwr; 506} 507 508 509/* reset packet-buffer registry in stream spec 510 */ 511void 512reset_pkt_registry( struct dstream_ctx* ds ) 513{ 514 assert( ds ); 515 516 ds->flags &= ~F_DROP_PACKET; 517 ds->pkt_count = 0; 518} 519 520 521/* release resources allocated for stream spec 522 */ 523void 524free_dstream_ctx( struct dstream_ctx* ds ) 525{ 526 assert( ds ); 527 528 if( NULL != ds->pkt ) { 529 free( ds->pkt ); 530 ds->pkt = NULL; 531 532 ds->pkt_count = ds->max_pkt = 0; 533 } 534} 535 536 537 538/* register received packet into registry (for scattered output) 539 */ 540static int 541register_packet( struct dstream_ctx* spc, char* buf, size_t len ) 542{ 543 struct iovec* new_pkt = NULL; 544 static const int DO_VERIFY = 1; 545 546 void* new_buf = buf; 547 size_t new_len = len; 548 549 assert( spc->max_pkt > 0 ); 550 551 /* enlarge packet registry if needed */ 552 if( spc->pkt_count >= spc->max_pkt ) { 553 spc->max_pkt <<= 1; 554 spc->pkt = realloc( spc->pkt, spc->max_pkt * sizeof(spc->pkt[0]) ); 555 if( NULL == spc->pkt ) { 556 mperror( g_flog, errno, "%s: realloc", __func__ ); 557 return -1; 558 } 559 560 TRACE( (void)tmfprintf( g_flog, "RTP packet registry " 561 "expanded to [%lu] records\n", (u_long)spc->max_pkt ) ); 562 } 563 564 /* put packet info into registry */ 565 566 new_pkt = &(spc->pkt[ spc->pkt_count ]); 567 568 /* 569 TRACE( (void)tmfprintf( stderr, "IN: packet [%lu]: buf=[%p], len=[%lu]\n", 570 (u_long)spc->pkt_count, (void*)buf, (u_long)len ) ); 571 */ 572 573 if( 0 != RTP_process( &new_buf, &new_len, DO_VERIFY, g_flog ) ) { 574 TRACE( (void)tmfputs("register packet: dropping\n", g_flog) ); 575 spc->flags |= F_DROP_PACKET; 576 577 return 0; 578 } 579 580 new_pkt->iov_base = new_buf; 581 new_pkt->iov_len = new_len; 582 583 /* 584 TRACE( (void)tmfprintf( stderr, "OUT: packet [%lu]: buf=[%p], len=[%lu]\n", 585 (u_long)spc->pkt_count, new_pkt->iov_base, 586 (u_long)new_pkt->iov_len ) ); 587 */ 588 589 spc->pkt_count++; 590 return 0; 591} 592 593 594/* read data from source, determine underlying protocol 595 * (if not already known); and process the packet 596 * if needed (for RTP - register packet) 597 * 598 * return the number of octets read from the source 599 * into the buffer 600 */ 601static ssize_t 602read_packet( struct dstream_ctx* spc, int fd, char* buf, size_t len ) 603{ 604 ssize_t n = -1; 605 size_t chunk_len = len; 606 607 assert( spc && buf && len ); 608 assert( fd > 0 ); 609 610 /* if *RAW* data specified - read AS IS 611 * and exit */ 612 if( UPXDT_RAW == spc->stype ) { 613 return read_buf( fd, buf, len, g_flog ); 614 } 615 616 /* if it is (or *could* be) RTP, read only MTU bytes 617 */ 618 if( (spc->stype == UPXDT_RTP_TS) || (spc->flags & F_CHECK_FMT) ) 619 chunk_len = (len > spc->mtu) ? spc->mtu : len; 620 621 if( spc->flags & F_FILE_INPUT ) { 622 assert( !buf_overrun( buf, len, 0, chunk_len, g_flog ) ); 623 n = read_frecord( fd, buf, chunk_len, &(spc->stype), g_flog ); 624 if( n <= 0 ) return n; 625 } 626 else { 627 assert( !buf_overrun(buf, len, 0, chunk_len, g_flog) ); 628 n = read_buf( fd, buf, chunk_len, g_flog ); 629 if( n <= 0 ) return n; 630 } 631 632 if( spc->flags & F_CHECK_FMT ) { 633 spc->stype = get_mstream_type( buf, n, g_flog ); 634 switch (spc->stype) { 635 case UPXDT_RTP_TS: 636 /* scattered: exclude RTP headers */ 637 spc->flags |= F_SCATTERED; break; 638 case UPXDT_TS: 639 spc->flags &= ~F_SCATTERED; break; 640 default: 641 spc->stype = UPXDT_RAW; 642 TRACE( (void)tmfputs( "Unrecognized stream type\n", g_flog ) ); 643 break; 644 } /* switch */ 645 646 TRACE( (void)tmfprintf( g_flog, "Established stream as [%s]\n", 647 fmt2str( spc->stype ) ) ); 648 649 spc->flags &= ~F_CHECK_FMT; 650 } 651 652 if( spc->flags & F_SCATTERED ) 653 if( -1 == register_packet( spc, buf, n ) ) return -1; 654 655 return n; 656} 657 658 659/* read data from source of specified type (UDP socket or otherwise); 660 * read as many fragments as specified (max_frgs) into the buffer 661 */ 662ssize_t 663read_data( struct dstream_ctx* spc, int fd, char* data, 664 const ssize_t data_len, const struct rdata_opt* opt ) 665{ 666 int m = 0; 667 ssize_t n = 0, nrcv = -1; 668 time_t start_tm = time(NULL), cur_tm = 0; 669 time_t buftm_sec = 0; 670 671 assert( spc && (data_len > 0) && opt ); 672 673 /* if max_frgs < 0, read as many packets as can fit in the buffer, 674 * otherwise read no more than max_frgs packets 675 */ 676 677 for( m = 0, n = 0; ((opt->max_frgs < 0) ? 1 : (m < opt->max_frgs)); ++m ) { 678 nrcv = read_packet( spc, fd, data + n, data_len - n ); 679 if( nrcv <= 0 ) { 680 if( EAGAIN == errno ) { 681 (void)tmfprintf( g_flog, 682 "Receive on socket/file [%d] timed out\n", 683 fd); 684 } 685 686 if( 0 == nrcv ) (void)tmfprintf(g_flog, "%s - EOF\n",__func__); 687 else { 688 mperror(g_flog, errno, "%s: read/recv", __func__); 689 } 690 691 break; 692 } 693 694 if( spc->flags & F_DROP_PACKET ) { 695 spc->flags &= ~F_DROP_PACKET; 696 continue; 697 } 698 699 n += nrcv; 700 if( n >= (data_len - nrcv) ) break; 701 702 if( -1 != opt->buf_tmout ) { 703 cur_tm = time(NULL); 704 buftm_sec = cur_tm - start_tm; 705 if( buftm_sec >= opt->buf_tmout ) { 706 TRACE( (void)tmfprintf( g_flog, "%s: Buffer timed out " 707 "after [%ld] seconds\n", __func__, 708 (long)buftm_sec ) ); 709 break; 710 } 711 /* 712 else { 713 (void) tmfprintf( g_flog, "%s: Skip\n", __func__ ); 714 } 715 */ 716 } 717 } /* for */ 718 719 if( (nrcv > 0) && !n ) { 720 TRACE( (void)tmfprintf( g_flog, "%s: no data to send " 721 "out of [%d] packets\n", __func__, m ) ); 722 return -1; 723 } 724 725 return (nrcv > 0) ? n : -1; 726} 727 728 729/* write data to destination(s) 730 */ 731ssize_t 732write_data( const struct dstream_ctx* spc, 733 const char* data, 734 const ssize_t len, 735 int fd ) 736{ 737 ssize_t n = 0, error = IO_ERR; 738 int32_t n_count = -1; 739 740 assert( spc && data && len ); 741 if( fd <= 0 ) return 0; 742 743 if( spc->flags & F_SCATTERED ) { 744 n_count = spc->pkt_count; 745 n = writev( fd, spc->pkt, n_count ); 746 if( n <= 0 ) { 747 if( EAGAIN == errno ) { 748 (void)tmfprintf( g_flog, "Write on fd=[%d] timed out\n", fd); 749 error = IO_BLK; 750 } 751 mperror( g_flog, errno, "%s: writev", __func__ ); 752 return error; 753 } 754 } 755 else { 756 n = write_buf( fd, data, len, g_flog ); 757 if( n < 0 ) 758 error = n; 759 } 760 761 return (n > 0) ? n : error; 762} 763 764 765/* initialize incoming-stream context: 766 * set data type (if possible) and flags 767 */ 768int 769init_dstream_ctx( struct dstream_ctx* ds, const char* cmd, const char* fname, 770 ssize_t nmsgs ) 771{ 772 extern const char CMD_UDP[]; 773 extern const size_t CMD_UDP_LEN; 774 extern const char CMD_RTP[]; 775 extern const size_t CMD_RTP_LEN; 776 777 assert( ds && cmd && (nmsgs > 0) ); 778 779 ds->flags = 0; 780 ds->pkt = NULL; 781 ds->max_pkt = ds->pkt_count = 0; 782 ds->mtu = ETHERNET_MTU; 783 784 if( NULL != fname ) { 785 ds->stype = UPXDT_UNKNOWN; 786 ds->flags |= (F_CHECK_FMT | F_FILE_INPUT); 787 TRACE( (void)tmfputs( "File stream, RTP check enabled\n", g_flog ) ); 788 } 789 else if( 0 == strncmp( cmd, CMD_UDP, CMD_UDP_LEN ) ) { 790 ds->stype = UPXDT_UNKNOWN; 791 ds->flags |= F_CHECK_FMT; 792 TRACE( (void)tmfputs( "UDP stream, RTP check enabled\n", g_flog ) ); 793 } 794 else if( 0 == strncmp( cmd, CMD_RTP, CMD_RTP_LEN ) ) { 795 ds->stype = UPXDT_RTP_TS; 796 ds->flags |= F_SCATTERED; 797 TRACE( (void)tmfputs( "RTP (over UDP) stream assumed," 798 " no checks\n", g_flog ) ); 799 } 800 else { 801 TRACE( (void)tmfprintf( g_flog, "%s: " 802 "Irrelevant command [%s]\n", __func__, cmd) ); 803 return -1; 804 } 805 806 ds->pkt = calloc( nmsgs, sizeof(ds->pkt[0]) ); 807 if( NULL == ds->pkt ) { 808 mperror( g_flog, errno, "%s: calloc", __func__ ); 809 return -1; 810 } 811 812 ds->max_pkt = nmsgs; 813 return 0; 814} 815 816 817 818/* __EOF__ */ 819