11832Swollman/* $NetBSD$ */ 239429Sobrien 31832Swollman/*- 41832Swollman * Copyright (c)2010,2011 YAMAMOTO Takashi, 51832Swollman * All rights reserved. 632624Swpaul * 71832Swollman * Redistribution and use in source and binary forms, with or without 826208Swpaul * modification, are permitted provided that the following conditions 916123Swpaul * are met: 1026208Swpaul * 1. Redistributions of source code must retain the above copyright 1126208Swpaul * notice, this list of conditions and the following disclaimer. 1226208Swpaul * 2. Redistributions in binary form must reproduce the above copyright 131832Swollman * notice, this list of conditions and the following disclaimer in the 1426208Swpaul * documentation and/or other materials provided with the distribution. 1526208Swpaul * 1626208Swpaul * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 171832Swollman * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 181832Swollman * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 191832Swollman * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 201832Swollman * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 211832Swollman * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2229504Sbde * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2326248Swpaul * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2417953Speter * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2517953Speter * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2628542Sbde * SUCH DAMAGE. 2728524Sjmg */ 2833298Sbde 2933298Sbde/* 3028524Sjmg * backend db operations 311832Swollman */ 3239429Sobrien 331832Swollman#include <sys/cdefs.h> 3432551Sbde#ifndef lint 3532551Sbde__RCSID("$NetBSD$"); 361832Swollman#endif /* not lint */ 37 38#include <assert.h> 39#include <err.h> 40#include <errno.h> 41#include <inttypes.h> 42#include <puffs.h> 43#include <stdbool.h> 44#include <stdarg.h> 45#include <stdio.h> 46#include <stdlib.h> 47#include <util.h> 48 49#include <libpq-fe.h> 50 51#include "pgfs_db.h" 52#include "pgfs_waitq.h" 53#include "pgfs_debug.h" 54 55bool pgfs_dosync = false; 56 57struct Xconn { 58 TAILQ_ENTRY(Xconn) list; 59 PGconn *conn; 60 struct puffs_cc *blocker; 61 struct puffs_cc *owner; 62 bool in_trans; 63 int id; 64}; 65 66static void 67dumperror(struct Xconn *xc, const PGresult *res) 68{ 69 static const struct { 70 const char *name; 71 int code; 72 } fields[] = { 73#define F(x) { .name = #x, .code = x, } 74 F(PG_DIAG_SEVERITY), 75 F(PG_DIAG_SQLSTATE), 76 F(PG_DIAG_MESSAGE_PRIMARY), 77 F(PG_DIAG_MESSAGE_DETAIL), 78 F(PG_DIAG_MESSAGE_HINT), 79 F(PG_DIAG_STATEMENT_POSITION), 80 F(PG_DIAG_INTERNAL_POSITION), 81 F(PG_DIAG_INTERNAL_QUERY), 82 F(PG_DIAG_CONTEXT), 83 F(PG_DIAG_SOURCE_FILE), 84 F(PG_DIAG_SOURCE_LINE), 85 F(PG_DIAG_SOURCE_FUNCTION), 86#undef F 87 }; 88 unsigned int i; 89 90 if (!pgfs_dodprintf) { 91 return; 92 } 93 assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR || 94 PQresultStatus(res) == PGRES_FATAL_ERROR); 95 for (i = 0; i < __arraycount(fields); i++) { 96 const char *val = PQresultErrorField(res, fields[i].code); 97 98 if (val == NULL) { 99 continue; 100 } 101 fprintf(stderr, "%s: %s\n", fields[i].name, val); 102 } 103} 104 105TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist); 106struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq); 107 108static struct Xconn * 109getxc(struct puffs_cc *cc) 110{ 111 struct Xconn *xc; 112 113 assert(cc != NULL); 114retry: 115 TAILQ_FOREACH(xc, &xclist, list) { 116 if (xc->blocker == NULL) { 117 assert(xc->owner == NULL); 118 xc->owner = cc; 119 DPRINTF("xc %p acquire %p\n", xc, cc); 120 return xc; 121 } else { 122 assert(xc->owner == xc->blocker); 123 } 124 } 125 DPRINTF("no free conn %p\n", cc); 126 waiton(&xcwaitq, cc); 127 goto retry; 128} 129 130static void 131relxc(struct Xconn *xc) 132{ 133 134 assert(xc->in_trans); 135 assert(xc->owner != NULL); 136 xc->in_trans = false; 137 xc->owner = NULL; 138 wakeup_one(&xcwaitq); 139} 140 141static void 142pqwait(struct Xconn *xc) 143{ 144 PGconn *conn = xc->conn; 145 struct puffs_cc *cc = xc->owner; 146 147 if (PQflush(conn)) { 148 errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn)); 149 } 150 if (!PQisBusy(conn)) { 151 return; 152 } 153 assert(xc->blocker == NULL); 154 xc->blocker = cc; 155 DPRINTF("yielding %p\n", cc); 156 /* XXX is it safe to yield before entering mainloop? */ 157 puffs_cc_yield(cc); 158 DPRINTF("yield returned %p\n", cc); 159 assert(xc->owner == cc); 160 assert(xc->blocker == cc); 161 xc->blocker = NULL; 162} 163 164static int 165sqltoerrno(const char *sqlstate) 166{ 167 /* 168 * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle 169 * "tuple concurrently updated" errors for lowrite/lo_truncate. 170 * 171 * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN? 172 */ 173 static const struct { 174 char sqlstate[5]; 175 int error; 176 } map[] = { 177 { "00000", 0, }, /* ERRCODE_SUCCESSFUL_COMPLETION */ 178 { "02000", ENOENT, }, /* ERRCODE_NO_DATA */ 179 { "23505", EEXIST, }, /* ERRCODE_UNIQUE_VIOLATION */ 180 { "23514", EINVAL, }, /* ERRCODE_CHECK_VIOLATION */ 181 { "40001", EAGAIN, }, /* ERRCODE_T_R_SERIALIZATION_FAILURE */ 182 { "40P01", EAGAIN, }, /* ERRCODE_T_R_DEADLOCK_DETECTED */ 183 { "42704", ENOENT, }, /* ERRCODE_UNDEFINED_OBJECT */ 184 { "53100", ENOSPC, }, /* ERRCODE_DISK_FULL */ 185 { "53200", ENOMEM, }, /* ERRCODE_OUT_OF_MEMORY */ 186 { "XX000", EAGAIN, }, /* ERRCODE_INTERNAL_ERROR */ 187 }; 188 unsigned int i; 189 190 for (i = 0; i < __arraycount(map); i++) { 191 if (!memcmp(map[i].sqlstate, sqlstate, 5)) { 192 const int error = map[i].error; 193 194 if (error != 0) { 195 DPRINTF("sqlstate %5s mapped to error %d\n", 196 sqlstate, error); 197 } 198 if (error == EINVAL) { 199 /* 200 * sounds like a bug. 201 */ 202 abort(); 203 } 204 return error; 205 } 206 } 207 DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate); 208 return EIO; 209} 210 211struct cmd { 212 char name[32]; /* name of prepared statement */ 213 char *cmd; /* query string */ 214 unsigned int nparams; 215 Oid *paramtypes; 216 uint32_t prepared_mask; /* for which connections this is prepared? */ 217 unsigned int flags; /* CMD_ flags */ 218}; 219 220#define CMD_NOPREPARE 1 /* don't prepare this command */ 221 222struct cmd * 223createcmd(const char *cmd, unsigned int flags, ...) 224{ 225 struct cmd *c; 226 va_list ap; 227 const char *cp; 228 unsigned int i; 229 static unsigned int cmdid; 230 231 c = emalloc(sizeof(*c)); 232 c->cmd = estrdup(cmd); 233 c->nparams = 0; 234 va_start(ap, flags); 235 for (cp = cmd; *cp != 0; cp++) { 236 if (*cp == '$') { /* XXX */ 237 c->nparams++; 238 } 239 } 240 c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes)); 241 for (i = 0; i < c->nparams; i++) { 242 Oid type = va_arg(ap, Oid); 243 assert(type == BYTEA || 244 type == INT4OID || type == INT8OID || type == OIDOID || 245 type == TEXTOID || type == TIMESTAMPTZOID); 246 c->paramtypes[i] = type; 247 } 248 va_end(ap); 249 snprintf(c->name, sizeof(c->name), "%u", cmdid++); 250 if ((flags & CMD_NOPREPARE) != 0) { 251 c->prepared_mask = ~0; 252 } else { 253 c->prepared_mask = 0; 254 } 255 c->flags = flags; 256 return c; 257} 258 259static void 260freecmd(struct cmd *c) 261{ 262 263 free(c->paramtypes); 264 free(c->cmd); 265 free(c); 266} 267 268static int 269fetch_noresult(struct Xconn *xc) 270{ 271 PGresult *res; 272 ExecStatusType status; 273 PGconn *conn = xc->conn; 274 int error; 275 276 pqwait(xc); 277 res = PQgetResult(conn); 278 if (res == NULL) { 279 return ENOENT; 280 } 281 status = PQresultStatus(res); 282 if (status == PGRES_COMMAND_OK) { 283 assert(PQnfields(res) == 0); 284 assert(PQntuples(res) == 0); 285 if (!strcmp(PQcmdTuples(res), "0")) { 286 error = ENOENT; 287 } else { 288 error = 0; 289 } 290 } else if (status == PGRES_FATAL_ERROR) { 291 error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE)); 292 assert(error != 0); 293 dumperror(xc, res); 294 } else { 295 errx(1, "%s not command_ok: %d: %s", __func__, 296 (int)status, 297 PQerrorMessage(conn)); 298 } 299 PQclear(res); 300 res = PQgetResult(conn); 301 assert(res == NULL); 302 if (error != 0) { 303 DPRINTF("error %d\n", error); 304 } 305 return error; 306} 307 308static int 309preparecmd(struct Xconn *xc, struct cmd *c) 310{ 311 PGconn *conn = xc->conn; 312 const uint32_t mask = 1 << xc->id; 313 int error; 314 int ret; 315 316 if ((c->prepared_mask & mask) != 0) { 317 return 0; 318 } 319 assert((c->flags & CMD_NOPREPARE) == 0); 320 DPRINTF("PREPARE: '%s'\n", c->cmd); 321 ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes); 322 if (!ret) { 323 errx(EXIT_FAILURE, "PQsendPrepare: %s", 324 PQerrorMessage(conn)); 325 } 326 error = fetch_noresult(xc); 327 if (error != 0) { 328 return error; 329 } 330 c->prepared_mask |= mask; 331 return 0; 332} 333 334/* 335 * vsendcmd: 336 * 337 * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared. 338 * 0 for text and 1 for binary. 339 */ 340 341static int 342vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap) 343{ 344 PGconn *conn = xc->conn; 345 char **paramvalues; 346 int *paramlengths; 347 int *paramformats; 348 unsigned int i; 349 int error; 350 int ret; 351 352 assert(xc->owner != NULL); 353 assert(xc->blocker == NULL); 354 error = preparecmd(xc, c); 355 if (error != 0) { 356 return error; 357 } 358 paramvalues = emalloc(c->nparams * sizeof(*paramvalues)); 359 paramlengths = NULL; 360 paramformats = NULL; 361 DPRINTF("CMD: '%s'\n", c->cmd); 362 for (i = 0; i < c->nparams; i++) { 363 Oid type = c->paramtypes[i]; 364 char tmpstore[1024]; 365 const char *buf = NULL; 366 intmax_t v = 0; /* XXXgcc */ 367 int sz; 368 bool binary = false; 369 370 switch (type) { 371 case BYTEA: 372 buf = va_arg(ap, const void *); 373 sz = (int)va_arg(ap, size_t); 374 binary = true; 375 break; 376 case INT8OID: 377 case OIDOID: 378 case INT4OID: 379 switch (type) { 380 case INT8OID: 381 v = (intmax_t)va_arg(ap, int64_t); 382 break; 383 case OIDOID: 384 v = (intmax_t)va_arg(ap, Oid); 385 break; 386 case INT4OID: 387 v = (intmax_t)va_arg(ap, int32_t); 388 break; 389 default: 390 errx(EXIT_FAILURE, "unknown integer oid %u", 391 type); 392 } 393 buf = tmpstore; 394 sz = snprintf(tmpstore, sizeof(tmpstore), 395 "%jd", v); 396 assert(sz != -1); 397 assert((size_t)sz < sizeof(tmpstore)); 398 sz += 1; 399 break; 400 case TEXTOID: 401 case TIMESTAMPTZOID: 402 buf = va_arg(ap, char *); 403 sz = strlen(buf) + 1; 404 break; 405 default: 406 errx(EXIT_FAILURE, "%s: unknown param type %u", 407 __func__, type); 408 } 409 if (binary) { 410 if (paramlengths == NULL) { 411 paramlengths = 412 emalloc(c->nparams * sizeof(*paramformats)); 413 } 414 if (paramformats == NULL) { 415 paramformats = ecalloc(1, 416 c->nparams * sizeof(*paramformats)); 417 } 418 paramformats[i] = 1; 419 paramlengths[i] = sz; 420 } 421 paramvalues[i] = emalloc(sz); 422 memcpy(paramvalues[i], buf, sz); 423 if (binary) { 424 DPRINTF("\t[%u]=<BINARY>\n", i); 425 } else { 426 DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]); 427 } 428 } 429 if ((c->flags & CMD_NOPREPARE) != 0) { 430 ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes, 431 (const char * const *)paramvalues, paramlengths, 432 paramformats, resultmode); 433 } else { 434 ret = PQsendQueryPrepared(conn, c->name, c->nparams, 435 (const char * const *)paramvalues, paramlengths, 436 paramformats, resultmode); 437 } 438 for (i = 0; i < c->nparams; i++) { 439 free(paramvalues[i]); 440 } 441 free(paramvalues); 442 free(paramlengths); 443 free(paramformats); 444 if (!ret) { 445 errx(EXIT_FAILURE, "PQsendQueryPrepared: %s", 446 PQerrorMessage(conn)); 447 } 448 return 0; 449} 450 451int 452sendcmd(struct Xconn *xc, struct cmd *c, ...) 453{ 454 va_list ap; 455 int error; 456 457 va_start(ap, c); 458 error = vsendcmd(xc, 0, c, ap); 459 va_end(ap); 460 return error; 461} 462 463int 464sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...) 465{ 466 va_list ap; 467 int error; 468 469 va_start(ap, c); 470 error = vsendcmd(xc, resultmode, c, ap); 471 va_end(ap); 472 return error; 473} 474 475/* 476 * simplecmd: a convenient routine to execute a command which returns 477 * no rows synchronously. 478 */ 479 480int 481simplecmd(struct Xconn *xc, struct cmd *c, ...) 482{ 483 va_list ap; 484 int error; 485 486 va_start(ap, c); 487 error = vsendcmd(xc, 0, c, ap); 488 va_end(ap); 489 if (error != 0) { 490 return error; 491 } 492 return fetch_noresult(xc); 493} 494 495void 496fetchinit(struct fetchstatus *s, struct Xconn *xc) 497{ 498 s->xc = xc; 499 s->res = NULL; 500 s->cur = 0; 501 s->nrows = 0; 502 s->done = false; 503} 504 505static intmax_t 506getint(const char *str) 507{ 508 intmax_t i; 509 char *ep; 510 511 errno = 0; 512 i = strtoimax(str, &ep, 10); 513 assert(errno == 0); 514 assert(str[0] != 0); 515 assert(*ep == 0); 516 return i; 517} 518 519static int 520vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap) 521{ 522 PGconn *conn = s->xc->conn; 523 unsigned int i; 524 525 assert(conn != NULL); 526 if (s->res == NULL) { 527 ExecStatusType status; 528 int error; 529 530 pqwait(s->xc); 531 s->res = PQgetResult(conn); 532 if (s->res == NULL) { 533 s->done = true; 534 return ENOENT; 535 } 536 status = PQresultStatus(s->res); 537 if (status == PGRES_FATAL_ERROR) { 538 error = sqltoerrno( 539 PQresultErrorField(s->res, PG_DIAG_SQLSTATE)); 540 assert(error != 0); 541 dumperror(s->xc, s->res); 542 return error; 543 } 544 if (status != PGRES_TUPLES_OK) { 545 errx(1, "not tuples_ok: %s", 546 PQerrorMessage(conn)); 547 } 548 assert((unsigned int)PQnfields(s->res) == n); 549 s->nrows = PQntuples(s->res); 550 if (s->nrows == 0) { 551 DPRINTF("no rows\n"); 552 return ENOENT; 553 } 554 assert(s->nrows >= 1); 555 s->cur = 0; 556 } 557 for (i = 0; i < n; i++) { 558 size_t size; 559 560 assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0)); 561 DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n", 562 i, PQftype(s->res, i), types[i], 563 PQgetisnull(s->res, s->cur, i) ? "<NULL>" : 564 PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) : 565 "<BINARY>"); 566 assert(PQftype(s->res, i) == types[i]); 567 assert(!PQgetisnull(s->res, s->cur, i)); 568 switch(types[i]) { 569 case INT8OID: 570 *va_arg(ap, int64_t *) = 571 getint(PQgetvalue(s->res, s->cur, i)); 572 break; 573 case OIDOID: 574 *va_arg(ap, Oid *) = 575 getint(PQgetvalue(s->res, s->cur, i)); 576 break; 577 case INT4OID: 578 *va_arg(ap, int32_t *) = 579 getint(PQgetvalue(s->res, s->cur, i)); 580 break; 581 case TEXTOID: 582 *va_arg(ap, char **) = 583 estrdup(PQgetvalue(s->res, s->cur, i)); 584 break; 585 case BYTEA: 586 size = PQgetlength(s->res, s->cur, i); 587 memcpy(va_arg(ap, void *), 588 PQgetvalue(s->res, s->cur, i), size); 589 *va_arg(ap, size_t *) = size; 590 break; 591 default: 592 errx(EXIT_FAILURE, "%s unknown type %u", __func__, 593 types[i]); 594 } 595 } 596 s->cur++; 597 if (s->cur == s->nrows) { 598 PQclear(s->res); 599 s->res = NULL; 600 } 601 return 0; 602} 603 604int 605fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...) 606{ 607 va_list ap; 608 int error; 609 610 va_start(ap, types); 611 error = vfetchnext(s, n, types, ap); 612 va_end(ap); 613 return error; 614} 615 616void 617fetchdone(struct fetchstatus *s) 618{ 619 620 if (s->res != NULL) { 621 PQclear(s->res); 622 s->res = NULL; 623 } 624 if (!s->done) { 625 PGresult *res; 626 unsigned int n; 627 628 n = 0; 629 while ((res = PQgetResult(s->xc->conn)) != NULL) { 630 PQclear(res); 631 n++; 632 } 633 if (n > 0) { 634 DPRINTF("%u rows dropped\n", n); 635 } 636 } 637} 638 639int 640simplefetch(struct Xconn *xc, Oid type, ...) 641{ 642 struct fetchstatus s; 643 va_list ap; 644 int error; 645 646 fetchinit(&s, xc); 647 va_start(ap, type); 648 error = vfetchnext(&s, 1, &type, ap); 649 va_end(ap); 650 assert(error != 0 || s.res == NULL); 651 fetchdone(&s); 652 return error; 653} 654 655struct Xconn * 656begin(struct puffs_usermount *pu) 657{ 658 struct Xconn *xc = getxc(puffs_cc_getcc(pu)); 659 static struct cmd *c; 660 int error; 661 662 CREATECMD_NOPARAM(c, "BEGIN"); 663 assert(!xc->in_trans); 664 error = simplecmd(xc, c); 665 assert(error == 0); 666 assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS); 667 xc->in_trans = true; 668 return xc; 669} 670 671struct Xconn * 672begin_readonly(struct puffs_usermount *pu) 673{ 674 struct Xconn *xc = getxc(puffs_cc_getcc(pu)); 675 static struct cmd *c; 676 int error; 677 678 CREATECMD_NOPARAM(c, "BEGIN READ ONLY"); 679 assert(!xc->in_trans); 680 error = simplecmd(xc, c); 681 assert(error == 0); 682 assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS); 683 xc->in_trans = true; 684 return xc; 685} 686 687void 688rollback(struct Xconn *xc) 689{ 690 PGTransactionStatusType status; 691 692 /* 693 * check the status as we are not sure the status of our transaction 694 * after a failed commit. 695 */ 696 status = PQtransactionStatus(xc->conn); 697 assert(status != PQTRANS_ACTIVE); 698 assert(status != PQTRANS_UNKNOWN); 699 if (status != PQTRANS_IDLE) { 700 static struct cmd *c; 701 int error; 702 703 assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR); 704 CREATECMD_NOPARAM(c, "ROLLBACK"); 705 error = simplecmd(xc, c); 706 assert(error == 0); 707 } 708 DPRINTF("xc %p rollback %p\n", xc, xc->owner); 709 relxc(xc); 710} 711 712int 713commit(struct Xconn *xc) 714{ 715 static struct cmd *c; 716 int error; 717 718 CREATECMD_NOPARAM(c, "COMMIT"); 719 error = simplecmd(xc, c); 720 if (error == 0) { 721 DPRINTF("xc %p commit %p\n", xc, xc->owner); 722 relxc(xc); 723 } 724 return error; 725} 726 727int 728commit_sync(struct Xconn *xc) 729{ 730 static struct cmd *c; 731 int error; 732 733 assert(!pgfs_dosync); 734 CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON"); 735 error = simplecmd(xc, c); 736 assert(error == 0); 737 return commit(xc); 738} 739 740static void 741pgfs_notice_receiver(void *vp, const PGresult *res) 742{ 743 struct Xconn *xc = vp; 744 745 assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR); 746 fprintf(stderr, "got a notice on %p\n", xc); 747 dumperror(xc, res); 748} 749 750static int 751pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf, 752 int fd, int *done) 753{ 754 struct Xconn *xc; 755 PGconn *conn; 756 757 TAILQ_FOREACH(xc, &xclist, list) { 758 if (PQsocket(xc->conn) == fd) { 759 break; 760 } 761 } 762 assert(xc != NULL); 763 conn = xc->conn; 764 PQconsumeInput(conn); 765 if (!PQisBusy(conn)) { 766 if (xc->blocker != NULL) { 767 DPRINTF("schedule %p\n", xc->blocker); 768 puffs_cc_schedule(xc->blocker); 769 } else { 770 DPRINTF("no blockers\n"); 771 } 772 } 773 *done = 0; 774 return 0; 775} 776 777int 778pgfs_connectdb(struct puffs_usermount *pu, const char *dbname, 779 const char *dbuser, bool debug, bool synchronous, unsigned int nconn) 780{ 781 const char *keywords[3+1]; 782 const char *values[3]; 783 unsigned int i; 784 785 if (nconn > 32) { 786 /* 787 * limit from sizeof(cmd->prepared_mask) 788 */ 789 return EINVAL; 790 } 791 if (debug) { 792 pgfs_dodprintf = true; 793 } 794 if (synchronous) { 795 pgfs_dosync = true; 796 } 797 i = 0; 798 if (dbname != NULL) { 799 keywords[i] = "dbname"; 800 values[i] = dbname; 801 i++; 802 } 803 if (dbuser != NULL) { 804 keywords[i] = "user"; 805 values[i] = dbuser; 806 i++; 807 } 808 keywords[i] = "application_name"; 809 values[i] = "pgfs"; 810 i++; 811 keywords[i] = NULL; 812 puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL); 813 for (i = 0; i < nconn; i++) { 814 struct Xconn *xc; 815 struct Xconn *xc2; 816 static int xcid; 817 PGconn *conn; 818 struct cmd *c; 819 int error; 820 821 conn = PQconnectdbParams(keywords, values, 0); 822 if (conn == NULL) { 823 errx(EXIT_FAILURE, 824 "PQconnectdbParams: unknown failure"); 825 } 826 if (PQstatus(conn) != CONNECTION_OK) { 827 /* 828 * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW 829 */ 830 errx(EXIT_FAILURE, "PQconnectdbParams: %s", 831 PQerrorMessage(conn)); 832 } 833 DPRINTF("protocol version %d\n", PQprotocolVersion(conn)); 834 puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ); 835 xc = emalloc(sizeof(*xc)); 836 xc->conn = conn; 837 xc->blocker = NULL; 838 xc->owner = NULL; 839 xc->in_trans = false; 840 xc->id = xcid++; 841 assert(xc->id < 32); 842 PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc); 843 TAILQ_INSERT_HEAD(&xclist, xc, list); 844 xc2 = begin(pu); 845 assert(xc2 == xc); 846 c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE); 847 error = simplecmd(xc, c); 848 assert(error == 0); 849 freecmd(c); 850 c = createcmd("SET SESSION CHARACTERISTICS AS " 851 "TRANSACTION ISOLATION LEVEL REPEATABLE READ", 852 CMD_NOPREPARE); 853 error = simplecmd(xc, c); 854 assert(error == 0); 855 freecmd(c); 856 c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE); 857 error = simplecmd(xc, c); 858 assert(error == 0); 859 freecmd(c); 860 if (!pgfs_dosync) { 861 c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF", 862 CMD_NOPREPARE); 863 error = simplecmd(xc, c); 864 assert(error == 0); 865 freecmd(c); 866 } 867 if (debug) { 868 struct fetchstatus s; 869 static const Oid types[] = { INT8OID, }; 870 uint64_t pid; 871 872 c = createcmd("SELECT pg_backend_pid()::int8;", 873 CMD_NOPREPARE); 874 error = sendcmd(xc, c); 875 assert(error == 0); 876 fetchinit(&s, xc); 877 error = FETCHNEXT(&s, types, &pid); 878 fetchdone(&s); 879 assert(error == 0); 880 DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid); 881 } 882 error = commit(xc); 883 assert(error == 0); 884 assert(xc->owner == NULL); 885 } 886 /* 887 * XXX cleanup unlinked files here? what to do when the filesystem 888 * is shared? 889 */ 890 return 0; 891} 892 893struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq); 894struct puffs_cc *flusher = NULL; 895 896int 897flush_xacts(struct puffs_usermount *pu) 898{ 899 struct puffs_cc *cc = puffs_cc_getcc(pu); 900 struct Xconn *xc; 901 static struct cmd *c; 902 uint64_t dummy; 903 int error; 904 905 /* 906 * flush all previously issued asynchronous transactions. 907 * 908 * XXX 909 * unfortunately it seems that there is no clean way to tell 910 * PostgreSQL flush XLOG. we could perform a CHECKPOINT but it's 911 * too expensive and overkill for our purpose. 912 * besides, PostgreSQL has an optimization to skip XLOG flushing 913 * for transactions which didn't produce WAL records. 914 * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2) 915 * it means that an empty transaction ("BEGIN; COMMIT;"), which 916 * doesn't produce any WAL records, doesn't flush the XLOG even if 917 * synchronous_commit=on. we issues a dummy setval() to avoid the 918 * optimization. 919 * on the other hand, we try to avoid creating unnecessary WAL activity 920 * by serializing flushing and checking XLOG locations. 921 */ 922 923 assert(!pgfs_dosync); 924 if (flusher != NULL) { /* serialize flushers */ 925 DPRINTF("%p flush in progress %p\n", cc, flusher); 926 waiton(&flushwaitq, cc); 927 assert(flusher == NULL); 928 } 929 DPRINTF("%p start flushing\n", cc); 930 flusher = cc; 931retry: 932 xc = begin(pu); 933 CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE " 934 "pg_current_xlog_insert_location() <> pg_current_xlog_location()"); 935 error = sendcmd(xc, c); 936 if (error != 0) { 937 goto got_error; 938 } 939 error = simplefetch(xc, INT8OID, &dummy); 940 assert(error != 0 || dummy == 1); 941 if (error == ENOENT) { 942 /* 943 * there seems to be nothing to flush. 944 */ 945 DPRINTF("%p no sync\n", cc); 946 error = 0; 947 } 948 if (error != 0) { 949 goto got_error; 950 } 951 error = commit_sync(xc); 952 if (error != 0) { 953 goto got_error; 954 } 955 goto done; 956got_error: 957 rollback(xc); 958 if (error == EAGAIN) { 959 goto retry; 960 } 961done: 962 assert(flusher == cc); 963 flusher = NULL; 964 wakeup_one(&flushwaitq); 965 DPRINTF("%p end flushing error=%d\n", cc, error); 966 return error; 967} 968