1//===-- ThreadedCommunication.cpp -----------------------------------------===// 2// 3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4// See https://llvm.org/LICENSE.txt for license information. 5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6// 7//===----------------------------------------------------------------------===// 8 9#include "lldb/Core/ThreadedCommunication.h" 10 11#include "lldb/Host/ThreadLauncher.h" 12#include "lldb/Utility/Connection.h" 13#include "lldb/Utility/ConstString.h" 14#include "lldb/Utility/Event.h" 15#include "lldb/Utility/LLDBLog.h" 16#include "lldb/Utility/Listener.h" 17#include "lldb/Utility/Log.h" 18#include "lldb/Utility/Status.h" 19 20#include "llvm/Support/Compiler.h" 21 22#include <algorithm> 23#include <chrono> 24#include <cstring> 25#include <memory> 26#include <shared_mutex> 27 28#include <cerrno> 29#include <cinttypes> 30#include <cstdio> 31 32using namespace lldb; 33using namespace lldb_private; 34 35ConstString &ThreadedCommunication::GetStaticBroadcasterClass() { 36 static ConstString class_name("lldb.communication"); 37 return class_name; 38} 39 40ThreadedCommunication::ThreadedCommunication(const char *name) 41 : Communication(), Broadcaster(nullptr, name), m_read_thread_enabled(false), 42 m_read_thread_did_exit(false), m_bytes(), m_bytes_mutex(), 43 m_synchronize_mutex(), m_callback(nullptr), m_callback_baton(nullptr) { 44 LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication), 45 "{0} ThreadedCommunication::ThreadedCommunication (name = {1})", 46 this, name); 47 48 SetEventName(eBroadcastBitDisconnected, "disconnected"); 49 SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes"); 50 SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit"); 51 SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit"); 52 SetEventName(eBroadcastBitPacketAvailable, "packet available"); 53 SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input"); 54 55 CheckInWithManager(); 56} 57 58ThreadedCommunication::~ThreadedCommunication() { 59 LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication), 60 "{0} ThreadedCommunication::~ThreadedCommunication (name = {1})", 61 this, GetBroadcasterName()); 62} 63 64void ThreadedCommunication::Clear() { 65 SetReadThreadBytesReceivedCallback(nullptr, nullptr); 66 StopReadThread(nullptr); 67 Communication::Clear(); 68} 69 70ConnectionStatus ThreadedCommunication::Disconnect(Status *error_ptr) { 71 assert((!m_read_thread_enabled || m_read_thread_did_exit) && 72 "Disconnecting while the read thread is running is racy!"); 73 return Communication::Disconnect(error_ptr); 74} 75 76size_t ThreadedCommunication::Read(void *dst, size_t dst_len, 77 const Timeout<std::micro> &timeout, 78 ConnectionStatus &status, 79 Status *error_ptr) { 80 Log *log = GetLog(LLDBLog::Communication); 81 LLDB_LOG( 82 log, 83 "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}", 84 this, dst, dst_len, timeout, m_connection_sp.get()); 85 86 if (m_read_thread_enabled) { 87 // We have a dedicated read thread that is getting data for us 88 size_t cached_bytes = GetCachedBytes(dst, dst_len); 89 if (cached_bytes > 0) { 90 status = eConnectionStatusSuccess; 91 return cached_bytes; 92 } 93 if (timeout && timeout->count() == 0) { 94 if (error_ptr) 95 error_ptr->SetErrorString("Timed out."); 96 status = eConnectionStatusTimedOut; 97 return 0; 98 } 99 100 if (!m_connection_sp) { 101 if (error_ptr) 102 error_ptr->SetErrorString("Invalid connection."); 103 status = eConnectionStatusNoConnection; 104 return 0; 105 } 106 107 // No data yet, we have to start listening. 108 ListenerSP listener_sp( 109 Listener::MakeListener("ThreadedCommunication::Read")); 110 listener_sp->StartListeningForEvents( 111 this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit); 112 113 // Re-check for data, as it might have arrived while we were setting up our 114 // listener. 115 cached_bytes = GetCachedBytes(dst, dst_len); 116 if (cached_bytes > 0) { 117 status = eConnectionStatusSuccess; 118 return cached_bytes; 119 } 120 121 EventSP event_sp; 122 // Explicitly check for the thread exit, for the same reason. 123 if (m_read_thread_did_exit) { 124 // We've missed the event, lets just conjure one up. 125 event_sp = std::make_shared<Event>(eBroadcastBitReadThreadDidExit); 126 } else { 127 if (!listener_sp->GetEvent(event_sp, timeout)) { 128 if (error_ptr) 129 error_ptr->SetErrorString("Timed out."); 130 status = eConnectionStatusTimedOut; 131 return 0; 132 } 133 } 134 const uint32_t event_type = event_sp->GetType(); 135 if (event_type & eBroadcastBitReadThreadGotBytes) { 136 return GetCachedBytes(dst, dst_len); 137 } 138 139 if (event_type & eBroadcastBitReadThreadDidExit) { 140 // If the thread exited of its own accord, it either means it 141 // hit an end-of-file condition or an error. 142 status = m_pass_status; 143 if (error_ptr) 144 *error_ptr = std::move(m_pass_error); 145 146 if (GetCloseOnEOF()) 147 Disconnect(nullptr); 148 return 0; 149 } 150 llvm_unreachable("Got unexpected event type!"); 151 } 152 153 // We aren't using a read thread, just read the data synchronously in this 154 // thread. 155 return Communication::Read(dst, dst_len, timeout, status, error_ptr); 156} 157 158bool ThreadedCommunication::StartReadThread(Status *error_ptr) { 159 std::lock_guard<std::mutex> lock(m_read_thread_mutex); 160 161 if (error_ptr) 162 error_ptr->Clear(); 163 164 if (m_read_thread.IsJoinable()) 165 return true; 166 167 LLDB_LOG(GetLog(LLDBLog::Communication), 168 "{0} ThreadedCommunication::StartReadThread ()", this); 169 170 const std::string thread_name = 171 llvm::formatv("<lldb.comm.{0}>", GetBroadcasterName()); 172 173 m_read_thread_enabled = true; 174 m_read_thread_did_exit = false; 175 auto maybe_thread = ThreadLauncher::LaunchThread( 176 thread_name, [this] { return ReadThread(); }); 177 if (maybe_thread) { 178 m_read_thread = *maybe_thread; 179 } else { 180 if (error_ptr) 181 *error_ptr = Status(maybe_thread.takeError()); 182 else { 183 LLDB_LOG_ERROR(GetLog(LLDBLog::Host), maybe_thread.takeError(), 184 "failed to launch host thread: {0}"); 185 } 186 } 187 188 if (!m_read_thread.IsJoinable()) 189 m_read_thread_enabled = false; 190 191 return m_read_thread_enabled; 192} 193 194bool ThreadedCommunication::StopReadThread(Status *error_ptr) { 195 std::lock_guard<std::mutex> lock(m_read_thread_mutex); 196 197 if (!m_read_thread.IsJoinable()) 198 return true; 199 200 LLDB_LOG(GetLog(LLDBLog::Communication), 201 "{0} ThreadedCommunication::StopReadThread ()", this); 202 203 m_read_thread_enabled = false; 204 205 BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr); 206 207 Status error = m_read_thread.Join(nullptr); 208 return error.Success(); 209} 210 211bool ThreadedCommunication::JoinReadThread(Status *error_ptr) { 212 std::lock_guard<std::mutex> lock(m_read_thread_mutex); 213 214 if (!m_read_thread.IsJoinable()) 215 return true; 216 217 Status error = m_read_thread.Join(nullptr); 218 return error.Success(); 219} 220 221size_t ThreadedCommunication::GetCachedBytes(void *dst, size_t dst_len) { 222 std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); 223 if (!m_bytes.empty()) { 224 // If DST is nullptr and we have a thread, then return the number of bytes 225 // that are available so the caller can call again 226 if (dst == nullptr) 227 return m_bytes.size(); 228 229 const size_t len = std::min<size_t>(dst_len, m_bytes.size()); 230 231 ::memcpy(dst, m_bytes.c_str(), len); 232 m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len); 233 234 return len; 235 } 236 return 0; 237} 238 239void ThreadedCommunication::AppendBytesToCache(const uint8_t *bytes, size_t len, 240 bool broadcast, 241 ConnectionStatus status) { 242 LLDB_LOG(GetLog(LLDBLog::Communication), 243 "{0} ThreadedCommunication::AppendBytesToCache (src = {1}, src_len " 244 "= {2}, " 245 "broadcast = {3})", 246 this, bytes, (uint64_t)len, broadcast); 247 if ((bytes == nullptr || len == 0) && 248 (status != lldb::eConnectionStatusEndOfFile)) 249 return; 250 if (m_callback) { 251 // If the user registered a callback, then call it and do not broadcast 252 m_callback(m_callback_baton, bytes, len); 253 } else if (bytes != nullptr && len > 0) { 254 std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); 255 m_bytes.append((const char *)bytes, len); 256 if (broadcast) 257 BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes); 258 } 259} 260 261bool ThreadedCommunication::ReadThreadIsRunning() { 262 return m_read_thread_enabled; 263} 264 265lldb::thread_result_t ThreadedCommunication::ReadThread() { 266 Log *log = GetLog(LLDBLog::Communication); 267 268 LLDB_LOG(log, "Communication({0}) thread starting...", this); 269 270 uint8_t buf[1024]; 271 272 Status error; 273 ConnectionStatus status = eConnectionStatusSuccess; 274 bool done = false; 275 bool disconnect = false; 276 while (!done && m_read_thread_enabled) { 277 size_t bytes_read = ReadFromConnection( 278 buf, sizeof(buf), std::chrono::seconds(5), status, &error); 279 if (bytes_read > 0 || status == eConnectionStatusEndOfFile) 280 AppendBytesToCache(buf, bytes_read, true, status); 281 282 switch (status) { 283 case eConnectionStatusSuccess: 284 break; 285 286 case eConnectionStatusEndOfFile: 287 done = true; 288 disconnect = GetCloseOnEOF(); 289 break; 290 case eConnectionStatusError: // Check GetError() for details 291 if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) { 292 // EIO on a pipe is usually caused by remote shutdown 293 disconnect = GetCloseOnEOF(); 294 done = true; 295 } 296 if (error.Fail()) 297 LLDB_LOG(log, "error: {0}, status = {1}", error, 298 ThreadedCommunication::ConnectionStatusAsString(status)); 299 break; 300 case eConnectionStatusInterrupted: // Synchronization signal from 301 // SynchronizeWithReadThread() 302 // The connection returns eConnectionStatusInterrupted only when there is 303 // no input pending to be read, so we can signal that. 304 BroadcastEvent(eBroadcastBitNoMorePendingInput); 305 break; 306 case eConnectionStatusNoConnection: // No connection 307 case eConnectionStatusLostConnection: // Lost connection while connected to 308 // a valid connection 309 done = true; 310 [[fallthrough]]; 311 case eConnectionStatusTimedOut: // Request timed out 312 if (error.Fail()) 313 LLDB_LOG(log, "error: {0}, status = {1}", error, 314 ThreadedCommunication::ConnectionStatusAsString(status)); 315 break; 316 } 317 } 318 m_pass_status = status; 319 m_pass_error = std::move(error); 320 LLDB_LOG(log, "Communication({0}) thread exiting...", this); 321 322 // Start shutting down. We need to do this in a very specific order to ensure 323 // we don't race with threads wanting to read/synchronize with us. 324 325 // First, we signal our intent to exit. This ensures no new thread start 326 // waiting on events from us. 327 m_read_thread_did_exit = true; 328 329 // Unblock any existing thread waiting for the synchronization signal. 330 BroadcastEvent(eBroadcastBitNoMorePendingInput); 331 332 { 333 // Wait for the synchronization thread to finish... 334 std::lock_guard<std::mutex> guard(m_synchronize_mutex); 335 // ... and disconnect. 336 if (disconnect) 337 Disconnect(); 338 } 339 340 // Finally, unblock any readers waiting for us to exit. 341 BroadcastEvent(eBroadcastBitReadThreadDidExit); 342 return {}; 343} 344 345void ThreadedCommunication::SetReadThreadBytesReceivedCallback( 346 ReadThreadBytesReceived callback, void *callback_baton) { 347 m_callback = callback; 348 m_callback_baton = callback_baton; 349} 350 351void ThreadedCommunication::SynchronizeWithReadThread() { 352 // Only one thread can do the synchronization dance at a time. 353 std::lock_guard<std::mutex> guard(m_synchronize_mutex); 354 355 // First start listening for the synchronization event. 356 ListenerSP listener_sp(Listener::MakeListener( 357 "ThreadedCommunication::SyncronizeWithReadThread")); 358 listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput); 359 360 // If the thread is not running, there is no point in synchronizing. 361 if (!m_read_thread_enabled || m_read_thread_did_exit) 362 return; 363 364 // Notify the read thread. 365 m_connection_sp->InterruptRead(); 366 367 // Wait for the synchronization event. 368 EventSP event_sp; 369 listener_sp->GetEvent(event_sp, std::nullopt); 370} 371 372void ThreadedCommunication::SetConnection( 373 std::unique_ptr<Connection> connection) { 374 StopReadThread(nullptr); 375 Communication::SetConnection(std::move(connection)); 376} 377