1/*
2 * streams.c :  stream encapsulation routines for the ra_svn protocol
3 *
4 * ====================================================================
5 *    Licensed to the Apache Software Foundation (ASF) under one
6 *    or more contributor license agreements.  See the NOTICE file
7 *    distributed with this work for additional information
8 *    regarding copyright ownership.  The ASF licenses this file
9 *    to you under the Apache License, Version 2.0 (the
10 *    "License"); you may not use this file except in compliance
11 *    with the License.  You may obtain a copy of the License at
12 *
13 *      http://www.apache.org/licenses/LICENSE-2.0
14 *
15 *    Unless required by applicable law or agreed to in writing,
16 *    software distributed under the License is distributed on an
17 *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
18 *    KIND, either express or implied.  See the License for the
19 *    specific language governing permissions and limitations
20 *    under the License.
21 * ====================================================================
22 */
23
24
25
26#include <apr_general.h>
27#include <apr_network_io.h>
28#include <apr_poll.h>
29
30#include "svn_types.h"
31#include "svn_error.h"
32#include "svn_pools.h"
33#include "svn_io.h"
34#include "svn_private_config.h"
35
36#include "ra_svn.h"
37
38struct svn_ra_svn__stream_st {
39  svn_stream_t *stream;
40  void *baton;
41  ra_svn_pending_fn_t pending_fn;
42  ra_svn_timeout_fn_t timeout_fn;
43};
44
45typedef struct sock_baton_t {
46  apr_socket_t *sock;
47  apr_pool_t *pool;
48} sock_baton_t;
49
50typedef struct file_baton_t {
51  apr_file_t *in_file;
52  apr_file_t *out_file;
53  apr_pool_t *pool;
54} file_baton_t;
55
56/* Returns TRUE if PFD has pending data, FALSE otherwise. */
57static svn_boolean_t pending(apr_pollfd_t *pfd, apr_pool_t *pool)
58{
59  apr_status_t status;
60  int n;
61
62  pfd->p = pool;
63  pfd->reqevents = APR_POLLIN;
64  status = apr_poll(pfd, 1, &n, 0);
65  return (status == APR_SUCCESS && n);
66}
67
68/* Functions to implement a file backed svn_ra_svn__stream_t. */
69
70/* Implements svn_read_fn_t */
71static svn_error_t *
72file_read_cb(void *baton, char *buffer, apr_size_t *len)
73{
74  file_baton_t *b = baton;
75  apr_status_t status = apr_file_read(b->in_file, buffer, len);
76
77  if (status && !APR_STATUS_IS_EOF(status))
78    return svn_error_wrap_apr(status, _("Can't read from connection"));
79  if (*len == 0)
80    return svn_error_create(SVN_ERR_RA_SVN_CONNECTION_CLOSED, NULL, NULL);
81  return SVN_NO_ERROR;
82}
83
84/* Implements svn_write_fn_t */
85static svn_error_t *
86file_write_cb(void *baton, const char *buffer, apr_size_t *len)
87{
88  file_baton_t *b = baton;
89  apr_status_t status = apr_file_write(b->out_file, buffer, len);
90  if (status)
91    return svn_error_wrap_apr(status, _("Can't write to connection"));
92  return SVN_NO_ERROR;
93}
94
95/* Implements ra_svn_timeout_fn_t */
96static void
97file_timeout_cb(void *baton, apr_interval_time_t interval)
98{
99  file_baton_t *b = baton;
100  apr_file_pipe_timeout_set(b->out_file, interval);
101}
102
103/* Implements ra_svn_pending_fn_t */
104static svn_boolean_t
105file_pending_cb(void *baton)
106{
107  file_baton_t *b = baton;
108  apr_pollfd_t pfd;
109
110  pfd.desc_type = APR_POLL_FILE;
111  pfd.desc.f = b->in_file;
112
113  return pending(&pfd, b->pool);
114}
115
116svn_ra_svn__stream_t *
117svn_ra_svn__stream_from_files(apr_file_t *in_file,
118                              apr_file_t *out_file,
119                              apr_pool_t *pool)
120{
121  file_baton_t *b = apr_palloc(pool, sizeof(*b));
122
123  b->in_file = in_file;
124  b->out_file = out_file;
125  b->pool = pool;
126
127  return svn_ra_svn__stream_create(b, file_read_cb, file_write_cb,
128                                   file_timeout_cb, file_pending_cb,
129                                   pool);
130}
131
132/* Functions to implement a socket backed svn_ra_svn__stream_t. */
133
134/* Implements svn_read_fn_t */
135static svn_error_t *
136sock_read_cb(void *baton, char *buffer, apr_size_t *len)
137{
138  sock_baton_t *b = baton;
139  apr_status_t status;
140  apr_interval_time_t interval;
141
142  status = apr_socket_timeout_get(b->sock, &interval);
143  if (status)
144    return svn_error_wrap_apr(status, _("Can't get socket timeout"));
145
146  /* Always block on read.
147   * During pipelining, we set the timeout to 0 for some write
148   * operations so that we can try them without blocking. If APR had
149   * separate timeouts for read and write, we would only set the
150   * write timeout, but it doesn't. So here, we revert back to blocking.
151   */
152  apr_socket_timeout_set(b->sock, -1);
153  status = apr_socket_recv(b->sock, buffer, len);
154  apr_socket_timeout_set(b->sock, interval);
155
156  if (status && !APR_STATUS_IS_EOF(status))
157    return svn_error_wrap_apr(status, _("Can't read from connection"));
158  if (*len == 0)
159    return svn_error_create(SVN_ERR_RA_SVN_CONNECTION_CLOSED, NULL, NULL);
160  return SVN_NO_ERROR;
161}
162
163/* Implements svn_write_fn_t */
164static svn_error_t *
165sock_write_cb(void *baton, const char *buffer, apr_size_t *len)
166{
167  sock_baton_t *b = baton;
168  apr_status_t status = apr_socket_send(b->sock, buffer, len);
169  if (status)
170    return svn_error_wrap_apr(status, _("Can't write to connection"));
171  return SVN_NO_ERROR;
172}
173
174/* Implements ra_svn_timeout_fn_t */
175static void
176sock_timeout_cb(void *baton, apr_interval_time_t interval)
177{
178  sock_baton_t *b = baton;
179  apr_socket_timeout_set(b->sock, interval);
180}
181
182/* Implements ra_svn_pending_fn_t */
183static svn_boolean_t
184sock_pending_cb(void *baton)
185{
186  sock_baton_t *b = baton;
187  apr_pollfd_t pfd;
188
189  pfd.desc_type = APR_POLL_SOCKET;
190  pfd.desc.s = b->sock;
191
192  return pending(&pfd, b->pool);
193}
194
195svn_ra_svn__stream_t *
196svn_ra_svn__stream_from_sock(apr_socket_t *sock,
197                             apr_pool_t *pool)
198{
199  sock_baton_t *b = apr_palloc(pool, sizeof(*b));
200
201  b->sock = sock;
202  b->pool = pool;
203
204  return svn_ra_svn__stream_create(b, sock_read_cb, sock_write_cb,
205                                   sock_timeout_cb, sock_pending_cb,
206                                   pool);
207}
208
209svn_ra_svn__stream_t *
210svn_ra_svn__stream_create(void *baton,
211                          svn_read_fn_t read_cb,
212                          svn_write_fn_t write_cb,
213                          ra_svn_timeout_fn_t timeout_cb,
214                          ra_svn_pending_fn_t pending_cb,
215                          apr_pool_t *pool)
216{
217  svn_ra_svn__stream_t *s = apr_palloc(pool, sizeof(*s));
218  s->stream = svn_stream_empty(pool);
219  svn_stream_set_baton(s->stream, baton);
220  if (read_cb)
221    svn_stream_set_read(s->stream, read_cb);
222  if (write_cb)
223    svn_stream_set_write(s->stream, write_cb);
224  s->baton = baton;
225  s->timeout_fn = timeout_cb;
226  s->pending_fn = pending_cb;
227  return s;
228}
229
230svn_error_t *
231svn_ra_svn__stream_write(svn_ra_svn__stream_t *stream,
232                         const char *data, apr_size_t *len)
233{
234  return svn_stream_write(stream->stream, data, len);
235}
236
237svn_error_t *
238svn_ra_svn__stream_read(svn_ra_svn__stream_t *stream, char *data,
239                        apr_size_t *len)
240{
241  return svn_stream_read(stream->stream, data, len);
242}
243
244void
245svn_ra_svn__stream_timeout(svn_ra_svn__stream_t *stream,
246                           apr_interval_time_t interval)
247{
248  stream->timeout_fn(stream->baton, interval);
249}
250
251svn_boolean_t
252svn_ra_svn__stream_pending(svn_ra_svn__stream_t *stream)
253{
254  return stream->pending_fn(stream->baton);
255}
256