1/* $NetBSD: pgfs_subs.c,v 1.2 2011/10/12 16:24:39 yamt Exp $ */ 2 3/*- 4 * Copyright (c)2010,2011 YAMAMOTO Takashi, 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29/* 30 * a file system server which stores the data in a PostgreSQL database. 31 */ 32 33/* 34 * we use large objects to store file contents. there are a few XXXs wrt it. 35 * 36 * - large objects don't obey the normal transaction semantics. 37 * 38 * - we use large object server-side functions directly (instead of via the 39 * libpq large object api) because: 40 * - we want to use asynchronous (in the sense of PQsendFoo) operations 41 * which is not available with the libpq large object api. 42 * - with the libpq large object api, there's no way to know details of 43 * an error because PGresult is freed in the library without saving 44 * PG_DIAG_SQLSTATE etc. 45 */ 46 47#include <sys/cdefs.h> 48#ifndef lint 49__RCSID("$NetBSD: pgfs_subs.c,v 1.2 2011/10/12 16:24:39 yamt Exp $"); 50#endif /* not lint */ 51 52#include <assert.h> 53#include <err.h> 54#include <errno.h> 55#include <puffs.h> 56#include <inttypes.h> 57#include <stdarg.h> 58#include <stdbool.h> 59#include <stdio.h> 60#include <stdlib.h> 61#include <time.h> 62#include <util.h> 63 64#include <libpq-fe.h> 65#include <libpq/libpq-fs.h> /* INV_* */ 66 67#include "pgfs.h" 68#include "pgfs_db.h" 69#include "pgfs_debug.h" 70#include "pgfs_waitq.h" 71#include "pgfs_subs.h" 72 73const char * const vtype_table[] = { 74 [VREG] = "regular", 75 [VDIR] = "directory", 76 [VLNK] = "link", 77}; 78 79static unsigned int 80tovtype(const char *type) 81{ 82 unsigned int i; 83 84 for (i = 0; i < __arraycount(vtype_table); i++) { 85 if (vtype_table[i] == NULL) { 86 continue; 87 } 88 if (!strcmp(type, vtype_table[i])) { 89 return i; 90 } 91 } 92 assert(0); 93 return 0; 94} 95 96static const char * 97fromvtype(enum vtype vtype) 98{ 99 100 if (vtype < __arraycount(vtype_table)) { 101 assert(vtype_table[vtype] != NULL); 102 return vtype_table[vtype]; 103 } 104 return NULL; 105} 106 107/* 108 * fileid_lock stuff below is to keep ordering of operations for a file. 109 * it is a workaround for the lack of operation barriers in the puffs 110 * protocol. 111 * 112 * currently we do this locking only for SETATTR, GETATTR, and WRITE as 113 * they are known to be reorder-unsafe. they are sensitive to the file 114 * attributes, mainly the file size. note that as the kernel issues async 115 * SETATTR/WRITE requests, vnode lock doesn't prevent GETATTR from seeing 116 * the stale attributes. 117 * 118 * we are relying on waiton/wakeup being a FIFO. 119 */ 120 121struct fileid_lock_handle { 122 TAILQ_ENTRY(fileid_lock_handle) list; 123 fileid_t fileid; 124 struct puffs_cc *owner; /* diagnostic only */ 125 struct waitq waitq; 126}; 127 128TAILQ_HEAD(, fileid_lock_handle) fileid_lock_list = 129 TAILQ_HEAD_INITIALIZER(fileid_lock_list); 130struct waitq fileid_lock_waitq = TAILQ_HEAD_INITIALIZER(fileid_lock_waitq); 131 132/* 133 * fileid_lock: serialize requests for the fileid. 134 * 135 * this function should be the first yieldable point in a puffs callback. 136 */ 137 138struct fileid_lock_handle * 139fileid_lock(fileid_t fileid, struct puffs_cc *cc) 140{ 141 struct fileid_lock_handle *lock; 142 143 TAILQ_FOREACH(lock, &fileid_lock_list, list) { 144 if (lock->fileid == fileid) { 145 DPRINTF("fileid wait %" PRIu64 " cc %p\n", fileid, cc); 146 assert(lock->owner != cc); 147 waiton(&lock->waitq, cc); /* enter FIFO */ 148 assert(lock->owner == cc); 149 return lock; 150 } 151 } 152 lock = emalloc(sizeof(*lock)); 153 lock->fileid = fileid; 154 lock->owner = cc; 155 DPRINTF("fileid lock %" PRIu64 " cc %p\n", lock->fileid, cc); 156 waitq_init(&lock->waitq); 157 TAILQ_INSERT_HEAD(&fileid_lock_list, lock, list); 158 return lock; 159} 160 161void 162fileid_unlock(struct fileid_lock_handle *lock) 163{ 164 165 DPRINTF("fileid unlock %" PRIu64 "\n", lock->fileid); 166 assert(lock != NULL); 167 assert(lock->owner != NULL); 168 /* 169 * perform direct-handoff to the first waiter. 170 * 171 * a handoff is essential to keep the order of requests. 172 */ 173 lock->owner = wakeup_one(&lock->waitq); 174 if (lock->owner != NULL) { 175 return; 176 } 177 /* 178 * no one is waiting this fileid. 179 */ 180 TAILQ_REMOVE(&fileid_lock_list, lock, list); 181 free(lock); 182} 183 184/* 185 * timespec_to_pgtimestamp: create a text representation of timestamp which 186 * can be recognized by the database server. 187 * 188 * it's caller's responsibility to free(3) the result. 189 */ 190 191int 192timespec_to_pgtimestamp(const struct timespec *tv, char **resultp) 193{ 194 /* 195 * XXX is there any smarter way? 196 */ 197 char buf1[1024]; 198 char buf2[1024]; 199 struct tm tm_store; 200 struct tm *tm; 201 202 tm = gmtime_r(&tv->tv_sec, &tm_store); 203 if (tm == NULL) { 204 assert(errno != 0); 205 return errno; 206 } 207 strftime(buf1, sizeof(buf1), "%Y%m%dT%H%M%S", tm); 208 snprintf(buf2, sizeof(buf2), "%s.%ju", buf1, 209 (uintmax_t)tv->tv_nsec / 1000); 210 *resultp = estrdup(buf2); 211 return 0; 212} 213 214int 215my_lo_truncate(struct Xconn *xc, int32_t fd, int32_t size) 216{ 217 static struct cmd *c; 218 int32_t ret; 219 int error; 220 221 CREATECMD(c, "SELECT lo_truncate($1, $2)", INT4OID, INT4OID); 222 error = sendcmd(xc, c, fd, size); 223 if (error != 0) { 224 return error; 225 } 226 error = simplefetch(xc, INT4OID, &ret); 227 if (error != 0) { 228 if (error == EEXIST) { 229 /* 230 * probably the insertion of the new-sized page 231 * caused a duplicated key error. retry. 232 */ 233 DPRINTF("map EEXIST to EAGAIN\n"); 234 error = EAGAIN; 235 } 236 return error; 237 } 238 assert(ret == 0); 239 return 0; 240} 241 242int 243my_lo_lseek(struct Xconn *xc, int32_t fd, int32_t offset, int32_t whence, 244 int32_t *retp) 245{ 246 static struct cmd *c; 247 int32_t ret; 248 int error; 249 250 CREATECMD(c, "SELECT lo_lseek($1, $2, $3)", INT4OID, INT4OID, INT4OID); 251 error = sendcmd(xc, c, fd, offset, whence); 252 if (error != 0) { 253 return error; 254 } 255 error = simplefetch(xc, INT4OID, &ret); 256 if (error != 0) { 257 return error; 258 } 259 if (retp != NULL) { 260 *retp = ret; 261 } 262 return 0; 263} 264 265int 266my_lo_read(struct Xconn *xc, int32_t fd, void *buf, size_t size, 267 size_t *resultsizep) 268{ 269 static struct cmd *c; 270 size_t resultsize; 271 int error; 272 273 CREATECMD(c, "SELECT loread($1, $2)", INT4OID, INT4OID); 274 error = sendcmdx(xc, 1, c, fd, (int32_t)size); 275 if (error != 0) { 276 return error; 277 } 278 error = simplefetch(xc, BYTEA, buf, &resultsize); 279 if (error != 0) { 280 return error; 281 } 282 *resultsizep = resultsize; 283 if (size != resultsize) { 284 DPRINTF("shortread? %zu != %zu\n", size, resultsize); 285 } 286 return 0; 287} 288 289int 290my_lo_write(struct Xconn *xc, int32_t fd, const void *buf, size_t size, 291 size_t *resultsizep) 292{ 293 static struct cmd *c; 294 int32_t resultsize; 295 int error; 296 297 CREATECMD(c, "SELECT lowrite($1, $2)", INT4OID, BYTEA); 298 error = sendcmd(xc, c, fd, buf, (int32_t)size); 299 if (error != 0) { 300 return error; 301 } 302 error = simplefetch(xc, INT4OID, &resultsize); 303 if (error != 0) { 304 if (error == EEXIST) { 305 /* 306 * probably the insertion of the new data page 307 * caused a duplicated key error. retry. 308 */ 309 DPRINTF("map EEXIST to EAGAIN\n"); 310 error = EAGAIN; 311 } 312 return error; 313 } 314 *resultsizep = resultsize; 315 if (size != (size_t)resultsize) { 316 DPRINTF("shortwrite? %zu != %zu\n", size, (size_t)resultsize); 317 } 318 return 0; 319} 320 321int 322my_lo_open(struct Xconn *xc, Oid loid, int32_t mode, int32_t *fdp) 323{ 324 static struct cmd *c; 325 int error; 326 327 CREATECMD(c, "SELECT lo_open($1, $2)", OIDOID, INT4OID); 328 error = sendcmd(xc, c, loid, mode); 329 if (error != 0) { 330 return error; 331 } 332 return simplefetch(xc, INT4OID, fdp); 333} 334 335int 336my_lo_close(struct Xconn *xc, int32_t fd) 337{ 338 static struct cmd *c; 339 int32_t ret; 340 int error; 341 342 CREATECMD(c, "SELECT lo_close($1)", INT4OID); 343 error = sendcmd(xc, c, fd); 344 if (error != 0) { 345 return error; 346 } 347 error = simplefetch(xc, INT4OID, &ret); 348 if (error != 0) { 349 return error; 350 } 351 assert(ret == 0); 352 return 0; 353} 354 355static int 356lo_lookup_by_fileid(struct Xconn *xc, fileid_t fileid, Oid *idp) 357{ 358 static struct cmd *c; 359 static const Oid types[] = { OIDOID, }; 360 struct fetchstatus s; 361 int error; 362 363 CREATECMD(c, "SELECT loid FROM datafork WHERE fileid = $1", INT8OID); 364 error = sendcmd(xc, c, fileid); 365 if (error != 0) { 366 return error; 367 } 368 fetchinit(&s, xc); 369 error = FETCHNEXT(&s, types, idp); 370 fetchdone(&s); 371 DPRINTF("error %d\n", error); 372 return error; 373} 374 375int 376lo_open_by_fileid(struct Xconn *xc, fileid_t fileid, int mode, int *fdp) 377{ 378 Oid loid; 379 int fd; 380 int error; 381 382 error = lo_lookup_by_fileid(xc, fileid, &loid); 383 if (error != 0) { 384 return error; 385 } 386 error = my_lo_open(xc, loid, mode, &fd); 387 if (error != 0) { 388 return error; 389 } 390 *fdp = fd; 391 return 0; 392} 393 394static int 395getsize(struct Xconn *xc, fileid_t fileid, int *resultp) 396{ 397 int32_t size; 398 int fd; 399 int error; 400 401 error = lo_open_by_fileid(xc, fileid, INV_READ, &fd); 402 if (error != 0) { 403 return error; 404 } 405 error = my_lo_lseek(xc, fd, 0, SEEK_END, &size); 406 if (error != 0) { 407 return error; 408 } 409 error = my_lo_close(xc, fd); 410 if (error != 0) { 411 return error; 412 } 413 *resultp = size; 414 return 0; 415} 416 417#define GETATTR_TYPE 0x00000001 418#define GETATTR_NLINK 0x00000002 419#define GETATTR_SIZE 0x00000004 420#define GETATTR_MODE 0x00000008 421#define GETATTR_UID 0x00000010 422#define GETATTR_GID 0x00000020 423#define GETATTR_TIME 0x00000040 424#define GETATTR_ALL \ 425 (GETATTR_TYPE|GETATTR_NLINK|GETATTR_SIZE|GETATTR_MODE| \ 426 GETATTR_UID|GETATTR_GID|GETATTR_TIME) 427 428int 429getattr(struct Xconn *xc, fileid_t fileid, struct vattr *va, unsigned int mask) 430{ 431 char *type; 432 long long atime_s; 433 long long atime_us; 434 long long ctime_s; 435 long long ctime_us; 436 long long mtime_s; 437 long long mtime_us; 438 long long btime_s; 439 long long btime_us; 440 uint64_t mode; 441 long long uid; 442 long long gid; 443 long long nlink; 444 long long rev; 445 struct fetchstatus s; 446 int error; 447 448 if (mask == 0) { 449 return 0; 450 } 451 /* 452 * unless explicitly requested, avoid fetching timestamps as they 453 * are a little more expensive than other simple attributes. 454 */ 455 if ((mask & GETATTR_TIME) != 0) { 456 static struct cmd *c; 457 static const Oid types[] = { 458 TEXTOID, 459 INT8OID, 460 INT8OID, 461 INT8OID, 462 INT8OID, 463 INT8OID, 464 INT8OID, 465 INT8OID, 466 INT8OID, 467 INT8OID, 468 INT8OID, 469 INT8OID, 470 INT8OID, 471 INT8OID, 472 }; 473 474 CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev, " 475 "extract(epoch from date_trunc('second', atime))::int8, " 476 "extract(microseconds from atime)::int8, " 477 "extract(epoch from date_trunc('second', ctime))::int8, " 478 "extract(microseconds from ctime)::int8, " 479 "extract(epoch from date_trunc('second', mtime))::int8, " 480 "extract(microseconds from mtime)::int8, " 481 "extract(epoch from date_trunc('second', btime))::int8, " 482 "extract(microseconds from btime)::int8 " 483 "FROM file " 484 "WHERE fileid = $1", INT8OID); 485 error = sendcmd(xc, c, fileid); 486 if (error != 0) { 487 return error; 488 } 489 fetchinit(&s, xc); 490 error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink, 491 &rev, 492 &atime_s, &atime_us, 493 &ctime_s, &ctime_us, 494 &mtime_s, &mtime_us, 495 &btime_s, &btime_us); 496 } else { 497 static struct cmd *c; 498 static const Oid types[] = { 499 TEXTOID, 500 INT8OID, 501 INT8OID, 502 INT8OID, 503 INT8OID, 504 INT8OID, 505 }; 506 507 CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev " 508 "FROM file " 509 "WHERE fileid = $1", INT8OID); 510 error = sendcmd(xc, c, fileid); 511 if (error != 0) { 512 return error; 513 } 514 fetchinit(&s, xc); 515 error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink, 516 &rev); 517 } 518 fetchdone(&s); 519 if (error != 0) { 520 return error; 521 } 522 memset(va, 0xaa, sizeof(*va)); /* fill with garbage for debug */ 523 va->va_type = tovtype(type); 524 free(type); 525 va->va_mode = mode; 526 va->va_uid = uid; 527 va->va_gid = gid; 528 if (nlink > 0 && va->va_type == VDIR) { 529 nlink++; /* "." */ 530 } 531 va->va_nlink = nlink; 532 va->va_fileid = fileid; 533 va->va_atime.tv_sec = atime_s; 534 va->va_atime.tv_nsec = atime_us * 1000; 535 va->va_ctime.tv_sec = ctime_s; 536 va->va_ctime.tv_nsec = ctime_us * 1000; 537 va->va_mtime.tv_sec = mtime_s; 538 va->va_mtime.tv_nsec = mtime_us * 1000; 539 va->va_birthtime.tv_sec = btime_s; 540 va->va_birthtime.tv_nsec = btime_us * 1000; 541 va->va_blocksize = LOBLKSIZE; 542 va->va_gen = 1; 543 va->va_filerev = rev; 544 if ((mask & GETATTR_SIZE) != 0) { 545 int size; 546 547 size = 0; 548 if (va->va_type == VREG || va->va_type == VLNK) { 549 error = getsize(xc, fileid, &size); 550 if (error != 0) { 551 return error; 552 } 553 } else if (va->va_type == VDIR) { 554 size = 100; /* XXX */ 555 } 556 va->va_size = size; 557 } 558 /* 559 * XXX va_bytes: likely wrong due to toast compression. 560 * there's no cheap way to get the compressed size of LO. 561 */ 562 va->va_bytes = va->va_size; 563 va->va_flags = 0; 564 return 0; 565} 566 567int 568update_mctime(struct Xconn *xc, fileid_t fileid) 569{ 570 static struct cmd *c; 571 572 CREATECMD(c, 573 "UPDATE file " 574 "SET mtime = current_timestamp, ctime = current_timestamp, " 575 "rev = rev + 1 " 576 "WHERE fileid = $1", INT8OID); 577 return simplecmd(xc, c, fileid); 578} 579 580int 581update_atime(struct Xconn *xc, fileid_t fileid) 582{ 583 static struct cmd *c; 584 585 CREATECMD(c, 586 "UPDATE file SET atime = current_timestamp WHERE fileid = $1", 587 INT8OID); 588 return simplecmd(xc, c, fileid); 589} 590 591int 592update_mtime(struct Xconn *xc, fileid_t fileid) 593{ 594 static struct cmd *c; 595 596 CREATECMD(c, 597 "UPDATE file " 598 "SET mtime = current_timestamp, rev = rev + 1 " 599 "WHERE fileid = $1", INT8OID); 600 return simplecmd(xc, c, fileid); 601} 602 603int 604update_ctime(struct Xconn *xc, fileid_t fileid) 605{ 606 static struct cmd *c; 607 608 CREATECMD(c, 609 "UPDATE file SET ctime = current_timestamp WHERE fileid = $1", 610 INT8OID); 611 return simplecmd(xc, c, fileid); 612} 613 614int 615update_nlink(struct Xconn *xc, fileid_t fileid, int delta) 616{ 617 static struct cmd *c; 618 619 CREATECMD(c, 620 "UPDATE file " 621 "SET nlink = nlink + $1 " 622 "WHERE fileid = $2", 623 INT8OID, INT8OID); 624 return simplecmd(xc, c, (int64_t)delta, fileid); 625} 626 627int 628lookupp(struct Xconn *xc, fileid_t fileid, fileid_t *parent) 629{ 630 static struct cmd *c; 631 static const Oid types[] = { INT8OID, }; 632 struct fetchstatus s; 633 int error; 634 635 CREATECMD(c, "SELECT parent_fileid FROM dirent " 636 "WHERE child_fileid = $1 LIMIT 1", INT8OID); 637 error = sendcmd(xc, c, fileid); 638 if (error != 0) { 639 return error; 640 } 641 fetchinit(&s, xc); 642 error = FETCHNEXT(&s, types, parent); 643 fetchdone(&s); 644 if (error != 0) { 645 return error; 646 } 647 return 0; 648} 649 650int 651mkfile(struct Xconn *xc, enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, 652 fileid_t *idp) 653{ 654 static struct cmd *c; 655 const char *type; 656 int error; 657 658 type = fromvtype(vtype); 659 if (type == NULL) { 660 return EOPNOTSUPP; 661 } 662 CREATECMD(c, 663 "INSERT INTO file " 664 "(fileid, type, mode, uid, gid, nlink, rev, " 665 "atime, ctime, mtime, btime) " 666 "VALUES(nextval('fileid_seq'), $1::filetype, $2, $3, $4, 0, 0, " 667 "current_timestamp, " 668 "current_timestamp, " 669 "current_timestamp, " 670 "current_timestamp) " 671 "RETURNING fileid", TEXTOID, INT8OID, INT8OID, INT8OID); 672 error = sendcmd(xc, c, type, (uint64_t)mode, (uint64_t)uid, 673 (uint64_t)gid); 674 if (error != 0) { 675 return error; 676 } 677 return simplefetch(xc, INT8OID, idp); 678} 679 680int 681linkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child) 682{ 683 static struct cmd *c; 684 int error; 685 686 CREATECMD(c, 687 "INSERT INTO dirent " 688 "(parent_fileid, name, child_fileid) " 689 "VALUES($1, $2, $3)", INT8OID, TEXTOID, INT8OID); 690 error = simplecmd(xc, c, parent, name, child); 691 if (error != 0) { 692 return error; 693 } 694 error = update_nlink(xc, child, 1); 695 if (error != 0) { 696 return error; 697 } 698 return update_mtime(xc, parent); 699} 700 701int 702unlinkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child) 703{ 704 static struct cmd *c; 705 int error; 706 707 /* 708 * in addition to the primary key, we check child_fileid as well here 709 * to avoid removing an entry which was appeared after our VOP_LOOKUP. 710 */ 711 CREATECMD(c, 712 "DELETE FROM dirent " 713 "WHERE parent_fileid = $1 AND name = $2 AND child_fileid = $3", 714 INT8OID, TEXTOID, INT8OID); 715 error = simplecmd(xc, c, parent, name, child); 716 if (error != 0) { 717 return error; 718 } 719 error = update_nlink(xc, child, -1); 720 if (error != 0) { 721 return error; 722 } 723 error = update_mtime(xc, parent); 724 if (error != 0) { 725 return error; 726 } 727 return update_ctime(xc, child); 728} 729 730int 731mklinkfile(struct Xconn *xc, fileid_t parent, const char *name, 732 enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *idp) 733{ 734 fileid_t fileid; 735 int error; 736 737 error = mkfile(xc, vtype, mode, uid, gid, &fileid); 738 if (error != 0) { 739 return error; 740 } 741 error = linkfile(xc, parent, name, fileid); 742 if (error != 0) { 743 return error; 744 } 745 if (idp != NULL) { 746 *idp = fileid; 747 } 748 return 0; 749} 750 751int 752mklinkfile_lo(struct Xconn *xc, fileid_t parent_fileid, const char *name, 753 enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *fileidp, 754 int *loidp) 755{ 756 static struct cmd *c; 757 fileid_t new_fileid; 758 int loid; 759 int error; 760 761 error = mklinkfile(xc, parent_fileid, name, vtype, mode, uid, gid, 762 &new_fileid); 763 if (error != 0) { 764 return error; 765 } 766 CREATECMD(c, 767 "INSERT INTO datafork (fileid, loid) " 768 "VALUES($1, lo_creat(-1)) " 769 "RETURNING loid", INT8OID); 770 error = sendcmd(xc, c, new_fileid); 771 if (error != 0) { 772 return error; 773 } 774 error = simplefetch(xc, OIDOID, &loid); 775 if (error != 0) { 776 return error; 777 } 778 if (fileidp != NULL) { 779 *fileidp = new_fileid; 780 } 781 if (loidp != NULL) { 782 *loidp = loid; 783 } 784 return 0; 785} 786 787int 788cleanupfile(struct Xconn *xc, fileid_t fileid, struct vattr *va) 789{ 790 static struct cmd *c; 791 792 /* 793 * XXX what to do when the filesystem is shared? 794 */ 795 796 if (va->va_type == VREG || va->va_type == VLNK) { 797 static struct cmd *c_datafork; 798 int32_t ret; 799 int error; 800 801 CREATECMD(c_datafork, 802 "WITH loids AS (DELETE FROM datafork WHERE fileid = $1 " 803 "RETURNING loid) SELECT lo_unlink(loid) FROM loids", 804 INT8OID); 805 error = sendcmd(xc, c_datafork, fileid); 806 if (error != 0) { 807 return error; 808 } 809 error = simplefetch(xc, INT4OID, &ret); 810 if (error != 0) { 811 return error; 812 } 813 if (ret != 1) { 814 return EIO; /* lo_unlink failed */ 815 } 816 } 817 CREATECMD(c, "DELETE FROM file WHERE fileid = $1", INT8OID); 818 return simplecmd(xc, c, fileid); 819} 820 821/* 822 * check_path: do locking and check to prevent a rename from creating loop. 823 * 824 * lock the dirents between child_fileid and the root directory. 825 * if gate_fileid is appeared in the path, return EINVAL. 826 * caller should ensure that child_fileid is of VDIR beforehand. 827 * 828 * we uses FOR SHARE row level locks as poor man's predicate locks. 829 * 830 * the following is an example to show why we need to lock the path. 831 * 832 * consider: 833 * "mkdir -p /a/b/c/d/e/f && mkdir -p /1/2/3/4/5/6" 834 * and then 835 * thread 1 is doing "mv /a/b /1/2/3/4/5/6" 836 * thread 2 is doing "mv /1/2 /a/b/c/d/e/f" 837 * 838 * a possible consequence: 839 * thread 1: check_path -> success 840 * thread 2: check_path -> success 841 * thread 1: modify directories -> block on row-level lock 842 * thread 2: modify directories -> block on row-level lock 843 * -> deadlock detected 844 * -> rollback and retry 845 * 846 * another possible consequence: 847 * thread 1: check_path -> success 848 * thread 1: modify directory entries -> success 849 * thread 2: check_path -> block on row-level lock 850 * thread 1: commit 851 * thread 2: acquire the lock and notices the row is updated 852 * -> serialization error 853 * -> rollback and retry 854 * 855 * XXX it might be better to use real serializable transactions, 856 * which will be available for PostgreSQL 9.1 857 */ 858 859int 860check_path(struct Xconn *xc, fileid_t gate_fileid, fileid_t child_fileid) 861{ 862 static struct cmd *c; 863 fileid_t parent_fileid; 864 struct fetchstatus s; 865 int error; 866 867 CREATECMD(c, 868 "WITH RECURSIVE r AS " 869 "( " 870 "SELECT parent_fileid, cookie, child_fileid " 871 "FROM dirent " 872 "WHERE child_fileid = $1 " 873 "UNION ALL " 874 "SELECT d.parent_fileid, d.cookie, " 875 "d.child_fileid " 876 "FROM dirent AS d INNER JOIN r " 877 "ON d.child_fileid = r.parent_fileid " 878 ") " 879 "SELECT d.parent_fileid " 880 "FROM dirent d " 881 "JOIN r " 882 "ON d.cookie = r.cookie " 883 "FOR SHARE", INT8OID); 884 error = sendcmd(xc, c, child_fileid); 885 if (error != 0) { 886 return error; 887 } 888 fetchinit(&s, xc); 889 do { 890 static const Oid types[] = { INT8OID, }; 891 892 error = FETCHNEXT(&s, types, &parent_fileid); 893 if (error == ENOENT) { 894 fetchdone(&s); 895 return 0; 896 } 897 if (error != 0) { 898 fetchdone(&s); 899 return error; 900 } 901 } while (gate_fileid != parent_fileid); 902 fetchdone(&s); 903 return EINVAL; 904} 905 906int 907isempty(struct Xconn *xc, fileid_t fileid, bool *emptyp) 908{ 909 int32_t dummy; 910 static struct cmd *c; 911 int error; 912 913 CREATECMD(c, 914 "SELECT 1 FROM dirent " 915 "WHERE parent_fileid = $1 LIMIT 1", INT8OID); 916 error = sendcmd(xc, c, fileid); 917 if (error != 0) { 918 return error; 919 } 920 error = simplefetch(xc, INT4OID, &dummy); 921 assert(error != 0 || dummy == 1); 922 if (error == ENOENT) { 923 *emptyp = true; 924 error = 0; 925 } else { 926 *emptyp = false; 927 } 928 return error; 929} 930