1/* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements.  See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License.  You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "apr_redis.h"
18#include "apr_poll.h"
19#include "apr_version.h"
20#include <stdlib.h>
21#include <string.h>
22
23#define BUFFER_SIZE  512
24#define LILBUFF_SIZE 64
25struct apr_redis_conn_t
26{
27    char *buffer;
28    apr_size_t blen;
29    apr_pool_t *p;
30    apr_pool_t *tp;
31    apr_socket_t *sock;
32    apr_bucket_brigade *bb;
33    apr_bucket_brigade *tb;
34    apr_redis_server_t *rs;
35};
36
37/* Strings for Client Commands */
38
39#define RC_EOL "\r\n"
40#define RC_EOL_LEN (sizeof(RC_EOL)-1)
41
42#define RC_WS " "
43#define RC_WS_LEN (sizeof(RC_WS)-1)
44
45#define RC_RESP_1 "*1\r\n"
46#define RC_RESP_1_LEN (sizeof(RC_RESP_1)-1)
47
48#define RC_RESP_2 "*2\r\n"
49#define RC_RESP_2_LEN (sizeof(RC_RESP_2)-1)
50
51#define RC_RESP_3 "*3\r\n"
52#define RC_RESP_3_LEN (sizeof(RC_RESP_3)-1)
53
54#define RC_RESP_4 "*4\r\n"
55#define RC_RESP_4_LEN (sizeof(RC_RESP_4)-1)
56
57#define RC_GET "GET\r\n"
58#define RC_GET_LEN (sizeof(RC_GET)-1)
59
60#define RC_GET_SIZE "$3\r\n"
61#define RC_GET_SIZE_LEN (sizeof(RC_GET_SIZE)-1)
62
63#define RC_SET "SET\r\n"
64#define RC_SET_LEN (sizeof(RC_SET)-1)
65
66#define RC_SET_SIZE "$3\r\n"
67#define RC_SET_SIZE_LEN (sizeof(RC_SET_SIZE)-1)
68
69#define RC_SETEX "SETEX\r\n"
70#define RC_SETEX_LEN (sizeof(RC_SETEX)-1)
71
72#define RC_SETEX_SIZE "$5\r\n"
73#define RC_SETEX_SIZE_LEN (sizeof(RC_SETEX_SIZE)-1)
74
75#define RC_DEL "DEL\r\n"
76#define RC_DEL_LEN (sizeof(RC_DEL)-1)
77
78#define RC_DEL_SIZE "$3\r\n"
79#define RC_DEL_SIZE_LEN (sizeof(RC_DEL_SIZE)-1)
80
81#define RC_QUIT "QUIT\r\n"
82#define RC_QUIT_LEN (sizeof(RC_QUIT)-1)
83
84#define RC_QUIT_SIZE "$4\r\n"
85#define RC_QUIT_SIZE_LEN (sizeof(RC_QUIT_SIZE)-1)
86
87#define RC_PING "PING\r\n"
88#define RC_PING_LEN (sizeof(RC_PING)-1)
89
90#define RC_PING_SIZE "$4\r\n"
91#define RC_PING_SIZE_LEN (sizeof(RC_PING_SIZE)-1)
92
93#define RC_INFO "INFO\r\n"
94#define RC_INFO_LEN (sizeof(RC_INFO)-1)
95
96#define RC_INFO_SIZE "$4\r\n"
97#define RC_INFO_SIZE_LEN (sizeof(RC_INFO_SIZE)-1)
98
99/* Strings for Server Replies */
100
101#define RS_STORED "+OK"
102#define RS_STORED_LEN (sizeof(RS_STORED)-1)
103
104#define RS_NOT_STORED "$-1"
105#define RS_NOT_STORED_LEN (sizeof(RS_NOT_STORED)-1)
106
107#define RS_DELETED ":1"
108#define RS_DELETED_LEN (sizeof(RS_DELETED)-1)
109
110#define RS_NOT_FOUND_GET "$-1"
111#define RS_NOT_FOUND_GET_LEN (sizeof(RS_NOT_FOUND_GET)-1)
112
113#define RS_NOT_FOUND_DEL ":0"
114#define RS_NOT_FOUND_DEL_LEN (sizeof(RS_NOT_FOUND_DEL)-1)
115
116#define RS_TYPE_STRING "$"
117#define RS_TYPE_STRING_LEN (sizeof(RS_TYPE_STRING)-1)
118
119#define RS_END "\r\n"
120#define RS_END_LEN (sizeof(RS_END)-1)
121
122static apr_status_t make_server_dead(apr_redis_t *rc,
123                                     apr_redis_server_t *rs)
124{
125#if APR_HAS_THREADS
126    apr_thread_mutex_lock(rs->lock);
127#endif
128    rs->status = APR_RC_SERVER_DEAD;
129    rs->btime = apr_time_now();
130#if APR_HAS_THREADS
131    apr_thread_mutex_unlock(rs->lock);
132#endif
133    return APR_SUCCESS;
134}
135
136static apr_status_t make_server_live(apr_redis_t *rc,
137                                     apr_redis_server_t *rs)
138{
139    rs->status = APR_RC_SERVER_LIVE;
140    return APR_SUCCESS;
141}
142
143APU_DECLARE(apr_status_t) apr_redis_add_server(apr_redis_t *rc,
144                                               apr_redis_server_t *rs)
145{
146    apr_status_t rv = APR_SUCCESS;
147
148    if (rc->ntotal >= rc->nalloc) {
149        return APR_ENOMEM;
150    }
151    rc->live_servers[rc->ntotal] = rs;
152    rc->ntotal++;
153    make_server_live(rc, rs);
154    return rv;
155}
156
157APU_DECLARE(apr_redis_server_t *)
158apr_redis_find_server_hash(apr_redis_t *rc, const apr_uint32_t hash)
159{
160    if (rc->server_func) {
161        return rc->server_func(rc->server_baton, rc, hash);
162    }
163    else {
164        return apr_redis_find_server_hash_default(NULL, rc, hash);
165    }
166}
167
168APU_DECLARE(apr_redis_server_t *)
169apr_redis_find_server_hash_default(void *baton, apr_redis_t *rc,
170                                   const apr_uint32_t hash)
171{
172    apr_redis_server_t *rs = NULL;
173    apr_uint32_t h = hash ? hash : 1;
174    apr_uint32_t i = 0;
175    apr_time_t curtime = 0;
176
177    if (rc->ntotal == 0) {
178        return NULL;
179    }
180
181    do {
182        rs = rc->live_servers[h % rc->ntotal];
183        if (rs->status == APR_RC_SERVER_LIVE) {
184            break;
185        }
186        else {
187            if (curtime == 0) {
188                curtime = apr_time_now();
189            }
190#if APR_HAS_THREADS
191            apr_thread_mutex_lock(rs->lock);
192#endif
193            /* Try the dead server, every 5 seconds */
194            if (curtime - rs->btime > apr_time_from_sec(5)) {
195                rs->btime = curtime;
196                if (apr_redis_ping(rs) == APR_SUCCESS) {
197                    make_server_live(rc, rs);
198#if APR_HAS_THREADS
199                    apr_thread_mutex_unlock(rs->lock);
200#endif
201                    break;
202                }
203            }
204#if APR_HAS_THREADS
205            apr_thread_mutex_unlock(rs->lock);
206#endif
207        }
208        h++;
209        i++;
210    } while (i < rc->ntotal);
211
212    if (i == rc->ntotal) {
213        rs = NULL;
214    }
215
216    return rs;
217}
218
219APU_DECLARE(apr_redis_server_t *) apr_redis_find_server(apr_redis_t *rc,
220                                                        const char *host,
221                                                        apr_port_t port)
222{
223    int i;
224
225    for (i = 0; i < rc->ntotal; i++) {
226        if (strcmp(rc->live_servers[i]->host, host) == 0
227            && rc->live_servers[i]->port == port) {
228
229            return rc->live_servers[i];
230        }
231    }
232
233    return NULL;
234}
235
236static apr_status_t rs_find_conn(apr_redis_server_t *rs,
237                                 apr_redis_conn_t ** conn)
238{
239    apr_status_t rv;
240    apr_bucket_alloc_t *balloc;
241    apr_bucket *e;
242
243#if APR_HAS_THREADS
244    rv = apr_reslist_acquire(rs->conns, (void **) conn);
245#else
246    *conn = rs->conn;
247    rv = APR_SUCCESS;
248#endif
249
250    if (rv != APR_SUCCESS) {
251        return rv;
252    }
253
254    balloc = apr_bucket_alloc_create((*conn)->tp);
255    (*conn)->bb = apr_brigade_create((*conn)->tp, balloc);
256    (*conn)->tb = apr_brigade_create((*conn)->tp, balloc);
257
258    e = apr_bucket_socket_create((*conn)->sock, balloc);
259    APR_BRIGADE_INSERT_TAIL((*conn)->bb, e);
260
261    return rv;
262}
263
264static apr_status_t rs_bad_conn(apr_redis_server_t *rs,
265                                apr_redis_conn_t *conn)
266{
267#if APR_HAS_THREADS
268    return apr_reslist_invalidate(rs->conns, conn);
269#else
270    return APR_SUCCESS;
271#endif
272}
273
274static apr_status_t rs_release_conn(apr_redis_server_t *rs,
275                                    apr_redis_conn_t *conn)
276{
277    apr_pool_clear(conn->tp);
278#if APR_HAS_THREADS
279    return apr_reslist_release(rs->conns, conn);
280#else
281    return APR_SUCCESS;
282#endif
283}
284
285APU_DECLARE(apr_status_t) apr_redis_enable_server(apr_redis_t *rc,
286                                                  apr_redis_server_t *rs)
287{
288    apr_status_t rv = APR_SUCCESS;
289
290    if (rs->status == APR_RC_SERVER_LIVE) {
291        return rv;
292    }
293    rv = make_server_live(rc, rs);
294    return rv;
295}
296
297APU_DECLARE(apr_status_t) apr_redis_disable_server(apr_redis_t *rc,
298                                                   apr_redis_server_t *rs)
299{
300    return make_server_dead(rc, rs);
301}
302
303static apr_status_t conn_connect(apr_redis_conn_t *conn)
304{
305    apr_status_t rv = APR_SUCCESS;
306    apr_sockaddr_t *sa;
307#if APR_HAVE_SOCKADDR_UN
308    apr_int32_t family = conn->rs->host[0] != '/' ? APR_INET : APR_UNIX;
309#else
310    apr_int32_t family = APR_INET;
311#endif
312
313    rv = apr_sockaddr_info_get(&sa, conn->rs->host, family, conn->rs->port, 0,
314                               conn->p);
315    if (rv != APR_SUCCESS) {
316        return rv;
317    }
318
319    rv = apr_socket_timeout_set(conn->sock, 1 * APR_USEC_PER_SEC);
320    if (rv != APR_SUCCESS) {
321        return rv;
322    }
323
324    rv = apr_socket_connect(conn->sock, sa);
325    if (rv != APR_SUCCESS) {
326        return rv;
327    }
328
329    rv = apr_socket_timeout_set(conn->sock,
330                                conn->rs->rwto * APR_USEC_PER_SEC);
331    if (rv != APR_SUCCESS) {
332        return rv;
333    }
334
335    return rv;
336}
337
338static apr_status_t
339rc_conn_construct(void **conn_, void *params, apr_pool_t *pool)
340{
341    apr_status_t rv = APR_SUCCESS;
342    apr_redis_conn_t *conn;
343    apr_pool_t *np;
344    apr_pool_t *tp;
345    apr_redis_server_t *rs = params;
346#if APR_HAVE_SOCKADDR_UN
347    apr_int32_t family = rs->host[0] != '/' ? APR_INET : APR_UNIX;
348#else
349    apr_int32_t family = APR_INET;
350#endif
351
352    rv = apr_pool_create(&np, pool);
353    if (rv != APR_SUCCESS) {
354        return rv;
355    }
356
357    rv = apr_pool_create(&tp, np);
358    if (rv != APR_SUCCESS) {
359        apr_pool_destroy(np);
360        return rv;
361    }
362
363    conn = apr_palloc(np, sizeof(apr_redis_conn_t));
364
365    conn->p = np;
366    conn->tp = tp;
367
368    rv = apr_socket_create(&conn->sock, family, SOCK_STREAM, 0, np);
369
370    if (rv != APR_SUCCESS) {
371        apr_pool_destroy(np);
372        return rv;
373    }
374
375    conn->buffer = apr_palloc(conn->p, BUFFER_SIZE + 1);
376    conn->blen = 0;
377    conn->rs = rs;
378
379    rv = conn_connect(conn);
380    if (rv != APR_SUCCESS) {
381        apr_pool_destroy(np);
382    }
383    else {
384        *conn_ = conn;
385    }
386
387    return rv;
388}
389
390#if APR_HAS_THREADS
391static apr_status_t
392rc_conn_destruct(void *conn_, void *params, apr_pool_t *pool)
393{
394    apr_redis_conn_t *conn = (apr_redis_conn_t *) conn_;
395    struct iovec vec[3];
396    apr_size_t written;
397
398    /* send a quit message to the Redis server to be nice about it. */
399
400    /*
401     * RESP Command:
402     *   *1
403     *   $4
404     *   QUIT
405     */
406    vec[0].iov_base = RC_RESP_1;
407    vec[0].iov_len = RC_RESP_1_LEN;
408
409    vec[1].iov_base = RC_QUIT_SIZE;
410    vec[1].iov_len = RC_QUIT_SIZE_LEN;
411
412    vec[2].iov_base = RC_QUIT;
413    vec[2].iov_len = RC_QUIT_LEN;
414
415    /* Return values not checked, since we just want to make it go away. */
416    apr_socket_sendv(conn->sock, vec, 3, &written);
417    apr_socket_close(conn->sock);
418
419    apr_pool_destroy(conn->p);
420
421    return APR_SUCCESS;
422}
423#endif
424
425APU_DECLARE(apr_status_t) apr_redis_server_create(apr_pool_t *p,
426                                                  const char *host,
427                                                  apr_port_t port,
428                                                  apr_uint32_t min,
429                                                  apr_uint32_t smax,
430                                                  apr_uint32_t max,
431                                                  apr_uint32_t ttl,
432                                                  apr_uint32_t rwto,
433                                                  apr_redis_server_t **rs)
434{
435    apr_status_t rv = APR_SUCCESS;
436    apr_redis_server_t *server;
437    apr_pool_t *np;
438
439    rv = apr_pool_create(&np, p);
440
441    server = apr_palloc(np, sizeof(apr_redis_server_t));
442
443    server->p = np;
444    server->host = apr_pstrdup(np, host);
445    server->port = port;
446    server->status = APR_RC_SERVER_DEAD;
447    server->rwto = rwto;
448    server->version.major = 0;
449    server->version.minor = 0;
450    server->version.patch = 0;
451
452#if APR_HAS_THREADS
453    rv = apr_thread_mutex_create(&server->lock, APR_THREAD_MUTEX_DEFAULT, np);
454    if (rv != APR_SUCCESS) {
455        return rv;
456    }
457
458    rv = apr_reslist_create(&server->conns,
459                            min,        /* hard minimum */
460                            smax,       /* soft maximum */
461                            max,        /* hard maximum */
462                            ttl,        /* Time to live */
463                            rc_conn_construct,  /* Make a New Connection */
464                            rc_conn_destruct,   /* Kill Old Connection */
465                            server, np);
466    if (rv != APR_SUCCESS) {
467        return rv;
468    }
469
470    apr_reslist_cleanup_order_set(server->conns, APR_RESLIST_CLEANUP_FIRST);
471#else
472    rv = rc_conn_construct((void **) &(server->conn), server, np);
473    if (rv != APR_SUCCESS) {
474        return rv;
475    }
476#endif
477
478    *rs = server;
479
480    return rv;
481}
482
483APU_DECLARE(apr_status_t) apr_redis_create(apr_pool_t *p,
484                                           apr_uint16_t max_servers,
485                                           apr_uint32_t flags,
486                                           apr_redis_t **redis)
487{
488    apr_status_t rv = APR_SUCCESS;
489    apr_redis_t *rc;
490
491    rc = apr_palloc(p, sizeof(apr_redis_t));
492    rc->p = p;
493    rc->nalloc = max_servers;
494    rc->ntotal = 0;
495    rc->live_servers =
496        apr_palloc(p, rc->nalloc * sizeof(struct apr_redis_server_t *));
497    rc->hash_func = NULL;
498    rc->hash_baton = NULL;
499    rc->server_func = NULL;
500    rc->server_baton = NULL;
501    *redis = rc;
502    return rv;
503}
504
505
506/* The crc32 functions and data was originally written by Spencer
507 * Garrett <srg@quick.com> and was gleaned from the PostgreSQL source
508 * tree via the files contrib/ltree/crc32.[ch] and from FreeBSD at
509 * src/usr.bin/cksum/crc32.c.
510 */
511
512static const apr_uint32_t crc32tab[256] = {
513    0x00000000, 0x77073096, 0xee0e612c, 0x990951ba,
514    0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
515    0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
516    0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91,
517    0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de,
518    0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
519    0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec,
520    0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5,
521    0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
522    0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b,
523    0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940,
524    0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
525    0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116,
526    0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f,
527    0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
528    0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d,
529    0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a,
530    0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
531    0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818,
532    0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01,
533    0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
534    0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457,
535    0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c,
536    0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
537    0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2,
538    0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb,
539    0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
540    0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9,
541    0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086,
542    0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
543    0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4,
544    0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad,
545    0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
546    0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683,
547    0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8,
548    0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
549    0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe,
550    0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7,
551    0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
552    0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5,
553    0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252,
554    0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
555    0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60,
556    0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79,
557    0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
558    0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f,
559    0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04,
560    0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
561    0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a,
562    0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713,
563    0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
564    0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21,
565    0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e,
566    0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
567    0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c,
568    0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45,
569    0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
570    0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db,
571    0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0,
572    0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
573    0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6,
574    0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf,
575    0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
576    0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d,
577};
578
579APU_DECLARE(apr_uint32_t) apr_redis_hash_crc32(void *baton,
580                                               const char *data,
581                                               const apr_size_t data_len)
582{
583    apr_uint32_t i;
584    apr_uint32_t crc;
585    crc = ~0;
586
587    for (i = 0; i < data_len; i++)
588        crc = (crc >> 8) ^ crc32tab[(crc ^ (data[i])) & 0xff];
589
590    return ~crc;
591}
592
593APU_DECLARE(apr_uint32_t) apr_redis_hash_default(void *baton,
594                                                 const char *data,
595                                                 const apr_size_t data_len)
596{
597    /* The default Perl Client doesn't actually use just crc32 -- it shifts it again
598     * like this....
599     */
600    return ((apr_redis_hash_crc32(baton, data, data_len) >> 16) & 0x7fff);
601}
602
603APU_DECLARE(apr_uint32_t) apr_redis_hash(apr_redis_t *rc,
604                                         const char *data,
605                                         const apr_size_t data_len)
606{
607    if (rc->hash_func) {
608        return rc->hash_func(rc->hash_baton, data, data_len);
609    }
610    else {
611        return apr_redis_hash_default(NULL, data, data_len);
612    }
613}
614
615static apr_status_t get_server_line(apr_redis_conn_t *conn)
616{
617    apr_size_t bsize = BUFFER_SIZE;
618    apr_status_t rv = APR_SUCCESS;
619
620    rv = apr_brigade_split_line(conn->tb, conn->bb, APR_BLOCK_READ,
621            BUFFER_SIZE);
622
623    if (rv != APR_SUCCESS) {
624        return rv;
625    }
626
627    rv = apr_brigade_flatten(conn->tb, conn->buffer, &bsize);
628
629    if (rv != APR_SUCCESS) {
630        return rv;
631    }
632
633    conn->blen = bsize;
634    conn->buffer[bsize] = '\0';
635
636    return apr_brigade_cleanup(conn->tb);
637}
638
639APU_DECLARE(apr_status_t) apr_redis_set(apr_redis_t *rc,
640                                        const char *key,
641                                        char *data,
642                                        const apr_size_t data_size,
643                                        apr_uint16_t flags)
644{
645    apr_uint32_t hash;
646    apr_redis_server_t *rs;
647    apr_redis_conn_t *conn;
648    apr_status_t rv;
649    apr_size_t written;
650    struct iovec vec[9];
651    char keysize_str[LILBUFF_SIZE];
652    char datasize_str[LILBUFF_SIZE];
653    apr_size_t len, klen;
654
655    klen = strlen(key);
656    hash = apr_redis_hash(rc, key, klen);
657
658    rs = apr_redis_find_server_hash(rc, hash);
659
660    if (rs == NULL)
661        return APR_NOTFOUND;
662
663    rv = rs_find_conn(rs, &conn);
664
665    if (rv != APR_SUCCESS) {
666        apr_redis_disable_server(rc, rs);
667        return rv;
668    }
669
670    /*
671     * RESP Command:
672     *   *3
673     *   $3
674     *   SET
675     *   $<keylen>
676     *   key
677     *   $<datalen>
678     *   data
679     */
680
681    vec[0].iov_base = RC_RESP_3;
682    vec[0].iov_len = RC_RESP_3_LEN;
683
684    vec[1].iov_base = RC_SET_SIZE;
685    vec[1].iov_len = RC_SET_SIZE_LEN;
686
687    vec[2].iov_base = RC_SET;
688    vec[2].iov_len = RC_SET_LEN;
689
690    len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n", klen);
691    vec[3].iov_base = keysize_str;
692    vec[3].iov_len = len;
693
694    vec[4].iov_base = (void *) key;
695    vec[4].iov_len = klen;
696
697    vec[5].iov_base = RC_EOL;
698    vec[5].iov_len = RC_EOL_LEN;
699
700    len = apr_snprintf(datasize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
701                     data_size);
702    vec[6].iov_base = datasize_str;
703    vec[6].iov_len = len;
704
705    vec[7].iov_base = data;
706    vec[7].iov_len = data_size;
707
708    vec[8].iov_base = RC_EOL;
709    vec[8].iov_len = RC_EOL_LEN;
710
711    rv = apr_socket_sendv(conn->sock, vec, 9, &written);
712
713    if (rv != APR_SUCCESS) {
714        rs_bad_conn(rs, conn);
715        apr_redis_disable_server(rc, rs);
716        return rv;
717    }
718
719    rv = get_server_line(conn);
720    if (rv != APR_SUCCESS) {
721        rs_bad_conn(rs, conn);
722        apr_redis_disable_server(rc, rs);
723        return rv;
724    }
725
726    if (strcmp(conn->buffer, RS_STORED RC_EOL) == 0) {
727        rv = APR_SUCCESS;
728    }
729    else if (strcmp(conn->buffer, RS_NOT_STORED RC_EOL) == 0) {
730        rv = APR_EEXIST;
731    }
732    else {
733        rv = APR_EGENERAL;
734    }
735
736    rs_release_conn(rs, conn);
737    return rv;
738}
739
740APU_DECLARE(apr_status_t) apr_redis_setex(apr_redis_t *rc,
741                                          const char *key,
742                                          char *data,
743                                          const apr_size_t data_size,
744                                          apr_uint32_t timeout,
745                                          apr_uint16_t flags)
746{
747    apr_uint32_t hash;
748    apr_redis_server_t *rs;
749    apr_redis_conn_t *conn;
750    apr_status_t rv;
751    apr_size_t written;
752    struct iovec vec[11];
753    char keysize_str[LILBUFF_SIZE];
754    char expire_str[LILBUFF_SIZE];
755    char expiresize_str[LILBUFF_SIZE];
756    char datasize_str[LILBUFF_SIZE];
757    apr_size_t len, klen, expire_len;
758
759
760    klen = strlen(key);
761    hash = apr_redis_hash(rc, key, klen);
762
763    rs = apr_redis_find_server_hash(rc, hash);
764
765    if (rs == NULL)
766        return APR_NOTFOUND;
767
768    rv = rs_find_conn(rs, &conn);
769
770    if (rv != APR_SUCCESS) {
771        apr_redis_disable_server(rc, rs);
772        return rv;
773    }
774
775    /*
776     * RESP Command:
777     *   *4
778     *   $5
779     *   SETEX
780     *   $<keylen>
781     *   key
782     *   $<expirelen>
783     *   expirey
784     *   $<datalen>
785     *   data
786     */
787
788    vec[0].iov_base = RC_RESP_4;
789    vec[0].iov_len = RC_RESP_4_LEN;
790
791    vec[1].iov_base = RC_SETEX_SIZE;
792    vec[1].iov_len = RC_SETEX_SIZE_LEN;
793
794    vec[2].iov_base = RC_SETEX;
795    vec[2].iov_len = RC_SETEX_LEN;
796
797    len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n", klen);
798    vec[3].iov_base = keysize_str;
799    vec[3].iov_len = len;
800
801    vec[4].iov_base = (void *) key;
802    vec[4].iov_len = klen;
803
804    vec[5].iov_base = RC_EOL;
805    vec[5].iov_len = RC_EOL_LEN;
806
807    expire_len = apr_snprintf(expire_str, LILBUFF_SIZE, "%u\r\n", timeout);
808    len = apr_snprintf(expiresize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
809                     expire_len - 2);
810    vec[6].iov_base = (void *) expiresize_str;
811    vec[6].iov_len = len;
812
813    vec[7].iov_base = (void *) expire_str;
814    vec[7].iov_len = expire_len;
815
816    len = apr_snprintf(datasize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
817                     data_size);
818    vec[8].iov_base = datasize_str;
819    vec[8].iov_len = len;
820
821    vec[9].iov_base = data;
822    vec[9].iov_len = data_size;
823
824    vec[10].iov_base = RC_EOL;
825    vec[10].iov_len = RC_EOL_LEN;
826
827    rv = apr_socket_sendv(conn->sock, vec, 11, &written);
828
829    if (rv != APR_SUCCESS) {
830        rs_bad_conn(rs, conn);
831        apr_redis_disable_server(rc, rs);
832        return rv;
833    }
834
835    rv = get_server_line(conn);
836    if (rv != APR_SUCCESS) {
837        rs_bad_conn(rs, conn);
838        apr_redis_disable_server(rc, rs);
839        return rv;
840    }
841
842    if (strcmp(conn->buffer, RS_STORED RC_EOL) == 0) {
843        rv = APR_SUCCESS;
844    }
845    else if (strcmp(conn->buffer, RS_NOT_STORED RC_EOL) == 0) {
846        rv = APR_EEXIST;
847    }
848    else {
849        rv = APR_EGENERAL;
850    }
851
852    rs_release_conn(rs, conn);
853    return rv;
854}
855
856static apr_status_t grab_bulk_resp(apr_redis_server_t *rs, apr_redis_t *rc,
857                                   apr_redis_conn_t *conn, apr_pool_t *p,
858                                   char **baton, apr_size_t *new_length)
859{
860    char *length;
861    char *last;
862    apr_status_t rv;
863    apr_size_t len = 0;
864    *new_length = 0;
865
866    length = apr_strtok(conn->buffer + 1, " ", &last);
867    if (length) {
868        len = strtol(length, (char **) NULL, 10);
869    }
870
871    if (len == 0) {
872        *new_length = 0;
873        *baton = NULL;
874    }
875    else {
876        apr_bucket_brigade *bbb;
877        apr_bucket *e;
878
879        /* eat the trailing \r\n */
880        rv = apr_brigade_partition(conn->bb, len + 2, &e);
881
882        if (rv != APR_SUCCESS) {
883            rs_bad_conn(rs, conn);
884            if (rc)
885                apr_redis_disable_server(rc, rs);
886            return rv;
887        }
888
889        bbb = apr_brigade_split(conn->bb, e);
890
891        rv = apr_brigade_pflatten(conn->bb, baton, &len, p);
892
893        if (rv != APR_SUCCESS) {
894            rs_bad_conn(rs, conn);
895            if (rc)
896                apr_redis_disable_server(rc, rs);
897            return rv;
898        }
899
900        rv = apr_brigade_destroy(conn->bb);
901        if (rv != APR_SUCCESS) {
902            rs_bad_conn(rs, conn);
903            if (rc)
904                apr_redis_disable_server(rc, rs);
905            return rv;
906        }
907
908        conn->bb = bbb;
909
910        *new_length = len - 2;
911        (*baton)[*new_length] = '\0';
912    }
913    return APR_SUCCESS;
914
915}
916
917APU_DECLARE(apr_status_t) apr_redis_getp(apr_redis_t *rc,
918                                         apr_pool_t *p,
919                                         const char *key,
920                                         char **baton,
921                                         apr_size_t *new_length,
922                                         apr_uint16_t *flags)
923{
924    apr_status_t rv;
925    apr_redis_server_t *rs;
926    apr_redis_conn_t *conn;
927    apr_uint32_t hash;
928    apr_size_t written;
929    apr_size_t len, klen;
930    struct iovec vec[6];
931    char keysize_str[LILBUFF_SIZE];
932
933    klen = strlen(key);
934    hash = apr_redis_hash(rc, key, klen);
935    rs = apr_redis_find_server_hash(rc, hash);
936
937    if (rs == NULL)
938        return APR_NOTFOUND;
939
940    rv = rs_find_conn(rs, &conn);
941
942    if (rv != APR_SUCCESS) {
943        apr_redis_disable_server(rc, rs);
944        return rv;
945    }
946
947    /*
948     * RESP Command:
949     *   *2
950     *   $3
951     *   GET
952     *   $<keylen>
953     *   key
954     */
955    vec[0].iov_base = RC_RESP_2;
956    vec[0].iov_len = RC_RESP_2_LEN;
957
958    vec[1].iov_base = RC_GET_SIZE;
959    vec[1].iov_len = RC_GET_SIZE_LEN;
960
961    vec[2].iov_base = RC_GET;
962    vec[2].iov_len = RC_GET_LEN;
963
964    len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
965                     klen);
966    vec[3].iov_base = keysize_str;
967    vec[3].iov_len = len;
968
969    vec[4].iov_base = (void *) key;
970    vec[4].iov_len = klen;
971
972    vec[5].iov_base = RC_EOL;
973    vec[5].iov_len = RC_EOL_LEN;
974
975    rv = apr_socket_sendv(conn->sock, vec, 6, &written);
976
977
978    if (rv != APR_SUCCESS) {
979        rs_bad_conn(rs, conn);
980        apr_redis_disable_server(rc, rs);
981        return rv;
982    }
983
984    rv = get_server_line(conn);
985    if (rv != APR_SUCCESS) {
986        rs_bad_conn(rs, conn);
987        apr_redis_disable_server(rc, rs);
988        return rv;
989    }
990    if (strncmp(RS_NOT_FOUND_GET, conn->buffer, RS_NOT_FOUND_GET_LEN) == 0) {
991        rv = APR_NOTFOUND;
992    }
993    else if (strncmp(RS_TYPE_STRING, conn->buffer, RS_TYPE_STRING_LEN) == 0) {
994        rv = grab_bulk_resp(rs, rc, conn, p, baton, new_length);
995    }
996    else {
997        rv = APR_EGENERAL;
998    }
999
1000    rs_release_conn(rs, conn);
1001    return rv;
1002}
1003
1004APU_DECLARE(apr_status_t)
1005    apr_redis_delete(apr_redis_t *rc, const char *key, apr_uint32_t timeout)
1006{
1007    apr_status_t rv;
1008    apr_redis_server_t *rs;
1009    apr_redis_conn_t *conn;
1010    apr_uint32_t hash;
1011    apr_size_t written;
1012    struct iovec vec[6];
1013    apr_size_t len, klen;
1014    char keysize_str[LILBUFF_SIZE];
1015
1016    klen = strlen(key);
1017    hash = apr_redis_hash(rc, key, klen);
1018    rs = apr_redis_find_server_hash(rc, hash);
1019    if (rs == NULL)
1020        return APR_NOTFOUND;
1021
1022    rv = rs_find_conn(rs, &conn);
1023
1024    if (rv != APR_SUCCESS) {
1025        apr_redis_disable_server(rc, rs);
1026        return rv;
1027    }
1028
1029    /*
1030     * RESP Command:
1031     *   *2
1032     *   $3
1033     *   DEL
1034     *   $<keylen>
1035     *   key
1036     */
1037    vec[0].iov_base = RC_RESP_2;
1038    vec[0].iov_len = RC_RESP_2_LEN;
1039
1040    vec[1].iov_base = RC_DEL_SIZE;
1041    vec[1].iov_len = RC_DEL_SIZE_LEN;
1042
1043    vec[2].iov_base = RC_DEL;
1044    vec[2].iov_len = RC_DEL_LEN;
1045
1046    len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
1047                     klen);
1048    vec[3].iov_base = keysize_str;
1049    vec[3].iov_len = len;
1050
1051    vec[4].iov_base = (void *) key;
1052    vec[4].iov_len = klen;
1053
1054    vec[5].iov_base = RC_EOL;
1055    vec[5].iov_len = RC_EOL_LEN;
1056
1057    rv = apr_socket_sendv(conn->sock, vec, 6, &written);
1058
1059    if (rv != APR_SUCCESS) {
1060        rs_bad_conn(rs, conn);
1061        apr_redis_disable_server(rc, rs);
1062        return rv;
1063    }
1064
1065    rv = get_server_line(conn);
1066    if (rv != APR_SUCCESS) {
1067        rs_bad_conn(rs, conn);
1068        apr_redis_disable_server(rc, rs);
1069        return rv;
1070    }
1071
1072    if (strncmp(RS_DELETED, conn->buffer, RS_DELETED_LEN) == 0) {
1073        rv = APR_SUCCESS;
1074    }
1075    else if (strncmp(RS_NOT_FOUND_DEL, conn->buffer, RS_NOT_FOUND_DEL_LEN) == 0) {
1076        rv = APR_NOTFOUND;
1077    }
1078    else {
1079        rv = APR_EGENERAL;
1080    }
1081
1082    rs_release_conn(rs, conn);
1083    return rv;
1084}
1085
1086APU_DECLARE(apr_status_t)
1087apr_redis_ping(apr_redis_server_t *rs)
1088{
1089    apr_status_t rv;
1090    apr_size_t written;
1091    struct iovec vec[3];
1092    apr_redis_conn_t *conn;
1093
1094    rv = rs_find_conn(rs, &conn);
1095
1096    if (rv != APR_SUCCESS) {
1097        return rv;
1098    }
1099
1100    /*
1101     * RESP Command:
1102     *   *1
1103     *   $4
1104     *   PING
1105     */
1106    vec[0].iov_base = RC_RESP_1;
1107    vec[0].iov_len = RC_RESP_1_LEN;
1108
1109    vec[1].iov_base = RC_PING_SIZE;
1110    vec[1].iov_len = RC_PING_SIZE_LEN;
1111
1112    vec[2].iov_base = RC_PING;
1113    vec[2].iov_len = RC_PING_LEN;
1114
1115    rv = apr_socket_sendv(conn->sock, vec, 3, &written);
1116
1117    if (rv != APR_SUCCESS) {
1118        rs_bad_conn(rs, conn);
1119        return rv;
1120    }
1121
1122    rv = get_server_line(conn);
1123    if (rv == APR_SUCCESS) {
1124        /* we got *something*. Was it Redis? */
1125        if (strncmp(conn->buffer, "+PONG", sizeof("+PONG")-1) != 0) {
1126            rv = APR_EGENERAL;
1127        }
1128    }
1129    rs_release_conn(rs, conn);
1130    return rv;
1131}
1132
1133APU_DECLARE(apr_status_t)
1134apr_redis_info(apr_redis_server_t *rs, apr_pool_t *p, char **baton)
1135{
1136    apr_status_t rv;
1137    apr_redis_conn_t *conn;
1138    apr_size_t written;
1139    struct iovec vec[3];
1140
1141    rv = rs_find_conn(rs, &conn);
1142
1143    if (rv != APR_SUCCESS) {
1144        return rv;
1145    }
1146
1147    /*
1148     * RESP Command:
1149     *   *1
1150     *   $4
1151     *   INFO
1152     */
1153    vec[0].iov_base = RC_RESP_1;
1154    vec[0].iov_len = RC_RESP_1_LEN;
1155
1156    vec[1].iov_base = RC_INFO_SIZE;
1157    vec[1].iov_len = RC_INFO_SIZE_LEN;
1158
1159    vec[2].iov_base = RC_INFO;
1160    vec[2].iov_len = RC_INFO_LEN;
1161
1162    rv = apr_socket_sendv(conn->sock, vec, 3, &written);
1163
1164    if (rv != APR_SUCCESS) {
1165        rs_bad_conn(rs, conn);
1166        return rv;
1167    }
1168
1169    rv = get_server_line(conn);
1170    if (rv != APR_SUCCESS) {
1171        rs_bad_conn(rs, conn);
1172        return rv;
1173    }
1174
1175    if (strncmp(RS_TYPE_STRING, conn->buffer, RS_TYPE_STRING_LEN) == 0) {
1176        apr_size_t nl;
1177        rv = grab_bulk_resp(rs, NULL, conn, p, baton, &nl);
1178    } else {
1179        rs_bad_conn(rs, conn);
1180        rv = APR_EGENERAL;
1181    }
1182
1183    rs_release_conn(rs, conn);
1184    return rv;
1185}
1186
1187#define RV_FIELD "redis_version:"
1188APU_DECLARE(apr_status_t)
1189apr_redis_version(apr_redis_server_t *rs, apr_pool_t *p, char **baton)
1190{
1191    apr_status_t rv;
1192    char *ptr, *eptr;
1193    apr_pool_t *subpool;
1194
1195    /* Have we already obtained the version number? */
1196    if (rs->version.minor != 0) {
1197        if (baton)
1198            *baton = apr_pstrdup(p, rs->version.number);
1199        return APR_SUCCESS;
1200    }
1201    if (apr_pool_create(&subpool, p) != APR_SUCCESS) {
1202        /* well, we tried */
1203        subpool = p;
1204    }
1205    rv = apr_redis_info(rs, subpool, baton);
1206
1207    if (rv != APR_SUCCESS) {
1208        if (subpool != p) {
1209            apr_pool_destroy(subpool);
1210        }
1211        return rv;
1212    }
1213
1214    ptr = strstr(*baton, RV_FIELD);
1215    if (ptr) {
1216        rs->version.major = strtol(ptr + sizeof(RV_FIELD) - 1, &eptr, 10);
1217        ptr = eptr + 1;
1218        rs->version.minor = strtol(ptr, &eptr, 10);
1219        ptr = eptr + 1;
1220        rs->version.patch = strtol(ptr, &eptr, 10);
1221        rs->version.number = apr_psprintf(rs->p, "%d.%d.%d",
1222                rs->version.major, rs->version.minor,
1223                rs->version.patch);
1224    }
1225    if (baton)
1226        *baton = apr_pstrdup(p, rs->version.number);
1227    if (subpool != p) {
1228        apr_pool_destroy(subpool);
1229    }
1230    return APR_SUCCESS;
1231}
1232
1233static apr_status_t plus_minus(apr_redis_t *rc,
1234                    int incr,
1235                    const char *key,
1236                    apr_int32_t inc,
1237                    apr_uint32_t *new_value)
1238{
1239    apr_status_t rv;
1240    apr_redis_server_t *rs;
1241    apr_redis_conn_t *conn;
1242    apr_uint32_t hash;
1243    apr_size_t written;
1244    apr_size_t len, klen;
1245    struct iovec vec[12];
1246    char keysize_str[LILBUFF_SIZE];
1247    char inc_str[LILBUFF_SIZE];
1248    char inc_str_len[LILBUFF_SIZE];
1249    int i = 0;
1250
1251    klen = strlen(key);
1252    hash = apr_redis_hash(rc, key, klen);
1253    rs = apr_redis_find_server_hash(rc, hash);
1254    if (rs == NULL)
1255        return APR_NOTFOUND;
1256
1257    rv = rs_find_conn(rs, &conn);
1258
1259    if (rv != APR_SUCCESS) {
1260        apr_redis_disable_server(rc, rs);
1261        return rv;
1262    }
1263
1264    /*
1265     * RESP Command:
1266     *   *2|*3
1267     *   $4|$6
1268     *   INCR/DECR|INCRBY/DECRBY
1269     *   $<keylen>
1270     *   key
1271     *   <:inc>
1272     */
1273    if (inc == 1) {
1274        vec[i].iov_base = RC_RESP_2;
1275        vec[i].iov_len = RC_RESP_2_LEN;
1276        i++;
1277
1278        vec[i].iov_base = "$4\r\n";
1279        vec[i].iov_len = sizeof("$4\r\n")-1;
1280        i++;
1281
1282        if (incr)
1283            vec[i].iov_base = "INCR\r\n";
1284        else
1285            vec[i].iov_base = "DECR\r\n";
1286        vec[i].iov_len = sizeof("INCR\r\n")-1;
1287        i++;
1288    }
1289    else {
1290        vec[i].iov_base = RC_RESP_3;
1291        vec[i].iov_len = RC_RESP_3_LEN;
1292        i++;
1293
1294        vec[i].iov_base = "$6\r\n";
1295        vec[i].iov_len = sizeof("$6\r\n")-1;
1296        i++;
1297
1298       if (incr)
1299            vec[i].iov_base = "INCRBY\r\n";
1300        else
1301            vec[i].iov_base = "DECRBY\r\n";
1302        vec[i].iov_len = sizeof("INCRBY\r\n")-1;
1303        i++;
1304    }
1305
1306    len = apr_snprintf(keysize_str, LILBUFF_SIZE, "$%" APR_SIZE_T_FMT "\r\n",
1307                     klen);
1308    vec[i].iov_base = keysize_str;
1309    vec[i].iov_len = len;
1310    i++;
1311
1312    vec[i].iov_base = (void *) key;
1313    vec[i].iov_len = klen;
1314    i++;
1315
1316    vec[i].iov_base = RC_EOL;
1317    vec[i].iov_len = RC_EOL_LEN;
1318    i++;
1319
1320    if (inc != 1) {
1321        len = apr_snprintf(inc_str, LILBUFF_SIZE, "%d\r\n", inc);
1322        klen = apr_snprintf(inc_str_len, LILBUFF_SIZE, "$%d\r\n", (int)(len-2));
1323        vec[i].iov_base = inc_str_len;
1324        vec[i].iov_len = klen;
1325        i++;
1326
1327        vec[i].iov_base = inc_str;
1328        vec[i].iov_len = len;
1329        i++;
1330
1331        vec[i].iov_base = RC_EOL;
1332        vec[i].iov_len = RC_EOL_LEN;
1333        i++;
1334    }
1335
1336    rv = apr_socket_sendv(conn->sock, vec, i, &written);
1337
1338    if (rv != APR_SUCCESS) {
1339        rs_bad_conn(rs, conn);
1340        apr_redis_disable_server(rc, rs);
1341        return rv;
1342    }
1343
1344    rv = get_server_line(conn);
1345    if (rv != APR_SUCCESS) {
1346        rs_bad_conn(rs, conn);
1347        apr_redis_disable_server(rc, rs);
1348        return rv;
1349    }
1350    if (strncmp(RS_NOT_FOUND_GET, conn->buffer, RS_NOT_FOUND_GET_LEN) == 0) {
1351        rv = APR_NOTFOUND;
1352    }
1353    else if (*conn->buffer == ':') {
1354        *new_value = atoi((const char *)(conn->buffer + 1));
1355        rv = APR_SUCCESS;
1356    }
1357    else {
1358        rv = APR_EGENERAL;
1359    }
1360    rs_release_conn(rs, conn);
1361    return rv;
1362}
1363
1364APU_DECLARE(apr_status_t)
1365apr_redis_incr(apr_redis_t *rc, const char *key, apr_int32_t inc, apr_uint32_t *new_value)
1366{
1367    return plus_minus(rc, 1, key, inc, new_value);
1368}
1369
1370APU_DECLARE(apr_status_t)
1371apr_redis_decr(apr_redis_t *rc, const char *key, apr_int32_t inc, apr_uint32_t *new_value)
1372{
1373    return plus_minus(rc, 0, key, inc, new_value);
1374}
1375
1376APU_DECLARE(apr_status_t)
1377apr_redis_multgetp(apr_redis_t *rc,
1378                   apr_pool_t *temp_pool,
1379                   apr_pool_t *data_pool,
1380                   apr_hash_t *values)
1381{
1382    return APR_ENOTIMPL;
1383}
1384
1385/**
1386 * Define all of the strings for stats
1387 */
1388
1389#define STAT_process_id "process_id:"
1390#define STAT_process_id_LEN (sizeof(STAT_process_id)-1)
1391
1392#define STAT_uptime_in_seconds "uptime_in_seconds:"
1393#define STAT_uptime_in_seconds_LEN (sizeof(STAT_uptime_in_seconds)-1)
1394
1395#define STAT_arch_bits "arch_bits:"
1396#define STAT_arch_bits_LEN (sizeof(STAT_arch_bits)-1)
1397
1398#define STAT_connected_clients "connected_clients:"
1399#define STAT_connected_clients_LEN (sizeof(STAT_connected_clients)-1)
1400
1401#define STAT_blocked_clients "blocked_clients:"
1402#define STAT_blocked_clients_LEN (sizeof(STAT_blocked_clients)-1)
1403
1404#define STAT_maxmemory "maxmemory:"
1405#define STAT_maxmemory_LEN (sizeof(STAT_maxmemory)-1)
1406
1407#define STAT_used_memory "used_memory:"
1408#define STAT_used_memory_LEN (sizeof(STAT_used_memory)-1)
1409
1410#define STAT_total_system_memory "total_system_memory:"
1411#define STAT_total_system_memory_LEN (sizeof(STAT_total_system_memory)-1)
1412
1413#define STAT_total_connections_received "total_connections_received:"
1414#define STAT_total_connections_received_LEN (sizeof(STAT_total_connections_received)-1)
1415
1416#define STAT_total_commands_processed "total_commands_processed:"
1417#define STAT_total_commands_processed_LEN (sizeof(STAT_total_commands_processed)-1)
1418
1419#define STAT_rejected_connections "rejected_connections:"
1420#define STAT_rejected_connections_LEN (sizeof(STAT_rejected_connections)-1)
1421
1422#define STAT_total_net_input_bytes "total_net_input_bytes:"
1423#define STAT_total_net_input_bytes_LEN (sizeof(STAT_total_net_input_bytes)-1)
1424
1425#define STAT_total_net_output_bytes "total_net_output_bytes:"
1426#define STAT_total_net_output_bytes_LEN (sizeof(STAT_total_net_output_bytes)-1)
1427
1428#define STAT_keyspace_hits "keyspace_hits:"
1429#define STAT_keyspace_hits_LEN (sizeof(STAT_keyspace_hits)-1)
1430
1431#define STAT_keyspace_misses "keyspace_misses:"
1432#define STAT_keyspace_misses_LEN (sizeof(STAT_keyspace_misses)-1)
1433
1434#define STAT_connected_slaves "connected_slaves:"
1435#define STAT_connected_slaves_LEN (sizeof(STAT_connected_slaves)-1)
1436
1437#define STAT_used_cpu_sys "used_cpu_sys:"
1438#define STAT_used_cpu_sys_LEN (sizeof(STAT_used_cpu_sys)-1)
1439
1440#define STAT_used_cpu_user "used_cpu_user:"
1441#define STAT_used_cpu_user_LEN (sizeof(STAT_used_cpu_user)-1)
1442
1443#define STAT_cluster_enabled "cluster_enabled:"
1444#define STAT_cluster_enabled_LEN (sizeof(STAT_cluster_enabled)-1)
1445
1446static apr_uint32_t stat_read_uint32( char *buf)
1447{
1448    return atoi(buf);
1449}
1450
1451static apr_uint64_t stat_read_uint64(char *buf)
1452{
1453    return apr_atoi64(buf);
1454}
1455
1456#define rc_do_stat(name, type) \
1457    if ((ptr = strstr(info , STAT_ ## name )) != NULL ) { \
1458        char *str = ptr + (STAT_ ## name ## _LEN ); \
1459        stats-> name = stat_read_ ## type (str); \
1460    }
1461
1462static void update_stats(char *info, apr_redis_stats_t *stats)
1463{
1464    char *ptr;
1465
1466    rc_do_stat(process_id, uint32);
1467    rc_do_stat(uptime_in_seconds, uint32);
1468    rc_do_stat(arch_bits, uint32);
1469    rc_do_stat(connected_clients, uint32);
1470    rc_do_stat(blocked_clients, uint32);
1471    rc_do_stat(maxmemory, uint64);
1472    rc_do_stat(used_memory, uint64);
1473    rc_do_stat(total_system_memory, uint64);
1474    rc_do_stat(total_connections_received, uint64);
1475    rc_do_stat(total_commands_processed, uint64);
1476    rc_do_stat(rejected_connections, uint64);
1477    rc_do_stat(total_net_input_bytes, uint64);
1478    rc_do_stat(total_net_output_bytes, uint64);
1479    rc_do_stat(keyspace_hits, uint64);
1480    rc_do_stat(keyspace_misses, uint64);
1481    rc_do_stat(connected_slaves, uint32);
1482    rc_do_stat(used_cpu_sys, uint32);
1483    rc_do_stat(used_cpu_user, uint32);
1484    rc_do_stat(cluster_enabled, uint32);
1485}
1486
1487APU_DECLARE(apr_status_t)
1488apr_redis_stats(apr_redis_server_t *rs,
1489                apr_pool_t *p,
1490                apr_redis_stats_t **stats)
1491{
1492    apr_status_t rv;
1493    char *info;
1494    apr_pool_t *subpool;
1495    apr_redis_stats_t *ret;
1496    char *ptr;
1497
1498    if (apr_pool_create(&subpool, p) != APR_SUCCESS) {
1499        /* well, we tried */
1500        subpool = p;
1501    }
1502    rv = apr_redis_info(rs, subpool, &info);
1503
1504    if (rv != APR_SUCCESS) {
1505        if (subpool != p) {
1506            apr_pool_destroy(subpool);
1507        }
1508        return rv;
1509    }
1510    ret = apr_pcalloc(p, sizeof(apr_redis_stats_t));
1511    /* Get the bulk of the stats */
1512    update_stats(info, ret);
1513
1514    /* Now the version number */
1515    if (rs->version.major != 0) {
1516        ret->major = rs->version.major;
1517        ret->minor = rs->version.minor;
1518        ret->patch = rs->version.patch;
1519    }
1520    else {
1521        char *eptr;
1522        ptr = strstr(info, RV_FIELD);
1523        if (ptr) {
1524            ret->major = rs->version.major = strtol(ptr + sizeof(RV_FIELD) - 1, &eptr, 10);
1525            ptr = eptr + 1;
1526            ret->minor = rs->version.minor = strtol(ptr, &eptr, 10);
1527            ptr = eptr + 1;
1528            ret->patch = rs->version.patch = strtol(ptr, &eptr, 10);
1529        }
1530    }
1531
1532    /* Finally, the role */
1533    ptr = strstr(info, "role:");
1534    if (!ptr) {
1535        ret->role = APR_RS_SERVER_UNKNOWN;
1536    }
1537    else if (!strncmp("master", ptr + sizeof("role:") - 1, sizeof("master")-1)) {
1538        ret->role = APR_RS_SERVER_MASTER;
1539    }
1540    else {
1541        ret->role = APR_RS_SERVER_SLAVE;
1542    }
1543    if (stats) {
1544        *stats = ret;
1545    }
1546
1547    return APR_SUCCESS;
1548}
1549