1158115Sume/*-
2158115Sume * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3158115Sume * All rights reserved.
4158115Sume *
5158115Sume * Redistribution and use in source and binary forms, with or without
6158115Sume * modification, are permitted provided that the following conditions
7158115Sume * are met:
8158115Sume * 1. Redistributions of source code must retain the above copyright
9158115Sume *    notice, this list of conditions and the following disclaimer.
10158115Sume * 2. Redistributions in binary form must reproduce the above copyright
11158115Sume *    notice, this list of conditions and the following disclaimer in the
12158115Sume *    documentation and/or other materials provided with the distribution.
13158115Sume *
14158115Sume * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15158115Sume * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16158115Sume * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17158115Sume * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18158115Sume * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19158115Sume * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20158115Sume * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21158115Sume * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22158115Sume * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23158115Sume * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24158115Sume * SUCH DAMAGE.
25158115Sume *
26158115Sume */
27158115Sume
28158115Sume#include <sys/cdefs.h>
29158115Sume__FBSDID("$FreeBSD$");
30158115Sume
31194089Sdes#include <sys/types.h>
32194089Sdes#include <sys/event.h>
33158115Sume#include <sys/socket.h>
34158115Sume#include <sys/time.h>
35194089Sdes
36158115Sume#include <assert.h>
37158115Sume#include <errno.h>
38194089Sdes#include <nsswitch.h>
39194089Sdes#include <stdio.h>
40158115Sume#include <stdlib.h>
41158115Sume#include <string.h>
42158115Sume
43158115Sume#include "cachelib.h"
44158115Sume#include "config.h"
45158115Sume#include "debug.h"
46158115Sume#include "log.h"
47158115Sume#include "query.h"
48158115Sume#include "mp_rs_query.h"
49158115Sume#include "mp_ws_query.h"
50158115Sume#include "singletons.h"
51158115Sume
52158115Sumestatic int on_mp_read_session_close_notification(struct query_state *);
53158115Sumestatic void on_mp_read_session_destroy(struct query_state *);
54158115Sumestatic int on_mp_read_session_mapper(struct query_state *);
55158115Sume/* int on_mp_read_session_request_read1(struct query_state *); */
56158115Sumestatic int on_mp_read_session_request_read2(struct query_state *);
57158115Sumestatic int on_mp_read_session_request_process(struct query_state *);
58158115Sumestatic int on_mp_read_session_response_write1(struct query_state *);
59158115Sumestatic int on_mp_read_session_read_request_process(struct query_state *);
60158115Sumestatic int on_mp_read_session_read_response_write1(struct query_state *);
61158115Sumestatic int on_mp_read_session_read_response_write2(struct query_state *);
62158115Sume
63158115Sume/*
64158115Sume * This function is used as the query_state's destroy_func to make the
65158115Sume * proper cleanup in case of errors.
66158115Sume */
67158115Sumestatic void
68158115Sumeon_mp_read_session_destroy(struct query_state *qstate)
69158115Sume{
70158115Sume	TRACE_IN(on_mp_read_session_destroy);
71158115Sume	finalize_comm_element(&qstate->request);
72158115Sume	finalize_comm_element(&qstate->response);
73158115Sume
74158115Sume	if (qstate->mdata != NULL) {
75158115Sume		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
76158115Sume		close_cache_mp_read_session(
77158115Sume	    		(cache_mp_read_session)qstate->mdata);
78158115Sume		configuration_unlock_entry(qstate->config_entry,
79158115Sume			CELT_MULTIPART);
80158115Sume	}
81158115Sume	TRACE_OUT(on_mp_read_session_destroy);
82158115Sume}
83158115Sume
84158115Sume/*
85158115Sume * The functions below are used to process multipart read session initiation
86158115Sume * requests.
87158115Sume * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
88158115Sume *   the request itself
89158115Sume * - on_mp_read_session_request_process processes it
90158115Sume * - on_mp_read_session_response_write1 sends the response
91158115Sume */
92158115Sumeint
93158115Sumeon_mp_read_session_request_read1(struct query_state *qstate)
94158115Sume{
95158115Sume	struct cache_mp_read_session_request	*c_mp_rs_request;
96158115Sume	ssize_t	result;
97158115Sume
98158115Sume	TRACE_IN(on_mp_read_session_request_read1);
99158115Sume	if (qstate->kevent_watermark == 0)
100158115Sume		qstate->kevent_watermark = sizeof(size_t);
101158115Sume	else {
102158115Sume		init_comm_element(&qstate->request,
103158115Sume	    		CET_MP_READ_SESSION_REQUEST);
104158115Sume		c_mp_rs_request = get_cache_mp_read_session_request(
105158115Sume	    		&qstate->request);
106158115Sume
107158115Sume		result = qstate->read_func(qstate,
108158115Sume	    		&c_mp_rs_request->entry_length, sizeof(size_t));
109158115Sume
110158115Sume		if (result != sizeof(size_t)) {
111158115Sume			TRACE_OUT(on_mp_read_session_request_read1);
112158115Sume			return (-1);
113158115Sume		}
114158115Sume
115158115Sume		if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
116158115Sume			TRACE_OUT(on_mp_read_session_request_read1);
117158115Sume			return (-1);
118158115Sume		}
119158115Sume
120194104Sdes		c_mp_rs_request->entry = calloc(1,
121158115Sume			c_mp_rs_request->entry_length + 1);
122158115Sume		assert(c_mp_rs_request->entry != NULL);
123158115Sume
124158115Sume		qstate->kevent_watermark = c_mp_rs_request->entry_length;
125158115Sume		qstate->process_func = on_mp_read_session_request_read2;
126158115Sume	}
127158115Sume	TRACE_OUT(on_mp_read_session_request_read1);
128158115Sume	return (0);
129158115Sume}
130158115Sume
131158115Sumestatic int
132158115Sumeon_mp_read_session_request_read2(struct query_state *qstate)
133158115Sume{
134158115Sume	struct cache_mp_read_session_request	*c_mp_rs_request;
135158115Sume	ssize_t	result;
136158115Sume
137158115Sume	TRACE_IN(on_mp_read_session_request_read2);
138158115Sume	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
139158115Sume
140158115Sume	result = qstate->read_func(qstate, c_mp_rs_request->entry,
141158115Sume		c_mp_rs_request->entry_length);
142158115Sume
143194096Sdes	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
144158115Sume		LOG_ERR_3("on_mp_read_session_request_read2",
145158115Sume			"read failed");
146158115Sume		TRACE_OUT(on_mp_read_session_request_read2);
147158115Sume		return (-1);
148158115Sume	}
149158115Sume
150158115Sume	qstate->kevent_watermark = 0;
151158115Sume	qstate->process_func = on_mp_read_session_request_process;
152158115Sume	TRACE_OUT(on_mp_read_session_request_read2);
153158115Sume	return (0);
154158115Sume}
155158115Sume
156158115Sumestatic int
157158115Sumeon_mp_read_session_request_process(struct query_state *qstate)
158158115Sume{
159158115Sume	struct cache_mp_read_session_request	*c_mp_rs_request;
160158115Sume	struct cache_mp_read_session_response	*c_mp_rs_response;
161158115Sume	cache_mp_read_session	rs;
162158115Sume	cache_entry	c_entry;
163158115Sume	char	*dec_cache_entry_name;
164158115Sume
165158115Sume	char *buffer;
166158115Sume	size_t buffer_size;
167158115Sume	cache_mp_write_session ws;
168158115Sume	struct agent	*lookup_agent;
169158115Sume	struct multipart_agent *mp_agent;
170158115Sume	void *mdata;
171158115Sume	int res;
172158115Sume
173158115Sume	TRACE_IN(on_mp_read_session_request_process);
174158115Sume	init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
175158115Sume	c_mp_rs_response = get_cache_mp_read_session_response(
176158115Sume		&qstate->response);
177158115Sume	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
178158115Sume
179158115Sume	qstate->config_entry = configuration_find_entry(
180158115Sume		s_configuration, c_mp_rs_request->entry);
181158115Sume	if (qstate->config_entry == NULL) {
182158115Sume		c_mp_rs_response->error_code = ENOENT;
183158115Sume
184158115Sume		LOG_ERR_2("read_session_request",
185158115Sume			"can't find configuration entry '%s'."
186158115Sume			" aborting request", c_mp_rs_request->entry);
187158115Sume		goto fin;
188158115Sume	}
189158115Sume
190158115Sume	if (qstate->config_entry->enabled == 0) {
191158115Sume		c_mp_rs_response->error_code = EACCES;
192158115Sume
193158115Sume		LOG_ERR_2("read_session_request",
194158115Sume			"configuration entry '%s' is disabled",
195158115Sume			c_mp_rs_request->entry);
196158115Sume		goto fin;
197158115Sume	}
198158115Sume
199158115Sume	if (qstate->config_entry->perform_actual_lookups != 0)
200158115Sume		dec_cache_entry_name = strdup(
201194097Sdes			qstate->config_entry->mp_cache_params.cep.entry_name);
202158115Sume	else {
203171795Sbushman#ifdef NS_NSCD_EID_CHECKING
204158115Sume		if (check_query_eids(qstate) != 0) {
205158115Sume			c_mp_rs_response->error_code = EPERM;
206158115Sume			goto fin;
207158115Sume		}
208158115Sume#endif
209158115Sume
210158115Sume		asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
211194097Sdes			qstate->config_entry->mp_cache_params.cep.entry_name);
212158115Sume	}
213158115Sume
214158115Sume	assert(dec_cache_entry_name != NULL);
215158115Sume
216158115Sume	configuration_lock_rdlock(s_configuration);
217158115Sume	c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
218158115Sume	configuration_unlock(s_configuration);
219158115Sume
220158115Sume	if ((c_entry == INVALID_CACHE) &&
221158115Sume	   (qstate->config_entry->perform_actual_lookups != 0))
222158115Sume		c_entry = register_new_mp_cache_entry(qstate,
223158115Sume			dec_cache_entry_name);
224158115Sume
225158115Sume	free(dec_cache_entry_name);
226158115Sume
227158115Sume	if (c_entry != INVALID_CACHE_ENTRY) {
228158115Sume		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
229158115Sume		rs = open_cache_mp_read_session(c_entry);
230158115Sume		configuration_unlock_entry(qstate->config_entry,
231158115Sume			CELT_MULTIPART);
232158115Sume
233158115Sume		if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
234158115Sume		   (qstate->config_entry->perform_actual_lookups != 0)) {
235158115Sume			lookup_agent = find_agent(s_agent_table,
236158115Sume				c_mp_rs_request->entry, MULTIPART_AGENT);
237158115Sume
238158115Sume			if ((lookup_agent != NULL) &&
239158115Sume			(lookup_agent->type == MULTIPART_AGENT)) {
240158115Sume				mp_agent = (struct multipart_agent *)
241158115Sume					lookup_agent;
242158115Sume				mdata = mp_agent->mp_init_func();
243158115Sume
244158115Sume				/*
245158115Sume				 * Multipart agents read the whole snapshot
246158115Sume				 * of the data at one time.
247158115Sume				 */
248158115Sume				configuration_lock_entry(qstate->config_entry,
249158115Sume					CELT_MULTIPART);
250158115Sume				ws = open_cache_mp_write_session(c_entry);
251158115Sume				configuration_unlock_entry(qstate->config_entry,
252158115Sume					CELT_MULTIPART);
253158115Sume				if (ws != NULL) {
254158115Sume				    do {
255158115Sume					buffer = NULL;
256158115Sume					res = mp_agent->mp_lookup_func(&buffer,
257158115Sume						&buffer_size,
258158115Sume						mdata);
259158115Sume
260158115Sume					if ((res & NS_TERMINATE) &&
261158115Sume					   (buffer != NULL)) {
262158115Sume						configuration_lock_entry(
263158115Sume							qstate->config_entry,
264158115Sume						   	CELT_MULTIPART);
265158115Sume						if (cache_mp_write(ws, buffer,
266158115Sume						    buffer_size) != 0) {
267158115Sume							abandon_cache_mp_write_session(ws);
268158115Sume							ws = NULL;
269158115Sume						}
270158115Sume						configuration_unlock_entry(
271158115Sume							qstate->config_entry,
272158115Sume							CELT_MULTIPART);
273158115Sume
274158115Sume						free(buffer);
275158115Sume						buffer = NULL;
276158115Sume					} else {
277158115Sume						configuration_lock_entry(
278158115Sume							qstate->config_entry,
279158115Sume							CELT_MULTIPART);
280158115Sume						close_cache_mp_write_session(ws);
281158115Sume						configuration_unlock_entry(
282158115Sume							qstate->config_entry,
283158115Sume							CELT_MULTIPART);
284158115Sume
285158115Sume						free(buffer);
286158115Sume						buffer = NULL;
287158115Sume					}
288158115Sume				    } while ((res & NS_TERMINATE) &&
289158115Sume				    	    (ws != NULL));
290158115Sume				}
291158115Sume
292158115Sume				configuration_lock_entry(qstate->config_entry,
293158115Sume					CELT_MULTIPART);
294158115Sume				rs = open_cache_mp_read_session(c_entry);
295158115Sume				configuration_unlock_entry(qstate->config_entry,
296158115Sume					CELT_MULTIPART);
297158115Sume			}
298158115Sume		}
299158115Sume
300158115Sume		if (rs == INVALID_CACHE_MP_READ_SESSION)
301158115Sume			c_mp_rs_response->error_code = -1;
302158115Sume		else {
303158115Sume		    qstate->mdata = rs;
304158115Sume		    qstate->destroy_func = on_mp_read_session_destroy;
305158115Sume
306158115Sume		    configuration_lock_entry(qstate->config_entry,
307158115Sume			CELT_MULTIPART);
308158115Sume		    if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
309158115Sume		    (qstate->config_entry->mp_query_timeout.tv_usec != 0))
310158115Sume			memcpy(&qstate->timeout,
311158115Sume			    &qstate->config_entry->mp_query_timeout,
312158115Sume			    sizeof(struct timeval));
313158115Sume		    configuration_unlock_entry(qstate->config_entry,
314158115Sume			CELT_MULTIPART);
315158115Sume		}
316158115Sume	} else
317158115Sume		c_mp_rs_response->error_code = -1;
318158115Sume
319158115Sumefin:
320158115Sume	qstate->process_func = on_mp_read_session_response_write1;
321158115Sume	qstate->kevent_watermark = sizeof(int);
322158115Sume	qstate->kevent_filter = EVFILT_WRITE;
323158115Sume
324158115Sume	TRACE_OUT(on_mp_read_session_request_process);
325158115Sume	return (0);
326158115Sume}
327158115Sume
328158115Sumestatic int
329158115Sumeon_mp_read_session_response_write1(struct query_state *qstate)
330158115Sume{
331158115Sume	struct cache_mp_read_session_response	*c_mp_rs_response;
332158115Sume	ssize_t	result;
333158115Sume
334158115Sume	TRACE_IN(on_mp_read_session_response_write1);
335158115Sume	c_mp_rs_response = get_cache_mp_read_session_response(
336158115Sume		&qstate->response);
337158115Sume	result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
338158115Sume		sizeof(int));
339158115Sume
340158115Sume	if (result != sizeof(int)) {
341158115Sume		LOG_ERR_3("on_mp_read_session_response_write1",
342158115Sume			"write failed");
343158115Sume		TRACE_OUT(on_mp_read_session_response_write1);
344158115Sume		return (-1);
345158115Sume	}
346158115Sume
347158115Sume	if (c_mp_rs_response->error_code == 0) {
348158115Sume		qstate->kevent_watermark = sizeof(int);
349158115Sume		qstate->process_func = on_mp_read_session_mapper;
350158115Sume		qstate->kevent_filter = EVFILT_READ;
351158115Sume	} else {
352158115Sume		qstate->kevent_watermark = 0;
353158115Sume		qstate->process_func = NULL;
354158115Sume	}
355158115Sume	TRACE_OUT(on_mp_read_session_response_write1);
356158115Sume	return (0);
357158115Sume}
358158115Sume
359158115Sume/*
360158115Sume * Mapper function is used to avoid multiple connections for each session
361158115Sume * write or read requests. After processing the request, it does not close
362158115Sume * the connection, but waits for the next request.
363158115Sume */
364158115Sumestatic int
365158115Sumeon_mp_read_session_mapper(struct query_state *qstate)
366158115Sume{
367158115Sume	ssize_t	result;
368158115Sume	int elem_type;
369158115Sume
370158115Sume	TRACE_IN(on_mp_read_session_mapper);
371158115Sume	if (qstate->kevent_watermark == 0) {
372158115Sume		qstate->kevent_watermark = sizeof(int);
373158115Sume	} else {
374158115Sume		result = qstate->read_func(qstate, &elem_type, sizeof(int));
375158115Sume		if (result != sizeof(int)) {
376158115Sume			LOG_ERR_3("on_mp_read_session_mapper",
377158115Sume				"read failed");
378158115Sume			TRACE_OUT(on_mp_read_session_mapper);
379158115Sume			return (-1);
380158115Sume		}
381158115Sume
382158115Sume		switch (elem_type) {
383158115Sume		case CET_MP_READ_SESSION_READ_REQUEST:
384158115Sume			qstate->kevent_watermark = 0;
385158115Sume			qstate->process_func =
386158115Sume				on_mp_read_session_read_request_process;
387158115Sume			break;
388158115Sume		case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
389158115Sume			qstate->kevent_watermark = 0;
390158115Sume			qstate->process_func =
391158115Sume				on_mp_read_session_close_notification;
392158115Sume			break;
393158115Sume		default:
394158115Sume			qstate->kevent_watermark = 0;
395158115Sume			qstate->process_func = NULL;
396158115Sume			LOG_ERR_3("on_mp_read_session_mapper",
397158115Sume				"unknown element type");
398158115Sume			TRACE_OUT(on_mp_read_session_mapper);
399158115Sume			return (-1);
400158115Sume		}
401158115Sume	}
402158115Sume	TRACE_OUT(on_mp_read_session_mapper);
403158115Sume	return (0);
404158115Sume}
405158115Sume
406158115Sume/*
407158115Sume * The functions below are used to process multipart read sessions read
408158115Sume * requests. User doesn't have to pass any kind of data, besides the
409158115Sume * request identificator itself. So we don't need any XXX_read functions and
410158115Sume * start with the XXX_process function.
411158115Sume * - on_mp_read_session_read_request_process processes it
412158115Sume * - on_mp_read_session_read_response_write1 and
413158115Sume *   on_mp_read_session_read_response_write2 sends the response
414158115Sume */
415158115Sumestatic int
416158115Sumeon_mp_read_session_read_request_process(struct query_state *qstate)
417158115Sume{
418158115Sume	struct cache_mp_read_session_read_response	*read_response;
419158115Sume
420158115Sume	TRACE_IN(on_mp_read_session_response_process);
421158115Sume	init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
422158115Sume	read_response = get_cache_mp_read_session_read_response(
423158115Sume		&qstate->response);
424158115Sume
425158115Sume	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
426158115Sume	read_response->error_code = cache_mp_read(
427158115Sume		(cache_mp_read_session)qstate->mdata, NULL,
428158115Sume		&read_response->data_size);
429158115Sume
430158115Sume	if (read_response->error_code == 0) {
431194104Sdes		read_response->data = malloc(read_response->data_size);
432158115Sume		assert(read_response != NULL);
433158115Sume		read_response->error_code = cache_mp_read(
434158115Sume			(cache_mp_read_session)qstate->mdata,
435158115Sume	    		read_response->data,
436158115Sume			&read_response->data_size);
437158115Sume	}
438158115Sume	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
439158115Sume
440158115Sume	if (read_response->error_code == 0)
441158115Sume		qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
442158115Sume	else
443158115Sume		qstate->kevent_watermark = sizeof(int);
444158115Sume	qstate->process_func = on_mp_read_session_read_response_write1;
445158115Sume	qstate->kevent_filter = EVFILT_WRITE;
446158115Sume
447158115Sume	TRACE_OUT(on_mp_read_session_response_process);
448158115Sume	return (0);
449158115Sume}
450158115Sume
451158115Sumestatic int
452158115Sumeon_mp_read_session_read_response_write1(struct query_state *qstate)
453158115Sume{
454158115Sume	struct cache_mp_read_session_read_response	*read_response;
455158115Sume	ssize_t	result;
456158115Sume
457158115Sume	TRACE_IN(on_mp_read_session_read_response_write1);
458158115Sume	read_response = get_cache_mp_read_session_read_response(
459158115Sume		&qstate->response);
460158115Sume
461158115Sume	result = qstate->write_func(qstate, &read_response->error_code,
462158115Sume		sizeof(int));
463158115Sume	if (read_response->error_code == 0) {
464158115Sume		result += qstate->write_func(qstate, &read_response->data_size,
465158115Sume			sizeof(size_t));
466194096Sdes		if (result < 0 || (size_t)result != qstate->kevent_watermark) {
467158115Sume			TRACE_OUT(on_mp_read_session_read_response_write1);
468158115Sume			LOG_ERR_3("on_mp_read_session_read_response_write1",
469158115Sume				"write failed");
470158115Sume			return (-1);
471158115Sume		}
472158115Sume
473158115Sume		qstate->kevent_watermark = read_response->data_size;
474158115Sume		qstate->process_func = on_mp_read_session_read_response_write2;
475158115Sume	} else {
476194096Sdes		if (result < 0 || (size_t)result != qstate->kevent_watermark) {
477158115Sume			LOG_ERR_3("on_mp_read_session_read_response_write1",
478158115Sume				"write failed");
479158115Sume			TRACE_OUT(on_mp_read_session_read_response_write1);
480158115Sume			return (-1);
481158115Sume		}
482158115Sume
483158115Sume		qstate->kevent_watermark = 0;
484158115Sume		qstate->process_func = NULL;
485158115Sume	}
486158115Sume
487158115Sume	TRACE_OUT(on_mp_read_session_read_response_write1);
488158115Sume	return (0);
489158115Sume}
490158115Sume
491158115Sumestatic int
492158115Sumeon_mp_read_session_read_response_write2(struct query_state *qstate)
493158115Sume{
494158115Sume	struct cache_mp_read_session_read_response *read_response;
495158115Sume	ssize_t	result;
496158115Sume
497158115Sume	TRACE_IN(on_mp_read_session_read_response_write2);
498158115Sume	read_response = get_cache_mp_read_session_read_response(
499158115Sume		&qstate->response);
500158115Sume	result = qstate->write_func(qstate, read_response->data,
501158115Sume		read_response->data_size);
502194096Sdes	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
503158115Sume		LOG_ERR_3("on_mp_read_session_read_response_write2",
504158115Sume			"write failed");
505158115Sume		TRACE_OUT(on_mp_read_session_read_response_write2);
506158115Sume		return (-1);
507158115Sume	}
508158115Sume
509158115Sume	finalize_comm_element(&qstate->request);
510158115Sume	finalize_comm_element(&qstate->response);
511158115Sume
512158115Sume	qstate->kevent_watermark = sizeof(int);
513158115Sume	qstate->process_func = on_mp_read_session_mapper;
514158115Sume	qstate->kevent_filter = EVFILT_READ;
515158115Sume
516158115Sume	TRACE_OUT(on_mp_read_session_read_response_write2);
517158115Sume	return (0);
518158115Sume}
519158115Sume
520158115Sume/*
521158115Sume * Handles session close notification by calling close_cache_mp_read_session
522158115Sume * function.
523158115Sume */
524158115Sumestatic int
525158115Sumeon_mp_read_session_close_notification(struct query_state *qstate)
526158115Sume{
527158115Sume
528158115Sume	TRACE_IN(on_mp_read_session_close_notification);
529158115Sume	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
530158115Sume	close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
531158115Sume	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
532158115Sume	qstate->mdata = NULL;
533158115Sume	qstate->kevent_watermark = 0;
534158115Sume	qstate->process_func = NULL;
535158115Sume	TRACE_OUT(on_mp_read_session_close_notification);
536158115Sume	return (0);
537158115Sume}
538