mp_rs_query.c revision 194104
168349Sobrien/*- 2133359Sobrien * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru> 3133359Sobrien * All rights reserved. 4133359Sobrien * 5133359Sobrien * Redistribution and use in source and binary forms, with or without 6133359Sobrien * modification, are permitted provided that the following conditions 7133359Sobrien * are met: 8133359Sobrien * 1. Redistributions of source code must retain the above copyright 9133359Sobrien * notice, this list of conditions and the following disclaimer. 10133359Sobrien * 2. Redistributions in binary form must reproduce the above copyright 11133359Sobrien * notice, this list of conditions and the following disclaimer in the 12133359Sobrien * documentation and/or other materials provided with the distribution. 13133359Sobrien * 14133359Sobrien * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15133359Sobrien * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16133359Sobrien * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17133359Sobrien * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18133359Sobrien * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19133359Sobrien * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20133359Sobrien * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21133359Sobrien * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22133359Sobrien * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23133359Sobrien * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24133359Sobrien * SUCH DAMAGE. 25133359Sobrien * 26133359Sobrien */ 27133359Sobrien 28133359Sobrien#include <sys/cdefs.h> 2968349Sobrien__FBSDID("$FreeBSD: head/usr.sbin/nscd/mp_rs_query.c 194104 2009-06-13 13:07:56Z des $"); 3068349Sobrien 3168349Sobrien#include <sys/types.h> 3268349Sobrien#include <sys/event.h> 3368349Sobrien#include <sys/socket.h> 3468349Sobrien#include <sys/time.h> 3568349Sobrien 36191736Sobrien#include <assert.h> 37191736Sobrien#include <errno.h> 38191736Sobrien#include <nsswitch.h> 39191736Sobrien#include <stdio.h> 40191736Sobrien#include <stdlib.h> 41133359Sobrien#include <string.h> 4268349Sobrien 4368349Sobrien#include "cachelib.h" 4468349Sobrien#include "config.h" 4568349Sobrien#include "debug.h" 4668349Sobrien#include "log.h" 47133359Sobrien#include "query.h" 48169942Sobrien#include "mp_rs_query.h" 4968349Sobrien#include "mp_ws_query.h" 5068349Sobrien#include "singletons.h" 5168349Sobrien 52169962Sobrienstatic int on_mp_read_session_close_notification(struct query_state *); 53169962Sobrienstatic void on_mp_read_session_destroy(struct query_state *); 54169962Sobrienstatic int on_mp_read_session_mapper(struct query_state *); 55175296Sobrien/* int on_mp_read_session_request_read1(struct query_state *); */ 56175296Sobrienstatic int on_mp_read_session_request_read2(struct query_state *); 57103373Sobrienstatic int on_mp_read_session_request_process(struct query_state *); 58103373Sobrienstatic int on_mp_read_session_response_write1(struct query_state *); 59103373Sobrienstatic int on_mp_read_session_read_request_process(struct query_state *); 60186690Sobrienstatic int on_mp_read_session_read_response_write1(struct query_state *); 61186690Sobrienstatic int on_mp_read_session_read_response_write2(struct query_state *); 62133359Sobrien 63186690Sobrien/* 64133359Sobrien * This function is used as the query_state's destroy_func to make the 6568349Sobrien * proper cleanup in case of errors. 6668349Sobrien */ 6775937Sobrienstatic void 6875937Sobrienon_mp_read_session_destroy(struct query_state *qstate) 6975937Sobrien{ 7068349Sobrien TRACE_IN(on_mp_read_session_destroy); 7168349Sobrien finalize_comm_element(&qstate->request); 7268349Sobrien finalize_comm_element(&qstate->response); 7368349Sobrien 7468349Sobrien if (qstate->mdata != NULL) { 75159764Sobrien configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 76159764Sobrien close_cache_mp_read_session( 7780588Sobrien (cache_mp_read_session)qstate->mdata); 78191736Sobrien configuration_unlock_entry(qstate->config_entry, 7968349Sobrien CELT_MULTIPART); 8068349Sobrien } 81169962Sobrien TRACE_OUT(on_mp_read_session_destroy); 8268349Sobrien} 83169942Sobrien 8468349Sobrien/* 85169942Sobrien * The functions below are used to process multipart read session initiation 86133359Sobrien * requests. 87159764Sobrien * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read 88159764Sobrien * the request itself 89175296Sobrien * - on_mp_read_session_request_process processes it 90133359Sobrien * - on_mp_read_session_response_write1 sends the response 91133359Sobrien */ 92103373Sobrienint 9368349Sobrienon_mp_read_session_request_read1(struct query_state *qstate) 94133359Sobrien{ 95169962Sobrien struct cache_mp_read_session_request *c_mp_rs_request; 96169962Sobrien ssize_t result; 9768349Sobrien 98133359Sobrien TRACE_IN(on_mp_read_session_request_read1); 99133359Sobrien if (qstate->kevent_watermark == 0) 100133359Sobrien qstate->kevent_watermark = sizeof(size_t); 101175296Sobrien else { 10268349Sobrien init_comm_element(&qstate->request, 103133359Sobrien CET_MP_READ_SESSION_REQUEST); 104133359Sobrien c_mp_rs_request = get_cache_mp_read_session_request( 105133359Sobrien &qstate->request); 10668349Sobrien 10768349Sobrien result = qstate->read_func(qstate, 10868349Sobrien &c_mp_rs_request->entry_length, sizeof(size_t)); 10968349Sobrien 110159764Sobrien if (result != sizeof(size_t)) { 111169942Sobrien TRACE_OUT(on_mp_read_session_request_read1); 112133359Sobrien return (-1); 113133359Sobrien } 114169962Sobrien 115133359Sobrien if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) { 116175296Sobrien TRACE_OUT(on_mp_read_session_request_read1); 117175296Sobrien return (-1); 118175296Sobrien } 119175296Sobrien 120175296Sobrien c_mp_rs_request->entry = calloc(1, 121175296Sobrien c_mp_rs_request->entry_length + 1); 122175296Sobrien assert(c_mp_rs_request->entry != NULL); 123175296Sobrien 124175296Sobrien qstate->kevent_watermark = c_mp_rs_request->entry_length; 125133359Sobrien qstate->process_func = on_mp_read_session_request_read2; 126175296Sobrien } 127175296Sobrien TRACE_OUT(on_mp_read_session_request_read1); 128133359Sobrien return (0); 129133359Sobrien} 130133359Sobrien 13168349Sobrienstatic int 13268349Sobrienon_mp_read_session_request_read2(struct query_state *qstate) 133133359Sobrien{ 134133359Sobrien struct cache_mp_read_session_request *c_mp_rs_request; 135133359Sobrien ssize_t result; 136133359Sobrien 137133359Sobrien TRACE_IN(on_mp_read_session_request_read2); 13868349Sobrien c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request); 13968349Sobrien 14075937Sobrien result = qstate->read_func(qstate, c_mp_rs_request->entry, 14175937Sobrien c_mp_rs_request->entry_length); 14275937Sobrien 143133359Sobrien if (result < 0 || (size_t)result != qstate->kevent_watermark) { 144103373Sobrien LOG_ERR_3("on_mp_read_session_request_read2", 14575937Sobrien "read failed"); 14675937Sobrien TRACE_OUT(on_mp_read_session_request_read2); 14775937Sobrien return (-1); 14868349Sobrien } 14975937Sobrien 15075937Sobrien qstate->kevent_watermark = 0; 15175937Sobrien qstate->process_func = on_mp_read_session_request_process; 15275937Sobrien TRACE_OUT(on_mp_read_session_request_read2); 15375937Sobrien return (0); 15475937Sobrien} 15575937Sobrien 15675937Sobrienstatic int 157103373Sobrienon_mp_read_session_request_process(struct query_state *qstate) 15875937Sobrien{ 15975937Sobrien struct cache_mp_read_session_request *c_mp_rs_request; 16075937Sobrien struct cache_mp_read_session_response *c_mp_rs_response; 16175937Sobrien cache_mp_read_session rs; 16275937Sobrien cache_entry c_entry; 16375937Sobrien char *dec_cache_entry_name; 16475937Sobrien 16575937Sobrien char *buffer; 16675937Sobrien size_t buffer_size; 16775937Sobrien cache_mp_write_session ws; 168169942Sobrien struct agent *lookup_agent; 169169962Sobrien struct multipart_agent *mp_agent; 17075937Sobrien void *mdata; 171169962Sobrien int res; 172169942Sobrien 173169942Sobrien TRACE_IN(on_mp_read_session_request_process); 174169942Sobrien init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE); 17575937Sobrien c_mp_rs_response = get_cache_mp_read_session_response( 17675937Sobrien &qstate->response); 177169942Sobrien c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request); 178169942Sobrien 179169942Sobrien qstate->config_entry = configuration_find_entry( 180169942Sobrien s_configuration, c_mp_rs_request->entry); 181169962Sobrien if (qstate->config_entry == NULL) { 182169942Sobrien c_mp_rs_response->error_code = ENOENT; 183169962Sobrien 184169942Sobrien LOG_ERR_2("read_session_request", 185169942Sobrien "can't find configuration entry '%s'." 186169962Sobrien " aborting request", c_mp_rs_request->entry); 187169942Sobrien goto fin; 188169942Sobrien } 189169942Sobrien 190169942Sobrien if (qstate->config_entry->enabled == 0) { 191169942Sobrien c_mp_rs_response->error_code = EACCES; 192169942Sobrien 193169942Sobrien LOG_ERR_2("read_session_request", 194169942Sobrien "configuration entry '%s' is disabled", 195169962Sobrien c_mp_rs_request->entry); 196169962Sobrien goto fin; 197169942Sobrien } 198169942Sobrien 199169962Sobrien if (qstate->config_entry->perform_actual_lookups != 0) 200169942Sobrien dec_cache_entry_name = strdup( 201169962Sobrien qstate->config_entry->mp_cache_params.cep.entry_name); 202169962Sobrien else { 203169942Sobrien#ifdef NS_NSCD_EID_CHECKING 204169942Sobrien if (check_query_eids(qstate) != 0) { 205169942Sobrien c_mp_rs_response->error_code = EPERM; 206169942Sobrien goto fin; 207169942Sobrien } 208169942Sobrien#endif 209169942Sobrien 210169942Sobrien asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str, 211169942Sobrien qstate->config_entry->mp_cache_params.cep.entry_name); 212169942Sobrien } 213169942Sobrien 214169942Sobrien assert(dec_cache_entry_name != NULL); 21575937Sobrien 216169942Sobrien configuration_lock_rdlock(s_configuration); 21775937Sobrien c_entry = find_cache_entry(s_cache, dec_cache_entry_name); 21875937Sobrien configuration_unlock(s_configuration); 21975937Sobrien 22075937Sobrien if ((c_entry == INVALID_CACHE) && 221103373Sobrien (qstate->config_entry->perform_actual_lookups != 0)) 222103373Sobrien c_entry = register_new_mp_cache_entry(qstate, 22375937Sobrien dec_cache_entry_name); 22475937Sobrien 22575937Sobrien free(dec_cache_entry_name); 22675937Sobrien 22775937Sobrien if (c_entry != INVALID_CACHE_ENTRY) { 22875937Sobrien configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 22975937Sobrien rs = open_cache_mp_read_session(c_entry); 23075937Sobrien configuration_unlock_entry(qstate->config_entry, 23175937Sobrien CELT_MULTIPART); 232133359Sobrien 233133359Sobrien if ((rs == INVALID_CACHE_MP_READ_SESSION) && 234133359Sobrien (qstate->config_entry->perform_actual_lookups != 0)) { 235103373Sobrien lookup_agent = find_agent(s_agent_table, 236103373Sobrien c_mp_rs_request->entry, MULTIPART_AGENT); 237103373Sobrien 238103373Sobrien if ((lookup_agent != NULL) && 239191736Sobrien (lookup_agent->type == MULTIPART_AGENT)) { 240103373Sobrien mp_agent = (struct multipart_agent *) 241103373Sobrien lookup_agent; 242103373Sobrien mdata = mp_agent->mp_init_func(); 243103373Sobrien 244103373Sobrien /* 245103373Sobrien * Multipart agents read the whole snapshot 246103373Sobrien * of the data at one time. 247103373Sobrien */ 248103373Sobrien configuration_lock_entry(qstate->config_entry, 249103373Sobrien CELT_MULTIPART); 250103373Sobrien ws = open_cache_mp_write_session(c_entry); 251103373Sobrien configuration_unlock_entry(qstate->config_entry, 252103373Sobrien CELT_MULTIPART); 253103373Sobrien if (ws != NULL) { 254103373Sobrien do { 255133359Sobrien buffer = NULL; 256133359Sobrien res = mp_agent->mp_lookup_func(&buffer, 257133359Sobrien &buffer_size, 258103373Sobrien mdata); 259103373Sobrien 260133359Sobrien if ((res & NS_TERMINATE) && 261103373Sobrien (buffer != NULL)) { 262103373Sobrien configuration_lock_entry( 263169962Sobrien qstate->config_entry, 264133359Sobrien CELT_MULTIPART); 265103373Sobrien if (cache_mp_write(ws, buffer, 266103373Sobrien buffer_size) != 0) { 267103373Sobrien abandon_cache_mp_write_session(ws); 268103373Sobrien ws = NULL; 269103373Sobrien } 270133359Sobrien configuration_unlock_entry( 271133359Sobrien qstate->config_entry, 272103373Sobrien CELT_MULTIPART); 273103373Sobrien 274103373Sobrien free(buffer); 275133359Sobrien buffer = NULL; 276133359Sobrien } else { 277103373Sobrien configuration_lock_entry( 278103373Sobrien qstate->config_entry, 279103373Sobrien CELT_MULTIPART); 280103373Sobrien close_cache_mp_write_session(ws); 281103373Sobrien configuration_unlock_entry( 282103373Sobrien qstate->config_entry, 283103373Sobrien CELT_MULTIPART); 284103373Sobrien 285133359Sobrien free(buffer); 286133359Sobrien buffer = NULL; 287103373Sobrien } 288103373Sobrien } while ((res & NS_TERMINATE) && 289103373Sobrien (ws != NULL)); 290133359Sobrien } 291133359Sobrien 292103373Sobrien configuration_lock_entry(qstate->config_entry, 293103373Sobrien CELT_MULTIPART); 294103373Sobrien rs = open_cache_mp_read_session(c_entry); 295103373Sobrien configuration_unlock_entry(qstate->config_entry, 296175296Sobrien CELT_MULTIPART); 297103373Sobrien } 298103373Sobrien } 299103373Sobrien 300103373Sobrien if (rs == INVALID_CACHE_MP_READ_SESSION) 301103373Sobrien c_mp_rs_response->error_code = -1; 302103373Sobrien else { 303133359Sobrien qstate->mdata = rs; 304133359Sobrien qstate->destroy_func = on_mp_read_session_destroy; 305133359Sobrien 30668349Sobrien configuration_lock_entry(qstate->config_entry, 307103373Sobrien CELT_MULTIPART); 308133359Sobrien if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) || 309103373Sobrien (qstate->config_entry->mp_query_timeout.tv_usec != 0)) 310103373Sobrien memcpy(&qstate->timeout, 311103373Sobrien &qstate->config_entry->mp_query_timeout, 312133359Sobrien sizeof(struct timeval)); 313133359Sobrien configuration_unlock_entry(qstate->config_entry, 314133359Sobrien CELT_MULTIPART); 315103373Sobrien } 316133359Sobrien } else 317103373Sobrien c_mp_rs_response->error_code = -1; 318133359Sobrien 319103373Sobrienfin: 320103373Sobrien qstate->process_func = on_mp_read_session_response_write1; 321103373Sobrien qstate->kevent_watermark = sizeof(int); 322103373Sobrien qstate->kevent_filter = EVFILT_WRITE; 323133359Sobrien 324103373Sobrien TRACE_OUT(on_mp_read_session_request_process); 325103373Sobrien return (0); 326103373Sobrien} 327103373Sobrien 328103373Sobrienstatic int 329103373Sobrienon_mp_read_session_response_write1(struct query_state *qstate) 330133359Sobrien{ 331133359Sobrien struct cache_mp_read_session_response *c_mp_rs_response; 332186690Sobrien ssize_t result; 333103373Sobrien 334103373Sobrien TRACE_IN(on_mp_read_session_response_write1); 335103373Sobrien c_mp_rs_response = get_cache_mp_read_session_response( 336133359Sobrien &qstate->response); 337133359Sobrien result = qstate->write_func(qstate, &c_mp_rs_response->error_code, 338133359Sobrien sizeof(int)); 339103373Sobrien 340103373Sobrien if (result != sizeof(int)) { 341103373Sobrien LOG_ERR_3("on_mp_read_session_response_write1", 342103373Sobrien "write failed"); 343103373Sobrien TRACE_OUT(on_mp_read_session_response_write1); 344103373Sobrien return (-1); 345103373Sobrien } 346103373Sobrien 347103373Sobrien if (c_mp_rs_response->error_code == 0) { 348133359Sobrien qstate->kevent_watermark = sizeof(int); 349103373Sobrien qstate->process_func = on_mp_read_session_mapper; 350103373Sobrien qstate->kevent_filter = EVFILT_READ; 351103373Sobrien } else { 352103373Sobrien qstate->kevent_watermark = 0; 353103373Sobrien qstate->process_func = NULL; 354133359Sobrien } 355103373Sobrien TRACE_OUT(on_mp_read_session_response_write1); 356103373Sobrien return (0); 357103373Sobrien} 358133359Sobrien 359169962Sobrien/* 360103373Sobrien * Mapper function is used to avoid multiple connections for each session 361103373Sobrien * write or read requests. After processing the request, it does not close 362169942Sobrien * the connection, but waits for the next request. 363103373Sobrien */ 364103373Sobrienstatic int 365103373Sobrienon_mp_read_session_mapper(struct query_state *qstate) 366103373Sobrien{ 367103373Sobrien ssize_t result; 368133359Sobrien int elem_type; 369159764Sobrien 370159764Sobrien TRACE_IN(on_mp_read_session_mapper); 371103373Sobrien if (qstate->kevent_watermark == 0) { 37268349Sobrien qstate->kevent_watermark = sizeof(int); 373133359Sobrien } else { 37468349Sobrien result = qstate->read_func(qstate, &elem_type, sizeof(int)); 375175296Sobrien if (result != sizeof(int)) { 376186690Sobrien LOG_ERR_3("on_mp_read_session_mapper", 377103373Sobrien "read failed"); 378133359Sobrien TRACE_OUT(on_mp_read_session_mapper); 379103373Sobrien return (-1); 380159764Sobrien } 381159764Sobrien 382103373Sobrien switch (elem_type) { 383159764Sobrien case CET_MP_READ_SESSION_READ_REQUEST: 384133359Sobrien qstate->kevent_watermark = 0; 385169942Sobrien qstate->process_func = 38668349Sobrien on_mp_read_session_read_request_process; 38768349Sobrien break; 38868349Sobrien case CET_MP_READ_SESSION_CLOSE_NOTIFICATION: 38968349Sobrien qstate->kevent_watermark = 0; 390159764Sobrien qstate->process_func = 391159764Sobrien on_mp_read_session_close_notification; 392159764Sobrien break; 393159764Sobrien default: 394159764Sobrien qstate->kevent_watermark = 0; 395159764Sobrien qstate->process_func = NULL; 396159764Sobrien LOG_ERR_3("on_mp_read_session_mapper", 397159764Sobrien "unknown element type"); 39868349Sobrien TRACE_OUT(on_mp_read_session_mapper); 39968349Sobrien return (-1); 40068349Sobrien } 40168349Sobrien } 40268349Sobrien TRACE_OUT(on_mp_read_session_mapper); 403159764Sobrien return (0); 40468349Sobrien} 405159764Sobrien 406159764Sobrien/* 40768349Sobrien * The functions below are used to process multipart read sessions read 408169962Sobrien * requests. User doesn't have to pass any kind of data, besides the 409169962Sobrien * request identificator itself. So we don't need any XXX_read functions and 410159764Sobrien * start with the XXX_process function. 411159764Sobrien * - on_mp_read_session_read_request_process processes it 412159764Sobrien * - on_mp_read_session_read_response_write1 and 413159764Sobrien * on_mp_read_session_read_response_write2 sends the response 41468349Sobrien */ 41568349Sobrienstatic int 41668349Sobrienon_mp_read_session_read_request_process(struct query_state *qstate) 417133359Sobrien{ 418169942Sobrien struct cache_mp_read_session_read_response *read_response; 41968349Sobrien 42068349Sobrien TRACE_IN(on_mp_read_session_response_process); 42168349Sobrien init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE); 422159764Sobrien read_response = get_cache_mp_read_session_read_response( 423159764Sobrien &qstate->response); 424159764Sobrien 425159764Sobrien configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 426159764Sobrien read_response->error_code = cache_mp_read( 427159764Sobrien (cache_mp_read_session)qstate->mdata, NULL, 428159764Sobrien &read_response->data_size); 429159764Sobrien 430159764Sobrien if (read_response->error_code == 0) { 431169962Sobrien read_response->data = malloc(read_response->data_size); 432159764Sobrien assert(read_response != NULL); 433159764Sobrien read_response->error_code = cache_mp_read( 434159764Sobrien (cache_mp_read_session)qstate->mdata, 435159764Sobrien read_response->data, 436159764Sobrien &read_response->data_size); 437159764Sobrien } 438159764Sobrien configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 439159764Sobrien 440159764Sobrien if (read_response->error_code == 0) 441159764Sobrien qstate->kevent_watermark = sizeof(size_t) + sizeof(int); 442159764Sobrien else 443159764Sobrien qstate->kevent_watermark = sizeof(int); 444159764Sobrien qstate->process_func = on_mp_read_session_read_response_write1; 445159764Sobrien qstate->kevent_filter = EVFILT_WRITE; 446159764Sobrien 447133359Sobrien TRACE_OUT(on_mp_read_session_response_process); 448159764Sobrien return (0); 449133359Sobrien} 450159764Sobrien 451159764Sobrienstatic int 452159764Sobrienon_mp_read_session_read_response_write1(struct query_state *qstate) 453159764Sobrien{ 454159764Sobrien struct cache_mp_read_session_read_response *read_response; 455159764Sobrien ssize_t result; 456133359Sobrien 457103373Sobrien TRACE_IN(on_mp_read_session_read_response_write1); 458159764Sobrien read_response = get_cache_mp_read_session_read_response( 459159764Sobrien &qstate->response); 460159764Sobrien 461159764Sobrien result = qstate->write_func(qstate, &read_response->error_code, 46275937Sobrien sizeof(int)); 46375937Sobrien if (read_response->error_code == 0) { 46475937Sobrien result += qstate->write_func(qstate, &read_response->data_size, 465169962Sobrien sizeof(size_t)); 466159764Sobrien if (result < 0 || (size_t)result != qstate->kevent_watermark) { 467159764Sobrien TRACE_OUT(on_mp_read_session_read_response_write1); 468159764Sobrien LOG_ERR_3("on_mp_read_session_read_response_write1", 469159764Sobrien "write failed"); 47068349Sobrien return (-1); 47175937Sobrien } 472133359Sobrien 47375937Sobrien qstate->kevent_watermark = read_response->data_size; 474133359Sobrien qstate->process_func = on_mp_read_session_read_response_write2; 475133359Sobrien } else { 47668349Sobrien if (result < 0 || (size_t)result != qstate->kevent_watermark) { 477103373Sobrien LOG_ERR_3("on_mp_read_session_read_response_write1", 478169942Sobrien "write failed"); 47975937Sobrien TRACE_OUT(on_mp_read_session_read_response_write1); 48075937Sobrien return (-1); 48175937Sobrien } 48268349Sobrien 483133359Sobrien qstate->kevent_watermark = 0; 484133359Sobrien qstate->process_func = NULL; 485133359Sobrien } 486133359Sobrien 487133359Sobrien TRACE_OUT(on_mp_read_session_read_response_write1); 488133359Sobrien return (0); 48968349Sobrien} 49068349Sobrien 49168349Sobrienstatic int 492on_mp_read_session_read_response_write2(struct query_state *qstate) 493{ 494 struct cache_mp_read_session_read_response *read_response; 495 ssize_t result; 496 497 TRACE_IN(on_mp_read_session_read_response_write2); 498 read_response = get_cache_mp_read_session_read_response( 499 &qstate->response); 500 result = qstate->write_func(qstate, read_response->data, 501 read_response->data_size); 502 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 503 LOG_ERR_3("on_mp_read_session_read_response_write2", 504 "write failed"); 505 TRACE_OUT(on_mp_read_session_read_response_write2); 506 return (-1); 507 } 508 509 finalize_comm_element(&qstate->request); 510 finalize_comm_element(&qstate->response); 511 512 qstate->kevent_watermark = sizeof(int); 513 qstate->process_func = on_mp_read_session_mapper; 514 qstate->kevent_filter = EVFILT_READ; 515 516 TRACE_OUT(on_mp_read_session_read_response_write2); 517 return (0); 518} 519 520/* 521 * Handles session close notification by calling close_cache_mp_read_session 522 * function. 523 */ 524static int 525on_mp_read_session_close_notification(struct query_state *qstate) 526{ 527 528 TRACE_IN(on_mp_read_session_close_notification); 529 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 530 close_cache_mp_read_session((cache_mp_read_session)qstate->mdata); 531 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 532 qstate->mdata = NULL; 533 qstate->kevent_watermark = 0; 534 qstate->process_func = NULL; 535 TRACE_OUT(on_mp_read_session_close_notification); 536 return (0); 537} 538