1/*
2 * Copyright 2022 Haiku Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 *		Niels Sascha Reedijk, niels.reedijk@gmail.com
7 */
8
9#include <algorithm>
10#include <atomic>
11#include <deque>
12#include <list>
13#include <map>
14#include <optional>
15#include <vector>
16
17#include <AutoLocker.h>
18#include <DataIO.h>
19#include <ErrorsExt.h>
20#include <HttpFields.h>
21#include <HttpRequest.h>
22#include <HttpResult.h>
23#include <HttpSession.h>
24#include <Locker.h>
25#include <Messenger.h>
26#include <NetBuffer.h>
27#include <NetServicesDefs.h>
28#include <NetworkAddress.h>
29#include <OS.h>
30#include <SecureSocket.h>
31#include <Socket.h>
32#include <ZlibCompressionAlgorithm.h>
33
34#include "HttpBuffer.h"
35#include "HttpParser.h"
36#include "HttpResultPrivate.h"
37#include "HttpSerializer.h"
38#include "NetServicesPrivate.h"
39
40using namespace std::literals;
41using namespace BPrivate::Network;
42
43
44/*!
45	\brief Maximum size of the HTTP Header lines of the message.
46
47	In the RFC there is no maximum, but we need to prevent the situation where we keep growing the
48	internal buffer waiting for the end of line ('\r\n\') characters to occur.
49*/
50static constexpr ssize_t kMaxHeaderLineSize = 64 * 1024;
51
52
53struct CounterDeleter {
54	void operator()(int32* counter) const noexcept { atomic_add(counter, -1); }
55};
56
57
58class BHttpSession::Request
59{
60public:
61	Request(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer);
62
63	Request(Request& original, const Redirect& redirect);
64
65	// States
66	enum RequestState { InitialState, Connected, RequestSent, ContentReceived };
67	RequestState State() const noexcept { return fRequestStatus; }
68
69	// Result Helpers
70	std::shared_ptr<HttpResultPrivate> Result() { return fResult; }
71	void SetError(std::exception_ptr e);
72
73	// Helpers for maintaining the connection count
74	std::pair<BString, int> GetHost() const;
75	void SetCounter(int32* counter) noexcept;
76
77	// Operational methods
78	void ResolveHostName();
79	void OpenConnection();
80	void TransferRequest();
81	bool ReceiveResult();
82	void Disconnect() noexcept;
83
84	// Object information
85	int Socket() const noexcept { return fSocket->Socket(); }
86	int32 Id() const noexcept { return fResult->id; }
87	bool CanCancel() const noexcept { return fResult->CanCancel(); }
88
89	// Message helper
90	void SendMessage(uint32 what, std::function<void(BMessage&)> dataFunc = nullptr) const;
91
92private:
93	BHttpRequest fRequest;
94
95	// Request state/events
96	RequestState fRequestStatus = InitialState;
97
98	// Communication
99	BMessenger fObserver;
100	std::shared_ptr<HttpResultPrivate> fResult;
101
102	// Connection
103	BNetworkAddress fRemoteAddress;
104	std::unique_ptr<BSocket> fSocket;
105
106	// Sending and receiving
107	HttpBuffer fBuffer;
108	HttpSerializer fSerializer;
109	HttpParser fParser;
110
111	// Receive state
112	BHttpStatus fStatus;
113	BHttpFields fFields;
114
115	// Redirection
116	bool fMightRedirect = false;
117	int8 fRemainingRedirects;
118
119	// Connection counter
120	std::unique_ptr<int32, CounterDeleter> fConnectionCounter;
121};
122
123
124class BHttpSession::Impl
125{
126public:
127	Impl();
128	~Impl() noexcept;
129
130	BHttpResult Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer);
131	void Cancel(int32 identifier);
132	void SetMaxConnectionsPerHost(size_t maxConnections);
133	void SetMaxHosts(size_t maxConnections);
134
135private:
136		// Thread functions
137	static status_t ControlThreadFunc(void* arg);
138	static status_t DataThreadFunc(void* arg);
139
140	// Helper functions
141	std::vector<BHttpSession::Request> GetRequestsForControlThread();
142
143private:
144		// constants (can be accessed unlocked)
145	const sem_id fControlQueueSem;
146	const sem_id fDataQueueSem;
147	const thread_id fControlThread;
148	const thread_id fDataThread;
149
150	// locking mechanism
151	BLocker fLock;
152	std::atomic<bool> fQuitting = false;
153
154	// queues & shared data
155	std::list<BHttpSession::Request> fControlQueue;
156	std::deque<BHttpSession::Request> fDataQueue;
157	std::vector<int32> fCancelList;
158
159	// data owned by the controlThread
160	using Host = std::pair<BString, int>;
161	std::map<Host, int32> fConnectionCount;
162
163	// data that can only be accessed atomically
164	std::atomic<size_t> fMaxConnectionsPerHost = 2;
165	std::atomic<size_t> fMaxHosts = 10;
166
167	// data owned by the dataThread
168	std::map<int, BHttpSession::Request> connectionMap;
169	std::vector<object_wait_info> objectList;
170};
171
172
173struct BHttpSession::Redirect {
174	BUrl url;
175	bool redirectToGet;
176};
177
178
179// #pragma mark -- BHttpSession::Impl
180
181
182BHttpSession::Impl::Impl()
183	:
184	fControlQueueSem(create_sem(0, "http:control")),
185	fDataQueueSem(create_sem(0, "http:data")),
186	fControlThread(spawn_thread(ControlThreadFunc, "http:control", B_NORMAL_PRIORITY, this)),
187	fDataThread(spawn_thread(DataThreadFunc, "http:data", B_NORMAL_PRIORITY, this))
188{
189	// check initialization of semaphores
190	if (fControlQueueSem < 0)
191		throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create control queue semaphore");
192	if (fDataQueueSem < 0)
193		throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create data queue semaphore");
194
195	// set up internal threads
196	if (fControlThread < 0)
197		throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create control thread");
198	if (resume_thread(fControlThread) != B_OK)
199		throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot resume control thread");
200
201	if (fDataThread < 0)
202		throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create data thread");
203	if (resume_thread(fDataThread) != B_OK)
204		throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot resume data thread");
205}
206
207
208BHttpSession::Impl::~Impl() noexcept
209{
210	fQuitting.store(true);
211	delete_sem(fControlQueueSem);
212	delete_sem(fDataQueueSem);
213	status_t threadResult;
214	wait_for_thread(fControlThread, &threadResult);
215		// The control thread waits for the data thread
216}
217
218
219BHttpResult
220BHttpSession::Impl::Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
221{
222	auto wRequest = Request(std::move(request), std::move(target), observer);
223
224	auto retval = BHttpResult(wRequest.Result());
225	auto lock = AutoLocker<BLocker>(fLock);
226	fControlQueue.push_back(std::move(wRequest));
227	release_sem(fControlQueueSem);
228	return retval;
229}
230
231
232void
233BHttpSession::Impl::Cancel(int32 identifier)
234{
235	auto lock = AutoLocker<BLocker>(fLock);
236	// Check if the item is on the control queue
237	fControlQueue.remove_if([&identifier](auto& request) {
238		if (request.Id() == identifier) {
239			try {
240				throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
241			} catch (...) {
242				request.SetError(std::current_exception());
243			}
244			return true;
245		}
246		return false;
247	});
248
249	// Get it on the list for deletion in the data queue
250	fCancelList.push_back(identifier);
251	release_sem(fDataQueueSem);
252}
253
254
255void
256BHttpSession::Impl::SetMaxConnectionsPerHost(size_t maxConnections)
257{
258	if (maxConnections <= 0 || maxConnections >= INT32_MAX) {
259		throw BRuntimeError(
260			__PRETTY_FUNCTION__, "MaxConnectionsPerHost must be between 1 and INT32_MAX");
261	}
262	fMaxConnectionsPerHost.store(maxConnections, std::memory_order_relaxed);
263}
264
265
266void
267BHttpSession::Impl::SetMaxHosts(size_t maxConnections)
268{
269	if (maxConnections <= 0)
270		throw BRuntimeError(__PRETTY_FUNCTION__, "MaxHosts must be 1 or more");
271	fMaxHosts.store(maxConnections, std::memory_order_relaxed);
272}
273
274
275/*static*/ status_t
276BHttpSession::Impl::ControlThreadFunc(void* arg)
277{
278	BHttpSession::Impl* impl = static_cast<BHttpSession::Impl*>(arg);
279
280	// Outer loop to use the fControlQueueSem when new items have entered the queue
281	while (true) {
282		if (auto status = acquire_sem(impl->fControlQueueSem); status == B_INTERRUPTED)
283			continue;
284		else if (status != B_OK) {
285			// Most likely B_BAD_SEM_ID indicating that the sem was deleted; go to cleanup
286			break;
287		}
288
289		// Check if we have woken up because we are quitting
290		if (impl->fQuitting.load())
291			break;
292
293		// Get items to process (locking done by the helper)
294		auto requests = impl->GetRequestsForControlThread();
295		if (requests.size() == 0)
296			continue;
297
298		for (auto& request: requests) {
299			bool hasError = false;
300			try {
301				request.ResolveHostName();
302				request.OpenConnection();
303			} catch (...) {
304				request.SetError(std::current_exception());
305				hasError = true;
306			}
307
308			if (hasError) {
309				// Do not add the request back to the queue; release the sem to do another round
310				// in case there is another item waiting because the limits of concurrent requests
311				// were reached
312				release_sem(impl->fControlQueueSem);
313				continue;
314			}
315
316			impl->fLock.Lock();
317			impl->fDataQueue.push_back(std::move(request));
318			impl->fLock.Unlock();
319			release_sem(impl->fDataQueueSem);
320		}
321	}
322
323	// Clean up and make sure we are quitting
324	if (impl->fQuitting.load()) {
325		// First wait for the data thread to complete
326		status_t threadResult;
327		wait_for_thread(impl->fDataThread, &threadResult);
328		// Cancel all requests
329		for (auto& request: impl->fControlQueue) {
330			try {
331				throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
332			} catch (...) {
333				request.SetError(std::current_exception());
334			}
335		}
336	} else {
337		throw BRuntimeError(
338			__PRETTY_FUNCTION__, "Unknown reason that the controlQueueSem is deleted");
339	}
340
341	// Cleanup: wait for data thread
342	return B_OK;
343}
344
345
346static constexpr uint16 EVENT_CANCELLED = 0x4000;
347
348
349/*static*/ status_t
350BHttpSession::Impl::DataThreadFunc(void* arg)
351{
352	BHttpSession::Impl* data = static_cast<BHttpSession::Impl*>(arg);
353
354	// initial initialization of wait list
355	data->objectList.push_back(
356		object_wait_info{data->fDataQueueSem, B_OBJECT_TYPE_SEMAPHORE, B_EVENT_ACQUIRE_SEMAPHORE});
357
358	while (true) {
359		if (auto status = wait_for_objects(data->objectList.data(), data->objectList.size());
360			status == B_INTERRUPTED)
361			continue;
362		else if (status < 0) {
363			// Something went inexplicably wrong
364			throw BSystemError("wait_for_objects()", status);
365		}
366
367		// First check if the change is in acquiring the sem, meaning that
368		// there are new requests to be scheduled
369		if (data->objectList[0].events == B_EVENT_ACQUIRE_SEMAPHORE) {
370			if (auto status = acquire_sem(data->fDataQueueSem); status == B_INTERRUPTED)
371				continue;
372			else if (status != B_OK) {
373				// Most likely B_BAD_SEM_ID indicating that the sem was deleted
374				break;
375			}
376
377			// Process the cancelList and dataQueue. Note that there might
378			// be a situation where a request is cancelled and added in the
379			// same iteration, but that is taken care by this algorithm.
380			data->fLock.Lock();
381			while (!data->fDataQueue.empty()) {
382				auto request = std::move(data->fDataQueue.front());
383				data->fDataQueue.pop_front();
384				auto socket = request.Socket();
385
386				data->connectionMap.insert(std::make_pair(socket, std::move(request)));
387
388				// Add to objectList
389				data->objectList.push_back(
390					object_wait_info{socket, B_OBJECT_TYPE_FD, B_EVENT_WRITE});
391			}
392
393			for (auto id: data->fCancelList) {
394				// To cancel, we set a special event status on the
395				// object_wait_info list so that we can handle it below.
396				// Also: the first item in the waitlist is always the semaphore
397				// so the fun starts at offset 1.
398				size_t offset = 0;
399				for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend();
400					it++) {
401					offset++;
402					if (it->second.Id() == id) {
403						data->objectList[offset].events = EVENT_CANCELLED;
404						break;
405					}
406				}
407			}
408			data->fCancelList.clear();
409			data->fLock.Unlock();
410		} else if ((data->objectList[0].events & B_EVENT_INVALID) == B_EVENT_INVALID) {
411			// The semaphore has been deleted. Start the cleanup
412			break;
413		}
414
415		// Process all objects that are ready
416		bool resizeObjectList = false;
417		for (auto& item: data->objectList) {
418			if (item.type != B_OBJECT_TYPE_FD)
419				continue;
420			if ((item.events & B_EVENT_WRITE) == B_EVENT_WRITE) {
421				auto& request = data->connectionMap.find(item.object)->second;
422				auto error = false;
423				try {
424					request.TransferRequest();
425				} catch (...) {
426					request.SetError(std::current_exception());
427					error = true;
428				}
429
430				// End failed writes
431				if (error) {
432					request.Disconnect();
433					data->connectionMap.erase(item.object);
434					release_sem(data->fControlQueueSem);
435						// wake up control thread; there may queued requests unblocked.
436					resizeObjectList = true;
437				}
438			} else if ((item.events & B_EVENT_READ) == B_EVENT_READ) {
439				auto& request = data->connectionMap.find(item.object)->second;
440				auto finished = false;
441				try {
442					if (request.CanCancel())
443						finished = true;
444					else
445						finished = request.ReceiveResult();
446				} catch (const Redirect& r) {
447					// Request is redirected, send back to the controlThread
448					// Move existing request into a new request and hand over to the control queue
449					auto lock = AutoLocker<BLocker>(data->fLock);
450					data->fControlQueue.emplace_back(request, r);
451					release_sem(data->fControlQueueSem);
452
453					finished = true;
454				} catch (...) {
455					request.SetError(std::current_exception());
456					finished = true;
457				}
458
459				if (finished) {
460					// Clean up finished requests; including redirected requests
461					request.Disconnect();
462					data->connectionMap.erase(item.object);
463					release_sem(data->fControlQueueSem);
464						// wake up control thread; there may queued requests unblocked.
465					resizeObjectList = true;
466				}
467			} else if ((item.events & B_EVENT_DISCONNECTED) == B_EVENT_DISCONNECTED) {
468				auto& request = data->connectionMap.find(item.object)->second;
469				try {
470					throw BNetworkRequestError(
471						__PRETTY_FUNCTION__, BNetworkRequestError::NetworkError);
472				} catch (...) {
473					request.SetError(std::current_exception());
474				}
475				data->connectionMap.erase(item.object);
476				resizeObjectList = true;
477			} else if ((item.events & EVENT_CANCELLED) == EVENT_CANCELLED) {
478				auto& request = data->connectionMap.find(item.object)->second;
479				request.Disconnect();
480				try {
481					throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
482				} catch (...) {
483					request.SetError(std::current_exception());
484				}
485				data->connectionMap.erase(item.object);
486				release_sem(data->fControlQueueSem);
487					// wake up control thread; there may queued requests unblocked.
488				resizeObjectList = true;
489			} else if (item.events == 0) {
490				// No events for this item, skip
491				continue;
492			} else {
493				// Likely to be B_EVENT_INVALID. This should not happen
494				auto& request = data->connectionMap.find(item.object)->second;
495				request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
496					msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugError);
497					msg.AddString(UrlEventData::DebugMessage, "Unexpected event; socket deleted?");
498				});
499				throw BRuntimeError(
500					__PRETTY_FUNCTION__, "Socket was deleted at an unexpected time");
501			}
502		}
503
504		// Reset objectList
505		data->objectList[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
506		if (resizeObjectList)
507			data->objectList.resize(data->connectionMap.size() + 1);
508
509		auto i = 1;
510		for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend(); it++) {
511			data->objectList[i].object = it->first;
512			if (it->second.State() == Request::InitialState)
513				throw BRuntimeError(__PRETTY_FUNCTION__, "Invalid state of request");
514			else if (it->second.State() == Request::Connected)
515				data->objectList[i].events = B_EVENT_WRITE | B_EVENT_DISCONNECTED;
516			else
517				data->objectList[i].events = B_EVENT_READ | B_EVENT_DISCONNECTED;
518			i++;
519		}
520	}
521	// Clean up and make sure we are quitting
522	if (data->fQuitting.load()) {
523		// Cancel all requests
524		for (auto it = data->connectionMap.begin(); it != data->connectionMap.end(); it++) {
525			try {
526				throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
527			} catch (...) {
528				it->second.SetError(std::current_exception());
529			}
530		}
531	} else {
532		throw BRuntimeError(__PRETTY_FUNCTION__, "Unknown reason that the dataQueueSem is deleted");
533	}
534
535	return B_OK;
536}
537
538
539/*!
540	\brief Internal helper that filters the lists of requests to guard against the concurrent
541		requests limit.
542
543	This method will do the locking of the internal structure.
544*/
545std::vector<BHttpSession::Request>
546BHttpSession::Impl::GetRequestsForControlThread()
547{
548	std::vector<BHttpSession::Request> requests;
549
550	// Clean up connection list if it is at the max number of hosts
551	if (fConnectionCount.size() >= fMaxHosts.load()) {
552		for (auto it = fConnectionCount.begin(); it != fConnectionCount.end();) {
553			if (atomic_get(std::addressof(it->second)) == 0) {
554				it = fConnectionCount.erase(it);
555			} else {
556				it++;
557			}
558		}
559	}
560
561	// Process the list of pending requests and review if they can be started.
562	auto lock = AutoLocker<BLocker>(fLock);
563	fControlQueue.remove_if([this, &requests](auto& request) {
564		auto host = request.GetHost();
565		auto it = fConnectionCount.find(host);
566		if (it != fConnectionCount.end()) {
567			if (static_cast<size_t>(atomic_get(std::addressof(it->second)))
568				>= fMaxConnectionsPerHost.load(std::memory_order_relaxed)) {
569				request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
570					msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugWarning);
571					msg.AddString(UrlEventData::DebugMessage,
572						"Request is queued: too many active connections for host");
573				});
574				return false;
575			} else {
576				atomic_add(std::addressof(it->second), 1);
577				request.SetCounter(std::addressof(it->second));
578			}
579		} else {
580			if (fConnectionCount.size() == fMaxHosts.load()) {
581				request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
582					msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugWarning);
583					msg.AddString(UrlEventData::DebugMessage,
584						"Request is queued: maximum number of concurrent hosts");
585				});
586				return false;
587			}
588			auto [newIt, success] = fConnectionCount.insert({host, 1});
589			if (!success) {
590				throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot insert into fConnectionCount");
591			}
592			request.SetCounter(std::addressof(newIt->second));
593		}
594		requests.emplace_back(std::move(request));
595		return true;
596	});
597	return requests;
598}
599
600
601// #pragma mark -- BHttpSession (public interface)
602
603
604BHttpSession::BHttpSession()
605{
606	fImpl = std::make_shared<BHttpSession::Impl>();
607}
608
609
610BHttpSession::~BHttpSession() = default;
611
612
613BHttpSession::BHttpSession(const BHttpSession&) noexcept = default;
614
615
616BHttpSession& BHttpSession::operator=(const BHttpSession&) noexcept = default;
617
618
619BHttpResult
620BHttpSession::Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
621{
622	return fImpl->Execute(std::move(request), std::move(target), observer);
623}
624
625
626void
627BHttpSession::Cancel(int32 identifier)
628{
629	fImpl->Cancel(identifier);
630}
631
632
633void
634BHttpSession::Cancel(const BHttpResult& request)
635{
636	fImpl->Cancel(request.Identity());
637}
638
639
640void
641BHttpSession::SetMaxConnectionsPerHost(size_t maxConnections)
642{
643	fImpl->SetMaxConnectionsPerHost(maxConnections);
644}
645
646
647void
648BHttpSession::SetMaxHosts(size_t maxConnections)
649{
650	fImpl->SetMaxHosts(maxConnections);
651}
652
653
654// #pragma mark -- BHttpSession::Request (helpers)
655BHttpSession::Request::Request(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
656	:
657	fRequest(std::move(request)),
658	fObserver(observer)
659{
660	auto identifier = get_netservices_request_identifier();
661
662	// interpret the remaining redirects
663	fRemainingRedirects = fRequest.MaxRedirections();
664
665	// create shared data
666	fResult = std::make_shared<HttpResultPrivate>(identifier);
667
668	// check if there is a target
669	if (target.HasValue())
670		fResult->bodyTarget = std::move(target);
671
672	// inform the parser when we do a HEAD request, so not to expect content
673	if (fRequest.Method() == BHttpMethod::Head)
674		fParser.SetNoContent();
675}
676
677
678BHttpSession::Request::Request(Request& original, const BHttpSession::Redirect& redirect)
679	:
680	fRequest(std::move(original.fRequest)),
681	fObserver(original.fObserver),
682	fResult(original.fResult)
683{
684	// update the original request with the new location
685	fRequest.SetUrl(redirect.url);
686
687	if (redirect.redirectToGet
688		&& (fRequest.Method() != BHttpMethod::Head && fRequest.Method() != BHttpMethod::Get)) {
689		fRequest.SetMethod(BHttpMethod::Get);
690		fRequest.ClearRequestBody();
691	}
692
693	fRemainingRedirects = original.fRemainingRedirects--;
694
695	// inform the parser when we do a HEAD request, so not to expect content
696	if (fRequest.Method() == BHttpMethod::Head)
697		fParser.SetNoContent();
698}
699
700
701/*!
702	\brief Helper that sets the error in the result to \a e and notifies the listeners.
703*/
704void
705BHttpSession::Request::SetError(std::exception_ptr e)
706{
707	fResult->SetError(e);
708	SendMessage(UrlEvent::DebugMessage, [&e](BMessage& msg) {
709		msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugError);
710		try {
711			std::rethrow_exception(e);
712		} catch (BError& error) {
713			msg.AddString(UrlEventData::DebugMessage, error.DebugMessage());
714		} catch (std::exception& error) {
715			msg.AddString(UrlEventData::DebugMessage, error.what());
716		} catch (...) {
717			msg.AddString(UrlEventData::DebugMessage, "Unknown exception");
718		}
719	});
720	SendMessage(UrlEvent::RequestCompleted,
721		[](BMessage& msg) { msg.AddBool(UrlEventData::Success, false); });
722}
723
724
725std::pair<BString, int>
726BHttpSession::Request::GetHost() const
727{
728	return {fRequest.Url().Host(), fRequest.Url().Port()};
729}
730
731
732void
733BHttpSession::Request::SetCounter(int32* counter) noexcept
734{
735	fConnectionCounter = std::unique_ptr<int32, CounterDeleter>(counter);
736}
737
738
739/*!
740	\brief Resolve the hostname for a request
741*/
742void
743BHttpSession::Request::ResolveHostName()
744{
745	int port;
746	if (fRequest.Url().HasPort())
747		port = fRequest.Url().Port();
748	else if (fRequest.Url().Protocol() == "https")
749		port = 443;
750	else
751		port = 80;
752
753	// TODO: proxy
754	if (auto status = fRemoteAddress.SetTo(fRequest.Url().Host(), port); status != B_OK) {
755		throw BNetworkRequestError(
756			"BNetworkAddress::SetTo()", BNetworkRequestError::HostnameError, status);
757	}
758
759	SendMessage(UrlEvent::HostNameResolved,
760		[this](BMessage& msg) { msg.AddString(UrlEventData::HostName, fRequest.Url().Host()); });
761}
762
763
764/*!
765	\brief Open the connection and make the socket non-blocking after opening it
766*/
767void
768BHttpSession::Request::OpenConnection()
769{
770	// Set up the socket
771	if (fRequest.Url().Protocol() == "https") {
772		// To do: secure socket with callbacks to check certificates
773		fSocket = std::make_unique<BSecureSocket>();
774	} else {
775		fSocket = std::make_unique<BSocket>();
776	}
777
778	// Set timeout
779	fSocket->SetTimeout(fRequest.Timeout());
780
781	// Open connection
782	if (auto status = fSocket->Connect(fRemoteAddress); status != B_OK) {
783		// TODO: inform listeners that the connection failed
784		throw BNetworkRequestError(
785			"BSocket::Connect()", BNetworkRequestError::NetworkError, status);
786	}
787
788	// Make the rest of the interaction non-blocking
789	auto flags = fcntl(fSocket->Socket(), F_GETFL, 0);
790	if (flags == -1)
791		throw BRuntimeError("fcntl()", "Error getting socket flags");
792	if (fcntl(fSocket->Socket(), F_SETFL, flags | O_NONBLOCK) != 0)
793		throw BRuntimeError("fcntl()", "Error setting non-blocking flag on socket");
794
795	SendMessage(UrlEvent::ConnectionOpened);
796
797	fRequestStatus = Connected;
798}
799
800
801/*!
802	\brief Transfer data from the request to the socket.
803
804	\returns \c true if the request is complete, or false if there is more.
805*/
806void
807BHttpSession::Request::TransferRequest()
808{
809	// Assert that we are in the right state
810	if (fRequestStatus != Connected)
811		throw BRuntimeError(
812			__PRETTY_FUNCTION__, "Write request for object that is not in the Connected state");
813
814	if (!fSerializer.IsInitialized())
815		fSerializer.SetTo(fBuffer, fRequest);
816
817	auto currentBytesWritten = fSerializer.Serialize(fBuffer, fSocket.get());
818
819	if (currentBytesWritten > 0) {
820		SendMessage(UrlEvent::UploadProgress, [this](BMessage& msg) {
821			msg.AddInt64(UrlEventData::NumBytes, fSerializer.BodyBytesTransferred());
822			if (auto totalSize = fSerializer.BodyBytesTotal())
823				msg.AddInt64(UrlEventData::TotalBytes, totalSize.value());
824		});
825	}
826
827	if (fSerializer.Complete())
828		fRequestStatus = RequestSent;
829}
830
831
832/*!
833	\brief Transfer data from the socket and parse the result.
834
835	\returns \c true if the request is complete, or false if there is more.
836*/
837bool
838BHttpSession::Request::ReceiveResult()
839{
840	// First: stream data from the socket
841	auto bytesRead = fBuffer.ReadFrom(fSocket.get());
842
843	if (bytesRead == B_WOULD_BLOCK || bytesRead == B_INTERRUPTED)
844		return false;
845
846	auto readEnd = bytesRead == 0;
847
848	// Parse the content in the buffer
849	switch (fParser.State()) {
850		case HttpInputStreamState::StatusLine:
851		{
852			if (fBuffer.RemainingBytes() == static_cast<size_t>(bytesRead)) {
853				// In the initial run, the bytes in the buffer will match the bytes read to indicate
854				// the response has started.
855				SendMessage(UrlEvent::ResponseStarted);
856			}
857
858			if (fParser.ParseStatus(fBuffer, fStatus)) {
859				// the status headers are now received, decide what to do next
860
861				// Determine if we can handle redirects; else notify of receiving status
862				if (fRemainingRedirects > 0) {
863					switch (fStatus.StatusCode()) {
864						case BHttpStatusCode::MovedPermanently:
865						case BHttpStatusCode::TemporaryRedirect:
866						case BHttpStatusCode::PermanentRedirect:
867							// These redirects require the request body to be sent again. It this is
868							// possible, BHttpRequest::RewindBody() will return true in which case
869							// we can handle the redirect.
870							if (!fRequest.RewindBody())
871								break;
872							[[fallthrough]];
873						case BHttpStatusCode::Found:
874						case BHttpStatusCode::SeeOther:
875							// These redirects redirect to GET, so we don't care if we can rewind
876							// the body; in this case redirect
877							fMightRedirect = true;
878							break;
879						default:
880							break;
881					}
882				}
883
884				if ((fStatus.StatusClass() == BHttpStatusClass::ClientError
885						|| fStatus.StatusClass() == BHttpStatusClass::ServerError)
886					&& fRequest.StopOnError()) {
887					fRequestStatus = ContentReceived;
888					fResult->SetStatus(std::move(fStatus));
889					fResult->SetFields(BHttpFields());
890					fResult->SetBody();
891					SendMessage(UrlEvent::RequestCompleted,
892						[](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
893					return true;
894				}
895
896				if (!fMightRedirect) {
897					// we are not redirecting and there is no error, so inform listeners
898					SendMessage(UrlEvent::HttpStatus, [this](BMessage& msg) {
899						msg.AddInt16(UrlEventData::HttpStatusCode, fStatus.code);
900					});
901					fResult->SetStatus(BHttpStatus{fStatus.code, std::move(fStatus.text)});
902				}
903			} else {
904				// We do not have enough data for the status line yet
905				if (readEnd) {
906					throw BNetworkRequestError(__PRETTY_FUNCTION__,
907						BNetworkRequestError::ProtocolError,
908						"Response did not include a complete status line");
909				}
910				return false;
911			}
912			[[fallthrough]];
913		}
914		case HttpInputStreamState::Fields:
915		{
916			if (!fParser.ParseFields(fBuffer, fFields)) {
917				// there may be more headers to receive, throw an error if there will be no more
918				if (readEnd) {
919					throw BNetworkRequestError(__PRETTY_FUNCTION__,
920						BNetworkRequestError::ProtocolError,
921						"Response did not include a complete header section");
922				}
923				break;
924			}
925
926			// The headers have been received, now set up the rest of the response handling
927
928			// Handle redirects
929			if (fMightRedirect) {
930				auto redirectToGet = false;
931				switch (fStatus.StatusCode()) {
932					case BHttpStatusCode::Found:
933					case BHttpStatusCode::SeeOther:
934						// 302 and 303 redirections convert all requests to GET request, except for
935						// HEAD
936						redirectToGet = true;
937						[[fallthrough]];
938					case BHttpStatusCode::MovedPermanently:
939					case BHttpStatusCode::TemporaryRedirect:
940					case BHttpStatusCode::PermanentRedirect:
941					{
942						auto locationField = fFields.FindField("Location");
943						if (locationField == fFields.end()) {
944							throw BNetworkRequestError(__PRETTY_FUNCTION__,
945								BNetworkRequestError::ProtocolError,
946								"Redirect; the Location field must be present and cannot be found");
947						}
948						auto locationString = BString(
949							(*locationField).Value().data(), (*locationField).Value().size());
950						auto redirect = BHttpSession::Redirect{
951							BUrl(fRequest.Url(), locationString), redirectToGet};
952						if (!redirect.url.IsValid()) {
953							throw BNetworkRequestError(__PRETTY_FUNCTION__,
954								BNetworkRequestError::ProtocolError,
955								"Redirect; invalid URL in the Location field");
956						}
957
958						// Notify of redirect
959						SendMessage(UrlEvent::HttpRedirect, [&locationString](BMessage& msg) {
960							msg.AddString(UrlEventData::HttpRedirectUrl, locationString);
961						});
962						throw redirect;
963					}
964					default:
965						// ignore other status codes and continue regular processing
966						SendMessage(UrlEvent::HttpStatus, [this](BMessage& msg) {
967							msg.AddInt16(UrlEventData::HttpStatusCode, fStatus.code);
968						});
969						fResult->SetStatus(BHttpStatus{fStatus.code, std::move(fStatus.text)});
970						break;
971				}
972			}
973
974			// TODO: Parse received cookies
975
976			// Move headers to the result and inform listener
977			fResult->SetFields(std::move(fFields));
978			SendMessage(UrlEvent::HttpFields);
979
980			if (!fParser.HasContent()) {
981				// Any requests with not content are finished
982				fResult->SetBody();
983				SendMessage(UrlEvent::RequestCompleted,
984					[](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
985				fRequestStatus = ContentReceived;
986				return true;
987			}
988			[[fallthrough]];
989		}
990		case HttpInputStreamState::Body:
991		{
992			size_t bytesWrittenToBody;
993				// The bytesWrittenToBody may differ from the bytes parsed from the buffer when
994				// there is compression on the incoming stream.
995			bytesRead = fParser.ParseBody(
996				fBuffer,
997				[this, &bytesWrittenToBody](const std::byte* buffer, size_t size) {
998					bytesWrittenToBody = fResult->WriteToBody(buffer, size);
999					return bytesWrittenToBody;
1000				},
1001				readEnd);
1002
1003			SendMessage(UrlEvent::DownloadProgress, [this, bytesRead](BMessage& msg) {
1004				msg.AddInt64(UrlEventData::NumBytes, bytesRead);
1005				if (fParser.BodyBytesTotal())
1006					msg.AddInt64(UrlEventData::TotalBytes, fParser.BodyBytesTotal().value());
1007			});
1008
1009			if (bytesWrittenToBody > 0) {
1010				SendMessage(UrlEvent::BytesWritten, [bytesWrittenToBody](BMessage& msg) {
1011					msg.AddInt64(UrlEventData::NumBytes, bytesWrittenToBody);
1012				});
1013			}
1014
1015			if (fParser.Complete()) {
1016				fResult->SetBody();
1017				SendMessage(UrlEvent::RequestCompleted,
1018					[](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
1019				fRequestStatus = ContentReceived;
1020				return true;
1021			} else if (readEnd) {
1022				// the parsing of the body is not complete but we are at the end of the data
1023				throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::ProtocolError,
1024					"Unexpected end of data: more data was expected");
1025			}
1026
1027			break;
1028		}
1029		default:
1030			throw BRuntimeError(__PRETTY_FUNCTION__, "Not reachable");
1031	}
1032
1033	// There is more to receive
1034	return false;
1035}
1036
1037
1038/*!
1039	\brief Disconnect the socket. Does not validate if it actually succeeded.
1040*/
1041void
1042BHttpSession::Request::Disconnect() noexcept
1043{
1044	fSocket->Disconnect();
1045}
1046
1047
1048/*!
1049	\brief Send a message to the observer, if one is present
1050
1051	\param what The code of the message to be sent
1052	\param dataFunc Optional function that adds additional data to the message.
1053*/
1054void
1055BHttpSession::Request::SendMessage(uint32 what, std::function<void(BMessage&)> dataFunc) const
1056{
1057	if (fObserver.IsValid()) {
1058		BMessage msg(what);
1059		msg.AddInt32(UrlEventData::Id, fResult->id);
1060		if (dataFunc)
1061			dataFunc(msg);
1062		fObserver.SendMessage(&msg);
1063	}
1064}
1065
1066
1067// #pragma mark -- Message constants
1068
1069
1070namespace BPrivate::Network::UrlEventData {
1071const char* HttpStatusCode = "url:httpstatuscode";
1072const char* SSLCertificate = "url:sslcertificate";
1073const char* SSLMessage = "url:sslmessage";
1074const char* HttpRedirectUrl = "url:httpredirecturl";
1075} // namespace BPrivate::Network::UrlEventData
1076