1/*
2 * streams.c :  stream encapsulation routines for the ra_svn protocol
3 *
4 * ====================================================================
5 *    Licensed to the Apache Software Foundation (ASF) under one
6 *    or more contributor license agreements.  See the NOTICE file
7 *    distributed with this work for additional information
8 *    regarding copyright ownership.  The ASF licenses this file
9 *    to you under the Apache License, Version 2.0 (the
10 *    "License"); you may not use this file except in compliance
11 *    with the License.  You may obtain a copy of the License at
12 *
13 *      http://www.apache.org/licenses/LICENSE-2.0
14 *
15 *    Unless required by applicable law or agreed to in writing,
16 *    software distributed under the License is distributed on an
17 *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
18 *    KIND, either express or implied.  See the License for the
19 *    specific language governing permissions and limitations
20 *    under the License.
21 * ====================================================================
22 */
23
24
25
26#include <apr_general.h>
27#include <apr_network_io.h>
28#include <apr_poll.h>
29
30#include "svn_types.h"
31#include "svn_error.h"
32#include "svn_pools.h"
33#include "svn_io.h"
34#include "svn_private_config.h"
35
36#include "private/svn_io_private.h"
37
38#include "ra_svn.h"
39
40struct svn_ra_svn__stream_st {
41  svn_stream_t *in_stream;
42  svn_stream_t *out_stream;
43  void *timeout_baton;
44  ra_svn_timeout_fn_t timeout_fn;
45};
46
47typedef struct sock_baton_t {
48  apr_socket_t *sock;
49  apr_pool_t *pool;
50} sock_baton_t;
51
52
53/* Returns TRUE if PFD has pending data, FALSE otherwise. */
54static svn_boolean_t pending(apr_pollfd_t *pfd, apr_pool_t *pool)
55{
56  apr_status_t status;
57  int n;
58
59  pfd->p = pool;
60  pfd->reqevents = APR_POLLIN;
61  status = apr_poll(pfd, 1, &n, 0);
62  return (status == APR_SUCCESS && n);
63}
64
65/* Functions to implement a file backed svn_ra_svn__stream_t. */
66
67/* Implements ra_svn_timeout_fn_t */
68static void
69file_timeout_cb(void *baton, apr_interval_time_t interval)
70{
71  apr_file_t *f = baton;
72
73  if (f)
74    apr_file_pipe_timeout_set(f, interval);
75}
76
77svn_ra_svn__stream_t *
78svn_ra_svn__stream_from_streams(svn_stream_t *in_stream,
79                                svn_stream_t *out_stream,
80                                apr_pool_t *pool)
81{
82  apr_file_t *file;
83
84  /* If out_stream is backed by an apr_file (e.g. an PIPE) we
85     provide a working callback, otherwise the callback ignores
86     the timeout.
87
88     The callback is used to make the write non-blocking on
89     some error scenarios. ### This (legacy) usage
90     breaks the stream promise */
91  file = svn_stream__aprfile(out_stream);
92
93  return svn_ra_svn__stream_create(in_stream, out_stream,
94                                   file, file_timeout_cb,
95                                   pool);
96}
97
98/* Functions to implement a socket backed svn_ra_svn__stream_t. */
99
100/* Implements svn_read_fn_t */
101static svn_error_t *
102sock_read_cb(void *baton, char *buffer, apr_size_t *len)
103{
104  sock_baton_t *b = baton;
105  apr_status_t status;
106  apr_interval_time_t interval;
107
108  status = apr_socket_timeout_get(b->sock, &interval);
109  if (status)
110    return svn_error_wrap_apr(status, _("Can't get socket timeout"));
111
112  /* Always block on read.
113   * During pipelining, we set the timeout to 0 for some write
114   * operations so that we can try them without blocking. If APR had
115   * separate timeouts for read and write, we would only set the
116   * write timeout, but it doesn't. So here, we revert back to blocking.
117   */
118  apr_socket_timeout_set(b->sock, -1);
119  status = apr_socket_recv(b->sock, buffer, len);
120  apr_socket_timeout_set(b->sock, interval);
121
122  if (status && !APR_STATUS_IS_EOF(status))
123    return svn_error_wrap_apr(status, _("Can't read from connection"));
124  return SVN_NO_ERROR;
125}
126
127/* Implements svn_write_fn_t */
128static svn_error_t *
129sock_write_cb(void *baton, const char *buffer, apr_size_t *len)
130{
131  sock_baton_t *b = baton;
132  apr_status_t status = apr_socket_send(b->sock, buffer, len);
133  if (status)
134    return svn_error_wrap_apr(status, _("Can't write to connection"));
135  return SVN_NO_ERROR;
136}
137
138/* Implements ra_svn_timeout_fn_t */
139static void
140sock_timeout_cb(void *baton, apr_interval_time_t interval)
141{
142  sock_baton_t *b = baton;
143  apr_socket_timeout_set(b->sock, interval);
144}
145
146/* Implements svn_stream_data_available_fn_t */
147static svn_error_t *
148sock_pending_cb(void *baton,
149                svn_boolean_t *data_available)
150{
151  sock_baton_t *b = baton;
152  apr_pollfd_t pfd;
153
154  pfd.desc_type = APR_POLL_SOCKET;
155  pfd.desc.s = b->sock;
156
157  *data_available = pending(&pfd, b->pool);
158
159  svn_pool_clear(b->pool);
160
161  return SVN_NO_ERROR;
162}
163
164svn_ra_svn__stream_t *
165svn_ra_svn__stream_from_sock(apr_socket_t *sock,
166                             apr_pool_t *result_pool)
167{
168  sock_baton_t *b = apr_palloc(result_pool, sizeof(*b));
169  svn_stream_t *sock_stream;
170
171  b->sock = sock;
172  b->pool = svn_pool_create(result_pool);
173
174  sock_stream = svn_stream_create(b, result_pool);
175
176  svn_stream_set_read2(sock_stream, sock_read_cb, NULL /* use default */);
177  svn_stream_set_write(sock_stream, sock_write_cb);
178  svn_stream_set_data_available(sock_stream, sock_pending_cb);
179
180  return svn_ra_svn__stream_create(sock_stream, sock_stream,
181                                   b, sock_timeout_cb, result_pool);
182}
183
184svn_ra_svn__stream_t *
185svn_ra_svn__stream_create(svn_stream_t *in_stream,
186                          svn_stream_t *out_stream,
187                          void *timeout_baton,
188                          ra_svn_timeout_fn_t timeout_cb,
189                          apr_pool_t *pool)
190{
191  svn_ra_svn__stream_t *s = apr_palloc(pool, sizeof(*s));
192  s->in_stream = in_stream;
193  s->out_stream = out_stream;
194  s->timeout_baton = timeout_baton;
195  s->timeout_fn = timeout_cb;
196  return s;
197}
198
199svn_error_t *
200svn_ra_svn__stream_write(svn_ra_svn__stream_t *stream,
201                         const char *data, apr_size_t *len)
202{
203  return svn_error_trace(svn_stream_write(stream->out_stream, data, len));
204}
205
206svn_error_t *
207svn_ra_svn__stream_read(svn_ra_svn__stream_t *stream, char *data,
208                        apr_size_t *len)
209{
210  SVN_ERR(svn_stream_read2(stream->in_stream, data, len));
211
212  if (*len == 0)
213    return svn_error_create(SVN_ERR_RA_SVN_CONNECTION_CLOSED, NULL, NULL);
214
215  return SVN_NO_ERROR;
216}
217
218void
219svn_ra_svn__stream_timeout(svn_ra_svn__stream_t *stream,
220                           apr_interval_time_t interval)
221{
222  stream->timeout_fn(stream->timeout_baton, interval);
223}
224
225svn_error_t *
226svn_ra_svn__stream_data_available(svn_ra_svn__stream_t *stream,
227                                  svn_boolean_t *data_available)
228{
229  return svn_error_trace(
230          svn_stream_data_available(stream->in_stream,
231                                    data_available));
232}
233