1// 2// This file is part of the aMule Project. 3// 4// Copyright (c) 2004-2011 aMule Team ( admin@amule.org / http://www.amule.org ) 5// Copyright (c) 2004-2011 Angel Vidal ( kry@amule.org ) 6// 7// Any parts of this program derived from the xMule, lMule or eMule project, 8// or contributed by third-party developers are copyrighted by their 9// respective authors. 10// 11// This program is free software; you can redistribute it and/or modify 12// it under the terms of the GNU General Public License as published by 13// the Free Software Foundation; either version 2 of the License, or 14// (at your option) any later version. 15// 16// This program is distributed in the hope that it will be useful, 17// but WITHOUT ANY WARRANTY; without even the implied warranty of 18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19// GNU General Public License for more details. 20// 21// You should have received a copy of the GNU General Public License 22// along with this program; if not, write to the Free Software 23// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA 24// 25 26#include "ECSocket.h" 27 28#include <sstream> 29#include <iostream> 30#include <algorithm> 31 32using namespace std; 33 34#include "ECPacket.h" // Needed for CECPacket 35#include "../../../Logger.h" 36#include <common/Format.h> // Needed for CFormat 37 38#define EC_COMPRESSION_LEVEL Z_DEFAULT_COMPRESSION 39#define EC_MAX_UNCOMPRESSED 1024 40 41#ifndef __GNUC__ 42#define __attribute__(x) 43#endif 44 45// If your compiler gives errors on these lines, just remove them. 46int utf8_mbtowc(wchar_t *p, const unsigned char *s, int n) __attribute__((__visibility__("internal"))); 47int utf8_wctomb(unsigned char *s, wchar_t wc, int maxlen) __attribute__((__visibility__("internal"))); 48int utf8_mb_remain(char c) __attribute__((__pure__)); 49 50/*----------=> Import from the Linux kernel <=----------*/ 51/* 52 * linux/fs/nls_base.c 53 */ 54 55/* 56 * Sample implementation from Unicode home page. 57 * http://www.stonehand.com/unicode/standard/fss-utf.html 58 */ 59struct utf8_table { 60 int cmask; 61 int cval; 62 int shift; 63 uint32_t lmask; 64 uint32_t lval; 65}; 66 67static const struct utf8_table utf8_table[] = 68{ 69 {0x80, 0x00, 0*6, 0x7F, 0, /* 1 byte sequence */}, 70 {0xE0, 0xC0, 1*6, 0x7FF, 0x80, /* 2 byte sequence */}, 71 {0xF0, 0xE0, 2*6, 0xFFFF, 0x800, /* 3 byte sequence */}, 72 {0xF8, 0xF0, 3*6, 0x1FFFFF, 0x10000, /* 4 byte sequence */}, 73 {0xFC, 0xF8, 4*6, 0x3FFFFFF, 0x200000, /* 5 byte sequence */}, 74 {0xFE, 0xFC, 5*6, 0x7FFFFFFF, 0x4000000, /* 6 byte sequence */}, 75 {0, 0, 0, 0, 0, /* end of table */} 76}; 77 78int utf8_mbtowc(uint32_t *p, const unsigned char *s, int n) 79{ 80 uint32_t l; 81 int c0, c, nc; 82 const struct utf8_table *t; 83 84 nc = 0; 85 c0 = *s; 86 l = c0; 87 for (t = utf8_table; t->cmask; t++) { 88 nc++; 89 if ((c0 & t->cmask) == t->cval) { 90 l &= t->lmask; 91 if (l < t->lval) 92 return -1; 93 *p = l; 94 return nc; 95 } 96 if (n <= nc) 97 return -1; 98 s++; 99 c = (*s ^ 0x80) & 0xFF; 100 if (c & 0xC0) 101 return -1; 102 l = (l << 6) | c; 103 } 104 return -1; 105} 106 107int utf8_wctomb(unsigned char *s, uint32_t wc, int maxlen) 108{ 109 uint32_t l; 110 int c, nc; 111 const struct utf8_table *t; 112 113 l = wc; 114 nc = 0; 115 for (t = utf8_table; t->cmask && maxlen; t++, maxlen--) { 116 nc++; 117 if (l <= t->lmask) { 118 c = t->shift; 119 *s = t->cval | (l >> c); 120 while (c > 0) { 121 c -= 6; 122 s++; 123 *s = 0x80 | ((l >> c) & 0x3F); 124 } 125 return nc; 126 } 127 } 128 return -1; 129} 130/*----------=> End of Import <=----------*/ 131 132int utf8_mb_remain(char c) 133{ 134 int i; 135 for (i = 0; i < 5; ++i) { 136 if ((c & utf8_table[i].cmask) == utf8_table[i].cval) break; 137 } 138 return i; 139} 140 141 142void CQueuedData::Write(const void *data, size_t len) 143{ 144 const size_t canWrite = std::min(GetRemLength(), len); 145 wxASSERT(len == canWrite); 146 147 memcpy(m_wr_ptr, data, canWrite); 148 m_wr_ptr += canWrite; 149} 150 151 152void CQueuedData::WriteAt(const void *data, size_t len, size_t offset) 153{ 154 wxASSERT(len + offset <= m_data.size()); 155 if (offset > m_data.size()) { 156 return; 157 } else if (offset + len > m_data.size()) { 158 len = m_data.size() - offset; 159 } 160 161 memcpy(&m_data[0] + offset, data, len); 162} 163 164 165void CQueuedData::Read(void *data, size_t len) 166{ 167 const size_t canRead = std::min(GetUnreadDataLength(), len); 168 wxASSERT(len == canRead); 169 170 memcpy(data, m_rd_ptr, canRead); 171 m_rd_ptr += canRead; 172} 173 174 175void CQueuedData::WriteToSocket(CECSocket *sock) 176{ 177 wxCHECK_RET(m_rd_ptr < m_wr_ptr, 178 wxT("Reading past written data in WriteToSocket")); 179 180 sock->SocketWrite(m_rd_ptr, GetUnreadDataLength()); 181 m_rd_ptr += sock->GetLastCount(); 182} 183 184 185void CQueuedData::ReadFromSocket(CECSocket *sock, size_t len) 186{ 187 const size_t canWrite = std::min(GetRemLength(), len); 188 wxASSERT(len == canWrite); 189 190 sock->SocketRead(m_wr_ptr, canWrite); 191 m_wr_ptr += sock->GetLastCount(); 192} 193 194 195size_t CQueuedData::ReadFromSocketAll(CECSocket *sock, size_t len) 196{ 197 size_t read_rem = std::min(GetRemLength(), len); 198 wxASSERT(read_rem == len); 199 200 // We get here when socket is truly blocking 201 do { 202 // Give socket a 10 sec chance to recv more data. 203 if ( !sock->WaitSocketRead(10, 0) ) { 204 AddDebugLogLineN(logEC, wxT("ReadFromSocketAll: socket is blocking")); 205 break; 206 } 207 208 wxASSERT(m_wr_ptr + read_rem <= &m_data[0] + m_data.size()); 209 sock->SocketRead(m_wr_ptr, read_rem); 210 m_wr_ptr += sock->GetLastCount(); 211 read_rem -= sock->GetLastCount(); 212 213 if (sock->SocketRealError()) { 214 AddDebugLogLineN(logEC, wxT("ReadFromSocketAll: socket error")); 215 break; 216 } 217 } while (read_rem); 218 219 return len - read_rem; 220} 221 222 223size_t CQueuedData::GetLength() const 224{ 225 return m_data.size(); 226} 227 228 229size_t CQueuedData::GetDataLength() const 230{ 231 const size_t len = m_wr_ptr - &m_data[0]; 232 wxCHECK_MSG(len <= m_data.size(), m_data.size(), 233 wxT("Write-pointer past end of buffer")); 234 235 return len; 236} 237 238 239size_t CQueuedData::GetRemLength() const 240{ 241 return m_data.size() - GetDataLength(); 242} 243 244 245size_t CQueuedData::GetUnreadDataLength() const 246{ 247 wxCHECK_MSG(m_wr_ptr >= m_rd_ptr, 0, 248 wxT("Read position past write position.")); 249 250 return m_wr_ptr - m_rd_ptr; 251} 252 253 254 255// 256// CECSocket API - User interface functions 257// 258 259CECSocket::CECSocket(bool use_events) 260: 261m_use_events(use_events), 262m_in_ptr(EC_SOCKET_BUFFER_SIZE), 263m_out_ptr(EC_SOCKET_BUFFER_SIZE), 264m_curr_rx_data(new CQueuedData(EC_SOCKET_BUFFER_SIZE)), 265m_curr_tx_data(new CQueuedData(EC_SOCKET_BUFFER_SIZE)), 266m_rx_flags(0), 267m_tx_flags(0), 268// setup initial state: 4 flags + 4 length 269m_bytes_needed(EC_HEADER_SIZE), 270m_in_header(true), 271m_my_flags(0x20), 272m_haveNotificationSupport(false) 273{ 274 275} 276 277CECSocket::~CECSocket() 278{ 279 while (!m_output_queue.empty()) { 280 CQueuedData *data = m_output_queue.front(); 281 m_output_queue.pop_front(); 282 delete data; 283 } 284} 285 286bool CECSocket::ConnectSocket(uint32_t ip, uint16_t port) 287{ 288 bool res = InternalConnect(ip, port, !m_use_events); 289 return !SocketError() && res; 290} 291 292void CECSocket::SendPacket(const CECPacket *packet) 293{ 294 uint32 len = WritePacket(packet); 295 packet->DebugPrint(false, len); 296 OnOutput(); 297} 298 299const CECPacket *CECSocket::SendRecvPacket(const CECPacket *packet) 300{ 301 SendPacket(packet); 302 303 if (m_curr_rx_data->ReadFromSocketAll(this, EC_HEADER_SIZE) != EC_HEADER_SIZE 304 || SocketError() // This is a synchronous read, so WouldBlock is an error too. 305 || !ReadHeader()) { 306 OnError(); 307 AddDebugLogLineN(logEC, wxT("SendRecvPacket: error")); 308 return 0; 309 } 310 if (m_curr_rx_data->ReadFromSocketAll(this, m_curr_packet_len) != m_curr_packet_len 311 || SocketError()) { 312 OnError(); 313 AddDebugLogLineN(logEC, wxT("SendRecvPacket: error")); 314 return 0; 315 } 316 const CECPacket *reply = ReadPacket(); 317 m_curr_rx_data->Rewind(); 318 return reply; 319} 320 321std::string CECSocket::GetLastErrorMsg() 322{ 323 int code = InternalGetLastError(); 324 switch(code) { 325 case EC_ERROR_NOERROR: 326 return "No error happened"; 327 case EC_ERROR_INVOP: 328 return "Invalid operation"; 329 case EC_ERROR_IOERR: 330 return "Input/Output error"; 331 case EC_ERROR_INVADDR: 332 return "Invalid address passed to wxSocket"; 333 case EC_ERROR_INVSOCK: 334 return "Invalid socket (uninitialized)"; 335 case EC_ERROR_NOHOST: 336 return "No corresponding host"; 337 case EC_ERROR_INVPORT: 338 return "Invalid port"; 339 case EC_ERROR_WOULDBLOCK: 340 return "The socket is non-blocking and the operation would block"; 341 case EC_ERROR_TIMEDOUT: 342 return "The timeout for this operation expired"; 343 case EC_ERROR_MEMERR: 344 return "Memory exhausted"; 345 } 346 ostringstream error_string; 347 error_string << "Error code " << code << " unknown."; 348 return error_string.str(); 349} 350 351bool CECSocket::SocketRealError() 352{ 353 bool ret = false; 354 if (InternalError()) { 355 int lastError = InternalGetLastError(); 356 ret = lastError != EC_ERROR_NOERROR && lastError != EC_ERROR_WOULDBLOCK; 357 } 358 return ret; 359} 360 361void CECSocket::OnError() 362{ 363#ifdef __DEBUG__ 364 cout << GetLastErrorMsg() << endl; 365#endif 366} 367 368void CECSocket::OnLost() 369{ 370} 371 372// 373// Event handlers 374// 375void CECSocket::OnConnect() 376{ 377} 378 379void CECSocket::OnInput() 380{ 381 size_t bytes_rx = 0; 382 do { 383 m_curr_rx_data->ReadFromSocket(this, m_bytes_needed); 384 if (SocketRealError()) { 385 AddDebugLogLineN(logEC, wxT("OnInput: socket error")); 386 OnError(); 387 // socket already disconnected in this point 388 return; 389 } 390 bytes_rx = GetLastCount(); 391 m_bytes_needed -= bytes_rx; 392 393 if (m_bytes_needed == 0) { 394 if (m_in_header) { 395 m_in_header = false; 396 if (!ReadHeader()) { 397 AddDebugLogLineN(logEC, wxT("OnInput: header error")); 398 return; 399 } 400 } else { 401 std::auto_ptr<const CECPacket> packet(ReadPacket()); 402 m_curr_rx_data->Rewind(); 403 if (packet.get()) { 404 std::auto_ptr<const CECPacket> reply(OnPacketReceived(packet.get(), m_curr_packet_len)); 405 if (reply.get()) { 406 SendPacket(reply.get()); 407 } 408 } else { 409 AddDebugLogLineN(logEC, wxT("OnInput: no packet")); 410 } 411 m_bytes_needed = EC_HEADER_SIZE; 412 m_in_header = true; 413 } 414 } 415 } while (bytes_rx); 416} 417 418void CECSocket::OnOutput() 419{ 420 while (!m_output_queue.empty()) { 421 CQueuedData* data = m_output_queue.front(); 422 data->WriteToSocket(this); 423 if (!data->GetUnreadDataLength()) { 424 m_output_queue.pop_front(); 425 delete data; 426 } 427 if (SocketError()) { 428 if (!WouldBlock()) { 429 // real error, abort 430 AddDebugLogLineN(logEC, wxT("OnOutput: socket error")); 431 OnError(); 432 return; 433 } 434 // Now it's just a blocked socket. 435 if ( m_use_events ) { 436 // Event driven logic: return, OnOutput() will be called again later 437 return; 438 } 439 // Syncronous call: wait (for max 10 secs) 440 if ( !WaitSocketWrite(10, 0) ) { 441 // Still not through ? 442 if (WouldBlock()) { 443 // WouldBlock() is only EAGAIN or EWOULD_BLOCK, 444 // and those shouldn't create an infinite wait. 445 // So give it another chance. 446 continue; 447 } else { 448 AddDebugLogLineN(logEC, wxT("OnOutput: socket error in sync wait")); 449 OnError(); 450 break; 451 } 452 } 453 } 454 } 455 // 456 // All outstanding data sent to socket 457 // (used for push clients) 458 // 459 WriteDoneAndQueueEmpty(); 460} 461 462bool CECSocket::DataPending() 463{ 464 return !m_output_queue.empty(); 465} 466 467// 468// Socket I/O 469// 470 471size_t CECSocket::ReadBufferFromSocket(void *buffer, size_t required_len) 472{ 473 wxASSERT(required_len); 474 475 if (m_curr_rx_data->GetUnreadDataLength() < required_len) { 476 // need more data that we have. Looks like nothing will help here 477 AddDebugLogLineN(logEC, CFormat(wxT("ReadBufferFromSocket: not enough data (%d < %d)")) 478 % m_curr_rx_data->GetUnreadDataLength() % required_len); 479 return 0; 480 } 481 m_curr_rx_data->Read(buffer, required_len); 482 return required_len; 483} 484 485void CECSocket::WriteBufferToSocket(const void *buffer, size_t len) 486{ 487 unsigned char *wr_ptr = (unsigned char *)buffer; 488 while ( len ) { 489 size_t curr_free = m_curr_tx_data->GetRemLength(); 490 if ( len > curr_free ) { 491 492 m_curr_tx_data->Write(wr_ptr, curr_free); 493 len -= curr_free; 494 wr_ptr += curr_free; 495 m_output_queue.push_back(m_curr_tx_data.release()); 496 m_curr_tx_data.reset(new CQueuedData(EC_SOCKET_BUFFER_SIZE)); 497 } else { 498 m_curr_tx_data->Write(wr_ptr, len); 499 break; 500 } 501 } 502} 503 504 505// 506// ZLib "error handler" 507// 508 509void ShowZError(int zerror, z_streamp strm) 510{ 511 const char *p = NULL; 512 513 switch (zerror) { 514 case Z_STREAM_END: p = "Z_STREAM_END"; break; 515 case Z_NEED_DICT: p = "Z_NEED_DICT"; break; 516 case Z_ERRNO: p = "Z_ERRNO"; break; 517 case Z_STREAM_ERROR: p = "Z_STREAM_ERROR"; break; 518 case Z_DATA_ERROR: p = "Z_DATA_ERROR"; break; 519 case Z_MEM_ERROR: p = "Z_MEM_ERROR"; break; 520 case Z_BUF_ERROR: p = "Z_BUF_ERROR"; break; 521 case Z_VERSION_ERROR: p = "Z_VERSION_ERROR"; break; 522 } 523 printf("ZLib operation returned %s\n", p); 524 printf("ZLib error message: %s\n", strm->msg); 525 printf("zstream state:\n\tnext_in=%p\n\tavail_in=%u\n\ttotal_in=%lu\n\tnext_out=%p\n\tavail_out=%u\n\ttotal_out=%lu\n", 526 strm->next_in, strm->avail_in, strm->total_in, strm->next_out, strm->avail_out, strm->total_out); 527 AddDebugLogLineN(logEC, wxT("ZLib error")); 528} 529 530 531bool CECSocket::ReadHeader() 532{ 533 m_curr_rx_data->Read(&m_rx_flags, 4); 534 m_rx_flags = ENDIAN_NTOHL(m_rx_flags); 535 m_curr_rx_data->Read(&m_curr_packet_len, 4); 536 m_curr_packet_len = ENDIAN_NTOHL(m_curr_packet_len); 537 m_bytes_needed = m_curr_packet_len; 538 // packet bigger that 16Mb looks more like broken request 539 if (m_bytes_needed > 16*1024*1024) { 540 AddDebugLogLineN(logEC, CFormat(wxT("ReadHeader: packet too big: %d")) % m_bytes_needed); 541 CloseSocket(); 542 return false; 543 } 544 m_curr_rx_data->Rewind(); 545 size_t currLength = m_curr_rx_data->GetLength(); 546 // resize input buffer if 547 // a) too small or 548 if (currLength < m_bytes_needed 549 // b) way too large (free data again after receiving huge packets) 550 || m_bytes_needed + EC_SOCKET_BUFFER_SIZE * 10 < currLength) { 551 // Client socket: IsAuthorized() is always true 552 // Server socket: do not allow growing of internal buffers before succesfull login. 553 // Otherwise sending a simple header with bogus length of 16MB-1 will crash an embedded 554 // client with memory exhaustion. 555 if (!IsAuthorized()) { 556 AddDebugLogLineN(logEC, CFormat(wxT("ReadHeader: resize (%d -> %d) on non autorized socket")) % currLength % m_bytes_needed); 557 CloseSocket(); 558 return false; 559 } 560 // Don't make buffer smaller than EC_SOCKET_BUFFER_SIZE 561 size_t bufSize = m_bytes_needed; 562 if (bufSize < EC_SOCKET_BUFFER_SIZE) { 563 bufSize = EC_SOCKET_BUFFER_SIZE; 564 } 565 m_curr_rx_data.reset(new CQueuedData(bufSize)); 566 } 567 if (ECLogIsEnabled()) { 568 DoECLogLine(CFormat(wxT("< %d ...")) % m_bytes_needed); 569 } 570 return true; 571} 572 573 574bool CECSocket::ReadNumber(void *buffer, size_t len) 575{ 576 if (m_rx_flags & EC_FLAG_UTF8_NUMBERS) { 577 unsigned char mb[6]; 578 uint32_t wc; 579 if (!ReadBuffer(mb, 1)) return false; 580 int remains = utf8_mb_remain(mb[0]); 581 if (remains) if (!ReadBuffer(&(mb[1]), remains)) return false; 582 if (utf8_mbtowc(&wc, mb, 6) == -1) return false; // Invalid UTF-8 code sequence 583 switch (len) { 584 case 1: PokeUInt8( buffer, wc ); break; 585 case 2: RawPokeUInt16( buffer, wc ); break; 586 case 4: RawPokeUInt32( buffer, wc ); break; 587 } 588 } else { 589 if ( !ReadBuffer(buffer, len) ) { 590 return false; 591 } 592 switch (len) { 593 case 2: 594 RawPokeUInt16( buffer, ENDIAN_NTOHS( RawPeekUInt16( buffer ) ) ); 595 break; 596 case 4: 597 RawPokeUInt32( buffer, ENDIAN_NTOHL( RawPeekUInt32( buffer ) ) ); 598 break; 599 } 600 } 601 return true; 602} 603 604bool CECSocket::WriteNumber(const void *buffer, size_t len) 605{ 606 if (m_tx_flags & EC_FLAG_UTF8_NUMBERS) { 607 unsigned char mb[6]; 608 uint32_t wc = 0; 609 int mb_len; 610 switch (len) { 611 case 1: wc = PeekUInt8( buffer ); break; 612 case 2: wc = RawPeekUInt16( buffer ); break; 613 case 4: wc = RawPeekUInt32( buffer ); break; 614 default: return false; 615 } 616 if ((mb_len = utf8_wctomb(mb, wc, 6)) == -1) return false; // Something is terribly wrong... 617 return WriteBuffer(mb, mb_len); 618 } else { 619 char tmp[8]; 620 621 switch (len) { 622 case 1: PokeUInt8( tmp, PeekUInt8( buffer ) ); break; 623 case 2: RawPokeUInt16( tmp, ENDIAN_NTOHS( RawPeekUInt16( buffer ) ) ); break; 624 case 4: RawPokeUInt32( tmp, ENDIAN_NTOHL( RawPeekUInt32( buffer ) ) ); break; 625 } 626 return WriteBuffer(tmp, len); 627 } 628} 629 630bool CECSocket::ReadBuffer(void *buffer, size_t len) 631{ 632 if (m_rx_flags & EC_FLAG_ZLIB) { 633 if ( !m_z.avail_in ) { 634 // no reason for this situation: all packet should be 635 // buffered by now 636 AddDebugLogLineN(logEC, wxT("ReadBuffer: ZLib error")); 637 return false; 638 } 639 m_z.avail_out = (uInt)len; 640 m_z.next_out = (Bytef*)buffer; 641 int zerror = inflate(&m_z, Z_SYNC_FLUSH); 642 if ((zerror != Z_OK) && (zerror != Z_STREAM_END)) { 643 ShowZError(zerror, &m_z); 644 AddDebugLogLineN(logEC, wxT("ReadBuffer: ZLib error")); 645 return false; 646 } 647 return true; 648 } else { 649 // using uncompressed buffered i/o 650 size_t read = ReadBufferFromSocket(buffer, len); 651 if (read == len) { 652 return true; 653 } else { 654 AddDebugLogLineN(logEC, CFormat(wxT("ReadBuffer: %d < %d")) % read % len); 655 return false; 656 } 657 } 658} 659 660bool CECSocket::WriteBuffer(const void *buffer, size_t len) 661{ 662 if (m_tx_flags & EC_FLAG_ZLIB) { 663 664 unsigned char *rd_ptr = (unsigned char *)buffer; 665 do { 666 unsigned int remain_in = EC_SOCKET_BUFFER_SIZE - m_z.avail_in; 667 if ( remain_in >= len ) { 668 memcpy(m_z.next_in+m_z.avail_in, rd_ptr, len); 669 m_z.avail_in += (uInt)len; 670 len = 0; 671 } else { 672 memcpy(m_z.next_in+m_z.avail_in, rd_ptr, remain_in); 673 m_z.avail_in += remain_in; 674 len -= remain_in; 675 rd_ptr += remain_in; 676 // buffer is full, calling zlib 677 do { 678 m_z.next_out = &m_out_ptr[0]; 679 m_z.avail_out = EC_SOCKET_BUFFER_SIZE; 680 int zerror = deflate(&m_z, Z_NO_FLUSH); 681 if ( zerror != Z_OK ) { 682 AddDebugLogLineN(logEC, wxT("WriteBuffer: ZLib error")); 683 ShowZError(zerror, &m_z); 684 return false; 685 } 686 WriteBufferToSocket(&m_out_ptr[0], 687 EC_SOCKET_BUFFER_SIZE - m_z.avail_out); 688 } while ( m_z.avail_out == 0 ); 689 // all input should be used by now 690 wxASSERT(m_z.avail_in == 0); 691 m_z.next_in = &m_in_ptr[0]; 692 } 693 } while ( len ); 694 return true; 695 } else { 696 // using uncompressed buffered i/o 697 WriteBufferToSocket(buffer, len); 698 return true; 699 } 700} 701 702bool CECSocket::FlushBuffers() 703{ 704 if (m_tx_flags & EC_FLAG_ZLIB) { 705 do { 706 m_z.next_out = &m_out_ptr[0]; 707 m_z.avail_out = EC_SOCKET_BUFFER_SIZE; 708 int zerror = deflate(&m_z, Z_FINISH); 709 if ( zerror == Z_STREAM_ERROR ) { 710 AddDebugLogLineN(logEC, wxT("FlushBuffers: ZLib error")); 711 ShowZError(zerror, &m_z); 712 return false; 713 } 714 WriteBufferToSocket(&m_out_ptr[0], 715 EC_SOCKET_BUFFER_SIZE - m_z.avail_out); 716 } while ( m_z.avail_out == 0 ); 717 } 718 if ( m_curr_tx_data->GetDataLength() ) { 719 m_output_queue.push_back(m_curr_tx_data.release()); 720 m_curr_tx_data.reset(new CQueuedData(EC_SOCKET_BUFFER_SIZE)); 721 } 722 return true; 723} 724 725// 726// Packet I/O 727// 728 729uint32 CECSocket::WritePacket(const CECPacket *packet) 730{ 731 if (SocketRealError()) { 732 OnError(); 733 return 0; 734 } 735 // Check if output queue is empty. If not, memorize the current end. 736 std::list<CQueuedData*>::iterator outputStart = m_output_queue.begin(); 737 uint32 outputQueueSize = m_output_queue.size(); 738 for (uint32 i = 1; i < outputQueueSize; i++) { 739 outputStart++; 740 } 741 742 uint32_t flags = 0x20; 743 744 if (packet->GetPacketLength() > EC_MAX_UNCOMPRESSED 745 && ((m_my_flags & EC_FLAG_ZLIB) > 0)) { 746 flags |= EC_FLAG_ZLIB; 747 } else { 748 flags |= EC_FLAG_UTF8_NUMBERS; 749 } 750 751 flags &= m_my_flags; 752 m_tx_flags = flags; 753 754 if (flags & EC_FLAG_ZLIB) { 755 m_z.zalloc = Z_NULL; 756 m_z.zfree = Z_NULL; 757 m_z.opaque = Z_NULL; 758 m_z.avail_in = 0; 759 m_z.next_in = &m_in_ptr[0]; 760 int zerror = deflateInit(&m_z, EC_COMPRESSION_LEVEL); 761 if (zerror != Z_OK) { 762 // don't use zlib if init failed 763 flags &= ~EC_FLAG_ZLIB; 764 ShowZError(zerror, &m_z); 765 } 766 } 767 768 uint32_t tmp_flags = ENDIAN_HTONL(flags); 769 WriteBufferToSocket(&tmp_flags, sizeof(uint32)); 770 771 // preallocate 4 bytes in buffer for packet length 772 uint32_t packet_len = 0; 773 WriteBufferToSocket(&packet_len, sizeof(uint32)); 774 775 packet->WritePacket(*this); 776 777 // Finalize zlib compression and move current data to outout queue 778 FlushBuffers(); 779 780 // find the beginning of our data in the output queue 781 if (outputQueueSize) { 782 outputStart++; 783 } else { 784 outputStart = m_output_queue.begin(); 785 } 786 // now calculate actual size of data 787 for(std::list<CQueuedData*>::iterator it = outputStart; it != m_output_queue.end(); it++) { 788 packet_len += (uint32_t)(*it)->GetDataLength(); 789 } 790 // header size is not counted 791 packet_len -= EC_HEADER_SIZE; 792 // now write actual length at offset 4 793 uint32 packet_len_E = ENDIAN_HTONL(packet_len); 794 (*outputStart)->WriteAt(&packet_len_E, 4, 4); 795 796 if (flags & EC_FLAG_ZLIB) { 797 int zerror = deflateEnd(&m_z); 798 if ( zerror != Z_OK ) { 799 AddDebugLogLineN(logEC, wxT("WritePacket: ZLib error")); 800 ShowZError(zerror, &m_z); 801 } 802 } 803 return packet_len; 804} 805 806 807const CECPacket *CECSocket::ReadPacket() 808{ 809 CECPacket *packet = 0; 810 811 uint32_t flags = m_rx_flags; 812 813 if ( ((flags & 0x60) != 0x20) || (flags & EC_FLAG_UNKNOWN_MASK) ) { 814 // Protocol error - other end might use an older protocol 815 AddDebugLogLineN(logEC, wxT("ReadPacket: protocol error")); 816 cout << "ReadPacket: packet have invalid flags " << flags << endl; 817 CloseSocket(); 818 return 0; 819 } 820 821 if (flags & EC_FLAG_ZLIB) { 822 823 m_z.zalloc = Z_NULL; 824 m_z.zfree = Z_NULL; 825 m_z.opaque = Z_NULL; 826 m_z.avail_in = 0; 827 m_z.next_in = 0; 828 829 int zerror = inflateInit(&m_z); 830 if (zerror != Z_OK) { 831 AddDebugLogLineN(logEC, wxT("ReadPacket: zlib error")); 832 ShowZError(zerror, &m_z); 833 cout << "ReadPacket: failed zlib init" << endl; 834 CloseSocket(); 835 return 0; 836 } 837 } 838 839 m_curr_rx_data->ToZlib(m_z); 840 packet = new CECPacket(); 841 842 if (!packet->ReadFromSocket(*this)) { 843 AddDebugLogLineN(logEC, wxT("ReadPacket: error in packet read")); 844 cout << "ReadPacket: error in packet read" << endl; 845 delete packet; 846 packet = NULL; 847 CloseSocket(); 848 } 849 850 if (flags & EC_FLAG_ZLIB) { 851 int zerror = inflateEnd(&m_z); 852 if ( zerror != Z_OK ) { 853 AddDebugLogLineN(logEC, wxT("ReadPacket: zlib error")); 854 ShowZError(zerror, &m_z); 855 cout << "ReadPacket: failed zlib free" << endl; 856 CloseSocket(); 857 } 858 } 859 860 return packet; 861} 862 863const CECPacket *CECSocket::OnPacketReceived(const CECPacket *, uint32) 864{ 865 return 0; 866} 867// File_checked_for_headers 868