1// Copyright 2017 The Fuchsia Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include <blobfs/blobfs.h> 6#include <blobfs/writeback.h> 7 8namespace blobfs { 9 10void WriteTxn::Enqueue(zx_handle_t vmo, uint64_t relative_block, uint64_t absolute_block, 11 uint64_t nblocks) { 12 ZX_DEBUG_ASSERT(!IsReady()); 13 14 for (size_t i = 0; i < requests_.size(); i++) { 15 if (requests_[i].vmo != vmo) { 16 continue; 17 } 18 19 if (requests_[i].vmo_offset == relative_block) { 20 // Take the longer of the operations (if operating on the same blocks). 21 requests_[i].length = (requests_[i].length > nblocks) ? requests_[i].length : nblocks; 22 return; 23 } else if ((requests_[i].vmo_offset + requests_[i].length == relative_block) && 24 (requests_[i].dev_offset + requests_[i].length == absolute_block)) { 25 // Combine with the previous request, if immediately following. 26 requests_[i].length += nblocks; 27 return; 28 } 29 } 30 31 write_request_t request; 32 request.vmo = vmo; 33 request.vmo_offset = relative_block; 34 request.dev_offset = absolute_block; 35 request.length = nblocks; 36 requests_.push_back(fbl::move(request)); 37} 38 39zx_status_t WriteTxn::Flush() { 40 ZX_ASSERT(IsReady()); 41 fs::Ticker ticker(bs_->CollectingMetrics()); 42 43 // Update all the outgoing transactions to be in disk blocks 44 block_fifo_request_t blk_reqs[requests_.size()]; 45 const uint32_t kDiskBlocksPerBlobfsBlock = kBlobfsBlockSize / bs_->DeviceBlockSize(); 46 for (size_t i = 0; i < requests_.size(); i++) { 47 blk_reqs[i].group = bs_->BlockGroupID(); 48 blk_reqs[i].vmoid = vmoid_; 49 blk_reqs[i].opcode = BLOCKIO_WRITE; 50 blk_reqs[i].vmo_offset = requests_[i].vmo_offset * kDiskBlocksPerBlobfsBlock; 51 blk_reqs[i].dev_offset = requests_[i].dev_offset * kDiskBlocksPerBlobfsBlock; 52 uint64_t length = requests_[i].length * kDiskBlocksPerBlobfsBlock; 53 // TODO(ZX-2253): Requests this long, although unlikely, should be 54 // handled more gracefully. 55 ZX_ASSERT_MSG(length < UINT32_MAX, "Request size too large"); 56 blk_reqs[i].length = static_cast<uint32_t>(length); 57 } 58 59 // Actually send the operations to the underlying block device. 60 zx_status_t status = bs_->Transaction(blk_reqs, requests_.size()); 61 62 if (bs_->CollectingMetrics()) { 63 uint64_t sum = 0; 64 for (size_t i = 0; i < requests_.size(); i++) { 65 sum += blk_reqs[i].length * kBlobfsBlockSize; 66 } 67 bs_->UpdateWritebackMetrics(sum, ticker.End()); 68 } 69 70 requests_.reset(); 71 vmoid_ = VMOID_INVALID; 72 return status; 73} 74 75size_t WriteTxn::BlkStart() const { 76 ZX_DEBUG_ASSERT(IsReady()); 77 ZX_DEBUG_ASSERT(requests_.size() > 0); 78 return requests_[0].vmo_offset; 79} 80 81size_t WriteTxn::BlkCount() const { 82 size_t blocks_needed = 0; 83 for (size_t i = 0; i < requests_.size(); i++) { 84 blocks_needed += requests_[i].length; 85 } 86 return blocks_needed; 87} 88 89WritebackWork::WritebackWork(Blobfs* bs, fbl::RefPtr<VnodeBlob> vn) : 90 WriteTxn(bs), closure_(nullptr), sync_(false), vn_(fbl::move(vn)) {} 91 92void WritebackWork::Reset() { 93 ZX_DEBUG_ASSERT(Requests().is_empty()); 94 closure_ = nullptr; 95 vn_ = nullptr; 96} 97 98void WritebackWork::SetSyncComplete() { 99 ZX_ASSERT(vn_); 100 sync_ = true; 101} 102 103// Returns the number of blocks of the writeback buffer that have been consumed 104zx_status_t WritebackWork::Complete() { 105 zx_status_t status = Flush(); 106 107 //TODO(planders): On flush failure, convert fs to read-only 108 if (status == ZX_OK && sync_) { 109 vn_->CompleteSync(); 110 } 111 112 if (closure_) { 113 closure_(status); 114 } 115 116 Reset(); 117 return ZX_OK; 118} 119 120void WritebackWork::SetClosure(SyncCallback closure) { 121 ZX_DEBUG_ASSERT(!closure_); 122 closure_ = fbl::move(closure); 123} 124 125zx_status_t WritebackBuffer::Create(Blobfs* bs, fbl::unique_ptr<fzl::MappedVmo> buffer, 126 fbl::unique_ptr<WritebackBuffer>* out) { 127 fbl::unique_ptr<WritebackBuffer> wb(new WritebackBuffer(bs, fbl::move(buffer))); 128 if (wb->buffer_->GetSize() % kBlobfsBlockSize != 0) { 129 return ZX_ERR_INVALID_ARGS; 130 } else if (cnd_init(&wb->consumer_cvar_) != thrd_success) { 131 return ZX_ERR_NO_RESOURCES; 132 } else if (cnd_init(&wb->producer_cvar_) != thrd_success) { 133 return ZX_ERR_NO_RESOURCES; 134 } else if (thrd_create_with_name(&wb->writeback_thrd_, 135 WritebackBuffer::WritebackThread, wb.get(), 136 "blobfs-writeback") != thrd_success) { 137 return ZX_ERR_NO_RESOURCES; 138 } 139 zx_status_t status = wb->bs_->AttachVmo(wb->buffer_->GetVmo(), &wb->buffer_vmoid_); 140 if (status != ZX_OK) { 141 return status; 142 } 143 144 *out = fbl::move(wb); 145 return ZX_OK; 146} 147 148zx_status_t WritebackBuffer::GenerateWork(fbl::unique_ptr<WritebackWork>* out, 149 fbl::RefPtr<VnodeBlob> vnode) { 150 fbl::AllocChecker ac; 151 fbl::unique_ptr<WritebackWork> wb(new (&ac) WritebackWork(bs_, fbl::move(vnode))); 152 if (!ac.check()) { 153 return ZX_ERR_NO_MEMORY; 154 } 155 156 *out = fbl::move(wb); 157 return ZX_OK; 158} 159 160WritebackBuffer::WritebackBuffer(Blobfs* bs, fbl::unique_ptr<fzl::MappedVmo> buffer) : 161 bs_(bs), unmounting_(false), buffer_(fbl::move(buffer)), 162 cap_(buffer_->GetSize() / kBlobfsBlockSize) {} 163 164WritebackBuffer::~WritebackBuffer() { 165 // Block until the background thread completes itself. 166 { 167 fbl::AutoLock lock(&writeback_lock_); 168 unmounting_ = true; 169 cnd_signal(&consumer_cvar_); 170 } 171 int r; 172 thrd_join(writeback_thrd_, &r); 173 174 if (buffer_vmoid_ != VMOID_INVALID) { 175 block_fifo_request_t request; 176 request.group = bs_->BlockGroupID(); 177 request.vmoid = buffer_vmoid_; 178 request.opcode = BLOCKIO_CLOSE_VMO; 179 bs_->Transaction(&request, 1); 180 } 181} 182 183zx_status_t WritebackBuffer::EnsureSpaceLocked(size_t blocks) { 184 if (blocks > cap_) { 185 // There will never be enough room in the writeback buffer for this request. 186 return ZX_ERR_NO_RESOURCES; 187 } 188 while (len_ + blocks > cap_) { 189 // Not enough room to write back work, yet. Wait until room is available. 190 Waiter w; 191 producer_queue_.push(&w); 192 193 do { 194 cnd_wait(&producer_cvar_, writeback_lock_.GetInternal()); 195 } while ((&producer_queue_.front() != &w) && // We are first in line to enqueue... 196 (len_ + blocks > cap_)); // ... and there is enough space for us. 197 198 producer_queue_.pop(); 199 } 200 return ZX_OK; 201} 202 203void WritebackBuffer::CopyToBufferLocked(WriteTxn* txn) { 204 auto& reqs = txn->Requests(); 205 ZX_DEBUG_ASSERT(!txn->IsReady()); 206 207 // Write back to the buffer 208 for (size_t i = 0; i < reqs.size(); i++) { 209 size_t vmo_offset = reqs[i].vmo_offset; 210 size_t dev_offset = reqs[i].dev_offset; 211 const size_t vmo_len = reqs[i].length; 212 ZX_DEBUG_ASSERT(vmo_len > 0); 213 size_t wb_offset = (start_ + len_) % cap_; 214 size_t wb_len = (wb_offset + vmo_len > cap_) ? cap_ - wb_offset : vmo_len; 215 ZX_DEBUG_ASSERT(wb_len <= vmo_len); 216 ZX_DEBUG_ASSERT(wb_offset < cap_); 217 zx_handle_t vmo = reqs[i].vmo; 218 219 void* ptr = (void*)((uintptr_t)(buffer_->GetData()) + 220 (uintptr_t)(wb_offset * kBlobfsBlockSize)); 221 zx_status_t status; 222 ZX_DEBUG_ASSERT((start_ <= wb_offset) ? 223 (start_ < wb_offset + wb_len) : 224 (wb_offset + wb_len <= start_)); // Wraparound 225 ZX_ASSERT_MSG((status = zx_vmo_read(vmo, ptr, vmo_offset * kBlobfsBlockSize, 226 wb_len * kBlobfsBlockSize)) == ZX_OK, "VMO Read Fail: %d", status); 227 228 len_ += wb_len; 229 230 // Update the write_request to transfer from the writeback buffer out to disk, rather than 231 // the supplied VMO 232 233 reqs[i].vmo_offset = wb_offset; 234 reqs[i].length = wb_len; 235 236 if (wb_len != vmo_len) { 237 // We wrapped around; write what remains from this request 238 vmo_offset += wb_len; 239 dev_offset += wb_len; 240 wb_len = vmo_len - wb_len; 241 ptr = buffer_->GetData(); 242 ZX_DEBUG_ASSERT((start_ == 0) ? (start_ < wb_len) : (wb_len <= start_)); // Wraparound 243 ZX_ASSERT(zx_vmo_read(vmo, ptr, vmo_offset * kBlobfsBlockSize, 244 wb_len * kBlobfsBlockSize) == ZX_OK); 245 246 len_ += wb_len; 247 248 // Shift down all following write requests 249 static_assert(fbl::is_pod<write_request_t>::value, "Can't memmove non-POD"); 250 // Insert the "new" request, which is the latter half of the last request 251 write_request_t request; 252 request.vmo = reqs[i].vmo; 253 request.vmo_offset = 0; 254 request.dev_offset = dev_offset; 255 request.length = wb_len; 256 i++; 257 reqs.insert(i, request); 258 } 259 } 260 261 txn->SetReady(buffer_vmoid_); 262} 263 264void WritebackBuffer::Enqueue(fbl::unique_ptr<WritebackWork> work) { 265 TRACE_DURATION("blobfs", "WritebackBuffer::Enqueue", "work ptr", work.get()); 266 fbl::AutoLock lock(&writeback_lock_); 267 size_t blocks = work->BlkCount(); 268 // TODO(planders): Similar to minfs, make sure that we either have a fallback mechanism for 269 // operations which are too large to be fully contained by the buffer, or that the 270 // worst-case operation will always fit within the buffer 271 ZX_ASSERT_MSG(EnsureSpaceLocked(blocks) == ZX_OK, 272 "Requested txn (%zu blocks) larger than writeback buffer", blocks); 273 CopyToBufferLocked(work.get()); 274 work_queue_.push(fbl::move(work)); 275 cnd_signal(&consumer_cvar_); 276} 277 278int WritebackBuffer::WritebackThread(void* arg) { 279 WritebackBuffer* b = reinterpret_cast<WritebackBuffer*>(arg); 280 281 b->writeback_lock_.Acquire(); 282 while (true) { 283 while (!b->work_queue_.is_empty()) { 284 auto work = b->work_queue_.pop(); 285 TRACE_DURATION("blobfs", "WritebackBuffer::WritebackThread", "work ptr", work.get()); 286 287 size_t blk_count = work->BlkCount(); 288 289 if (blk_count > 0) { 290 ZX_ASSERT(work->BlkStart() == b->start_); 291 ZX_ASSERT(blk_count <= b->len_); 292 } 293 294 // Stay unlocked while processing a unit of work 295 b->writeback_lock_.Release(); 296 work->Complete(); 297 work = nullptr; 298 b->writeback_lock_.Acquire(); 299 b->start_ = (b->start_ + blk_count) % b->cap_; 300 b->len_ -= blk_count; 301 cnd_signal(&b->producer_cvar_); 302 } 303 304 // Before waiting, we should check if we're unmounting. 305 if (b->unmounting_) { 306 b->writeback_lock_.Release(); 307 return 0; 308 } 309 cnd_wait(&b->consumer_cvar_, b->writeback_lock_.GetInternal()); 310 } 311} 312} // namespace blobfs 313