• Home
  • History
  • Annotate
  • Raw
  • Download
  • only in /macosx-10.9.5/libdispatch-339.92.1/src/

Lines Matching defs:op

51 static void _dispatch_operation_enqueue(dispatch_operation_t op,
54 dispatch_operation_t op);
85 static void _dispatch_operation_advise(dispatch_operation_t op,
87 static int _dispatch_operation_perform(dispatch_operation_t op);
88 static void _dispatch_operation_deliver_data(dispatch_operation_t op,
295 _dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
300 if (op) {
301 channel = op->channel;
310 err = op ? op->fd_entry->err : channel->err;
753 dispatch_operation_t op;
754 op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
756 if (op) {
759 _dispatch_operation_enqueue(op, DOP_DIR_READ,
787 dispatch_operation_t op;
788 op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
790 if (op) {
793 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
854 dispatch_operation_t op =
868 if (op) {
869 _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
925 dispatch_operation_t op =
938 if (op) {
939 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
991 dispatch_operation_t op = _dispatch_alloc(DISPATCH_VTABLE(operation),
993 op->do_next = DISPATCH_OBJECT_LISTLESS;
994 op->do_xref_cnt = -1; // operation object is not exposed externally
995 op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
996 op->op_q->do_targetq = queue;
998 op->active = false;
999 op->direction = direction;
1000 op->offset = offset + channel->f_ptr;
1001 op->length = length;
1002 op->handler = _dispatch_io_Block_copy(handler);
1004 op->channel = channel;
1005 op->params = channel->params;
1008 dispatch_queue_t targetq = op->channel->do_targetq;
1012 op->do_targetq = targetq;
1013 _dispatch_object_debug(op, "%s", __func__);
1014 return op;
1018 _dispatch_operation_dispose(dispatch_operation_t op)
1020 _dispatch_object_debug(op, "%s", __func__);
1022 if (op->fd_entry) {
1023 _dispatch_operation_deliver_data(op, DOP_DONE);
1024 dispatch_group_leave(op->fd_entry->barrier_group);
1025 _dispatch_fd_entry_release(op->fd_entry);
1027 if (op->channel) {
1028 _dispatch_release(op->channel);
1030 if (op->timer) {
1031 dispatch_release(op->timer);
1033 // For write operations, op->buf is owned by op->buf_data
1034 if (op->buf && op->direction == DOP_DIR_READ) {
1035 free(op->buf);
1037 if (op->buf_data) {
1038 _dispatch_io_data_release(op->buf_data);
1040 if (op->data) {
1041 _dispatch_io_data_release(op->data);
1043 if (op->op_q) {
1044 dispatch_release(op->op_q);
1046 Block_release(op->handler);
1050 _dispatch_operation_enqueue(dispatch_operation_t op,
1056 int err = _dispatch_io_get_error(NULL, op->channel, false);
1058 dispatch_io_handler_t handler = op->handler;
1059 dispatch_async(op->op_q, ^{
1069 _dispatch_release(op);
1073 op->fd_entry = op->channel->fd_entry;
1074 _dispatch_fd_entry_retain(op->fd_entry);
1075 dispatch_group_enter(op->fd_entry->barrier_group);
1076 dispatch_disk_t disk = op->fd_entry->disk;
1078 dispatch_stream_t stream = op->fd_entry->streams[direction];
1080 _dispatch_stream_enqueue_operation(stream, op, data);
1085 _dispatch_disk_enqueue_operation(disk, op, data);
1092 _dispatch_operation_should_enqueue(dispatch_operation_t op,
1096 _dispatch_fd_debug("enqueue operation", op->fd_entry->fd);
1098 op->data = data;
1099 int err = _dispatch_io_get_error(op, NULL, true);
1101 op->err = err;
1103 _dispatch_release(op);
1106 if (op->params.interval) {
1107 dispatch_resume(_dispatch_operation_timer(tq, op));
1113 _dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
1116 if (op->timer) {
1117 return op->timer;
1122 (int64_t)op->params.interval), op->params.interval, 0);
1130 if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
1135 if ((op->active) && (flags & DOP_DELIVER)) {
1136 op->flags = flags;
1138 _dispatch_operation_deliver_data(op, flags);
1141 op->timer = timer;
1142 return op->timer;
1641 dispatch_operation_t op, dispatch_data_t data)
1643 if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
1646 _dispatch_object_debug(op, "%s", __func__);
1648 TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
1656 _dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
1659 if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
1662 _dispatch_object_debug(op, "%s", __func__);
1663 if (op->params.type == DISPATCH_IO_STREAM) {
1664 if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
1665 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1667 TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
1669 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1676 dispatch_operation_t op)
1679 _dispatch_object_debug(op, "%s", __func__);
1680 _dispatch_fd_debug("complete operation", op->fd_entry->fd);
1681 TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
1682 if (op == stream->op) {
1683 stream->op = NULL;
1685 if (op->timer) {
1686 dispatch_source_cancel(op->timer);
1689 _dispatch_release(op);
1693 _dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
1696 _dispatch_object_debug(op, "%s", __func__);
1697 _dispatch_fd_debug("complete operation", op->fd_entry->fd);
1698 // Current request is always the last op returned
1699 if (disk->cur_rq == op) {
1700 disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
1703 if (op->params.type == DISPATCH_IO_STREAM) {
1705 dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
1706 TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
1711 TAILQ_REMOVE(&disk->operations, op, operation_list);
1712 if (op->timer) {
1713 dispatch_source_cancel(op->timer);
1716 _dispatch_release(op);
1721 dispatch_operation_t op)
1724 if (!op) {
1727 return op;
1730 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
1732 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1734 return op;
1736 if (op->params.type == DISPATCH_IO_STREAM) {
1739 return op;
1742 if (op->params.type == DISPATCH_IO_RANDOM) {
1743 op = TAILQ_NEXT(op, operation_list);
1744 if (!op) {
1745 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1747 return op;
1756 dispatch_operation_t op;
1759 op = TAILQ_FIRST(&disk->operations);
1761 op = disk->cur_rq;
1763 op = TAILQ_NEXT(op, operation_list);
1764 if (!op) {
1765 op = TAILQ_FIRST(&disk->operations);
1768 } while (op->active && op != disk->cur_rq);
1770 if (!op->active) {
1771 disk->cur_rq = op;
1772 return op;
1783 dispatch_operation_t op, tmp;
1786 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1787 if (!channel || op->channel == channel) {
1788 _dispatch_stream_complete_operation(stream, op);
1792 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1793 if (!channel || op->channel == channel) {
1794 _dispatch_stream_complete_operation(stream, op);
1807 dispatch_operation_t op, tmp;
1808 TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
1809 if (!channel || op->channel == channel) {
1810 _dispatch_disk_complete_operation(disk, op);
1819 _dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
1825 dispatch_fd_t fd = op->fd_entry->fd;
1828 if (op->direction == DOP_DIR_READ) {
1831 } else if (op->direction == DOP_DIR_WRITE) {
1835 dispatch_assert(op->direction < DOP_DIR_MAX);
1843 dispatch_queue_t close_queue = op->fd_entry->close_queue;
1879 dispatch_operation_t op;
1881 op = _dispatch_stream_pick_next_operation(stream, stream->op);
1882 if (!op) {
1886 int err = _dispatch_io_get_error(op, NULL, true);
1888 op->err = err;
1889 _dispatch_stream_complete_operation(stream, op);
1892 stream->op = op;
1893 _dispatch_fd_debug("stream handler", op->fd_entry->fd);
1894 dispatch_fd_entry_t fd_entry = op->fd_entry;
1897 if (!op->total && dispatch_io_defaults.initial_delivery) {
1899 _dispatch_fd_debug("initial delivery", op->fd_entry->fd);
1900 _dispatch_operation_deliver_data(op, DOP_DELIVER);
1903 int result = _dispatch_operation_perform(op);
1912 _dispatch_operation_deliver_data(op, flags);
1916 _dispatch_stream_complete_operation(stream, op);
1924 _dispatch_stream_complete_operation(stream, op);
1929 dispatch_resume(_dispatch_stream_source(stream, op));
1933 _dispatch_stream_cleanup_operations(stream, op->channel);
1958 dispatch_operation_t op;
1965 (op = _dispatch_disk_pick_next_operation(disk))) {
1966 int err = _dispatch_io_get_error(op, NULL, true);
1968 op->err = err;
1969 _dispatch_disk_complete_operation(disk, op);
1972 _dispatch_retain(op);
1973 disk->advise_list[i%disk->advise_list_depth] = op;
1974 op->active = true;
1975 _dispatch_object_debug(op, "%s", __func__);
1983 op = disk->advise_list[disk->req_idx];
1984 if (op) {
1986 dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
1996 dispatch_operation_t op;
2002 op = disk->advise_list[i%disk->advise_list_depth];
2003 if (!op) {
2008 if (op->direction == DOP_DIR_WRITE) {
2012 if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
2013 op->channel)) {
2017 if (!op->total && dispatch_io_defaults.initial_delivery) {
2019 _dispatch_fd_debug("initial delivery", op->fd_entry->fd);
2020 _dispatch_operation_deliver_data(op, DOP_DELIVER);
2025 !op->advise_offset) {
2028 _dispatch_operation_advise(op, chunk_size);
2031 op = disk->advise_list[disk->req_idx];
2032 int result = _dispatch_operation_perform(op);
2038 _dispatch_operation_deliver_data(op, DOP_DEFAULT);
2041 _dispatch_disk_complete_operation(disk, op);
2044 _dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY);
2045 _dispatch_disk_complete_operation(disk, op);
2048 _dispatch_disk_cleanup_operations(disk, op->channel);
2057 op->active = false;
2060 // Balancing the retain in _dispatch_disk_handler. Note that op must be
2063 _dispatch_release(op);
2071 _dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
2077 if (op->advise_offset > (off_t)(((size_t)op->offset + op->total) +
2081 _dispatch_object_debug(op, "%s", __func__);
2083 if (!op->advise_offset) {
2084 op->advise_offset = op->offset;
2087 size_t pg_fraction = ((size_t)op->offset + chunk_size) % PAGE_SIZE;
2090 advise.ra_offset = op->advise_offset;
2091 op->advise_offset += advise.ra_count;
2093 fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
2102 _dispatch_operation_perform(dispatch_operation_t op)
2104 int err = _dispatch_io_get_error(op, NULL, true);
2108 _dispatch_object_debug(op, "%s", __func__);
2109 if (!op->buf) {
2110 size_t max_buf_siz = op->params.high;
2112 if (op->direction == DOP_DIR_READ) {
2115 size_t data_siz = dispatch_data_get_size(op->data);
2123 if (op->length < SIZE_MAX) {
2124 op->buf_siz = op->length - op->total;
2125 if (op->buf_siz > max_buf_siz) {
2126 op->buf_siz = max_buf_siz;
2129 op->buf_siz = max_buf_siz;
2131 op->buf = valloc(op->buf_siz);
2132 _dispatch_fd_debug("buffer allocated", op->fd_entry->fd);
2133 } else if (op->direction == DOP_DIR_WRITE) {
2139 op->buf_siz = 0;
2140 dispatch_data_apply(op->data,
2144 size_t siz = op->buf_siz + len;
2145 if (!op->buf_siz || siz <= chunk_siz) {
2146 op->buf_siz = siz;
2150 if (op->buf_siz > max_buf_siz) {
2151 op->buf_siz = max_buf_siz;
2154 d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
2155 op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
2158 _dispatch_fd_debug("buffer mapped", op->fd_entry->fd);
2161 if (op->fd_entry->fd == -1) {
2162 err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
2167 void *buf = op->buf + op->buf_len;
2168 size_t len = op->buf_siz - op->buf_len;
2169 off_t off = (off_t)((size_t)op->offset + op->total);
2172 if (op->direction == DOP_DIR_READ) {
2173 if (op->params.type == DISPATCH_IO_STREAM) {
2174 processed = read(op->fd_entry->fd, buf, len);
2175 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2176 processed = pread(op->fd_entry->fd, buf, len, off);
2178 } else if (op->direction == DOP_DIR_WRITE) {
2179 if (op->params.type == DISPATCH_IO_STREAM) {
2180 processed = write(op->fd_entry->fd, buf, len);
2181 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2182 processed = pwrite(op->fd_entry->fd, buf, len, off);
2195 _dispatch_fd_debug("EOF", op->fd_entry->fd);
2198 op->buf_len += (size_t)processed;
2199 op->total += (size_t)processed;
2200 if (op->total == op->length) {
2210 dispatch_assert(!op->fd_entry->disk);
2211 _dispatch_fd_debug("EAGAIN %d", op->fd_entry->fd, err);
2212 if (op->direction == DOP_DIR_READ && op->total &&
2213 op->channel == op->fd_entry->convenience_channel) {
2219 op->err = err;
2224 (void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err, relaxed);
2232 _dispatch_operation_deliver_data(dispatch_operation_t op,
2235 // Either called from stream resp. pick queue or when op is finalized
2238 size_t undelivered = op->undelivered + op->buf_len;
2240 (op->flags & DOP_DELIVER);
2241 op->flags = DOP_DEFAULT;
2244 if (undelivered >= op->params.low) {
2246 } else if (op->buf_len < op->buf_siz) {
2248 _dispatch_fd_debug("buffer data", op->fd_entry->fd);
2252 err = op->err;
2253 if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
2255 op->err = err;
2259 if (op->direction == DOP_DIR_READ) {
2260 if (op->buf_len) {
2261 void *buf = op->buf;
2262 data = dispatch_data_create(buf, op->buf_len, NULL,
2264 op->buf = NULL;
2265 op->buf_len = 0;
2266 dispatch_data_t d = dispatch_data_create_concat(op->data, data);
2267 _dispatch_io_data_release(op->data);
2271 data = op->data;
2273 op->data = deliver ? dispatch_data_empty : data;
2274 } else if (op->direction == DOP_DIR_WRITE) {
2276 data = dispatch_data_create_subrange(op->data, op->buf_len,
2277 op->length);
2279 if (op->buf_data && op->buf_len == op->buf_siz) {
2280 _dispatch_io_data_release(op->buf_data);
2281 op->buf_data = NULL;
2282 op->buf = NULL;
2283 op->buf_len = 0;
2290 d = dispatch_data_create_subrange(op->data, op->buf_siz,
2291 op->length);
2293 _dispatch_io_data_release(op->data);
2294 op->data = d;
2297 dispatch_assert(op->direction < DOP_DIR_MAX);
2301 op->undelivered = undelivered;
2302 _dispatch_fd_debug("buffer data", op->fd_entry->fd);
2305 op->undelivered = 0;
2306 _dispatch_object_debug(op, "%s", __func__);
2307 _dispatch_fd_debug("deliver data", op->fd_entry->fd);
2308 dispatch_op_direction_t direction = op->direction;
2309 dispatch_io_handler_t handler = op->handler;
2311 int fd = op->fd_entry->fd;
2313 dispatch_fd_entry_t fd_entry = op->fd_entry;
2315 dispatch_io_t channel = op->channel;
2318 dispatch_async(op->op_q, ^{
2375 _dispatch_operation_debug_attr(dispatch_operation_t op, char* buf,
2378 dispatch_queue_t target = op->do_targetq;
2379 dispatch_queue_t oqtarget = op->op_q ? op->op_q->do_targetq : NULL;
2384 "interval%s = %llu ", op->params.type == DISPATCH_IO_STREAM ?
2385 "stream" : "random", op->direction == DOP_DIR_READ ? "read" :
2386 "write", op->fd_entry ? op->fd_entry->fd : -1, op->fd_entry,
2387 op->channel, op->op_q, oqtarget && oqtarget->dq_label ?
2389 target->dq_label : "", target, op->offset, op->length, op->total,
2390 op->undelivered + op->buf_len, op->flags, op->err, op->params.low,
2391 op->params.high, op->params.interval_flags &
2392 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "", op->params.interval);
2396 _dispatch_operation_debug(dispatch_operation_t op, char* buf, size_t bufsiz)
2400 dx_kind(op), op);
2401 offset += _dispatch_object_debug_attr(op, &buf[offset], bufsiz - offset);
2402 offset += _dispatch_operation_debug_attr(op, &buf[offset], bufsiz - offset);