mp_rs_query.c revision 194104
168349Sobrien/*-
2133359Sobrien * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3133359Sobrien * All rights reserved.
4133359Sobrien *
5133359Sobrien * Redistribution and use in source and binary forms, with or without
6133359Sobrien * modification, are permitted provided that the following conditions
7133359Sobrien * are met:
8133359Sobrien * 1. Redistributions of source code must retain the above copyright
9133359Sobrien *    notice, this list of conditions and the following disclaimer.
10133359Sobrien * 2. Redistributions in binary form must reproduce the above copyright
11133359Sobrien *    notice, this list of conditions and the following disclaimer in the
12133359Sobrien *    documentation and/or other materials provided with the distribution.
13133359Sobrien *
14133359Sobrien * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15133359Sobrien * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16133359Sobrien * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17133359Sobrien * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18133359Sobrien * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19133359Sobrien * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20133359Sobrien * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21133359Sobrien * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22133359Sobrien * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23133359Sobrien * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24133359Sobrien * SUCH DAMAGE.
25133359Sobrien *
26133359Sobrien */
27133359Sobrien
28133359Sobrien#include <sys/cdefs.h>
2968349Sobrien__FBSDID("$FreeBSD: head/usr.sbin/nscd/mp_rs_query.c 194104 2009-06-13 13:07:56Z des $");
3068349Sobrien
3168349Sobrien#include <sys/types.h>
3268349Sobrien#include <sys/event.h>
3368349Sobrien#include <sys/socket.h>
3468349Sobrien#include <sys/time.h>
3568349Sobrien
36191736Sobrien#include <assert.h>
37191736Sobrien#include <errno.h>
38191736Sobrien#include <nsswitch.h>
39191736Sobrien#include <stdio.h>
40191736Sobrien#include <stdlib.h>
41133359Sobrien#include <string.h>
4268349Sobrien
4368349Sobrien#include "cachelib.h"
4468349Sobrien#include "config.h"
4568349Sobrien#include "debug.h"
4668349Sobrien#include "log.h"
47133359Sobrien#include "query.h"
48169942Sobrien#include "mp_rs_query.h"
4968349Sobrien#include "mp_ws_query.h"
5068349Sobrien#include "singletons.h"
5168349Sobrien
52169962Sobrienstatic int on_mp_read_session_close_notification(struct query_state *);
53169962Sobrienstatic void on_mp_read_session_destroy(struct query_state *);
54169962Sobrienstatic int on_mp_read_session_mapper(struct query_state *);
55175296Sobrien/* int on_mp_read_session_request_read1(struct query_state *); */
56175296Sobrienstatic int on_mp_read_session_request_read2(struct query_state *);
57103373Sobrienstatic int on_mp_read_session_request_process(struct query_state *);
58103373Sobrienstatic int on_mp_read_session_response_write1(struct query_state *);
59103373Sobrienstatic int on_mp_read_session_read_request_process(struct query_state *);
60186690Sobrienstatic int on_mp_read_session_read_response_write1(struct query_state *);
61186690Sobrienstatic int on_mp_read_session_read_response_write2(struct query_state *);
62133359Sobrien
63186690Sobrien/*
64133359Sobrien * This function is used as the query_state's destroy_func to make the
6568349Sobrien * proper cleanup in case of errors.
6668349Sobrien */
6775937Sobrienstatic void
6875937Sobrienon_mp_read_session_destroy(struct query_state *qstate)
6975937Sobrien{
7068349Sobrien	TRACE_IN(on_mp_read_session_destroy);
7168349Sobrien	finalize_comm_element(&qstate->request);
7268349Sobrien	finalize_comm_element(&qstate->response);
7368349Sobrien
7468349Sobrien	if (qstate->mdata != NULL) {
75159764Sobrien		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
76159764Sobrien		close_cache_mp_read_session(
7780588Sobrien	    		(cache_mp_read_session)qstate->mdata);
78191736Sobrien		configuration_unlock_entry(qstate->config_entry,
7968349Sobrien			CELT_MULTIPART);
8068349Sobrien	}
81169962Sobrien	TRACE_OUT(on_mp_read_session_destroy);
8268349Sobrien}
83169942Sobrien
8468349Sobrien/*
85169942Sobrien * The functions below are used to process multipart read session initiation
86133359Sobrien * requests.
87159764Sobrien * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
88159764Sobrien *   the request itself
89175296Sobrien * - on_mp_read_session_request_process processes it
90133359Sobrien * - on_mp_read_session_response_write1 sends the response
91133359Sobrien */
92103373Sobrienint
9368349Sobrienon_mp_read_session_request_read1(struct query_state *qstate)
94133359Sobrien{
95169962Sobrien	struct cache_mp_read_session_request	*c_mp_rs_request;
96169962Sobrien	ssize_t	result;
9768349Sobrien
98133359Sobrien	TRACE_IN(on_mp_read_session_request_read1);
99133359Sobrien	if (qstate->kevent_watermark == 0)
100133359Sobrien		qstate->kevent_watermark = sizeof(size_t);
101175296Sobrien	else {
10268349Sobrien		init_comm_element(&qstate->request,
103133359Sobrien	    		CET_MP_READ_SESSION_REQUEST);
104133359Sobrien		c_mp_rs_request = get_cache_mp_read_session_request(
105133359Sobrien	    		&qstate->request);
10668349Sobrien
10768349Sobrien		result = qstate->read_func(qstate,
10868349Sobrien	    		&c_mp_rs_request->entry_length, sizeof(size_t));
10968349Sobrien
110159764Sobrien		if (result != sizeof(size_t)) {
111169942Sobrien			TRACE_OUT(on_mp_read_session_request_read1);
112133359Sobrien			return (-1);
113133359Sobrien		}
114169962Sobrien
115133359Sobrien		if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
116175296Sobrien			TRACE_OUT(on_mp_read_session_request_read1);
117175296Sobrien			return (-1);
118175296Sobrien		}
119175296Sobrien
120175296Sobrien		c_mp_rs_request->entry = calloc(1,
121175296Sobrien			c_mp_rs_request->entry_length + 1);
122175296Sobrien		assert(c_mp_rs_request->entry != NULL);
123175296Sobrien
124175296Sobrien		qstate->kevent_watermark = c_mp_rs_request->entry_length;
125133359Sobrien		qstate->process_func = on_mp_read_session_request_read2;
126175296Sobrien	}
127175296Sobrien	TRACE_OUT(on_mp_read_session_request_read1);
128133359Sobrien	return (0);
129133359Sobrien}
130133359Sobrien
13168349Sobrienstatic int
13268349Sobrienon_mp_read_session_request_read2(struct query_state *qstate)
133133359Sobrien{
134133359Sobrien	struct cache_mp_read_session_request	*c_mp_rs_request;
135133359Sobrien	ssize_t	result;
136133359Sobrien
137133359Sobrien	TRACE_IN(on_mp_read_session_request_read2);
13868349Sobrien	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
13968349Sobrien
14075937Sobrien	result = qstate->read_func(qstate, c_mp_rs_request->entry,
14175937Sobrien		c_mp_rs_request->entry_length);
14275937Sobrien
143133359Sobrien	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
144103373Sobrien		LOG_ERR_3("on_mp_read_session_request_read2",
14575937Sobrien			"read failed");
14675937Sobrien		TRACE_OUT(on_mp_read_session_request_read2);
14775937Sobrien		return (-1);
14868349Sobrien	}
14975937Sobrien
15075937Sobrien	qstate->kevent_watermark = 0;
15175937Sobrien	qstate->process_func = on_mp_read_session_request_process;
15275937Sobrien	TRACE_OUT(on_mp_read_session_request_read2);
15375937Sobrien	return (0);
15475937Sobrien}
15575937Sobrien
15675937Sobrienstatic int
157103373Sobrienon_mp_read_session_request_process(struct query_state *qstate)
15875937Sobrien{
15975937Sobrien	struct cache_mp_read_session_request	*c_mp_rs_request;
16075937Sobrien	struct cache_mp_read_session_response	*c_mp_rs_response;
16175937Sobrien	cache_mp_read_session	rs;
16275937Sobrien	cache_entry	c_entry;
16375937Sobrien	char	*dec_cache_entry_name;
16475937Sobrien
16575937Sobrien	char *buffer;
16675937Sobrien	size_t buffer_size;
16775937Sobrien	cache_mp_write_session ws;
168169942Sobrien	struct agent	*lookup_agent;
169169962Sobrien	struct multipart_agent *mp_agent;
17075937Sobrien	void *mdata;
171169962Sobrien	int res;
172169942Sobrien
173169942Sobrien	TRACE_IN(on_mp_read_session_request_process);
174169942Sobrien	init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
17575937Sobrien	c_mp_rs_response = get_cache_mp_read_session_response(
17675937Sobrien		&qstate->response);
177169942Sobrien	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
178169942Sobrien
179169942Sobrien	qstate->config_entry = configuration_find_entry(
180169942Sobrien		s_configuration, c_mp_rs_request->entry);
181169962Sobrien	if (qstate->config_entry == NULL) {
182169942Sobrien		c_mp_rs_response->error_code = ENOENT;
183169962Sobrien
184169942Sobrien		LOG_ERR_2("read_session_request",
185169942Sobrien			"can't find configuration entry '%s'."
186169962Sobrien			" aborting request", c_mp_rs_request->entry);
187169942Sobrien		goto fin;
188169942Sobrien	}
189169942Sobrien
190169942Sobrien	if (qstate->config_entry->enabled == 0) {
191169942Sobrien		c_mp_rs_response->error_code = EACCES;
192169942Sobrien
193169942Sobrien		LOG_ERR_2("read_session_request",
194169942Sobrien			"configuration entry '%s' is disabled",
195169962Sobrien			c_mp_rs_request->entry);
196169962Sobrien		goto fin;
197169942Sobrien	}
198169942Sobrien
199169962Sobrien	if (qstate->config_entry->perform_actual_lookups != 0)
200169942Sobrien		dec_cache_entry_name = strdup(
201169962Sobrien			qstate->config_entry->mp_cache_params.cep.entry_name);
202169962Sobrien	else {
203169942Sobrien#ifdef NS_NSCD_EID_CHECKING
204169942Sobrien		if (check_query_eids(qstate) != 0) {
205169942Sobrien			c_mp_rs_response->error_code = EPERM;
206169942Sobrien			goto fin;
207169942Sobrien		}
208169942Sobrien#endif
209169942Sobrien
210169942Sobrien		asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
211169942Sobrien			qstate->config_entry->mp_cache_params.cep.entry_name);
212169942Sobrien	}
213169942Sobrien
214169942Sobrien	assert(dec_cache_entry_name != NULL);
21575937Sobrien
216169942Sobrien	configuration_lock_rdlock(s_configuration);
21775937Sobrien	c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
21875937Sobrien	configuration_unlock(s_configuration);
21975937Sobrien
22075937Sobrien	if ((c_entry == INVALID_CACHE) &&
221103373Sobrien	   (qstate->config_entry->perform_actual_lookups != 0))
222103373Sobrien		c_entry = register_new_mp_cache_entry(qstate,
22375937Sobrien			dec_cache_entry_name);
22475937Sobrien
22575937Sobrien	free(dec_cache_entry_name);
22675937Sobrien
22775937Sobrien	if (c_entry != INVALID_CACHE_ENTRY) {
22875937Sobrien		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
22975937Sobrien		rs = open_cache_mp_read_session(c_entry);
23075937Sobrien		configuration_unlock_entry(qstate->config_entry,
23175937Sobrien			CELT_MULTIPART);
232133359Sobrien
233133359Sobrien		if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
234133359Sobrien		   (qstate->config_entry->perform_actual_lookups != 0)) {
235103373Sobrien			lookup_agent = find_agent(s_agent_table,
236103373Sobrien				c_mp_rs_request->entry, MULTIPART_AGENT);
237103373Sobrien
238103373Sobrien			if ((lookup_agent != NULL) &&
239191736Sobrien			(lookup_agent->type == MULTIPART_AGENT)) {
240103373Sobrien				mp_agent = (struct multipart_agent *)
241103373Sobrien					lookup_agent;
242103373Sobrien				mdata = mp_agent->mp_init_func();
243103373Sobrien
244103373Sobrien				/*
245103373Sobrien				 * Multipart agents read the whole snapshot
246103373Sobrien				 * of the data at one time.
247103373Sobrien				 */
248103373Sobrien				configuration_lock_entry(qstate->config_entry,
249103373Sobrien					CELT_MULTIPART);
250103373Sobrien				ws = open_cache_mp_write_session(c_entry);
251103373Sobrien				configuration_unlock_entry(qstate->config_entry,
252103373Sobrien					CELT_MULTIPART);
253103373Sobrien				if (ws != NULL) {
254103373Sobrien				    do {
255133359Sobrien					buffer = NULL;
256133359Sobrien					res = mp_agent->mp_lookup_func(&buffer,
257133359Sobrien						&buffer_size,
258103373Sobrien						mdata);
259103373Sobrien
260133359Sobrien					if ((res & NS_TERMINATE) &&
261103373Sobrien					   (buffer != NULL)) {
262103373Sobrien						configuration_lock_entry(
263169962Sobrien							qstate->config_entry,
264133359Sobrien						   	CELT_MULTIPART);
265103373Sobrien						if (cache_mp_write(ws, buffer,
266103373Sobrien						    buffer_size) != 0) {
267103373Sobrien							abandon_cache_mp_write_session(ws);
268103373Sobrien							ws = NULL;
269103373Sobrien						}
270133359Sobrien						configuration_unlock_entry(
271133359Sobrien							qstate->config_entry,
272103373Sobrien							CELT_MULTIPART);
273103373Sobrien
274103373Sobrien						free(buffer);
275133359Sobrien						buffer = NULL;
276133359Sobrien					} else {
277103373Sobrien						configuration_lock_entry(
278103373Sobrien							qstate->config_entry,
279103373Sobrien							CELT_MULTIPART);
280103373Sobrien						close_cache_mp_write_session(ws);
281103373Sobrien						configuration_unlock_entry(
282103373Sobrien							qstate->config_entry,
283103373Sobrien							CELT_MULTIPART);
284103373Sobrien
285133359Sobrien						free(buffer);
286133359Sobrien						buffer = NULL;
287103373Sobrien					}
288103373Sobrien				    } while ((res & NS_TERMINATE) &&
289103373Sobrien				    	    (ws != NULL));
290133359Sobrien				}
291133359Sobrien
292103373Sobrien				configuration_lock_entry(qstate->config_entry,
293103373Sobrien					CELT_MULTIPART);
294103373Sobrien				rs = open_cache_mp_read_session(c_entry);
295103373Sobrien				configuration_unlock_entry(qstate->config_entry,
296175296Sobrien					CELT_MULTIPART);
297103373Sobrien			}
298103373Sobrien		}
299103373Sobrien
300103373Sobrien		if (rs == INVALID_CACHE_MP_READ_SESSION)
301103373Sobrien			c_mp_rs_response->error_code = -1;
302103373Sobrien		else {
303133359Sobrien		    qstate->mdata = rs;
304133359Sobrien		    qstate->destroy_func = on_mp_read_session_destroy;
305133359Sobrien
30668349Sobrien		    configuration_lock_entry(qstate->config_entry,
307103373Sobrien			CELT_MULTIPART);
308133359Sobrien		    if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
309103373Sobrien		    (qstate->config_entry->mp_query_timeout.tv_usec != 0))
310103373Sobrien			memcpy(&qstate->timeout,
311103373Sobrien			    &qstate->config_entry->mp_query_timeout,
312133359Sobrien			    sizeof(struct timeval));
313133359Sobrien		    configuration_unlock_entry(qstate->config_entry,
314133359Sobrien			CELT_MULTIPART);
315103373Sobrien		}
316133359Sobrien	} else
317103373Sobrien		c_mp_rs_response->error_code = -1;
318133359Sobrien
319103373Sobrienfin:
320103373Sobrien	qstate->process_func = on_mp_read_session_response_write1;
321103373Sobrien	qstate->kevent_watermark = sizeof(int);
322103373Sobrien	qstate->kevent_filter = EVFILT_WRITE;
323133359Sobrien
324103373Sobrien	TRACE_OUT(on_mp_read_session_request_process);
325103373Sobrien	return (0);
326103373Sobrien}
327103373Sobrien
328103373Sobrienstatic int
329103373Sobrienon_mp_read_session_response_write1(struct query_state *qstate)
330133359Sobrien{
331133359Sobrien	struct cache_mp_read_session_response	*c_mp_rs_response;
332186690Sobrien	ssize_t	result;
333103373Sobrien
334103373Sobrien	TRACE_IN(on_mp_read_session_response_write1);
335103373Sobrien	c_mp_rs_response = get_cache_mp_read_session_response(
336133359Sobrien		&qstate->response);
337133359Sobrien	result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
338133359Sobrien		sizeof(int));
339103373Sobrien
340103373Sobrien	if (result != sizeof(int)) {
341103373Sobrien		LOG_ERR_3("on_mp_read_session_response_write1",
342103373Sobrien			"write failed");
343103373Sobrien		TRACE_OUT(on_mp_read_session_response_write1);
344103373Sobrien		return (-1);
345103373Sobrien	}
346103373Sobrien
347103373Sobrien	if (c_mp_rs_response->error_code == 0) {
348133359Sobrien		qstate->kevent_watermark = sizeof(int);
349103373Sobrien		qstate->process_func = on_mp_read_session_mapper;
350103373Sobrien		qstate->kevent_filter = EVFILT_READ;
351103373Sobrien	} else {
352103373Sobrien		qstate->kevent_watermark = 0;
353103373Sobrien		qstate->process_func = NULL;
354133359Sobrien	}
355103373Sobrien	TRACE_OUT(on_mp_read_session_response_write1);
356103373Sobrien	return (0);
357103373Sobrien}
358133359Sobrien
359169962Sobrien/*
360103373Sobrien * Mapper function is used to avoid multiple connections for each session
361103373Sobrien * write or read requests. After processing the request, it does not close
362169942Sobrien * the connection, but waits for the next request.
363103373Sobrien */
364103373Sobrienstatic int
365103373Sobrienon_mp_read_session_mapper(struct query_state *qstate)
366103373Sobrien{
367103373Sobrien	ssize_t	result;
368133359Sobrien	int elem_type;
369159764Sobrien
370159764Sobrien	TRACE_IN(on_mp_read_session_mapper);
371103373Sobrien	if (qstate->kevent_watermark == 0) {
37268349Sobrien		qstate->kevent_watermark = sizeof(int);
373133359Sobrien	} else {
37468349Sobrien		result = qstate->read_func(qstate, &elem_type, sizeof(int));
375175296Sobrien		if (result != sizeof(int)) {
376186690Sobrien			LOG_ERR_3("on_mp_read_session_mapper",
377103373Sobrien				"read failed");
378133359Sobrien			TRACE_OUT(on_mp_read_session_mapper);
379103373Sobrien			return (-1);
380159764Sobrien		}
381159764Sobrien
382103373Sobrien		switch (elem_type) {
383159764Sobrien		case CET_MP_READ_SESSION_READ_REQUEST:
384133359Sobrien			qstate->kevent_watermark = 0;
385169942Sobrien			qstate->process_func =
38668349Sobrien				on_mp_read_session_read_request_process;
38768349Sobrien			break;
38868349Sobrien		case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
38968349Sobrien			qstate->kevent_watermark = 0;
390159764Sobrien			qstate->process_func =
391159764Sobrien				on_mp_read_session_close_notification;
392159764Sobrien			break;
393159764Sobrien		default:
394159764Sobrien			qstate->kevent_watermark = 0;
395159764Sobrien			qstate->process_func = NULL;
396159764Sobrien			LOG_ERR_3("on_mp_read_session_mapper",
397159764Sobrien				"unknown element type");
39868349Sobrien			TRACE_OUT(on_mp_read_session_mapper);
39968349Sobrien			return (-1);
40068349Sobrien		}
40168349Sobrien	}
40268349Sobrien	TRACE_OUT(on_mp_read_session_mapper);
403159764Sobrien	return (0);
40468349Sobrien}
405159764Sobrien
406159764Sobrien/*
40768349Sobrien * The functions below are used to process multipart read sessions read
408169962Sobrien * requests. User doesn't have to pass any kind of data, besides the
409169962Sobrien * request identificator itself. So we don't need any XXX_read functions and
410159764Sobrien * start with the XXX_process function.
411159764Sobrien * - on_mp_read_session_read_request_process processes it
412159764Sobrien * - on_mp_read_session_read_response_write1 and
413159764Sobrien *   on_mp_read_session_read_response_write2 sends the response
41468349Sobrien */
41568349Sobrienstatic int
41668349Sobrienon_mp_read_session_read_request_process(struct query_state *qstate)
417133359Sobrien{
418169942Sobrien	struct cache_mp_read_session_read_response	*read_response;
41968349Sobrien
42068349Sobrien	TRACE_IN(on_mp_read_session_response_process);
42168349Sobrien	init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
422159764Sobrien	read_response = get_cache_mp_read_session_read_response(
423159764Sobrien		&qstate->response);
424159764Sobrien
425159764Sobrien	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
426159764Sobrien	read_response->error_code = cache_mp_read(
427159764Sobrien		(cache_mp_read_session)qstate->mdata, NULL,
428159764Sobrien		&read_response->data_size);
429159764Sobrien
430159764Sobrien	if (read_response->error_code == 0) {
431169962Sobrien		read_response->data = malloc(read_response->data_size);
432159764Sobrien		assert(read_response != NULL);
433159764Sobrien		read_response->error_code = cache_mp_read(
434159764Sobrien			(cache_mp_read_session)qstate->mdata,
435159764Sobrien	    		read_response->data,
436159764Sobrien			&read_response->data_size);
437159764Sobrien	}
438159764Sobrien	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
439159764Sobrien
440159764Sobrien	if (read_response->error_code == 0)
441159764Sobrien		qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
442159764Sobrien	else
443159764Sobrien		qstate->kevent_watermark = sizeof(int);
444159764Sobrien	qstate->process_func = on_mp_read_session_read_response_write1;
445159764Sobrien	qstate->kevent_filter = EVFILT_WRITE;
446159764Sobrien
447133359Sobrien	TRACE_OUT(on_mp_read_session_response_process);
448159764Sobrien	return (0);
449133359Sobrien}
450159764Sobrien
451159764Sobrienstatic int
452159764Sobrienon_mp_read_session_read_response_write1(struct query_state *qstate)
453159764Sobrien{
454159764Sobrien	struct cache_mp_read_session_read_response	*read_response;
455159764Sobrien	ssize_t	result;
456133359Sobrien
457103373Sobrien	TRACE_IN(on_mp_read_session_read_response_write1);
458159764Sobrien	read_response = get_cache_mp_read_session_read_response(
459159764Sobrien		&qstate->response);
460159764Sobrien
461159764Sobrien	result = qstate->write_func(qstate, &read_response->error_code,
46275937Sobrien		sizeof(int));
46375937Sobrien	if (read_response->error_code == 0) {
46475937Sobrien		result += qstate->write_func(qstate, &read_response->data_size,
465169962Sobrien			sizeof(size_t));
466159764Sobrien		if (result < 0 || (size_t)result != qstate->kevent_watermark) {
467159764Sobrien			TRACE_OUT(on_mp_read_session_read_response_write1);
468159764Sobrien			LOG_ERR_3("on_mp_read_session_read_response_write1",
469159764Sobrien				"write failed");
47068349Sobrien			return (-1);
47175937Sobrien		}
472133359Sobrien
47375937Sobrien		qstate->kevent_watermark = read_response->data_size;
474133359Sobrien		qstate->process_func = on_mp_read_session_read_response_write2;
475133359Sobrien	} else {
47668349Sobrien		if (result < 0 || (size_t)result != qstate->kevent_watermark) {
477103373Sobrien			LOG_ERR_3("on_mp_read_session_read_response_write1",
478169942Sobrien				"write failed");
47975937Sobrien			TRACE_OUT(on_mp_read_session_read_response_write1);
48075937Sobrien			return (-1);
48175937Sobrien		}
48268349Sobrien
483133359Sobrien		qstate->kevent_watermark = 0;
484133359Sobrien		qstate->process_func = NULL;
485133359Sobrien	}
486133359Sobrien
487133359Sobrien	TRACE_OUT(on_mp_read_session_read_response_write1);
488133359Sobrien	return (0);
48968349Sobrien}
49068349Sobrien
49168349Sobrienstatic int
492on_mp_read_session_read_response_write2(struct query_state *qstate)
493{
494	struct cache_mp_read_session_read_response *read_response;
495	ssize_t	result;
496
497	TRACE_IN(on_mp_read_session_read_response_write2);
498	read_response = get_cache_mp_read_session_read_response(
499		&qstate->response);
500	result = qstate->write_func(qstate, read_response->data,
501		read_response->data_size);
502	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
503		LOG_ERR_3("on_mp_read_session_read_response_write2",
504			"write failed");
505		TRACE_OUT(on_mp_read_session_read_response_write2);
506		return (-1);
507	}
508
509	finalize_comm_element(&qstate->request);
510	finalize_comm_element(&qstate->response);
511
512	qstate->kevent_watermark = sizeof(int);
513	qstate->process_func = on_mp_read_session_mapper;
514	qstate->kevent_filter = EVFILT_READ;
515
516	TRACE_OUT(on_mp_read_session_read_response_write2);
517	return (0);
518}
519
520/*
521 * Handles session close notification by calling close_cache_mp_read_session
522 * function.
523 */
524static int
525on_mp_read_session_close_notification(struct query_state *qstate)
526{
527
528	TRACE_IN(on_mp_read_session_close_notification);
529	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
530	close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
531	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
532	qstate->mdata = NULL;
533	qstate->kevent_watermark = 0;
534	qstate->process_func = NULL;
535	TRACE_OUT(on_mp_read_session_close_notification);
536	return (0);
537}
538