1158115Sume/*-
2158115Sume * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3158115Sume * All rights reserved.
4158115Sume *
5158115Sume * Redistribution and use in source and binary forms, with or without
6158115Sume * modification, are permitted provided that the following conditions
7158115Sume * are met:
8158115Sume * 1. Redistributions of source code must retain the above copyright
9158115Sume *    notice, this list of conditions and the following disclaimer.
10158115Sume * 2. Redistributions in binary form must reproduce the above copyright
11158115Sume *    notice, this list of conditions and the following disclaimer in the
12158115Sume *    documentation and/or other materials provided with the distribution.
13158115Sume *
14158115Sume * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15158115Sume * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16158115Sume * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17158115Sume * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18158115Sume * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19158115Sume * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20158115Sume * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21158115Sume * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22158115Sume * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23158115Sume * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24158115Sume * SUCH DAMAGE.
25158115Sume *
26158115Sume */
27158115Sume
28158115Sume#include <sys/cdefs.h>
29158115Sume__FBSDID("$FreeBSD$");
30158115Sume
31194093Sdes#include <sys/types.h>
32194093Sdes#include <sys/event.h>
33158115Sume#include <sys/socket.h>
34158115Sume#include <sys/time.h>
35194093Sdes
36158115Sume#include <assert.h>
37158115Sume#include <errno.h>
38194093Sdes#include <stdio.h>
39158115Sume#include <stdlib.h>
40158115Sume#include <string.h>
41158115Sume
42158115Sume#include "cachelib.h"
43158115Sume#include "config.h"
44158115Sume#include "debug.h"
45158115Sume#include "log.h"
46158115Sume#include "query.h"
47158115Sume#include "mp_ws_query.h"
48158115Sume#include "singletons.h"
49158115Sume
50158115Sumestatic int on_mp_write_session_abandon_notification(struct query_state *);
51158115Sumestatic int on_mp_write_session_close_notification(struct query_state *);
52158115Sumestatic void on_mp_write_session_destroy(struct query_state *);
53158115Sumestatic int on_mp_write_session_mapper(struct query_state *);
54158115Sume/* int on_mp_write_session_request_read1(struct query_state *); */
55158115Sumestatic int on_mp_write_session_request_read2(struct query_state *);
56158115Sumestatic int on_mp_write_session_request_process(struct query_state *);
57158115Sumestatic int on_mp_write_session_response_write1(struct query_state *);
58158115Sumestatic int on_mp_write_session_write_request_read1(struct query_state *);
59158115Sumestatic int on_mp_write_session_write_request_read2(struct query_state *);
60158115Sumestatic int on_mp_write_session_write_request_process(struct query_state *);
61158115Sumestatic int on_mp_write_session_write_response_write1(struct query_state *);
62158115Sume
63158115Sume/*
64158115Sume * This function is used as the query_state's destroy_func to make the
65158115Sume * proper cleanup in case of errors.
66158115Sume */
67158115Sumestatic void
68158115Sumeon_mp_write_session_destroy(struct query_state *qstate)
69158115Sume{
70158115Sume
71158115Sume	TRACE_IN(on_mp_write_session_destroy);
72158115Sume	finalize_comm_element(&qstate->request);
73158115Sume	finalize_comm_element(&qstate->response);
74158115Sume
75158115Sume	if (qstate->mdata != NULL) {
76158115Sume		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
77158115Sume		abandon_cache_mp_write_session(
78158115Sume	    		(cache_mp_write_session)qstate->mdata);
79158115Sume		configuration_unlock_entry(qstate->config_entry,
80158115Sume			CELT_MULTIPART);
81158115Sume	}
82158115Sume	TRACE_OUT(on_mp_write_session_destroy);
83158115Sume}
84158115Sume
85158115Sume/*
86158115Sume * The functions below are used to process multipart write session initiation
87158115Sume * requests.
88158115Sume * - on_mp_write_session_request_read1 and on_mp_write_session_request_read2
89158115Sume *   read the request itself
90158115Sume * - on_mp_write_session_request_process processes it
91158115Sume * - on_mp_write_session_response_write1 sends the response
92158115Sume */
93158115Sumeint
94158115Sumeon_mp_write_session_request_read1(struct query_state *qstate)
95158115Sume{
96158115Sume	struct cache_mp_write_session_request	*c_mp_ws_request;
97158115Sume	ssize_t	result;
98158115Sume
99158115Sume	TRACE_IN(on_mp_write_session_request_read1);
100158115Sume	if (qstate->kevent_watermark == 0)
101158115Sume		qstate->kevent_watermark = sizeof(size_t);
102158115Sume	else {
103158115Sume		init_comm_element(&qstate->request,
104158115Sume	    		CET_MP_WRITE_SESSION_REQUEST);
105158115Sume		c_mp_ws_request = get_cache_mp_write_session_request(
106158115Sume	    		&qstate->request);
107158115Sume
108158115Sume		result = qstate->read_func(qstate,
109158115Sume	    		&c_mp_ws_request->entry_length, sizeof(size_t));
110158115Sume
111158115Sume		if (result != sizeof(size_t)) {
112158115Sume			LOG_ERR_3("on_mp_write_session_request_read1",
113158115Sume				"read failed");
114158115Sume			TRACE_OUT(on_mp_write_session_request_read1);
115158115Sume			return (-1);
116158115Sume		}
117158115Sume
118158115Sume		if (BUFSIZE_INVALID(c_mp_ws_request->entry_length)) {
119158115Sume			LOG_ERR_3("on_mp_write_session_request_read1",
120158115Sume				"invalid entry_length value");
121158115Sume			TRACE_OUT(on_mp_write_session_request_read1);
122158115Sume			return (-1);
123158115Sume		}
124158115Sume
125194104Sdes		c_mp_ws_request->entry = calloc(1,
126158115Sume			c_mp_ws_request->entry_length + 1);
127158115Sume		assert(c_mp_ws_request->entry != NULL);
128158115Sume
129158115Sume		qstate->kevent_watermark = c_mp_ws_request->entry_length;
130158115Sume		qstate->process_func = on_mp_write_session_request_read2;
131158115Sume	}
132158115Sume	TRACE_OUT(on_mp_write_session_request_read1);
133158115Sume	return (0);
134158115Sume}
135158115Sume
136158115Sumestatic int
137158115Sumeon_mp_write_session_request_read2(struct query_state *qstate)
138158115Sume{
139158115Sume	struct cache_mp_write_session_request	*c_mp_ws_request;
140158115Sume	ssize_t	result;
141158115Sume
142158115Sume	TRACE_IN(on_mp_write_session_request_read2);
143158115Sume	c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request);
144158115Sume
145158115Sume	result = qstate->read_func(qstate, c_mp_ws_request->entry,
146158115Sume		c_mp_ws_request->entry_length);
147158115Sume
148194096Sdes	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
149158115Sume		LOG_ERR_3("on_mp_write_session_request_read2",
150158115Sume			"read failed");
151158115Sume		TRACE_OUT(on_mp_write_session_request_read2);
152158115Sume		return (-1);
153158115Sume	}
154158115Sume
155158115Sume	qstate->kevent_watermark = 0;
156158115Sume	qstate->process_func = on_mp_write_session_request_process;
157158115Sume
158158115Sume	TRACE_OUT(on_mp_write_session_request_read2);
159158115Sume	return (0);
160158115Sume}
161158115Sume
162158115Sumestatic int
163158115Sumeon_mp_write_session_request_process(struct query_state *qstate)
164158115Sume{
165158115Sume	struct cache_mp_write_session_request	*c_mp_ws_request;
166158115Sume	struct cache_mp_write_session_response	*c_mp_ws_response;
167158115Sume	cache_mp_write_session	ws;
168158115Sume	cache_entry	c_entry;
169158115Sume	char	*dec_cache_entry_name;
170158115Sume
171158115Sume	TRACE_IN(on_mp_write_session_request_process);
172158115Sume	init_comm_element(&qstate->response, CET_MP_WRITE_SESSION_RESPONSE);
173158115Sume	c_mp_ws_response = get_cache_mp_write_session_response(
174158115Sume		&qstate->response);
175158115Sume	c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request);
176158115Sume
177158115Sume	qstate->config_entry = configuration_find_entry(
178158115Sume		s_configuration, c_mp_ws_request->entry);
179158115Sume	if (qstate->config_entry == NULL) {
180158115Sume		c_mp_ws_response->error_code = ENOENT;
181158115Sume
182158115Sume		LOG_ERR_2("write_session_request",
183158115Sume			"can't find configuration entry '%s'. "
184158115Sume	    		"aborting request", c_mp_ws_request->entry);
185158115Sume	    	goto fin;
186158115Sume	}
187158115Sume
188158115Sume	if (qstate->config_entry->enabled == 0) {
189158115Sume		c_mp_ws_response->error_code = EACCES;
190158115Sume
191158115Sume		LOG_ERR_2("write_session_request",
192158115Sume			"configuration entry '%s' is disabled",
193158115Sume			c_mp_ws_request->entry);
194158115Sume		goto fin;
195158115Sume	}
196158115Sume
197158115Sume	if (qstate->config_entry->perform_actual_lookups != 0) {
198158115Sume		c_mp_ws_response->error_code = EOPNOTSUPP;
199158115Sume
200158115Sume		LOG_ERR_2("write_session_request",
201158115Sume			"entry '%s' performs lookups by itself: "
202158115Sume			"can't write to it", c_mp_ws_request->entry);
203158115Sume		goto fin;
204158115Sume	} else {
205171795Sbushman#ifdef NS_NSCD_EID_CHECKING
206158115Sume		if (check_query_eids(qstate) != 0) {
207158115Sume			c_mp_ws_response->error_code = EPERM;
208158115Sume			goto fin;
209158115Sume		}
210158115Sume#endif
211158115Sume	}
212158115Sume
213158115Sume	/*
214158115Sume	 * All multipart entries are separated by their name decorations.
215158115Sume	 * For one configuration entry there will be a lot of multipart
216158115Sume	 * cache entries - each with its own decorated name.
217158115Sume	 */
218158115Sume	asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
219194097Sdes		qstate->config_entry->mp_cache_params.cep.entry_name);
220158115Sume	assert(dec_cache_entry_name != NULL);
221158115Sume
222158115Sume	configuration_lock_rdlock(s_configuration);
223158115Sume	c_entry = find_cache_entry(s_cache,
224158115Sume		dec_cache_entry_name);
225158115Sume	configuration_unlock(s_configuration);
226158115Sume
227158115Sume	if (c_entry == INVALID_CACHE_ENTRY)
228158115Sume		c_entry = register_new_mp_cache_entry(qstate,
229158115Sume			dec_cache_entry_name);
230158115Sume
231158115Sume	free(dec_cache_entry_name);
232158115Sume
233158115Sume	assert(c_entry != NULL);
234158115Sume	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
235158115Sume	ws = open_cache_mp_write_session(c_entry);
236158115Sume	if (ws == INVALID_CACHE_MP_WRITE_SESSION)
237158115Sume		c_mp_ws_response->error_code = -1;
238158115Sume	else {
239158115Sume		qstate->mdata = ws;
240158115Sume		qstate->destroy_func = on_mp_write_session_destroy;
241158115Sume
242158115Sume		if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
243158115Sume		    (qstate->config_entry->mp_query_timeout.tv_usec != 0))
244158115Sume			memcpy(&qstate->timeout,
245158115Sume				&qstate->config_entry->mp_query_timeout,
246158115Sume				sizeof(struct timeval));
247158115Sume	}
248158115Sume	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
249158115Sume
250158115Sumefin:
251158115Sume	qstate->process_func = on_mp_write_session_response_write1;
252158115Sume	qstate->kevent_watermark = sizeof(int);
253158115Sume	qstate->kevent_filter = EVFILT_WRITE;
254158115Sume
255158115Sume	TRACE_OUT(on_mp_write_session_request_process);
256158115Sume	return (0);
257158115Sume}
258158115Sume
259158115Sumestatic int
260158115Sumeon_mp_write_session_response_write1(struct query_state *qstate)
261158115Sume{
262158115Sume	struct cache_mp_write_session_response	*c_mp_ws_response;
263158115Sume	ssize_t	result;
264158115Sume
265158115Sume	TRACE_IN(on_mp_write_session_response_write1);
266158115Sume	c_mp_ws_response = get_cache_mp_write_session_response(
267158115Sume		&qstate->response);
268158115Sume	result = qstate->write_func(qstate, &c_mp_ws_response->error_code,
269158115Sume		sizeof(int));
270158115Sume	if (result != sizeof(int)) {
271158115Sume		LOG_ERR_3("on_mp_write_session_response_write1",
272158115Sume			"write failed");
273158115Sume		TRACE_OUT(on_mp_write_session_response_write1);
274158115Sume		return (-1);
275158115Sume	}
276158115Sume
277158115Sume	if (c_mp_ws_response->error_code == 0) {
278158115Sume		qstate->kevent_watermark = sizeof(int);
279158115Sume		qstate->process_func = on_mp_write_session_mapper;
280158115Sume		qstate->kevent_filter = EVFILT_READ;
281158115Sume	} else {
282158115Sume		qstate->kevent_watermark = 0;
283158115Sume		qstate->process_func = NULL;
284158115Sume	}
285158115Sume	TRACE_OUT(on_mp_write_session_response_write1);
286158115Sume	return (0);
287158115Sume}
288158115Sume
289158115Sume/*
290158115Sume * Mapper function is used to avoid multiple connections for each session
291158115Sume * write or read requests. After processing the request, it does not close
292158115Sume * the connection, but waits for the next request.
293158115Sume */
294158115Sumestatic int
295158115Sumeon_mp_write_session_mapper(struct query_state *qstate)
296158115Sume{
297158115Sume	ssize_t	result;
298158115Sume	int		elem_type;
299158115Sume
300158115Sume	TRACE_IN(on_mp_write_session_mapper);
301158115Sume	if (qstate->kevent_watermark == 0) {
302158115Sume		qstate->kevent_watermark = sizeof(int);
303158115Sume	} else {
304158115Sume		result = qstate->read_func(qstate, &elem_type, sizeof(int));
305158115Sume		if (result != sizeof(int)) {
306158115Sume			LOG_ERR_3("on_mp_write_session_mapper",
307158115Sume				"read failed");
308158115Sume			TRACE_OUT(on_mp_write_session_mapper);
309158115Sume			return (-1);
310158115Sume		}
311158115Sume
312158115Sume		switch (elem_type) {
313158115Sume		case CET_MP_WRITE_SESSION_WRITE_REQUEST:
314158115Sume			qstate->kevent_watermark = sizeof(size_t);
315158115Sume			qstate->process_func =
316158115Sume				on_mp_write_session_write_request_read1;
317158115Sume			break;
318158115Sume		case CET_MP_WRITE_SESSION_ABANDON_NOTIFICATION:
319158115Sume			qstate->kevent_watermark = 0;
320158115Sume			qstate->process_func =
321158115Sume				on_mp_write_session_abandon_notification;
322158115Sume			break;
323158115Sume		case CET_MP_WRITE_SESSION_CLOSE_NOTIFICATION:
324158115Sume			qstate->kevent_watermark = 0;
325158115Sume			qstate->process_func =
326158115Sume				on_mp_write_session_close_notification;
327158115Sume			break;
328158115Sume		default:
329158115Sume			qstate->kevent_watermark = 0;
330158115Sume			qstate->process_func = NULL;
331158115Sume			LOG_ERR_2("on_mp_write_session_mapper",
332158115Sume				"unknown element type");
333158115Sume			TRACE_OUT(on_mp_write_session_mapper);
334158115Sume			return (-1);
335158115Sume		}
336158115Sume	}
337158115Sume	TRACE_OUT(on_mp_write_session_mapper);
338158115Sume	return (0);
339158115Sume}
340158115Sume
341158115Sume/*
342158115Sume * The functions below are used to process multipart write sessions write
343158115Sume * requests.
344158115Sume * - on_mp_write_session_write_request_read1 and
345158115Sume *   on_mp_write_session_write_request_read2 read the request itself
346158115Sume * - on_mp_write_session_write_request_process processes it
347158115Sume * - on_mp_write_session_write_response_write1 sends the response
348158115Sume */
349158115Sumestatic int
350158115Sumeon_mp_write_session_write_request_read1(struct query_state *qstate)
351158115Sume{
352158115Sume	struct cache_mp_write_session_write_request	*write_request;
353158115Sume	ssize_t	result;
354158115Sume
355158115Sume	TRACE_IN(on_mp_write_session_write_request_read1);
356158115Sume	init_comm_element(&qstate->request,
357158115Sume		CET_MP_WRITE_SESSION_WRITE_REQUEST);
358158115Sume	write_request = get_cache_mp_write_session_write_request(
359158115Sume		&qstate->request);
360158115Sume
361158115Sume	result = qstate->read_func(qstate, &write_request->data_size,
362158115Sume		sizeof(size_t));
363158115Sume
364158115Sume	if (result != sizeof(size_t)) {
365158115Sume		LOG_ERR_3("on_mp_write_session_write_request_read1",
366158115Sume			"read failed");
367158115Sume		TRACE_OUT(on_mp_write_session_write_request_read1);
368158115Sume		return (-1);
369158115Sume	}
370158115Sume
371158115Sume	if (BUFSIZE_INVALID(write_request->data_size)) {
372158115Sume		LOG_ERR_3("on_mp_write_session_write_request_read1",
373158115Sume			"invalid data_size value");
374158115Sume		TRACE_OUT(on_mp_write_session_write_request_read1);
375158115Sume		return (-1);
376158115Sume	}
377158115Sume
378194104Sdes	write_request->data = calloc(1, write_request->data_size);
379158115Sume	assert(write_request->data != NULL);
380158115Sume
381158115Sume	qstate->kevent_watermark = write_request->data_size;
382158115Sume	qstate->process_func = on_mp_write_session_write_request_read2;
383158115Sume	TRACE_OUT(on_mp_write_session_write_request_read1);
384158115Sume	return (0);
385158115Sume}
386158115Sume
387158115Sumestatic int
388158115Sumeon_mp_write_session_write_request_read2(struct query_state *qstate)
389158115Sume{
390158115Sume	struct cache_mp_write_session_write_request	*write_request;
391158115Sume	ssize_t	result;
392158115Sume
393158115Sume	TRACE_IN(on_mp_write_session_write_request_read2);
394158115Sume	write_request = get_cache_mp_write_session_write_request(
395158115Sume		&qstate->request);
396158115Sume
397158115Sume	result = qstate->read_func(qstate, write_request->data,
398158115Sume		write_request->data_size);
399158115Sume
400194096Sdes	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
401158115Sume		LOG_ERR_3("on_mp_write_session_write_request_read2",
402158115Sume			"read failed");
403158115Sume		TRACE_OUT(on_mp_write_session_write_request_read2);
404158115Sume		return (-1);
405158115Sume	}
406158115Sume
407158115Sume	qstate->kevent_watermark = 0;
408158115Sume	qstate->process_func = on_mp_write_session_write_request_process;
409158115Sume	TRACE_OUT(on_mp_write_session_write_request_read2);
410158115Sume	return (0);
411158115Sume}
412158115Sume
413158115Sumestatic int
414158115Sumeon_mp_write_session_write_request_process(struct query_state *qstate)
415158115Sume{
416158115Sume	struct cache_mp_write_session_write_request	*write_request;
417158115Sume	struct cache_mp_write_session_write_response	*write_response;
418158115Sume
419158115Sume	TRACE_IN(on_mp_write_session_write_request_process);
420158115Sume	init_comm_element(&qstate->response,
421158115Sume		CET_MP_WRITE_SESSION_WRITE_RESPONSE);
422158115Sume	write_response = get_cache_mp_write_session_write_response(
423158115Sume		&qstate->response);
424158115Sume	write_request = get_cache_mp_write_session_write_request(
425158115Sume		&qstate->request);
426158115Sume
427158115Sume	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
428158115Sume	write_response->error_code = cache_mp_write(
429158115Sume		(cache_mp_write_session)qstate->mdata,
430158115Sume		write_request->data,
431158115Sume		write_request->data_size);
432158115Sume	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
433158115Sume
434158115Sume	qstate->kevent_watermark = sizeof(int);
435158115Sume	qstate->process_func = on_mp_write_session_write_response_write1;
436158115Sume	qstate->kevent_filter = EVFILT_WRITE;
437158115Sume
438158115Sume	TRACE_OUT(on_mp_write_session_write_request_process);
439158115Sume	return (0);
440158115Sume}
441158115Sume
442158115Sumestatic int
443158115Sumeon_mp_write_session_write_response_write1(struct query_state *qstate)
444158115Sume{
445158115Sume	struct cache_mp_write_session_write_response	*write_response;
446158115Sume	ssize_t	result;
447158115Sume
448158115Sume	TRACE_IN(on_mp_write_session_write_response_write1);
449158115Sume	write_response = get_cache_mp_write_session_write_response(
450158115Sume		&qstate->response);
451158115Sume	result = qstate->write_func(qstate, &write_response->error_code,
452158115Sume		sizeof(int));
453158115Sume	if (result != sizeof(int)) {
454158115Sume		LOG_ERR_3("on_mp_write_session_write_response_write1",
455158115Sume			"write failed");
456158115Sume		TRACE_OUT(on_mp_write_session_write_response_write1);
457158115Sume		return (-1);
458158115Sume	}
459158115Sume
460158115Sume	if (write_response->error_code == 0) {
461158115Sume		finalize_comm_element(&qstate->request);
462158115Sume		finalize_comm_element(&qstate->response);
463158115Sume
464158115Sume		qstate->kevent_watermark = sizeof(int);
465158115Sume		qstate->process_func = on_mp_write_session_mapper;
466158115Sume		qstate->kevent_filter = EVFILT_READ;
467158115Sume	} else {
468158115Sume		qstate->kevent_watermark = 0;
469158115Sume		qstate->process_func = 0;
470158115Sume	}
471158115Sume
472158115Sume	TRACE_OUT(on_mp_write_session_write_response_write1);
473158115Sume	return (0);
474158115Sume}
475158115Sume
476158115Sume/*
477158115Sume * Handles abandon notifications. Destroys the session by calling the
478158115Sume * abandon_cache_mp_write_session.
479158115Sume */
480158115Sumestatic int
481158115Sumeon_mp_write_session_abandon_notification(struct query_state *qstate)
482158115Sume{
483158115Sume	TRACE_IN(on_mp_write_session_abandon_notification);
484158115Sume	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
485158115Sume	abandon_cache_mp_write_session((cache_mp_write_session)qstate->mdata);
486158115Sume	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
487158115Sume	qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION;
488158115Sume
489158115Sume	qstate->kevent_watermark = 0;
490158115Sume	qstate->process_func = NULL;
491158115Sume	TRACE_OUT(on_mp_write_session_abandon_notification);
492158115Sume	return (0);
493158115Sume}
494158115Sume
495158115Sume/*
496158115Sume * Handles close notifications. Commits the session by calling
497158115Sume * the close_cache_mp_write_session.
498158115Sume */
499158115Sumestatic int
500158115Sumeon_mp_write_session_close_notification(struct query_state *qstate)
501158115Sume{
502158115Sume	TRACE_IN(on_mp_write_session_close_notification);
503158115Sume	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
504158115Sume	close_cache_mp_write_session((cache_mp_write_session)qstate->mdata);
505158115Sume	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
506158115Sume	qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION;
507158115Sume
508158115Sume	qstate->kevent_watermark = 0;
509158115Sume	qstate->process_func = NULL;
510158115Sume	TRACE_OUT(on_mp_write_session_close_notification);
511158115Sume	return (0);
512158115Sume}
513158115Sume
514158115Sumecache_entry register_new_mp_cache_entry(struct query_state *qstate,
515158115Sume	const char *dec_cache_entry_name)
516158115Sume{
517158115Sume	cache_entry c_entry;
518158115Sume	char *en_bkp;
519158115Sume
520158115Sume	TRACE_IN(register_new_mp_cache_entry);
521158115Sume	c_entry = INVALID_CACHE_ENTRY;
522158115Sume	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
523158115Sume
524158115Sume	configuration_lock_wrlock(s_configuration);
525194097Sdes	en_bkp = qstate->config_entry->mp_cache_params.cep.entry_name;
526194097Sdes	qstate->config_entry->mp_cache_params.cep.entry_name =
527158115Sume		(char *)dec_cache_entry_name;
528158115Sume	register_cache_entry(s_cache, (struct cache_entry_params *)
529158115Sume		&qstate->config_entry->mp_cache_params);
530194097Sdes	qstate->config_entry->mp_cache_params.cep.entry_name = en_bkp;
531158115Sume	configuration_unlock(s_configuration);
532158115Sume
533158115Sume	configuration_lock_rdlock(s_configuration);
534158115Sume	c_entry = find_cache_entry(s_cache,
535158115Sume		dec_cache_entry_name);
536158115Sume	configuration_unlock(s_configuration);
537158115Sume
538158115Sume	configuration_entry_add_mp_cache_entry(qstate->config_entry,
539158115Sume		c_entry);
540158115Sume
541158115Sume	configuration_unlock_entry(qstate->config_entry,
542158115Sume		CELT_MULTIPART);
543158115Sume
544158115Sume	TRACE_OUT(register_new_mp_cache_entry);
545158115Sume	return (c_entry);
546158115Sume}
547