1/*-
2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 *
26 */
27
28#include <sys/types.h>
29#include <sys/event.h>
30#include <sys/socket.h>
31#include <sys/time.h>
32
33#include <assert.h>
34#include <errno.h>
35#include <nsswitch.h>
36#include <stdio.h>
37#include <stdlib.h>
38#include <string.h>
39
40#include "cachelib.h"
41#include "config.h"
42#include "debug.h"
43#include "log.h"
44#include "query.h"
45#include "mp_rs_query.h"
46#include "mp_ws_query.h"
47#include "singletons.h"
48
49static int on_mp_read_session_close_notification(struct query_state *);
50static void on_mp_read_session_destroy(struct query_state *);
51static int on_mp_read_session_mapper(struct query_state *);
52/* int on_mp_read_session_request_read1(struct query_state *); */
53static int on_mp_read_session_request_read2(struct query_state *);
54static int on_mp_read_session_request_process(struct query_state *);
55static int on_mp_read_session_response_write1(struct query_state *);
56static int on_mp_read_session_read_request_process(struct query_state *);
57static int on_mp_read_session_read_response_write1(struct query_state *);
58static int on_mp_read_session_read_response_write2(struct query_state *);
59
60/*
61 * This function is used as the query_state's destroy_func to make the
62 * proper cleanup in case of errors.
63 */
64static void
65on_mp_read_session_destroy(struct query_state *qstate)
66{
67	TRACE_IN(on_mp_read_session_destroy);
68	finalize_comm_element(&qstate->request);
69	finalize_comm_element(&qstate->response);
70
71	if (qstate->mdata != NULL) {
72		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
73		close_cache_mp_read_session(
74	    		(cache_mp_read_session)qstate->mdata);
75		configuration_unlock_entry(qstate->config_entry,
76			CELT_MULTIPART);
77	}
78	TRACE_OUT(on_mp_read_session_destroy);
79}
80
81/*
82 * The functions below are used to process multipart read session initiation
83 * requests.
84 * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
85 *   the request itself
86 * - on_mp_read_session_request_process processes it
87 * - on_mp_read_session_response_write1 sends the response
88 */
89int
90on_mp_read_session_request_read1(struct query_state *qstate)
91{
92	struct cache_mp_read_session_request	*c_mp_rs_request;
93	ssize_t	result;
94
95	TRACE_IN(on_mp_read_session_request_read1);
96	if (qstate->kevent_watermark == 0)
97		qstate->kevent_watermark = sizeof(size_t);
98	else {
99		init_comm_element(&qstate->request,
100	    		CET_MP_READ_SESSION_REQUEST);
101		c_mp_rs_request = get_cache_mp_read_session_request(
102	    		&qstate->request);
103
104		result = qstate->read_func(qstate,
105	    		&c_mp_rs_request->entry_length, sizeof(size_t));
106
107		if (result != sizeof(size_t)) {
108			TRACE_OUT(on_mp_read_session_request_read1);
109			return (-1);
110		}
111
112		if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
113			TRACE_OUT(on_mp_read_session_request_read1);
114			return (-1);
115		}
116
117		c_mp_rs_request->entry = calloc(1,
118			c_mp_rs_request->entry_length + 1);
119		assert(c_mp_rs_request->entry != NULL);
120
121		qstate->kevent_watermark = c_mp_rs_request->entry_length;
122		qstate->process_func = on_mp_read_session_request_read2;
123	}
124	TRACE_OUT(on_mp_read_session_request_read1);
125	return (0);
126}
127
128static int
129on_mp_read_session_request_read2(struct query_state *qstate)
130{
131	struct cache_mp_read_session_request	*c_mp_rs_request;
132	ssize_t	result;
133
134	TRACE_IN(on_mp_read_session_request_read2);
135	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
136
137	result = qstate->read_func(qstate, c_mp_rs_request->entry,
138		c_mp_rs_request->entry_length);
139
140	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
141		LOG_ERR_3("on_mp_read_session_request_read2",
142			"read failed");
143		TRACE_OUT(on_mp_read_session_request_read2);
144		return (-1);
145	}
146
147	qstate->kevent_watermark = 0;
148	qstate->process_func = on_mp_read_session_request_process;
149	TRACE_OUT(on_mp_read_session_request_read2);
150	return (0);
151}
152
153static int
154on_mp_read_session_request_process(struct query_state *qstate)
155{
156	struct cache_mp_read_session_request	*c_mp_rs_request;
157	struct cache_mp_read_session_response	*c_mp_rs_response;
158	cache_mp_read_session	rs;
159	cache_entry	c_entry;
160	char	*dec_cache_entry_name;
161
162	char *buffer;
163	size_t buffer_size;
164	cache_mp_write_session ws;
165	struct agent	*lookup_agent;
166	struct multipart_agent *mp_agent;
167	void *mdata;
168	int res;
169
170	TRACE_IN(on_mp_read_session_request_process);
171	init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
172	c_mp_rs_response = get_cache_mp_read_session_response(
173		&qstate->response);
174	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
175
176	qstate->config_entry = configuration_find_entry(
177		s_configuration, c_mp_rs_request->entry);
178	if (qstate->config_entry == NULL) {
179		c_mp_rs_response->error_code = ENOENT;
180
181		LOG_ERR_2("read_session_request",
182			"can't find configuration entry '%s'."
183			" aborting request", c_mp_rs_request->entry);
184		goto fin;
185	}
186
187	if (qstate->config_entry->enabled == 0) {
188		c_mp_rs_response->error_code = EACCES;
189
190		LOG_ERR_2("read_session_request",
191			"configuration entry '%s' is disabled",
192			c_mp_rs_request->entry);
193		goto fin;
194	}
195
196	if (qstate->config_entry->perform_actual_lookups != 0)
197		dec_cache_entry_name = strdup(
198			qstate->config_entry->mp_cache_params.cep.entry_name);
199	else {
200#ifdef NS_NSCD_EID_CHECKING
201		if (check_query_eids(qstate) != 0) {
202			c_mp_rs_response->error_code = EPERM;
203			goto fin;
204		}
205#endif
206
207		asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
208			qstate->config_entry->mp_cache_params.cep.entry_name);
209	}
210
211	assert(dec_cache_entry_name != NULL);
212
213	configuration_lock_rdlock(s_configuration);
214	c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
215	configuration_unlock(s_configuration);
216
217	if ((c_entry == INVALID_CACHE) &&
218	   (qstate->config_entry->perform_actual_lookups != 0))
219		c_entry = register_new_mp_cache_entry(qstate,
220			dec_cache_entry_name);
221
222	free(dec_cache_entry_name);
223
224	if (c_entry != INVALID_CACHE_ENTRY) {
225		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
226		rs = open_cache_mp_read_session(c_entry);
227		configuration_unlock_entry(qstate->config_entry,
228			CELT_MULTIPART);
229
230		if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
231		   (qstate->config_entry->perform_actual_lookups != 0)) {
232			lookup_agent = find_agent(s_agent_table,
233				c_mp_rs_request->entry, MULTIPART_AGENT);
234
235			if ((lookup_agent != NULL) &&
236			(lookup_agent->type == MULTIPART_AGENT)) {
237				mp_agent = (struct multipart_agent *)
238					lookup_agent;
239				mdata = mp_agent->mp_init_func();
240
241				/*
242				 * Multipart agents read the whole snapshot
243				 * of the data at one time.
244				 */
245				configuration_lock_entry(qstate->config_entry,
246					CELT_MULTIPART);
247				ws = open_cache_mp_write_session(c_entry);
248				configuration_unlock_entry(qstate->config_entry,
249					CELT_MULTIPART);
250				if (ws != NULL) {
251				    do {
252					buffer = NULL;
253					res = mp_agent->mp_lookup_func(&buffer,
254						&buffer_size,
255						mdata);
256
257					if ((res & NS_TERMINATE) &&
258					   (buffer != NULL)) {
259						configuration_lock_entry(
260							qstate->config_entry,
261						   	CELT_MULTIPART);
262						if (cache_mp_write(ws, buffer,
263						    buffer_size) != 0) {
264							abandon_cache_mp_write_session(ws);
265							ws = NULL;
266						}
267						configuration_unlock_entry(
268							qstate->config_entry,
269							CELT_MULTIPART);
270
271						free(buffer);
272						buffer = NULL;
273					} else {
274						configuration_lock_entry(
275							qstate->config_entry,
276							CELT_MULTIPART);
277						close_cache_mp_write_session(ws);
278						configuration_unlock_entry(
279							qstate->config_entry,
280							CELT_MULTIPART);
281
282						free(buffer);
283						buffer = NULL;
284					}
285				    } while ((res & NS_TERMINATE) &&
286				    	    (ws != NULL));
287				}
288
289				configuration_lock_entry(qstate->config_entry,
290					CELT_MULTIPART);
291				rs = open_cache_mp_read_session(c_entry);
292				configuration_unlock_entry(qstate->config_entry,
293					CELT_MULTIPART);
294			}
295		}
296
297		if (rs == INVALID_CACHE_MP_READ_SESSION)
298			c_mp_rs_response->error_code = -1;
299		else {
300		    qstate->mdata = rs;
301		    qstate->destroy_func = on_mp_read_session_destroy;
302
303		    configuration_lock_entry(qstate->config_entry,
304			CELT_MULTIPART);
305		    if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
306		    (qstate->config_entry->mp_query_timeout.tv_usec != 0))
307			memcpy(&qstate->timeout,
308			    &qstate->config_entry->mp_query_timeout,
309			    sizeof(struct timeval));
310		    configuration_unlock_entry(qstate->config_entry,
311			CELT_MULTIPART);
312		}
313	} else
314		c_mp_rs_response->error_code = -1;
315
316fin:
317	qstate->process_func = on_mp_read_session_response_write1;
318	qstate->kevent_watermark = sizeof(int);
319	qstate->kevent_filter = EVFILT_WRITE;
320
321	TRACE_OUT(on_mp_read_session_request_process);
322	return (0);
323}
324
325static int
326on_mp_read_session_response_write1(struct query_state *qstate)
327{
328	struct cache_mp_read_session_response	*c_mp_rs_response;
329	ssize_t	result;
330
331	TRACE_IN(on_mp_read_session_response_write1);
332	c_mp_rs_response = get_cache_mp_read_session_response(
333		&qstate->response);
334	result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
335		sizeof(int));
336
337	if (result != sizeof(int)) {
338		LOG_ERR_3("on_mp_read_session_response_write1",
339			"write failed");
340		TRACE_OUT(on_mp_read_session_response_write1);
341		return (-1);
342	}
343
344	if (c_mp_rs_response->error_code == 0) {
345		qstate->kevent_watermark = sizeof(int);
346		qstate->process_func = on_mp_read_session_mapper;
347		qstate->kevent_filter = EVFILT_READ;
348	} else {
349		qstate->kevent_watermark = 0;
350		qstate->process_func = NULL;
351	}
352	TRACE_OUT(on_mp_read_session_response_write1);
353	return (0);
354}
355
356/*
357 * Mapper function is used to avoid multiple connections for each session
358 * write or read requests. After processing the request, it does not close
359 * the connection, but waits for the next request.
360 */
361static int
362on_mp_read_session_mapper(struct query_state *qstate)
363{
364	ssize_t	result;
365	int elem_type;
366
367	TRACE_IN(on_mp_read_session_mapper);
368	if (qstate->kevent_watermark == 0) {
369		qstate->kevent_watermark = sizeof(int);
370	} else {
371		result = qstate->read_func(qstate, &elem_type, sizeof(int));
372		if (result != sizeof(int)) {
373			LOG_ERR_3("on_mp_read_session_mapper",
374				"read failed");
375			TRACE_OUT(on_mp_read_session_mapper);
376			return (-1);
377		}
378
379		switch (elem_type) {
380		case CET_MP_READ_SESSION_READ_REQUEST:
381			qstate->kevent_watermark = 0;
382			qstate->process_func =
383				on_mp_read_session_read_request_process;
384			break;
385		case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
386			qstate->kevent_watermark = 0;
387			qstate->process_func =
388				on_mp_read_session_close_notification;
389			break;
390		default:
391			qstate->kevent_watermark = 0;
392			qstate->process_func = NULL;
393			LOG_ERR_3("on_mp_read_session_mapper",
394				"unknown element type");
395			TRACE_OUT(on_mp_read_session_mapper);
396			return (-1);
397		}
398	}
399	TRACE_OUT(on_mp_read_session_mapper);
400	return (0);
401}
402
403/*
404 * The functions below are used to process multipart read sessions read
405 * requests. User doesn't have to pass any kind of data, besides the
406 * request identificator itself. So we don't need any XXX_read functions and
407 * start with the XXX_process function.
408 * - on_mp_read_session_read_request_process processes it
409 * - on_mp_read_session_read_response_write1 and
410 *   on_mp_read_session_read_response_write2 sends the response
411 */
412static int
413on_mp_read_session_read_request_process(struct query_state *qstate)
414{
415	struct cache_mp_read_session_read_response	*read_response;
416
417	TRACE_IN(on_mp_read_session_response_process);
418	init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
419	read_response = get_cache_mp_read_session_read_response(
420		&qstate->response);
421
422	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
423	read_response->error_code = cache_mp_read(
424		(cache_mp_read_session)qstate->mdata, NULL,
425		&read_response->data_size);
426
427	if (read_response->error_code == 0) {
428		read_response->data = malloc(read_response->data_size);
429		assert(read_response != NULL);
430		read_response->error_code = cache_mp_read(
431			(cache_mp_read_session)qstate->mdata,
432	    		read_response->data,
433			&read_response->data_size);
434	}
435	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
436
437	if (read_response->error_code == 0)
438		qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
439	else
440		qstate->kevent_watermark = sizeof(int);
441	qstate->process_func = on_mp_read_session_read_response_write1;
442	qstate->kevent_filter = EVFILT_WRITE;
443
444	TRACE_OUT(on_mp_read_session_response_process);
445	return (0);
446}
447
448static int
449on_mp_read_session_read_response_write1(struct query_state *qstate)
450{
451	struct cache_mp_read_session_read_response	*read_response;
452	ssize_t	result;
453
454	TRACE_IN(on_mp_read_session_read_response_write1);
455	read_response = get_cache_mp_read_session_read_response(
456		&qstate->response);
457
458	result = qstate->write_func(qstate, &read_response->error_code,
459		sizeof(int));
460	if (read_response->error_code == 0) {
461		result += qstate->write_func(qstate, &read_response->data_size,
462			sizeof(size_t));
463		if (result < 0 || (size_t)result != qstate->kevent_watermark) {
464			TRACE_OUT(on_mp_read_session_read_response_write1);
465			LOG_ERR_3("on_mp_read_session_read_response_write1",
466				"write failed");
467			return (-1);
468		}
469
470		qstate->kevent_watermark = read_response->data_size;
471		qstate->process_func = on_mp_read_session_read_response_write2;
472	} else {
473		if (result < 0 || (size_t)result != qstate->kevent_watermark) {
474			LOG_ERR_3("on_mp_read_session_read_response_write1",
475				"write failed");
476			TRACE_OUT(on_mp_read_session_read_response_write1);
477			return (-1);
478		}
479
480		qstate->kevent_watermark = 0;
481		qstate->process_func = NULL;
482	}
483
484	TRACE_OUT(on_mp_read_session_read_response_write1);
485	return (0);
486}
487
488static int
489on_mp_read_session_read_response_write2(struct query_state *qstate)
490{
491	struct cache_mp_read_session_read_response *read_response;
492	ssize_t	result;
493
494	TRACE_IN(on_mp_read_session_read_response_write2);
495	read_response = get_cache_mp_read_session_read_response(
496		&qstate->response);
497	result = qstate->write_func(qstate, read_response->data,
498		read_response->data_size);
499	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
500		LOG_ERR_3("on_mp_read_session_read_response_write2",
501			"write failed");
502		TRACE_OUT(on_mp_read_session_read_response_write2);
503		return (-1);
504	}
505
506	finalize_comm_element(&qstate->request);
507	finalize_comm_element(&qstate->response);
508
509	qstate->kevent_watermark = sizeof(int);
510	qstate->process_func = on_mp_read_session_mapper;
511	qstate->kevent_filter = EVFILT_READ;
512
513	TRACE_OUT(on_mp_read_session_read_response_write2);
514	return (0);
515}
516
517/*
518 * Handles session close notification by calling close_cache_mp_read_session
519 * function.
520 */
521static int
522on_mp_read_session_close_notification(struct query_state *qstate)
523{
524
525	TRACE_IN(on_mp_read_session_close_notification);
526	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
527	close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
528	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
529	qstate->mdata = NULL;
530	qstate->kevent_watermark = 0;
531	qstate->process_func = NULL;
532	TRACE_OUT(on_mp_read_session_close_notification);
533	return (0);
534}
535