1//
2// This file is part of the aMule Project.
3//
4// Copyright (c) 2004-2011 aMule Team ( admin@amule.org / http://www.amule.org )
5// Copyright (c) 2004-2011 Angel Vidal ( kry@amule.org )
6//
7// Any parts of this program derived from the xMule, lMule or eMule project,
8// or contributed by third-party developers are copyrighted by their
9// respective authors.
10//
11// This program is free software; you can redistribute it and/or modify
12// it under the terms of the GNU General Public License as published by
13// the Free Software Foundation; either version 2 of the License, or
14// (at your option) any later version.
15//
16// This program is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19// GNU General Public License for more details.
20//
21// You should have received a copy of the GNU General Public License
22// along with this program; if not, write to the Free Software
23// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301, USA
24//
25
26#include "ECSocket.h"
27
28#include <sstream>
29#include <iostream>
30#include <algorithm>
31
32using namespace std;
33
34#include "ECPacket.h"		// Needed for CECPacket
35#include "../../../Logger.h"
36#include <common/Format.h>	// Needed for CFormat
37
38#define EC_COMPRESSION_LEVEL	Z_DEFAULT_COMPRESSION
39#define EC_MAX_UNCOMPRESSED	1024
40
41#ifndef __GNUC__
42#define __attribute__(x)
43#endif
44
45// If your compiler gives errors on these lines, just remove them.
46int utf8_mbtowc(wchar_t *p, const unsigned char *s, int n) __attribute__((__visibility__("internal")));
47int utf8_wctomb(unsigned char *s, wchar_t wc, int maxlen) __attribute__((__visibility__("internal")));
48int utf8_mb_remain(char c) __attribute__((__pure__));
49
50/*----------=> Import from the Linux kernel <=----------*/
51/*
52 * linux/fs/nls_base.c
53 */
54
55/*
56 * Sample implementation from Unicode home page.
57 * http://www.stonehand.com/unicode/standard/fss-utf.html
58 */
59struct utf8_table {
60	int     cmask;
61	int     cval;
62	int     shift;
63	uint32_t  lmask;
64	uint32_t  lval;
65};
66
67static const struct utf8_table utf8_table[] =
68{
69    {0x80,  0x00,   0*6,    0x7F,           0,         /* 1 byte sequence */},
70    {0xE0,  0xC0,   1*6,    0x7FF,          0x80,      /* 2 byte sequence */},
71    {0xF0,  0xE0,   2*6,    0xFFFF,         0x800,     /* 3 byte sequence */},
72    {0xF8,  0xF0,   3*6,    0x1FFFFF,       0x10000,   /* 4 byte sequence */},
73    {0xFC,  0xF8,   4*6,    0x3FFFFFF,      0x200000,  /* 5 byte sequence */},
74    {0xFE,  0xFC,   5*6,    0x7FFFFFFF,     0x4000000, /* 6 byte sequence */},
75    {0,     0,      0,      0,              0,         /* end of table    */}
76};
77
78int utf8_mbtowc(uint32_t *p, const unsigned char *s, int n)
79{
80	uint32_t l;
81	int c0, c, nc;
82	const struct utf8_table *t;
83
84	nc = 0;
85	c0 = *s;
86	l = c0;
87	for (t = utf8_table; t->cmask; t++) {
88		nc++;
89		if ((c0 & t->cmask) == t->cval) {
90			l &= t->lmask;
91			if (l < t->lval)
92				return -1;
93			*p = l;
94			return nc;
95		}
96		if (n <= nc)
97			return -1;
98		s++;
99		c = (*s ^ 0x80) & 0xFF;
100		if (c & 0xC0)
101			return -1;
102		l = (l << 6) | c;
103	}
104	return -1;
105}
106
107int utf8_wctomb(unsigned char *s, uint32_t wc, int maxlen)
108{
109	uint32_t l;
110	int c, nc;
111	const struct utf8_table *t;
112
113	l = wc;
114	nc = 0;
115	for (t = utf8_table; t->cmask && maxlen; t++, maxlen--) {
116		nc++;
117		if (l <= t->lmask) {
118			c = t->shift;
119			*s = t->cval | (l >> c);
120			while (c > 0) {
121				c -= 6;
122				s++;
123				*s = 0x80 | ((l >> c) & 0x3F);
124			}
125			return nc;
126		}
127	}
128	return -1;
129}
130/*----------=> End of Import <=----------*/
131
132int utf8_mb_remain(char c)
133{
134	int i;
135	for (i = 0; i < 5; ++i) {
136		if ((c & utf8_table[i].cmask) == utf8_table[i].cval) break;
137	}
138	return i;
139}
140
141
142void CQueuedData::Write(const void *data, size_t len)
143{
144	const size_t canWrite = std::min(GetRemLength(), len);
145	wxASSERT(len == canWrite);
146
147	memcpy(m_wr_ptr, data, canWrite);
148	m_wr_ptr += canWrite;
149}
150
151
152void CQueuedData::WriteAt(const void *data, size_t len, size_t offset)
153{
154	wxASSERT(len + offset <= m_data.size());
155	if (offset > m_data.size()) {
156		return;
157	} else if (offset + len > m_data.size()) {
158		len = m_data.size() - offset;
159	}
160
161	memcpy(&m_data[0] + offset, data, len);
162}
163
164
165void CQueuedData::Read(void *data, size_t len)
166{
167	const size_t canRead = std::min(GetUnreadDataLength(), len);
168	wxASSERT(len == canRead);
169
170	memcpy(data, m_rd_ptr, canRead);
171	m_rd_ptr += canRead;
172}
173
174
175void CQueuedData::WriteToSocket(CECSocket *sock)
176{
177	wxCHECK_RET(m_rd_ptr < m_wr_ptr,
178		wxT("Reading past written data in WriteToSocket"));
179
180	sock->SocketWrite(m_rd_ptr, GetUnreadDataLength());
181	m_rd_ptr += sock->GetLastCount();
182}
183
184
185void CQueuedData::ReadFromSocket(CECSocket *sock, size_t len)
186{
187	const size_t canWrite = std::min(GetRemLength(), len);
188	wxASSERT(len == canWrite);
189
190	sock->SocketRead(m_wr_ptr, canWrite);
191	m_wr_ptr += sock->GetLastCount();
192}
193
194
195size_t CQueuedData::ReadFromSocketAll(CECSocket *sock, size_t len)
196{
197	size_t read_rem = std::min(GetRemLength(), len);
198	wxASSERT(read_rem == len);
199
200	//  We get here when socket is truly blocking
201	do {
202		// Give socket a 10 sec chance to recv more data.
203		if ( !sock->WaitSocketRead(10, 0) ) {
204			AddDebugLogLineN(logEC, wxT("ReadFromSocketAll: socket is blocking"));
205			break;
206		}
207
208		wxASSERT(m_wr_ptr + read_rem <= &m_data[0] + m_data.size());
209		sock->SocketRead(m_wr_ptr, read_rem);
210		m_wr_ptr += sock->GetLastCount();
211		read_rem -= sock->GetLastCount();
212
213		if (sock->SocketRealError()) {
214			AddDebugLogLineN(logEC, wxT("ReadFromSocketAll: socket error"));
215			break;
216		}
217	} while (read_rem);
218
219	return len - read_rem;
220}
221
222
223size_t CQueuedData::GetLength() const
224{
225	return m_data.size();
226}
227
228
229size_t CQueuedData::GetDataLength() const
230{
231	const size_t len = m_wr_ptr - &m_data[0];
232	wxCHECK_MSG(len <= m_data.size(), m_data.size(),
233		wxT("Write-pointer past end of buffer"));
234
235	return len;
236}
237
238
239size_t CQueuedData::GetRemLength() const
240{
241	return m_data.size() - GetDataLength();
242}
243
244
245size_t CQueuedData::GetUnreadDataLength() const
246{
247	wxCHECK_MSG(m_wr_ptr >= m_rd_ptr, 0,
248		wxT("Read position past write position."));
249
250	return m_wr_ptr - m_rd_ptr;
251}
252
253
254
255//
256// CECSocket API - User interface functions
257//
258
259CECSocket::CECSocket(bool use_events)
260:
261m_use_events(use_events),
262m_in_ptr(EC_SOCKET_BUFFER_SIZE),
263m_out_ptr(EC_SOCKET_BUFFER_SIZE),
264m_curr_rx_data(new CQueuedData(EC_SOCKET_BUFFER_SIZE)),
265m_curr_tx_data(new CQueuedData(EC_SOCKET_BUFFER_SIZE)),
266m_rx_flags(0),
267m_tx_flags(0),
268// setup initial state: 4 flags + 4 length
269m_bytes_needed(EC_HEADER_SIZE),
270m_in_header(true),
271m_my_flags(0x20),
272m_haveNotificationSupport(false)
273{
274
275}
276
277CECSocket::~CECSocket()
278{
279	while (!m_output_queue.empty()) {
280		CQueuedData *data = m_output_queue.front();
281		m_output_queue.pop_front();
282		delete data;
283	}
284}
285
286bool CECSocket::ConnectSocket(uint32_t ip, uint16_t port)
287{
288	bool res = InternalConnect(ip, port, !m_use_events);
289	return !SocketError() && res;
290}
291
292void CECSocket::SendPacket(const CECPacket *packet)
293{
294	uint32 len = WritePacket(packet);
295	packet->DebugPrint(false, len);
296	OnOutput();
297}
298
299const CECPacket *CECSocket::SendRecvPacket(const CECPacket *packet)
300{
301	SendPacket(packet);
302
303	if (m_curr_rx_data->ReadFromSocketAll(this, EC_HEADER_SIZE) != EC_HEADER_SIZE
304		|| SocketError()		// This is a synchronous read, so WouldBlock is an error too.
305		|| !ReadHeader()) {
306		OnError();
307		AddDebugLogLineN(logEC, wxT("SendRecvPacket: error"));
308		return 0;
309	}
310	if (m_curr_rx_data->ReadFromSocketAll(this, m_curr_packet_len) != m_curr_packet_len
311		|| SocketError()) {
312		OnError();
313		AddDebugLogLineN(logEC, wxT("SendRecvPacket: error"));
314		return 0;
315	}
316	const CECPacket *reply = ReadPacket();
317	m_curr_rx_data->Rewind();
318	return reply;
319}
320
321std::string CECSocket::GetLastErrorMsg()
322{
323	int code = InternalGetLastError();
324	switch(code) {
325		case EC_ERROR_NOERROR:
326			return "No error happened";
327		case EC_ERROR_INVOP:
328			return "Invalid operation";
329		case EC_ERROR_IOERR:
330			return "Input/Output error";
331		case EC_ERROR_INVADDR:
332			return "Invalid address passed to wxSocket";
333		case EC_ERROR_INVSOCK:
334			return "Invalid socket (uninitialized)";
335		case EC_ERROR_NOHOST:
336			return "No corresponding host";
337		case EC_ERROR_INVPORT:
338			return "Invalid port";
339		case EC_ERROR_WOULDBLOCK:
340			return "The socket is non-blocking and the operation would block";
341		case EC_ERROR_TIMEDOUT:
342			return "The timeout for this operation expired";
343		case EC_ERROR_MEMERR:
344			return "Memory exhausted";
345	}
346	ostringstream error_string;
347	error_string << "Error code " << code <<  " unknown.";
348	return error_string.str();
349}
350
351bool CECSocket::SocketRealError()
352{
353	bool ret = false;
354	if (InternalError()) {
355		int lastError = InternalGetLastError();
356		ret = lastError != EC_ERROR_NOERROR && lastError != EC_ERROR_WOULDBLOCK;
357	}
358	return ret;
359}
360
361void CECSocket::OnError()
362{
363#ifdef __DEBUG__
364	cout << GetLastErrorMsg() << endl;
365#endif
366}
367
368void CECSocket::OnLost()
369{
370}
371
372//
373// Event handlers
374//
375void CECSocket::OnConnect()
376{
377}
378
379void CECSocket::OnInput()
380{
381	size_t bytes_rx = 0;
382	do {
383		m_curr_rx_data->ReadFromSocket(this, m_bytes_needed);
384		if (SocketRealError()) {
385			AddDebugLogLineN(logEC, wxT("OnInput: socket error"));
386			OnError();
387			// socket already disconnected in this point
388			return;
389		}
390		bytes_rx = GetLastCount();
391		m_bytes_needed -= bytes_rx;
392
393		if (m_bytes_needed == 0) {
394			if (m_in_header) {
395				m_in_header = false;
396				if (!ReadHeader()) {
397					AddDebugLogLineN(logEC, wxT("OnInput: header error"));
398					return;
399				}
400			} else {
401				std::auto_ptr<const CECPacket> packet(ReadPacket());
402				m_curr_rx_data->Rewind();
403				if (packet.get()) {
404					std::auto_ptr<const CECPacket> reply(OnPacketReceived(packet.get(), m_curr_packet_len));
405					if (reply.get()) {
406						SendPacket(reply.get());
407					}
408				} else {
409					AddDebugLogLineN(logEC, wxT("OnInput: no packet"));
410				}
411				m_bytes_needed = EC_HEADER_SIZE;
412				m_in_header = true;
413			}
414		}
415	} while (bytes_rx);
416}
417
418void CECSocket::OnOutput()
419{
420	while (!m_output_queue.empty()) {
421		CQueuedData* data = m_output_queue.front();
422		data->WriteToSocket(this);
423		if (!data->GetUnreadDataLength()) {
424			m_output_queue.pop_front();
425			delete data;
426		}
427		if (SocketError()) {
428			if (!WouldBlock()) {
429				// real error, abort
430				AddDebugLogLineN(logEC, wxT("OnOutput: socket error"));
431				OnError();
432				return;
433			}
434			// Now it's just a blocked socket.
435			if ( m_use_events ) {
436				// Event driven logic: return, OnOutput() will be called again later
437				return;
438			}
439			// Syncronous call: wait (for max 10 secs)
440			if ( !WaitSocketWrite(10, 0) ) {
441				// Still not through ?
442				if (WouldBlock()) {
443					// WouldBlock() is only EAGAIN or EWOULD_BLOCK,
444					// and those shouldn't create an infinite wait.
445					// So give it another chance.
446					continue;
447				} else {
448					AddDebugLogLineN(logEC, wxT("OnOutput: socket error in sync wait"));
449					OnError();
450					break;
451				}
452			}
453		}
454	}
455	//
456	// All outstanding data sent to socket
457	// (used for push clients)
458	//
459	WriteDoneAndQueueEmpty();
460}
461
462bool CECSocket::DataPending()
463{
464	return !m_output_queue.empty();
465}
466
467//
468// Socket I/O
469//
470
471size_t CECSocket::ReadBufferFromSocket(void *buffer, size_t required_len)
472{
473	wxASSERT(required_len);
474
475	if (m_curr_rx_data->GetUnreadDataLength() < required_len) {
476		// need more data that we have. Looks like nothing will help here
477		AddDebugLogLineN(logEC, CFormat(wxT("ReadBufferFromSocket: not enough data (%d < %d)"))
478			% m_curr_rx_data->GetUnreadDataLength() % required_len);
479		return 0;
480	}
481	m_curr_rx_data->Read(buffer, required_len);
482	return required_len;
483}
484
485void CECSocket::WriteBufferToSocket(const void *buffer, size_t len)
486{
487	unsigned char *wr_ptr = (unsigned char *)buffer;
488	while ( len ) {
489		size_t curr_free = m_curr_tx_data->GetRemLength();
490		if ( len > curr_free ) {
491
492			m_curr_tx_data->Write(wr_ptr, curr_free);
493			len -= curr_free;
494			wr_ptr += curr_free;
495			m_output_queue.push_back(m_curr_tx_data.release());
496			m_curr_tx_data.reset(new CQueuedData(EC_SOCKET_BUFFER_SIZE));
497		} else {
498			m_curr_tx_data->Write(wr_ptr, len);
499			break;
500		}
501	}
502}
503
504
505//
506// ZLib "error handler"
507//
508
509void ShowZError(int zerror, z_streamp strm)
510{
511	const char *p = NULL;
512
513	switch (zerror) {
514		case Z_STREAM_END: p = "Z_STREAM_END"; break;
515		case Z_NEED_DICT: p = "Z_NEED_DICT"; break;
516		case Z_ERRNO: p = "Z_ERRNO"; break;
517		case Z_STREAM_ERROR: p = "Z_STREAM_ERROR"; break;
518		case Z_DATA_ERROR: p = "Z_DATA_ERROR"; break;
519		case Z_MEM_ERROR: p = "Z_MEM_ERROR"; break;
520		case Z_BUF_ERROR: p = "Z_BUF_ERROR"; break;
521		case Z_VERSION_ERROR: p = "Z_VERSION_ERROR"; break;
522	}
523	printf("ZLib operation returned %s\n", p);
524	printf("ZLib error message: %s\n", strm->msg);
525	printf("zstream state:\n\tnext_in=%p\n\tavail_in=%u\n\ttotal_in=%lu\n\tnext_out=%p\n\tavail_out=%u\n\ttotal_out=%lu\n",
526		strm->next_in, strm->avail_in, strm->total_in, strm->next_out, strm->avail_out, strm->total_out);
527	AddDebugLogLineN(logEC, wxT("ZLib error"));
528}
529
530
531bool CECSocket::ReadHeader()
532{
533	m_curr_rx_data->Read(&m_rx_flags, 4);
534	m_rx_flags = ENDIAN_NTOHL(m_rx_flags);
535	m_curr_rx_data->Read(&m_curr_packet_len, 4);
536	m_curr_packet_len = ENDIAN_NTOHL(m_curr_packet_len);
537	m_bytes_needed = m_curr_packet_len;
538	// packet bigger that 16Mb looks more like broken request
539	if (m_bytes_needed > 16*1024*1024) {
540		AddDebugLogLineN(logEC, CFormat(wxT("ReadHeader: packet too big: %d")) % m_bytes_needed);
541		CloseSocket();
542		return false;
543	}
544	m_curr_rx_data->Rewind();
545	size_t currLength = m_curr_rx_data->GetLength();
546	// resize input buffer if
547	// a) too small or
548	if (currLength < m_bytes_needed
549	// b) way too large (free data again after receiving huge packets)
550		|| m_bytes_needed + EC_SOCKET_BUFFER_SIZE * 10 < currLength) {
551		// Client socket: IsAuthorized() is always true
552		// Server socket: do not allow growing of internal buffers before succesfull login.
553		// Otherwise sending a simple header with bogus length of 16MB-1 will crash an embedded
554		// client with memory exhaustion.
555		if (!IsAuthorized()) {
556			AddDebugLogLineN(logEC, CFormat(wxT("ReadHeader: resize (%d -> %d) on non autorized socket")) % currLength % m_bytes_needed);
557			CloseSocket();
558			return false;
559		}
560		// Don't make buffer smaller than EC_SOCKET_BUFFER_SIZE
561		size_t bufSize = m_bytes_needed;
562		if (bufSize < EC_SOCKET_BUFFER_SIZE) {
563			bufSize = EC_SOCKET_BUFFER_SIZE;
564		}
565		m_curr_rx_data.reset(new CQueuedData(bufSize));
566	}
567	if (ECLogIsEnabled()) {
568		DoECLogLine(CFormat(wxT("< %d ...")) % m_bytes_needed);
569	}
570	return true;
571}
572
573
574bool CECSocket::ReadNumber(void *buffer, size_t len)
575{
576	if (m_rx_flags & EC_FLAG_UTF8_NUMBERS) {
577		unsigned char mb[6];
578		uint32_t wc;
579		if (!ReadBuffer(mb, 1)) return false;
580		int remains = utf8_mb_remain(mb[0]);
581		if (remains) if (!ReadBuffer(&(mb[1]), remains)) return false;
582		if (utf8_mbtowc(&wc, mb, 6) == -1) return false;	// Invalid UTF-8 code sequence
583		switch (len) {
584			case 1: PokeUInt8( buffer,  wc ); break;
585			case 2: RawPokeUInt16( buffer, wc ); break;
586			case 4: RawPokeUInt32( buffer, wc ); break;
587		}
588	} else {
589		if ( !ReadBuffer(buffer, len) ) {
590			return false;
591		}
592		switch (len) {
593			case 2:
594				RawPokeUInt16( buffer, ENDIAN_NTOHS( RawPeekUInt16( buffer ) ) );
595				break;
596			case 4:
597				RawPokeUInt32( buffer, ENDIAN_NTOHL( RawPeekUInt32( buffer ) ) );
598				break;
599		}
600	}
601	return true;
602}
603
604bool CECSocket::WriteNumber(const void *buffer, size_t len)
605{
606	if (m_tx_flags & EC_FLAG_UTF8_NUMBERS) {
607		unsigned char mb[6];
608		uint32_t wc = 0;
609		int mb_len;
610		switch (len) {
611			case 1: wc = PeekUInt8( buffer ); break;
612			case 2: wc = RawPeekUInt16( buffer ); break;
613			case 4: wc = RawPeekUInt32( buffer ); break;
614			default: return false;
615		}
616		if ((mb_len = utf8_wctomb(mb, wc, 6)) == -1) return false;	// Something is terribly wrong...
617		return WriteBuffer(mb, mb_len);
618	} else {
619		char tmp[8];
620
621		switch (len) {
622			case 1: PokeUInt8( tmp, PeekUInt8( buffer ) ); break;
623			case 2: RawPokeUInt16( tmp, ENDIAN_NTOHS( RawPeekUInt16( buffer ) ) ); break;
624			case 4: RawPokeUInt32( tmp, ENDIAN_NTOHL( RawPeekUInt32( buffer ) ) ); break;
625		}
626		return WriteBuffer(tmp, len);
627	}
628}
629
630bool CECSocket::ReadBuffer(void *buffer, size_t len)
631{
632	if (m_rx_flags & EC_FLAG_ZLIB) {
633		if ( !m_z.avail_in ) {
634			// no reason for this situation: all packet should be
635			// buffered by now
636			AddDebugLogLineN(logEC, wxT("ReadBuffer: ZLib error"));
637			return false;
638		}
639		m_z.avail_out = (uInt)len;
640		m_z.next_out = (Bytef*)buffer;
641		int zerror = inflate(&m_z, Z_SYNC_FLUSH);
642		if ((zerror != Z_OK) && (zerror != Z_STREAM_END)) {
643			ShowZError(zerror, &m_z);
644			AddDebugLogLineN(logEC, wxT("ReadBuffer: ZLib error"));
645			return false;
646		}
647		return true;
648	} else {
649		// using uncompressed buffered i/o
650		size_t read = ReadBufferFromSocket(buffer, len);
651		if (read == len) {
652			return true;
653		} else {
654			AddDebugLogLineN(logEC, CFormat(wxT("ReadBuffer: %d < %d")) % read % len);
655			return false;
656		}
657	}
658}
659
660bool CECSocket::WriteBuffer(const void *buffer, size_t len)
661{
662	if (m_tx_flags & EC_FLAG_ZLIB) {
663
664		unsigned char *rd_ptr = (unsigned char *)buffer;
665		do {
666			unsigned int remain_in = EC_SOCKET_BUFFER_SIZE - m_z.avail_in;
667			if ( remain_in >= len ) {
668				memcpy(m_z.next_in+m_z.avail_in, rd_ptr, len);
669				m_z.avail_in += (uInt)len;
670				len = 0;
671			} else {
672				memcpy(m_z.next_in+m_z.avail_in, rd_ptr, remain_in);
673				m_z.avail_in += remain_in;
674				len -= remain_in;
675				rd_ptr += remain_in;
676				// buffer is full, calling zlib
677				do {
678				    m_z.next_out = &m_out_ptr[0];
679				    m_z.avail_out = EC_SOCKET_BUFFER_SIZE;
680					int zerror = deflate(&m_z, Z_NO_FLUSH);
681					if ( zerror != Z_OK ) {
682						AddDebugLogLineN(logEC, wxT("WriteBuffer: ZLib error"));
683						ShowZError(zerror, &m_z);
684						return false;
685					}
686					WriteBufferToSocket(&m_out_ptr[0],
687						EC_SOCKET_BUFFER_SIZE - m_z.avail_out);
688				} while ( m_z.avail_out == 0 );
689				// all input should be used by now
690				wxASSERT(m_z.avail_in == 0);
691				m_z.next_in = &m_in_ptr[0];
692			}
693		} while ( len );
694		return true;
695	} else {
696		// using uncompressed buffered i/o
697		WriteBufferToSocket(buffer, len);
698		return true;
699	}
700}
701
702bool CECSocket::FlushBuffers()
703{
704	if (m_tx_flags & EC_FLAG_ZLIB) {
705		do {
706		    m_z.next_out = &m_out_ptr[0];
707		    m_z.avail_out = EC_SOCKET_BUFFER_SIZE;
708			int zerror = deflate(&m_z, Z_FINISH);
709			if ( zerror == Z_STREAM_ERROR ) {
710				AddDebugLogLineN(logEC, wxT("FlushBuffers: ZLib error"));
711				ShowZError(zerror, &m_z);
712				return false;
713			}
714			WriteBufferToSocket(&m_out_ptr[0],
715				EC_SOCKET_BUFFER_SIZE - m_z.avail_out);
716		} while ( m_z.avail_out == 0 );
717	}
718	if ( m_curr_tx_data->GetDataLength() ) {
719		m_output_queue.push_back(m_curr_tx_data.release());
720		m_curr_tx_data.reset(new CQueuedData(EC_SOCKET_BUFFER_SIZE));
721	}
722	return true;
723}
724
725//
726// Packet I/O
727//
728
729uint32 CECSocket::WritePacket(const CECPacket *packet)
730{
731	if (SocketRealError()) {
732		OnError();
733		return 0;
734	}
735	// Check if output queue is empty. If not, memorize the current end.
736	std::list<CQueuedData*>::iterator outputStart = m_output_queue.begin();
737	uint32 outputQueueSize = m_output_queue.size();
738	for (uint32 i = 1; i < outputQueueSize; i++) {
739		outputStart++;
740	}
741
742	uint32_t flags = 0x20;
743
744	if (packet->GetPacketLength() > EC_MAX_UNCOMPRESSED
745		&& ((m_my_flags & EC_FLAG_ZLIB) > 0)) {
746		flags |= EC_FLAG_ZLIB;
747	} else {
748		flags |= EC_FLAG_UTF8_NUMBERS;
749	}
750
751	flags &= m_my_flags;
752	m_tx_flags = flags;
753
754	if (flags & EC_FLAG_ZLIB) {
755		m_z.zalloc = Z_NULL;
756		m_z.zfree = Z_NULL;
757		m_z.opaque = Z_NULL;
758		m_z.avail_in = 0;
759		m_z.next_in = &m_in_ptr[0];
760		int zerror = deflateInit(&m_z, EC_COMPRESSION_LEVEL);
761		if (zerror != Z_OK) {
762			// don't use zlib if init failed
763			flags &= ~EC_FLAG_ZLIB;
764			ShowZError(zerror, &m_z);
765		}
766	}
767
768	uint32_t tmp_flags = ENDIAN_HTONL(flags);
769	WriteBufferToSocket(&tmp_flags, sizeof(uint32));
770
771	// preallocate 4 bytes in buffer for packet length
772	uint32_t packet_len = 0;
773	WriteBufferToSocket(&packet_len, sizeof(uint32));
774
775	packet->WritePacket(*this);
776
777	// Finalize zlib compression and move current data to outout queue
778	FlushBuffers();
779
780	// find the beginning of our data in the output queue
781	if (outputQueueSize) {
782		outputStart++;
783	} else {
784		outputStart = m_output_queue.begin();
785	}
786	// now calculate actual size of data
787	for(std::list<CQueuedData*>::iterator it = outputStart; it != m_output_queue.end(); it++) {
788		packet_len += (uint32_t)(*it)->GetDataLength();
789	}
790	// header size is not counted
791	packet_len -= EC_HEADER_SIZE;
792	// now write actual length at offset 4
793	uint32 packet_len_E = ENDIAN_HTONL(packet_len);
794	(*outputStart)->WriteAt(&packet_len_E, 4, 4);
795
796	if (flags & EC_FLAG_ZLIB) {
797		int zerror = deflateEnd(&m_z);
798		if ( zerror != Z_OK ) {
799			AddDebugLogLineN(logEC, wxT("WritePacket: ZLib error"));
800			ShowZError(zerror, &m_z);
801		}
802	}
803	return packet_len;
804}
805
806
807const CECPacket *CECSocket::ReadPacket()
808{
809	CECPacket *packet = 0;
810
811	uint32_t flags = m_rx_flags;
812
813	if ( ((flags & 0x60) != 0x20) || (flags & EC_FLAG_UNKNOWN_MASK) ) {
814		// Protocol error - other end might use an older protocol
815		AddDebugLogLineN(logEC, wxT("ReadPacket: protocol error"));
816		cout << "ReadPacket: packet have invalid flags " << flags << endl;
817		CloseSocket();
818		return 0;
819	}
820
821	if (flags & EC_FLAG_ZLIB) {
822
823	    m_z.zalloc = Z_NULL;
824	    m_z.zfree = Z_NULL;
825	    m_z.opaque = Z_NULL;
826	    m_z.avail_in = 0;
827	    m_z.next_in = 0;
828
829		int zerror = inflateInit(&m_z);
830		if (zerror != Z_OK) {
831			AddDebugLogLineN(logEC, wxT("ReadPacket: zlib error"));
832			ShowZError(zerror, &m_z);
833			cout << "ReadPacket: failed zlib init" << endl;
834			CloseSocket();
835			return 0;
836		}
837	}
838
839	m_curr_rx_data->ToZlib(m_z);
840	packet = new CECPacket();
841
842	if (!packet->ReadFromSocket(*this)) {
843		AddDebugLogLineN(logEC, wxT("ReadPacket: error in packet read"));
844		cout << "ReadPacket: error in packet read" << endl;
845		delete packet;
846		packet = NULL;
847		CloseSocket();
848	}
849
850	if (flags & EC_FLAG_ZLIB) {
851		int zerror = inflateEnd(&m_z);
852		if ( zerror != Z_OK ) {
853			AddDebugLogLineN(logEC, wxT("ReadPacket: zlib error"));
854			ShowZError(zerror, &m_z);
855			cout << "ReadPacket: failed zlib free" << endl;
856			CloseSocket();
857		}
858	}
859
860	return packet;
861}
862
863const CECPacket *CECSocket::OnPacketReceived(const CECPacket *, uint32)
864{
865	return 0;
866}
867// File_checked_for_headers
868