1// Copyright 2017 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <blobfs/blobfs.h>
6#include <blobfs/writeback.h>
7
8namespace blobfs {
9
10void WriteTxn::Enqueue(zx_handle_t vmo, uint64_t relative_block, uint64_t absolute_block,
11                       uint64_t nblocks) {
12    ZX_DEBUG_ASSERT(!IsReady());
13
14    for (size_t i = 0; i < requests_.size(); i++) {
15        if (requests_[i].vmo != vmo) {
16            continue;
17        }
18
19        if (requests_[i].vmo_offset == relative_block) {
20            // Take the longer of the operations (if operating on the same blocks).
21            requests_[i].length = (requests_[i].length > nblocks) ? requests_[i].length : nblocks;
22            return;
23        } else if ((requests_[i].vmo_offset + requests_[i].length == relative_block) &&
24                   (requests_[i].dev_offset + requests_[i].length == absolute_block)) {
25            // Combine with the previous request, if immediately following.
26            requests_[i].length += nblocks;
27            return;
28        }
29    }
30
31    write_request_t request;
32    request.vmo = vmo;
33    request.vmo_offset = relative_block;
34    request.dev_offset = absolute_block;
35    request.length = nblocks;
36    requests_.push_back(fbl::move(request));
37}
38
39zx_status_t WriteTxn::Flush() {
40    ZX_ASSERT(IsReady());
41    fs::Ticker ticker(bs_->CollectingMetrics());
42
43    // Update all the outgoing transactions to be in disk blocks
44    block_fifo_request_t blk_reqs[requests_.size()];
45    const uint32_t kDiskBlocksPerBlobfsBlock = kBlobfsBlockSize / bs_->DeviceBlockSize();
46    for (size_t i = 0; i < requests_.size(); i++) {
47        blk_reqs[i].group = bs_->BlockGroupID();
48        blk_reqs[i].vmoid = vmoid_;
49        blk_reqs[i].opcode = BLOCKIO_WRITE;
50        blk_reqs[i].vmo_offset = requests_[i].vmo_offset * kDiskBlocksPerBlobfsBlock;
51        blk_reqs[i].dev_offset = requests_[i].dev_offset * kDiskBlocksPerBlobfsBlock;
52        uint64_t length = requests_[i].length * kDiskBlocksPerBlobfsBlock;
53        // TODO(ZX-2253): Requests this long, although unlikely, should be
54        // handled more gracefully.
55        ZX_ASSERT_MSG(length < UINT32_MAX, "Request size too large");
56        blk_reqs[i].length = static_cast<uint32_t>(length);
57    }
58
59    // Actually send the operations to the underlying block device.
60    zx_status_t status = bs_->Transaction(blk_reqs, requests_.size());
61
62    if (bs_->CollectingMetrics()) {
63        uint64_t sum = 0;
64        for (size_t i = 0; i < requests_.size(); i++) {
65            sum += blk_reqs[i].length * kBlobfsBlockSize;
66        }
67        bs_->UpdateWritebackMetrics(sum, ticker.End());
68    }
69
70    requests_.reset();
71    vmoid_ = VMOID_INVALID;
72    return status;
73}
74
75size_t WriteTxn::BlkStart() const {
76    ZX_DEBUG_ASSERT(IsReady());
77    ZX_DEBUG_ASSERT(requests_.size() > 0);
78    return requests_[0].vmo_offset;
79}
80
81size_t WriteTxn::BlkCount() const {
82    size_t blocks_needed = 0;
83    for (size_t i = 0; i < requests_.size(); i++) {
84        blocks_needed += requests_[i].length;
85    }
86    return blocks_needed;
87}
88
89WritebackWork::WritebackWork(Blobfs* bs, fbl::RefPtr<VnodeBlob> vn) :
90    WriteTxn(bs), closure_(nullptr), sync_(false), vn_(fbl::move(vn)) {}
91
92void WritebackWork::Reset() {
93    ZX_DEBUG_ASSERT(Requests().is_empty());
94    closure_ = nullptr;
95    vn_ = nullptr;
96}
97
98void WritebackWork::SetSyncComplete() {
99    ZX_ASSERT(vn_);
100    sync_ = true;
101}
102
103// Returns the number of blocks of the writeback buffer that have been consumed
104zx_status_t WritebackWork::Complete() {
105    zx_status_t status = Flush();
106
107    //TODO(planders): On flush failure, convert fs to read-only
108    if (status == ZX_OK && sync_) {
109        vn_->CompleteSync();
110    }
111
112    if (closure_) {
113        closure_(status);
114    }
115
116    Reset();
117    return ZX_OK;
118}
119
120void WritebackWork::SetClosure(SyncCallback closure) {
121    ZX_DEBUG_ASSERT(!closure_);
122    closure_ = fbl::move(closure);
123}
124
125zx_status_t WritebackBuffer::Create(Blobfs* bs, fbl::unique_ptr<fzl::MappedVmo> buffer,
126                                    fbl::unique_ptr<WritebackBuffer>* out) {
127    fbl::unique_ptr<WritebackBuffer> wb(new WritebackBuffer(bs, fbl::move(buffer)));
128    if (wb->buffer_->GetSize() % kBlobfsBlockSize != 0) {
129        return ZX_ERR_INVALID_ARGS;
130    } else if (cnd_init(&wb->consumer_cvar_) != thrd_success) {
131        return ZX_ERR_NO_RESOURCES;
132    } else if (cnd_init(&wb->producer_cvar_) != thrd_success) {
133        return ZX_ERR_NO_RESOURCES;
134    } else if (thrd_create_with_name(&wb->writeback_thrd_,
135                                     WritebackBuffer::WritebackThread, wb.get(),
136                                     "blobfs-writeback") != thrd_success) {
137        return ZX_ERR_NO_RESOURCES;
138    }
139    zx_status_t status = wb->bs_->AttachVmo(wb->buffer_->GetVmo(), &wb->buffer_vmoid_);
140    if (status != ZX_OK) {
141        return status;
142    }
143
144    *out = fbl::move(wb);
145    return ZX_OK;
146}
147
148zx_status_t WritebackBuffer::GenerateWork(fbl::unique_ptr<WritebackWork>* out,
149                                          fbl::RefPtr<VnodeBlob> vnode) {
150    fbl::AllocChecker ac;
151    fbl::unique_ptr<WritebackWork> wb(new (&ac) WritebackWork(bs_, fbl::move(vnode)));
152    if (!ac.check()) {
153        return ZX_ERR_NO_MEMORY;
154    }
155
156    *out = fbl::move(wb);
157    return ZX_OK;
158}
159
160WritebackBuffer::WritebackBuffer(Blobfs* bs, fbl::unique_ptr<fzl::MappedVmo> buffer) :
161    bs_(bs), unmounting_(false), buffer_(fbl::move(buffer)),
162    cap_(buffer_->GetSize() / kBlobfsBlockSize) {}
163
164WritebackBuffer::~WritebackBuffer() {
165    // Block until the background thread completes itself.
166    {
167        fbl::AutoLock lock(&writeback_lock_);
168        unmounting_ = true;
169        cnd_signal(&consumer_cvar_);
170    }
171    int r;
172    thrd_join(writeback_thrd_, &r);
173
174    if (buffer_vmoid_ != VMOID_INVALID) {
175        block_fifo_request_t request;
176        request.group = bs_->BlockGroupID();
177        request.vmoid = buffer_vmoid_;
178        request.opcode = BLOCKIO_CLOSE_VMO;
179        bs_->Transaction(&request, 1);
180    }
181}
182
183zx_status_t WritebackBuffer::EnsureSpaceLocked(size_t blocks) {
184    if (blocks > cap_) {
185        // There will never be enough room in the writeback buffer for this request.
186        return ZX_ERR_NO_RESOURCES;
187    }
188    while (len_ + blocks > cap_) {
189        // Not enough room to write back work, yet. Wait until room is available.
190        Waiter w;
191        producer_queue_.push(&w);
192
193        do {
194            cnd_wait(&producer_cvar_, writeback_lock_.GetInternal());
195        } while ((&producer_queue_.front() != &w) && // We are first in line to enqueue...
196                 (len_ + blocks > cap_)); // ... and there is enough space for us.
197
198        producer_queue_.pop();
199    }
200    return ZX_OK;
201}
202
203void WritebackBuffer::CopyToBufferLocked(WriteTxn* txn) {
204    auto& reqs = txn->Requests();
205    ZX_DEBUG_ASSERT(!txn->IsReady());
206
207    // Write back to the buffer
208    for (size_t i = 0; i < reqs.size(); i++) {
209        size_t vmo_offset = reqs[i].vmo_offset;
210        size_t dev_offset = reqs[i].dev_offset;
211        const size_t vmo_len = reqs[i].length;
212        ZX_DEBUG_ASSERT(vmo_len > 0);
213        size_t wb_offset = (start_ + len_) % cap_;
214        size_t wb_len = (wb_offset + vmo_len > cap_) ? cap_ - wb_offset : vmo_len;
215        ZX_DEBUG_ASSERT(wb_len <= vmo_len);
216        ZX_DEBUG_ASSERT(wb_offset < cap_);
217        zx_handle_t vmo = reqs[i].vmo;
218
219        void* ptr = (void*)((uintptr_t)(buffer_->GetData()) +
220                            (uintptr_t)(wb_offset * kBlobfsBlockSize));
221        zx_status_t status;
222        ZX_DEBUG_ASSERT((start_ <= wb_offset) ?
223                        (start_ < wb_offset + wb_len) :
224                        (wb_offset + wb_len <= start_)); // Wraparound
225        ZX_ASSERT_MSG((status = zx_vmo_read(vmo, ptr, vmo_offset * kBlobfsBlockSize,
226                      wb_len * kBlobfsBlockSize)) == ZX_OK, "VMO Read Fail: %d", status);
227
228        len_ += wb_len;
229
230        // Update the write_request to transfer from the writeback buffer out to disk, rather than
231        // the supplied VMO
232
233        reqs[i].vmo_offset = wb_offset;
234        reqs[i].length = wb_len;
235
236        if (wb_len != vmo_len) {
237            // We wrapped around; write what remains from this request
238            vmo_offset += wb_len;
239            dev_offset += wb_len;
240            wb_len = vmo_len - wb_len;
241            ptr = buffer_->GetData();
242            ZX_DEBUG_ASSERT((start_ == 0) ? (start_ < wb_len) : (wb_len <= start_)); // Wraparound
243            ZX_ASSERT(zx_vmo_read(vmo, ptr, vmo_offset * kBlobfsBlockSize,
244                                  wb_len * kBlobfsBlockSize) == ZX_OK);
245
246            len_ += wb_len;
247
248            // Shift down all following write requests
249            static_assert(fbl::is_pod<write_request_t>::value, "Can't memmove non-POD");
250            // Insert the "new" request, which is the latter half of the last request
251            write_request_t request;
252            request.vmo = reqs[i].vmo;
253            request.vmo_offset = 0;
254            request.dev_offset = dev_offset;
255            request.length = wb_len;
256            i++;
257            reqs.insert(i, request);
258        }
259    }
260
261    txn->SetReady(buffer_vmoid_);
262}
263
264void WritebackBuffer::Enqueue(fbl::unique_ptr<WritebackWork> work) {
265    TRACE_DURATION("blobfs", "WritebackBuffer::Enqueue", "work ptr", work.get());
266    fbl::AutoLock lock(&writeback_lock_);
267    size_t blocks = work->BlkCount();
268    // TODO(planders): Similar to minfs, make sure that we either have a fallback mechanism for
269    // operations which are too large to be fully contained by the buffer, or that the
270    // worst-case operation will always fit within the buffer
271    ZX_ASSERT_MSG(EnsureSpaceLocked(blocks) == ZX_OK,
272                "Requested txn (%zu blocks) larger than writeback buffer", blocks);
273    CopyToBufferLocked(work.get());
274    work_queue_.push(fbl::move(work));
275    cnd_signal(&consumer_cvar_);
276}
277
278int WritebackBuffer::WritebackThread(void* arg) {
279    WritebackBuffer* b = reinterpret_cast<WritebackBuffer*>(arg);
280
281    b->writeback_lock_.Acquire();
282    while (true) {
283        while (!b->work_queue_.is_empty()) {
284            auto work = b->work_queue_.pop();
285            TRACE_DURATION("blobfs", "WritebackBuffer::WritebackThread", "work ptr", work.get());
286
287            size_t blk_count = work->BlkCount();
288
289            if (blk_count > 0) {
290                ZX_ASSERT(work->BlkStart() == b->start_);
291                ZX_ASSERT(blk_count <= b->len_);
292            }
293
294            // Stay unlocked while processing a unit of work
295            b->writeback_lock_.Release();
296            work->Complete();
297            work = nullptr;
298            b->writeback_lock_.Acquire();
299            b->start_ = (b->start_ + blk_count) % b->cap_;
300            b->len_ -= blk_count;
301            cnd_signal(&b->producer_cvar_);
302        }
303
304        // Before waiting, we should check if we're unmounting.
305        if (b->unmounting_) {
306            b->writeback_lock_.Release();
307            return 0;
308        }
309        cnd_wait(&b->consumer_cvar_, b->writeback_lock_.GetInternal());
310    }
311}
312} // namespace blobfs
313