1/*
2 * Copyright (c) 2010 Apple Inc. All rights reserved.
3 *
4 * @APPLE_LICENSE_HEADER_START@
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * 1.  Redistributions of source code must retain the above copyright
11 *     notice, this list of conditions and the following disclaimer.
12 * 2.  Redistributions in binary form must reproduce the above copyright
13 *     notice, this list of conditions and the following disclaimer in the
14 *     documentation and/or other materials provided with the distribution.
15 * 3.  Neither the name of Apple Inc. ("Apple") nor the names of its
16 *     contributors may be used to endorse or promote products derived from
17 *     this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY APPLE AND ITS CONTRIBUTORS "AS IS" AND ANY
20 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 * DISCLAIMED. IN NO EVENT SHALL APPLE OR ITS CONTRIBUTORS BE LIABLE FOR ANY
23 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 *
30 * Portions of this software have been released under the following terms:
31 *
32 * (c) Copyright 1989-1993 OPEN SOFTWARE FOUNDATION, INC.
33 * (c) Copyright 1989-1993 HEWLETT-PACKARD COMPANY
34 * (c) Copyright 1989-1993 DIGITAL EQUIPMENT CORPORATION
35 *
36 * To anyone who acknowledges that this file is provided "AS IS"
37 * without any express or implied warranty:
38 * permission to use, copy, modify, and distribute this file for any
39 * purpose is hereby granted without fee, provided that the above
40 * copyright notices and this notice appears in all source code copies,
41 * and that none of the names of Open Software Foundation, Inc., Hewlett-
42 * Packard Company or Digital Equipment Corporation be used
43 * in advertising or publicity pertaining to distribution of the software
44 * without specific, written prior permission.  Neither Open Software
45 * Foundation, Inc., Hewlett-Packard Company nor Digital
46 * Equipment Corporation makes any representations about the suitability
47 * of this software for any purpose.
48 *
49 * Copyright (c) 2007, Novell, Inc. All rights reserved.
50 * Redistribution and use in source and binary forms, with or without
51 * modification, are permitted provided that the following conditions
52 * are met:
53 *
54 * 1.  Redistributions of source code must retain the above copyright
55 *     notice, this list of conditions and the following disclaimer.
56 * 2.  Redistributions in binary form must reproduce the above copyright
57 *     notice, this list of conditions and the following disclaimer in the
58 *     documentation and/or other materials provided with the distribution.
59 * 3.  Neither the name of Novell Inc. nor the names of its contributors
60 *     may be used to endorse or promote products derived from this
61 *     this software without specific prior written permission.
62 *
63 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
64 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
65 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
66 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY
67 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
68 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
69 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
70 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
71 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
72 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
73 *
74 * @APPLE_LICENSE_HEADER_END@
75 */
76
77/*
78**
79**
80**  NAME:
81**
82**      dsm.c
83**
84**  FACILITY:
85**
86**      Data Storage Manager (DSM)
87**
88**  ABSTRACT:
89**
90**  The DSM implements a heap storage interface wherein records are strongly
91**  associated with records in a file, such that they can be stably stored
92**  upon modification.  The basic paradigm is that the client ALLOCATEs a
93**  block of some requested size, modifies it in memory, and WRITEs it;
94**  successful completion of the WRITE implies that the record has been stably
95**  stored in the file.  DSM makes this assumption about the underlying file
96**  system and operations thereon: there is a basic page size which is well
97**  known (compile time constant), and file operations operate on pages as
98**  units.  In particular, a write operation which is entirely within a page
99**  (i.e. a write of a page or a subset of a page) will succeed or fail
100**  atomically, with no intermediate state.  We use this to stably write
101**  new blocks, and free existing ones.  There is no atomic update operation;
102**  the caller must establish protocol for doing updates of existing records.
103**
104**  The memory model is simple.  The heap file begins with a header page
105**  containing version information etc. (including the size of the stably
106**  initialized portion of the file).  The rest of the file is some integral
107**  number of pages long, with records laid out in sequence.  Each record
108**  begins with a preheader (to us it's a header, to the client application
109**  an invisible preheader) which contains the offset of the record in the
110**  file (so we can easily update that record), the size of the record not
111**  including the preheader, a 'link' field used to string the free list
112**  in memory, a 'free' indication, and a 'magic cookie' used to flag validly
113**  allocated records in memory.  The portion of the record after the preheader
114**  is what we give the client; note that there may be slop at the end of
115**  the record beyond the amount of space requested, for alignment.  The
116**  preheader and the data portion of the record are both aligned to an 8-byte
117**  boundary.  The first 64 bytes of each record, including the preheader,
118**  fit within a page, so the preheader (16 bytes currently) and the first
119**  48 bytes of the user data can be atomically updated.
120**
121**  When initially created, no data pages are allocated to the file (just
122**  the header is filled in).  When the file needs to grow (because no suitable
123**  free block is available) it is extended by an integral number of pages,
124**  at least large enough to accomodate the client request, and at least
125**  a minimum size.  A chunk of memory of the growth size is allocated at
126**  the same time to act as the virtual mirror of this new section of the
127**  file.  As the file grows over time a list of such chunks is formed.  When
128**  a preexisting file is opened a single chunk representing the actual length
129**  of the file is created, and the entire file read into it.  Within memory
130**  a free list is constructed linking all free blocks; this list is
131**  reconstructed at open time by traversing the entire (virtual) file.
132**
133**  A given data store can be specified to be volatile at open/create time
134**  by giving a null or empty filename, in which case there will be no backing
135**  file.  This allows PL/I-style storage areas, where an arbitrary number
136**  of records can be allocated in memory in a data store and freed with
137**  a single operation of closing the data store handle, whereas each record
138**  would have to be kept track of and freed individually with conventional
139**  malloc.
140**
141**
142*/
143#if HAVE_CONFIG_H
144#include <config.h>
145#endif
146
147#include <dce/dce.h>
148#include <string.h>
149#include <stdlib.h>
150#include <unistd.h>
151#include <stdio.h>
152#include <fcntl.h>
153
154#define _I_AM_DSM_C_
155#include "dsm_p.h"  /* private include file */
156
157/** verify_dsh
158
159    Make sure this thing looks like a good data store handle.  It shouldn't
160    be null, and should have the proper magic cookie.
161*/
162
163#define verify_dsh(dsh) { \
164    if ((dsh) == NULL) SIGNAL(dsm_err_invalid_handle);  \
165    if ((unsigned32)(dsh)->cookie != DSM_COOKIE) SIGNAL(dsm_err_invalid_handle);  \
166  }
167
168/** dsm_create
169
170    Create a new data store.  This involves creating a new file based on
171    the user-provided name, initializing its header, and allocating a data
172    store record to represent it.  As created the file is empty other than
173    the header, and thus will have to grow on the first dsm_allocate()
174    call.  If the filename is null/empty no backing file will exist for this
175    datastore.
176*/
177
178public void dsm_create(fname,new_dsh,st)
179unsigned char  *fname;      /* filename (null-terminated) */
180dsm_handle_t   *new_dsh;    /* (output) resulting data store handle */
181error_status_t *st;         /* (output) status */
182{
183    int                 fd = -1;            /* file descriptor of created file */
184    dsm_handle          dsh;                /* allocated data store handle */
185    file_hdr_t          fhdr;               /* file header (first page) */
186
187    CLEANUP {      /* upon errors */
188        if (fd != -1) close(fd);            /* if file is open close it */
189        return;                             /* return with error status */
190    }
191
192    if (fname != NULL && ustrlen(fname) != 0) {
193        fd = create_file(fname);                /* try to create the file */
194        if (fd == -1) SIGNAL(dsm_err_create_failed);  /* error if fails */
195        dsm__lock_file(fd, st);         /* lock the file for unique access */
196        PROP_BAD_ST;
197
198        fhdr.version = dsm_version;            /* file has current version */
199        fhdr.pages = 0;                         /* no currently allocated data pages (only hdr) */
200        if (dcethread_write(fd,&fhdr,PAGE_SIZE) != PAGE_SIZE) SIGNAL(dsm_err_file_io_error); /* write hdr */
201    }
202
203    dsh = NEW(dsm_db_t);                    /* allocate data store handle */
204    if (dsh == NULL) SIGNAL(dsm_err_no_memory);    /* make sure we got it */
205
206    dsh->fd = fd;                           /* remember fd */
207    if (fname == NULL) {
208        dsh->fname = (char *) malloc(1);
209        dsh->fname[0] = 0;
210    }
211    else {
212        dsh->fname = (char *) malloc(ustrlen(fname)+1);  /* and filename */
213        ustrlcpy(dsh->fname, fname, ustrlen(fname)+1);
214    }
215    dsh->freelist = NULL;                   /* no free memory */
216    dsh->map = NULL;                        /* therefore no file mapping */
217    dsh->pages = 0;                         /* and no data pages in the file */
218    dsh->cookie = DSM_COOKIE;               /* magic cookie */
219    dsh->coalesced = false;                 /* no coalesce yet */
220    dsh->pending = 0;                       /* no allocated/unwritten blocks */
221    cache_clear(dsh);                       /* nothing in cache */
222
223    *new_dsh = (dsm_handle_t)dsh;           /* fill in user's dsh */
224
225    CLEAR_ST;
226
227} /* dsm_create */
228
229/** dsm_open
230
231    Open a preexisting data store.  This involves opening the user-named
232    file, reading its header to determine its version stamp and data size,
233    then allocating a data store record for the file.  We then allocate
234    an initial file map (one entry) mapping the entire file (pages
235    1..pagecount) to a single buffer, which we allocate and read the file
236    into.  Finally we use the build_freelist() operation to traverse the file
237    constructing the free list.  If the file name is null/empty there is no
238    backing store.
239*/
240
241public void dsm_open(fname,new_dsh,st)
242unsigned char  *fname;      /* filename (null-terminated) */
243dsm_handle_t   *new_dsh;    /* (output) resulting data store handle */
244error_status_t *st;         /* (output) status */
245{
246    int                 fd = -1;        /* file descriptor of opened file */
247    dsm_handle          dsh = NULL;     /* allocated data store handle */
248    file_hdr_t          fhdr;           /* file header (first page) */
249
250    CLEANUP {  /* upon errors */
251        if (fd != -1) close(fd);        /* if file is open close it */
252        if (dsh) {                      /* free allocated memory */
253            if (dsh->map) {             /* which is nested */
254                if (dsh->map->ptr) free(dsh->map->ptr);
255                free(dsh->map);
256            }
257            free(dsh->fname);
258            free(dsh);
259        }
260        return;                         /* return with error status */
261    }
262
263    if (fname != NULL && ustrlen(fname) != 0) {
264        fd = open((char *)fname,O_RDWR);        /* try to open the file */
265        if (fd == -1) SIGNAL(dsm_err_open_failed);  /* signal error if fails */
266        dsm__lock_file(fd, st);         /* lock the file for exclusive access */
267        PROP_BAD_ST;
268
269        if (dcethread_read(fd,&fhdr,PAGE_SIZE) != PAGE_SIZE) SIGNAL(dsm_err_file_io_error); /* read hdr */
270
271        if (fhdr.version != dsm_version) SIGNAL(dsm_err_version); /* check version */
272    }
273
274    dsh = NEW(dsm_db_t);            /* allocate data store handle */
275    if (dsh == NULL) SIGNAL(dsm_err_no_memory);    /* make sure we got it */
276
277    dsh->fd = fd;                   /* remember fd */
278    dsh->fname = (char *) malloc(ustrlen(fname)+1);
279    ustrlcpy(dsh->fname, fname, ustrlen(fname)+1);      /* and filename */
280    dsh->freelist = NULL;           /* no free list yet */
281    dsh->map = NULL;                /* no memory mapping yet */
282    dsh->pages = (fd < 0) ? 0 : fhdr.pages; /* number of valid pages */
283    dsh->cookie = DSM_COOKIE;       /* magic cookie */
284    dsh->coalesced = false;         /* haven't coalesced yet */
285    dsh->pending = 0;               /* no allocated/unwritten blocks */
286    cache_clear(dsh);               /* nothing in cache */
287
288    if (dsh->pages > 0) {           /* if any valid pages */
289        dsh->map = NEW(file_map_t);                                 /* get initial file map */
290        dsh->map->link = NULL;                                      /* it's the last one */
291        dsh->map->loc = PAGE_SIZE;                                  /* at starts at the second page */
292        dsh->map->size = dsh->pages*PAGE_SIZE;                      /* it's this many bytes long */
293        dsh->map->ptr = (block_t *)malloc(dsh->map->size);          /* allocate space for it */
294        if (dsh->map->ptr == NULL) SIGNAL(dsm_err_no_memory);  /* did we get it? */
295
296        /* observe that the file pointer is right after the header, from header read.
297        */
298        if ((unsigned32)dcethread_read(dsh->fd,dsh->map->ptr,dsh->map->size) != dsh->map->size) { /* read in the file */
299            SIGNAL(dsm_err_file_io_error); /* gripe if that fails */
300        }
301
302        build_freelist(dsh);        /* reconstruct free list */
303    }
304
305    *new_dsh = (dsm_handle_t)dsh;   /* fill in user's dsh */
306
307    CLEAR_ST;
308} /* dsm_open */
309
310/** dsm_close
311
312    Close an open data store, freeing allocated resources.  We attempt
313    to avoid potential dangling pointers by taking the dsh by reference
314    and clearing it, as well as clearing its cookie before freeing it.
315*/
316
317public void dsm_close(dsx,st)
318dsm_handle_t  *dsx;    /* (update) data store handle */
319error_status_t *st;     /* (output) status */
320{
321    dsm_handle *dsh = (dsm_handle *)dsx;
322
323    CLEANUP {  /* upon errors */
324        return;                         /* and just return with fault status */
325    }
326
327    verify_dsh(*dsh);                   /* reality check */
328
329    if ((*dsh)->fd >= 0) {              /* if there's a file */
330        close((*dsh)->fd);              /* close it */
331    }
332    free_map((*dsh)->map);              /* free the memory map and all mapped memory */
333    (*dsh)->cookie = 0;                 /* clear cookie (in case of alias pointers) */
334    free((*dsh)->fname);
335    free(*dsh);                         /* free the data store record */
336
337    *dsh = NULL;                        /* invalidate the handle */
338    CLEAR_ST;
339
340} /* dsm_close */
341
342/** dsm_allocate
343
344    Allocate a block of at least the requested size from the storage pool
345    associated with a dsm file.  First we try to locate such a block from
346    the free pool; if that fails we try a coalesce pass (merge adjacent
347    free blocks into single larger ones); if that fails we grow the file
348    (if that fails it signals an error).  If the block we find is
349    significantly larger than the one requested, we split it up by making
350    a new free block from everything but the reqested amount (rounded up
351    to an 8-byte boundary).  The header of the allocated block is marked
352    with a "magic cookie" to let us do at least minimal validity checking
353    on incoming pointers.
354
355    Per datastore we maintain a count of "pending" records -- records that
356    are free in the datastore, but the application holds pointers to them,
357    as a result of dsm_allocate, or dsm_detach.  We don't want to
358    coalesce the database while these are held because presumably the
359    intent is to dsm_write[_hdr] (or at least dsm_free) them later;
360    coalescing could move the block, rendering it invalid.
361*/
362
363public void dsm_allocate(dsx,usr_size,ptr,st)
364dsm_handle_t   dsx;        /* data store handle */
365unsigned32     usr_size;   /* requested minimum size of user data area */
366void *         *ptr;        /* (output) allocated storage (user data) */
367error_status_t *st;         /* (output) status */
368{
369    dsm_handle dsh = (dsm_handle)dsx;
370    block_t            *p;      /* new block */
371    block_t            *fp;     /* constructed block made from tail of large free block */
372
373    CLEANUP {  /* upon errors */
374        return;                         /* and just return with fault status */
375    }
376
377    verify_dsh(dsh);                    /* reality check */
378
379    usr_size = ROUND_UP(usr_size,8);            /* round size up to 8-byte alignment */
380
381    p = get_free_block(dsh,usr_size);           /* get suitably-sized block from free list */
382
383    if (p == NULL) {                            /* didn't find one? */
384        coalesce(dsh, st);                      /* coalesce adjacent free blocks */
385        PROP_BAD_ST;
386        p = get_free_block(dsh,usr_size);       /* try again */
387        if (p == NULL) {                        /* still out of luck? grow the file */
388            p = grow_file(dsh, usr_size, st);   /* grow the file by at least usr_size */
389            PROP_BAD_ST;
390        }
391    }
392
393    if ((p->size - usr_size) >= MINBLOCK) {     /* is there enough left to split the block? */
394        fp = (block_t *)(((char *)&p->data)+usr_size); /* point past user data portion */
395        fp->loc = p->loc+PREHEADER+usr_size;    /* file location */
396        if ((ROUND_UP(fp->loc, PAGE_SIZE) - fp->loc) >= UNIT) {  /* if at least unit left in page */
397            fp->size = p->size-PREHEADER-usr_size;  /* compute its size */
398            make_free(dsh,fp,st);                   /* make free in memory and on disk */
399            p->size = usr_size;                     /* this block gets smaller */
400            write_header(dsh, p, st);               /* done */
401            PROP_BAD_ST;
402        }
403    }
404
405    p->cookie = HDR_COOKIE;                     /* set magic cookie value */
406    if (dsh->fd < 0) p->isfree = false;         /* in volatile DB it's nonfree */
407    else dsh->pending += 1;                     /* otherwise remember pending block */
408    *ptr = (void * )&p->data;                   /* fill in data pointer */
409    CLEAR_ST;
410
411} /* dsm_allocate */
412
413/** dsm_free
414
415    Frees a block previously allocated by dsm_allocate.
416*/
417
418public void dsm_free(dsx,ptr,st)
419dsm_handle_t   dsx;    /* data store handle */
420void *          ptr;    /* user data pointer (to user portion of a block) */
421error_status_t *st;     /* (output) status */
422{
423    dsm_handle dsh = (dsm_handle)dsx;
424    block_t            *p;      /* pointer to actual block preheader */
425
426    CLEANUP {  /* upon errors */
427        return;                         /* and just return with fault status */
428    }
429
430    verify_dsh(dsh);                    /* reality check */
431
432    p = block_from_ptr(ptr, st);            /* get block pointer */
433    PROP_BAD_ST;
434
435    /* if this block is already free (assume it's been allocated but
436       not written), decrement the pending count.
437    */
438    if (p->isfree) {                    /* if it was already free */
439        assert(dsh->pending > 0);       /* it should have been pending */
440        dsh->pending -= 1;              /* but no longer */
441    }
442
443    make_free(dsh,p,st);                 /* free the block stably, add to free list */
444
445} /* dsm_free */
446
447/** dsm_detach
448
449    We don't support an atomic update operation (replacing an existing record
450    with a new copy).  The safest approach would be to create a new copy
451    (presumably flagged so as to distinguish it in case of a crash) and then
452    free the old one (finally unflagging the new one).  An update can also be
453    considered a delete and an add, and the application must either not care
454    if only the delete succeeds, or have some way of recovering (replaying a
455    log etc.).  For such applications we provide a 'detach' operation.  Detach
456    is similar to free; the effect is that if a crash occurs between the detach
457    and write phases of an update, the block in question is effectively freed,
458    i.e. upon restarting the record being updated will have been deleted.
459    dsm_detach of a free/detached record is a no-op.
460*/
461
462public void dsm_detach(dsx,ptr,st)
463dsm_handle_t   dsx;    /* data store handle */
464void *          ptr;    /* user data pointer (to user portion of a block) */
465error_status_t *st;     /* output) status */
466{
467    dsm_handle dsh = (dsm_handle)dsx;
468    block_t            *p;      /* pointer to actual block preheader */
469
470    CLEANUP {  /* upon errors */
471        return;                         /* and just return with fault status */
472    }
473
474    verify_dsh(dsh);                    /* reality check */
475
476    p = block_from_ptr(ptr, st);            /* get block pointer */
477    PROP_BAD_ST;
478
479    if (!p->isfree) {                   /* unless already free */
480        p->isfree = true;               /* mark as free */
481        write_header(dsh, p, st);       /* make it stable */
482        PROP_BAD_ST;
483        dsh->pending += 1;              /* presumably detaching to write */
484    }
485    CLEAR_ST;
486
487} /* dsm_detach */
488
489/** dsm_write
490
491    Writes the contents of the given block to the data store file.  The
492    block should be 'free' in the sense of having not yet been written
493    (via dsm_write) since being allocated by dsm_allocate or 'detached'
494    by dsm_detach.  This restriction strictly need only apply to blocks
495    that span page boundaries, due to the two-phased nature of the write
496    operation: first the body of the record is written and committed, then
497    the header is modified to clear the 'free' setting.  Up until this
498    last atomic write the block is still free in the file, so that a crash
499    at any time won't yield an inconsistent or partially-updated record.
500*/
501
502public void dsm_write(dsx,ptr,st)
503dsm_handle_t    dsx;    /* data store handle */
504void *          ptr;    /* user record pointer */
505error_status_t *st;     /* (output) status */
506{
507    dsm_handle dsh = (dsm_handle)dsx;
508    block_t            *p;          /* the actual record */
509    unsigned long       sp,ep;      /* base of start page, end page */
510
511    CLEANUP {  /* upon errors */
512        return;                         /* and just return with fault status */
513    }
514
515    verify_dsh(dsh);                    /* reality check */
516
517    p = block_from_ptr(ptr, st);            /* get actual record pointer */
518    PROP_BAD_ST;
519
520    /* as allocated records are not "free" in a volatile datastore,
521       this check will go off in the case of write in such a case.
522    */
523    if (!p->isfree) SIGNAL(dsm_err_duplicate_write); /* must be free */
524
525    if (dsh->fd >= 0) { /* only if there's a file */
526        sp = ROUND_DOWN(p->loc,PAGE_SIZE);          /* file location of start of containing page */
527        ep = ROUND_DOWN(p->loc+p->size+PREHEADER-1,PAGE_SIZE);    /* and of start of end page */
528
529        if (ep == sp) {                             /* if all on one page */
530            p->isfree = false;                        /* we'll write the whole thing at once */
531            write_block(dsh,p,p->size+PREHEADER, st);   /* write the whole block */
532            PROP_BAD_ST;
533        }
534        else {
535            write_block(dsh,p,p->size+PREHEADER, st);   /* write the block with header still free */
536            PROP_BAD_ST;
537            p->isfree = false;                          /* clear free flag */
538            write_header(dsh, p, st);                   /* update header */
539            PROP_BAD_ST;
540        }
541    }
542
543    assert(dsh->pending > 0);   /* this should have been pending */
544    dsh->pending -= 1;  /* one less pending record */
545    CLEAR_ST;
546
547} /* dsm_write */
548
549/** dsm_write_hdr
550
551    Many applications want to support records too long to avoid spanning
552    page boundaries, but have shorter header information that they may
553    want to update atomically, such as "deleted" or "new" flags.  DSM
554    guarantees that the first part of each record up to dsm_hdr_size [48]
555    bytes can be written atomically, and so updated in place without having
556    to worry about detaching.  dsm_write_hdr is like dsm_write except
557    that an additional parameter specifies the number of bytes to write
558    -- a typical call probably being:
559
560        dsm_write_hdr(my_dsh,my_rec,sizeof(myrec.hdr),&status);
561
562    Note - we'll count this as a pending record if it's free, but the
563    caller is in charge, in such cases, of ensuring that this write is
564    sufficient to stably reflect the record.
565*/
566
567public void dsm_write_hdr(dsx,ptr,size,st)
568dsm_handle_t    dsx;    /* data store handle */
569void *          ptr;    /* user pointer */
570unsigned32      size;   /* length of user header */
571error_status_t  *st;     /* status */
572{
573    dsm_handle dsh = (dsm_handle)dsx;
574    block_t            *p;              /* the actual record */
575
576    CLEANUP {  /* upon errors */
577        return;                         /* and just return with fault status */
578    }
579
580    verify_dsh(dsh);                    /* reality check */
581
582    p = block_from_ptr(ptr, st);            /* get actual record pointer */
583    PROP_BAD_ST;
584
585    if (size > dsm_hdr_size) SIGNAL(dsm_err_header_too_long); /* check size */
586
587    if (p->isfree) {                    /* if it was free */
588        assert(dsh->pending > 0);       /* it should have been pending */
589        dsh->pending -= 1;              /* but no longer */
590    }                                   /* now it's not free: */
591    p->isfree = false;                  /* we'll write the whole thing at once */
592    write_block(dsh, p, size+PREHEADER, st);  /* write the preheader and user header */
593    PROP_BAD_ST;
594
595    CLEAR_ST;
596} /* dsm_write_hdr */
597
598/** dsm_get_info
599    dsm_set_info
600
601    A small part of the file header (256 bytes) is reserved for client-defined
602    file-wide header information (version information and the like).  These
603    routines provide access to that storage.  Note that we don't keep a copy
604    of the file header around, and so do I/O for every access, it being
605    assumed that these operations are infrequent.  Note also that set_info
606    does a read/alter/rewrite to preserve the DSM header information.
607
608    Just give file_io_error on volatile data stores.
609*/
610
611public void dsm_get_info(dsx,info,size,st)
612dsm_handle_t   dsx;    /* data store handle */
613void *         info;   /* pointer to user info buffer */
614unsigned32     size;   /* size of user info buffer */
615error_status_t *st;    /* status */
616{
617    dsm_handle dsh = (dsm_handle)dsx;
618    file_hdr_t  hdr;    /* data store file header */
619
620    CLEANUP {  /* upon errors */
621        return;                         /* and just return with fault status */
622    }
623
624    verify_dsh(dsh);                    /* reality check */
625
626    if (size > INFOSZ) SIGNAL(dsm_err_info_too_long);  /* check size */
627
628    /* reset to beginning of file and read in first page (file header).
629    */
630    if (dsh->fd < 0
631    ||  lseek(dsh->fd,0,L_SET) == -1
632    ||  dcethread_read(dsh->fd,&hdr,PAGE_SIZE) != PAGE_SIZE) SIGNAL(dsm_err_file_io_error);
633
634    memcpy(info,hdr.info,size);          /* copy user info to caller's buffer */
635
636    CLEAR_ST;
637} /* dsm_get_info */
638
639public void dsm_set_info(dsx,info,size,st)
640dsm_handle_t   dsx;    /* data store handle */
641void *         info;   /* pointer to user info buffer */
642unsigned32     size;   /* size of user info buffer */
643error_status_t *st;     /* status */
644{
645    dsm_handle dsh = (dsm_handle)dsx;
646    file_hdr_t  hdr;    /* data store file header */
647
648    CLEANUP {  /* upon errors */
649        return;                         /* and just return with fault status */
650    }
651
652    verify_dsh(dsh);                    /* reality check */
653
654    if (size > INFOSZ) SIGNAL(dsm_err_info_too_long);  /* check size */
655
656    /* reset to beginning of file and read in first page (file header).
657    */
658    if (lseek(dsh->fd,0,L_SET) == -1
659    ||  dcethread_read(dsh->fd,&hdr,PAGE_SIZE) != PAGE_SIZE) SIGNAL(dsm_err_file_io_error);
660
661    memcpy(hdr.info,info,size);          /* copy caller's info into header */
662
663    /*
664     * seek back to the beginning, write out file header, synch the file.
665     */
666    if (lseek(dsh->fd,0,L_SET) == -1
667    ||  dcethread_write(dsh->fd,&hdr,PAGE_SIZE) != PAGE_SIZE
668    ||  dsm__flush_file(dsh->fd) != status_ok ) SIGNAL(dsm_err_file_io_error);
669
670    CLEAR_ST;
671} /* dsm_set_info */
672
673/** dsm_read
674
675    Successively returns each non-free block in the data store.  We are
676    passed (by reference) a marker which we interpret as a file offset
677    in the data store.  The marker points to the last block examined; a
678    special invalid offset as set by dsm_marker_reset will see the first
679    non-free block, otherwise the next non-free block after that identified
680    by the marker will be returned.  The marker is updated, and a pointer
681    to the record filled in.  A status of dsm_err_no_more_entries will
682    be returned when no more valid entries exist (i.e. on the call *after*
683    the last valid entry is seen).  Thus the complete set of allocated
684    records can be traversed by calling dsm_marker_reset and then
685    repeatedly calling dsm_read with that marker as long as the status
686    is status_ok.
687
688    A single-tasked process making consecutive calls to dsm_read without
689    other dsm calls will see all allocated records.  However in general it
690    is possible for a marker to become stale, as could happen during a
691    coalesce operation.  We maintain a small cache of valid marker values,
692    cleared upon coalescing; if a given marker value is not cached, then
693    we scan the data store from some earlier known valid block loooking
694    for the first allocated block at a higher offset than the nominal
695    marker value (at worst we might have to scan the entire data store;
696    the problem is that with variable-length records it's hard to find an
697    anchor point).
698*** heuristics can be applied: scanning the free list
699    can generate valid record starts, as can scanning the memory map (chunk
700    list), as can scanning the valid marker cache.
701*/
702
703public void dsm_read(dsx,marker,ptr,st)
704dsm_handle_t   dsx;    /* data store handle */
705dsm_marker_t  *marker; /* marker (file offset of previous record) */
706void *         *ptr;    /* (output) next record */
707error_status_t *st;     /* (output) status */
708{
709    dsm_handle dsh = (dsm_handle)dsx;
710    block_t            *p = NULL;       /* a block */
711
712    CLEANUP {  /* upon errors */
713        return;                         /* and just return with fault status */
714    }
715
716    verify_dsh(dsh);                    /* reality check */
717
718    if (*marker == MAGIC_MARKER) {      /* special request for first block */
719        if (dsh->map) {                 /* must be mapped memory */
720            p = dsh->map->ptr;          /* use first record */
721            if (p->isfree) p = get_next_block(dsh,p); /* get the next one if that's free */
722        }
723    }
724    else {
725        /* If this marker is in the cache, we can use the block cached with it.
726           Otherwise we have to back up to some known block at an earlier
727           location and step forward until we reach the marker location (or
728           a later block).
729        */
730        p = cache_lookup(dsh,*marker);  /* see if marker loc is in cache */
731
732        if (!p) {                       /* if not in the cache */
733            p = get_earlier_block(dsh,*marker); /* find some known earlier block */
734        }
735        while (p && (p->loc <= *marker)) p = get_next_block(dsh,p); /* get block after mkr */
736    }
737
738    if (!p) {                           /* if at end of file (no next record) */
739        *ptr = NULL;                    /* flag EOF */
740        SIGNAL(dsm_err_no_more_entries);   /* special status */
741    }
742    else {
743        *ptr = (void * )&p->data;       /* return user pointer */
744        *marker = p->loc;               /* update marker */
745        cache_add(dsh,p,p->loc);        /* add to the cache */
746    }
747
748    CLEAR_ST;
749} /* dsm_read */
750
751/** dsm_locate
752
753    Returns a pointer to a dsm block, given a marker identifying that block, as
754    returned by dsm_read (the marker returned by a dsm_read call corresponds
755    to the record returned by that call), or by dsm_inq_marker.  The marker is
756    simply the record's offset in the file.  We locate the record by methods
757    similar to dsm_read, but the record must be located exactly.
758*/
759
760public void dsm_locate(dsx,marker,ptr,st)
761dsm_handle_t   dsx;    /* data store handle */
762dsm_marker_t   marker; /* marker (file offset of record) */
763void *         *ptr;    /* (output) record */
764error_status_t *st;     /* (output) status */
765{
766    dsm_handle dsh = (dsm_handle)dsx;
767    block_t            *p;              /* a block */
768
769    CLEANUP {  /* upon errors */
770        return;                         /* and just return with fault status */
771    }
772
773    verify_dsh(dsh);                    /* reality check */
774
775    /* reset marker is not valid
776    */
777    if (marker == MAGIC_MARKER) SIGNAL(dsm_err_invalid_marker);
778
779    /* If this marker is in the cache, we can use the block cached with it.
780       Otherwise we have to back up to some known block at an earlier
781       location and step forward until we reach the marker location (or
782       a later block).
783    */
784    p = cache_lookup(dsh,marker);  /* see if marker loc is in cache */
785
786    if (!p) {                       /* if not in the cache */
787        p = get_earlier_block(dsh,marker);  /* find some known earlier block */
788        while (p && p->loc < marker) p = get_next_block(dsh,p); /* find mkr */
789    }
790
791    if (!p || p->loc != marker)  {      /* didn't find it? (EOF or wrong loc) */
792        *ptr = NULL;                    /* don't leave anything dangling */
793        SIGNAL(dsm_err_invalid_marker); /* gripe */
794    }
795    else {
796        *ptr = (void * )&p->data;       /* return user pointer */
797        cache_add(dsh,p,p->loc);        /* add to the cache */
798    }
799
800    CLEAR_ST;
801} /* dsm_locate */
802
803/** dsm_marker_reset
804
805    Reset the given dsm_marker_t such that it will cause the first
806    block to be returned from the next dsm_read.
807*/
808
809public void dsm_marker_reset(marker)
810dsm_marker_t  *marker;
811{
812    *marker = MAGIC_MARKER;     /* there; that was easy */
813} /* dsm_marker_reset */
814
815/** dsm_inq_marker
816
817    Returns a marker identifying a given dsm block.  This marker can later
818    be used to redevelop a pointer to the block, with dsm_locate, even if
819    the data store is closed in between.  Naturally such a marker becomes
820    invalid if the block in question is freed.
821*/
822
823public void dsm_inq_marker(dsx,ptr,mkr,st)
824dsm_handle_t   dsx;    /* data store handle */
825void *          ptr;    /* user record pointer */
826dsm_marker_t  *mkr;    /* (output) marker thereto */
827error_status_t *st;     /* (output) status */
828{
829    dsm_handle dsh = (dsm_handle)dsx;
830    block_t            *p;          /* the actual record */
831
832    CLEANUP {  /* upon errors */
833        return;                         /* and just return with fault status */
834    }
835
836    verify_dsh(dsh);                    /* reality check */
837
838    p = block_from_ptr(ptr, st);            /* get actual record pointer */
839    PROP_BAD_ST;
840
841    *mkr = p->loc;                      /* fill in marker */
842
843    CLEAR_ST;
844} /* dsm_inq_marker */
845
846/** dsm_inq_size
847
848    Returns (in the "size" parameter) the size in bytes allocated to the
849    given dsm block.  This may be larger than the amount requested when
850    the block was allocated.
851*/
852
853public void dsm_inq_size(dsx,ptr,len,st)
854dsm_handle_t   dsx;    /* data store handle */
855void *         ptr;    /* user record pointer */
856unsigned32     *len;    /* (output) its length */
857error_status_t *st;     /* (output) status */
858{
859    dsm_handle dsh = (dsm_handle)dsx;
860    block_t            *p;          /* the actual record */
861
862    CLEANUP {  /* upon errors */
863        return;                         /* and just return with fault status */
864    }
865
866    verify_dsh(dsh);                    /* reality check */
867
868    p = block_from_ptr(ptr, st);            /* get actual record pointer */
869    PROP_BAD_ST;
870
871    *len = p->size;                     /* fill in length */
872
873    CLEAR_ST;
874} /* dsm_inq_size */
875
876/** dsm_get_stats
877
878    Returns information about a data store.
879*/
880
881public void dsm_get_stats(dsx,stats,st)
882dsm_handle_t   dsx;    /* data store handle */
883dsm_stats_t   *stats;  /* (output) statistics record */
884error_status_t *st;     /* (output) status */
885{
886    dsm_handle dsh = (dsm_handle)dsx;
887    block_t            *p;              /* freelist traverser */
888
889    CLEANUP {  /* upon errors */
890        return;                         /* and just return with fault status */
891    }
892
893    verify_dsh(dsh);                    /* reality check */
894
895    stats->size = dsh->pages*PAGE_SIZE;  /* size in bytes */
896
897    /* determine total size of free list, size of largest block
898    */
899    stats->free_space = stats-> largest_free = 0;
900    for (p = dsh->freelist; p != NULL; p = p->link) {   /* traverse free list */
901        stats->free_space += p->size + PREHEADER;       /* accumulate size */
902        stats->largest_free = p->size;  /* this is the largest as they're sorted */
903    }
904
905    CLEAR_ST;
906} /* dsm_get_stats */
907
908/** dsm_reclaim
909
910    Reclaim free storage in a named datastore by making a new copy of it.
911*/
912
913public boolean dsm_reclaim(dsx,name,tmpname,bakname,pct,st)
914dsm_handle_t   *dsx;        /* open data store handle */
915unsigned char  *name;       /* data store name */
916unsigned char  *tmpname;    /* build new copy in this temp */
917unsigned char  *bakname;    /* name old (backup) version this */
918unsigned32     pct;        /* percent freespace acceptable */
919error_status_t *st;         /* (output) status */
920{
921    dsm_handle_t        curh = *dsx;    /* current data store handle */
922    dsm_handle_t        newh=NULL;      /* new data store handle */
923    dsm_stats_t         stats;          /* usage statistics */
924    unsigned char       info[INFOSZ];   /* hdr info */
925    dsm_marker_t        mkr;            /* traversing marker */
926    void               *curp,*newp;     /* rec from curh, rec in newh */
927    unsigned32          size;           /* size of record */
928    error_status_t      st2;
929
930    if (curh) {
931        /*  does handle point to same data store that is being
932            reclaimed?
933        */
934        if (ustrcmp(name, ((dsm_handle)curh)->fname) != 0) {
935            *st = dsm_err_invalid_handle;
936            return false;
937        }
938    }
939    else {
940        /*  data store needs to be opened
941        */
942        dsm_open(name, &curh, st);
943        if (BAD_ST) return false;
944    }
945
946    dsm_get_stats(curh,&stats,st);      /* get statistics */
947    if (BAD_ST) goto EXIT;
948
949    if ((stats.size == 0) ||
950        (stats.free_space <= (GROW_PAGES*PAGE_SIZE)) ||
951        ((100*stats.free_space)/stats.size <= pct)) { /* ignore if within pct */
952        goto EXIT;                      /* return (with that status) */
953    }
954
955    dsm_create(tmpname,&newh,st);       /* create the temp/new file */
956    if (BAD_ST) goto EXIT;
957
958    dsm_get_info(curh,info,INFOSZ,st);
959    if (BAD_ST) goto EXIT;
960    dsm_set_info(newh,info,INFOSZ,st);
961    if (BAD_ST) goto EXIT;
962
963    dsm_marker_reset(&mkr);             /* start at the beginning */
964
965    for (;;) {                          /* traverse the datastore */
966        dsm_read(curh,&mkr,&curp,st);   /* get next record */
967        if ((*st) == dsm_err_no_more_entries) break;  /* break at end */
968        else if (BAD_ST) goto EXIT;
969
970        dsm_inq_size(curh,curp,&size,st);   /* get record size */
971        if (BAD_ST) goto EXIT;
972
973        dsm_allocate(newh,size,&newp,st);   /* allocate same size in new file */
974        if (BAD_ST) goto EXIT;
975
976        memcpy(newp,curp,size);          /* copy the record */
977
978        dsm_write(newh,newp,st);        /* and write it */
979        if (BAD_ST) goto EXIT;
980    }
981
982    /* close datastores
983    */
984    dsm_close(&newh,st);
985    if (BAD_ST) goto EXIT;
986
987    dsm_close(&curh,st);
988    if (BAD_ST) goto EXIT;
989
990    if (rename((char *)name,(char *)bakname) != 0) { /* rename foo, foo.bak */
991        (*st) = dsm_err_file_io_error;
992        goto EXIT;
993    }
994
995    if (rename((char *)tmpname,(char *)name) != 0) { /* rename foo.new foo */
996        (*st) = dsm_err_file_io_error;
997        goto EXIT;
998    }
999
1000    if ((*dsx) != NULL) {
1001        dsm_open(name,dsx,st);          /* open reclaimed datastore */
1002        if (BAD_ST) goto EXIT;
1003    }
1004
1005    return true;
1006
1007EXIT:   /* problem
1008           clean up stuff dsm_reclaim has opened/created
1009           exit (with status st)
1010        */
1011    if ((curh) && ((*dsx) == NULL )) {
1012        dsm_close(&curh,&st2);
1013    }
1014    else {
1015        *dsx = curh;
1016    }
1017
1018    if (newh) {
1019        dsm_close(&newh,&st2);
1020    }
1021
1022    remove((char *) tmpname);
1023    return false;
1024} /* dsm_reclaim */
1025
1026/** dsm_print_map
1027
1028    For debugging purposes, this traverses the mapped file printing a
1029    summary of each block (and doing some error checks on alignment etc.).
1030*/
1031
1032public void dsm_print_map(
1033	dsm_handle_t   dsx ATTRIBUTE_UNUSED,
1034	error_status_t *st)
1035{
1036#ifndef DSM_DEBUG
1037    printf("dsm_print_map unavailable.\n");
1038    CLEAR_ST;
1039#else
1040    dsm_handle dsh = (dsm_handle)dsx;
1041    file_map_t         *map;        /* for traversing chunk list */
1042    block_t            *this,*next; /* current block, next block */
1043
1044    CLEANUP {  /* upon errors */
1045        return;                         /* and just return with fault status */
1046    }
1047
1048    verify_dsh(dsh);                    /* reality check */
1049
1050    printf("DSM map; %d initialized pages of %d bytes; %d allocations pending\n",
1051           dsh->pages, PAGE_SIZE, dsh->pending);
1052
1053    for (map = dsh->map; map != NULL; map = map->link) {    /* for each file chunk */
1054        printf("---- %d byte chunk at %d\n",map->size,map->loc);
1055
1056        this = map->ptr;    /* first block in chunk */
1057
1058        for (;;) {  /* until explicit break at end of chunk */
1059            printf("    %08x %08x: (page %d offset %d) %d bytes ",
1060                   this,this->loc,this->loc/PAGE_SIZE,MOD(this->loc,PAGE_SIZE),this->size);
1061
1062            if (MOD(this->size,8) != 0) printf(" *** NOT 8-BYTE ALIGNED *** ");
1063            if (this->isfree) printf("(free)");
1064            else if (this->cookie != HDR_COOKIE) printf(" *** INVALID COOKIE *** ");
1065
1066            printf("\n");
1067
1068            next = (block_t *) (((char *)this)+this->size+PREHEADER);   /* find next block */
1069
1070            if ((char *)next-(char *)map->ptr >= map->size) break;      /* is this the last block? */
1071            else this = next;   /* if more, iterate with next block */
1072        }
1073    }
1074    CLEAR_ST;
1075
1076#endif
1077} /* dsm_print_map */
1078
1079/** get_free_block
1080
1081    Scan the free list seeking a block of at least some required amount,
1082    returning a pointer to it, after removing it from the free list.  We
1083    return the first match that's at least the requested size; thus the
1084    order of the free list as maintained by the free routine (q.v.) is
1085    significant.  Returns NULL if no large enough block exists.
1086*/
1087
1088private block_t *get_free_block(dsh,min_size)
1089dsm_handle     dsh;        /* data store handle */
1090unsigned long   min_size;   /* requested size */
1091{
1092    block_t    *p,*pp;  /* list traversers */
1093
1094    pp = NULL;
1095    for (p = dsh->freelist; p != NULL; p = p->link) {   /* traverse free list */
1096        if (p->size >= min_size) {          /* is this one big enough? take it */
1097            if (pp) pp->link = p->link;     /* if we've seen a previous node link there */
1098            else dsh->freelist = p->link;   /* else it's the first */
1099            p->link = NULL;                 /* cover our tracks to be safe */
1100            break;                          /* done */
1101        }
1102        pp = p;     /* backpointer */
1103    }
1104
1105    return p;       /* back with our catch (NULL if no match found) */
1106
1107} /* get_free_block */
1108
1109/** grow_file
1110
1111    Grow the given data store file by at least some required amount,
1112    allocating equivalent memory space for it, returning a pointer to a
1113    block of at least the required amount.  Currently the entire new growth
1114    is treated as a single free block and a pointer to it is returned.
1115    Note that the block returned is nominally 'free' but is not on the
1116    free list.  Note also that if the data store is volatile (fd == -1)
1117    there's not actually any file backing the new storage.
1118*/
1119
1120private block_t *grow_file(dsh, min_size, st)
1121dsm_handle     dsh;        /* handle to file to grow */
1122unsigned long   min_size;   /* size of requested block (minimum grow amount) in bytes */
1123error_status_t *st;
1124{
1125    unsigned long       grow_pages,grow_bytes;  /* number of pages, bytes to grow */
1126    block_t            *p = NULL;               /* new header */
1127    file_map_t         *map = NULL;             /* file map entry */
1128    off_t                flen;                   /* file length/offset of new chunk */
1129
1130    CLEANUP {
1131        if (p != NULL) free(p);                 /* free allocated memory if any */
1132        if (map != NULL) free(map);
1133        return NULL;                                 /* return w/ bad status */
1134    }
1135
1136    grow_pages = ROUND_UP(min_size,PAGE_SIZE)/PAGE_SIZE;  /* what's minimum to grow in pages? */
1137    grow_pages = MAX(grow_pages,GROW_PAGES);    /* use larger of that and default */
1138
1139    grow_bytes = grow_pages*PAGE_SIZE;      /* what's that in dollars? */
1140
1141    p = (block_t *)malloc(grow_bytes);      /* allocate the memory chunk */
1142    if (p == NULL) SIGNAL(dsm_err_no_memory);
1143
1144    map = NEW(file_map_t);                  /* get new file map entry */
1145    if (map == NULL) SIGNAL(dsm_err_no_memory);
1146
1147    flen = (dsh->pages+1)*PAGE_SIZE;        /* compute file length/offset of new chunk */
1148
1149    if (dsh->fd >= 0) {                     /* if a file, set to that position */
1150        flen = lseek(dsh->fd, flen, L_SET); /* get length of file/offset of new chunk */
1151        if (flen < 0) SIGNAL(dsm_err_file_io_error);
1152    }
1153
1154    /* XXX file_map_t should be updated to contain an off_t so that we can
1155     * always handle large files. However I'm scared of breaking code, and
1156     * in practice I don't have a need for very large registration databases.
1157     *	    -- jpeach
1158     */
1159    assert(flen < ULONG_MAX);
1160
1161    map->link = NULL;                       /* this'll be the last link */
1162    map->loc = p->loc = (unsigned long)flen;/* location of chunk and first block */
1163    map->size = grow_bytes;                 /* size in bytes */
1164    map->ptr = p;                           /* where the memory copy is */
1165    p->size = grow_bytes-PREHEADER;         /* first/only block is everything but one preheader */
1166    p->isfree = true;                       /* this is a free block */
1167
1168    if (dsh->map == NULL) dsh->map = map;   /* if this is first map entry fill in */
1169    else {                                  /* else append to existing list */
1170        file_map_t  *m;                     /* declare list traverser */
1171        for (m = dsh->map; m->link != NULL; m = m->link) {};    /* find last link */
1172        m->link = map;                      /* append new one */
1173    }
1174
1175    /*  Now update the file.  We need to write out the newly allocated chunk (so there will be
1176        something in the file), then update the "pages" count in the file header.  Only that
1177        many pages "really" exist upon re-opening.
1178    */
1179
1180    if (dsh->fd >= 0) { /* only if there's a file, */
1181        if ((unsigned32)dcethread_write(dsh->fd,p,grow_bytes) != grow_bytes) SIGNAL(dsm_err_file_io_error);
1182
1183        dsh->pages += grow_pages;           /* update our page count */
1184        update_file_header(dsh, st);        /* update the file header to include new page count */
1185    }
1186    else dsh->pages += grow_pages;          /* maintain page count even if no fd */
1187
1188    CLEAR_ST;
1189    return p;                               /* return the newly allocated block */
1190} /* grow_file */
1191
1192/** write_header
1193
1194    Writes the given block header to the disk file, an atomic operation
1195    as far as the file is concerned.
1196*/
1197
1198private void write_header(dsh, p, st)
1199dsm_handle     dsh;    /* data store handle */
1200block_t        *p;      /* block in question */
1201error_status_t *st;
1202{
1203    CLEAR_ST;
1204
1205    if (dsh->fd < 0) return;    /* noop if no file */
1206
1207    if (lseek(dsh->fd,p->loc,L_SET) < 0             /* point at the record on disk */
1208    ||  dcethread_write(dsh->fd,p,PREHEADER) != PREHEADER     /* write the header */
1209    ||  fsync(dsh->fd) < 0                          /* commit that */
1210    ) (*st) = dsm_err_file_io_error;              /* error if any fail */
1211} /* write_header */
1212
1213/** write_block
1214
1215    Writes the given block contents including preheader to the disk file,
1216    an operation which may span multiple pages and is not necessarily
1217    atomic.  The caller should check this and not attempt a page-spanning
1218    write unless the block is free.
1219*/
1220
1221private void write_block(dsh, p, size, st)
1222dsm_handle     dsh;    /* data store handle */
1223block_t        *p;      /* block in question */
1224unsigned long   size;   /* how much to write */
1225error_status_t *st;
1226{
1227    CLEAR_ST;
1228
1229    if (dsh->fd < 0) return;    /* noop if no file */
1230
1231    if (lseek(dsh->fd,p->loc,L_SET) < 0     /* point at the record on disk */
1232    ||  (unsigned32)dcethread_write(dsh->fd,p,size) != size       /* write the header */
1233    ||  fsync(dsh->fd) < 0                  /* commit that */
1234    ) (*st) = dsm_err_file_io_error;      /* error if any fail */
1235} /* write_block */
1236
1237/** update_file_header
1238
1239    Updates (Read/Alter/Rewrite) the header page of the data store file
1240    to reflect volatile reality.  In particular the size of the data store
1241    is determined by the page count field in the header, not the length
1242    of the file.  We could keep a copy of the header around and avoid the
1243    first read, but currently this doesn't happen much (only when the file
1244    grows).
1245*/
1246
1247private void update_file_header(dsh, st)
1248dsm_handle   dsh;
1249error_status_t *st;
1250{
1251    file_hdr_t  hdr;    /* data store file header */
1252
1253    CLEANUP {
1254        return;
1255    }
1256
1257    if (dsh->fd < 0) return;    /* noop if no file */
1258
1259    /* reset to beginning of file and read in first page (file header).
1260    */
1261    if (lseek(dsh->fd,0,L_SET) == -1
1262    ||  dcethread_read(dsh->fd,&hdr,PAGE_SIZE) != PAGE_SIZE) SIGNAL(dsm_err_file_io_error);
1263
1264    hdr.pages = dsh->pages; /* update page count */
1265
1266    /* seek back to the beginning, write out file header, synch the file.
1267    */
1268    if (lseek(dsh->fd,0,L_SET) == -1
1269    ||  dcethread_write(dsh->fd,&hdr,PAGE_SIZE) != PAGE_SIZE
1270    ||  fsync(dsh->fd) == -1) SIGNAL(dsm_err_file_io_error);
1271
1272    CLEAR_ST;
1273
1274} /* update_file_header */
1275
1276/** create_file
1277
1278    Create a new file of a given name.  Essentially x = create_file(foo)
1279    has the semantics of x = open(foo,O_CREAT|O_RDWR,0666), but on older
1280    Apollo systems (pre-SR10) we need to force the created object to
1281    'unstruct' not 'uasc' -- the latter includes an invisible stream header
1282    that throws off our page-alignment assumptions thereby invalidating
1283    stable storage.
1284*/
1285
1286private int create_file(fname)
1287unsigned char  *fname;
1288{
1289#ifdef pre_sr10
1290#   include <apollo/type_uids.h>
1291#   include <apollo/ios.h>
1292
1293    short               namelen;        /* length of filename */
1294    uid_t              unstruct_uid;   /* uid of object type to create: unstruct */
1295    ios_create_mode_t  cmode;          /* create mode (treatment of preexisting object) */
1296    ios_open_options_t oopts;          /* options pertaining to implicit open of created obj */
1297    ios_id_t           id;             /* returned stream id (== unix fd) */
1298    error_status_t      status;         /* status of create operation */
1299
1300    namelen = ustrlen(fname);        /* compute name length for pascal interface */
1301    unstruct_uid = unstruct_uid;   /* create as unstruct (raw bytes) */
1302    cmode = ios_recreate_mode;     /* delete any existing object */
1303    oopts = 0;                      /* no particular open options */
1304
1305    ios_create(fname,namelen,unstruct_uid,cmode,oopts,&id,&status);    /* do it */
1306    if (status.all != status_ok) return -1;    /* check status */
1307    else return (int)id;                        /* else stream id (== fd) of created obj */
1308#else
1309    return open((char *) fname, O_CREAT | O_RDWR, 0666);
1310#endif
1311} /* create_file */
1312
1313/** make_free
1314
1315    Mark a block free and insert it into the free list.
1316*/
1317
1318private void make_free(dsh, p, st)
1319dsm_handle     dsh;    /* data store handle */
1320block_t        *p;      /* block to free */
1321error_status_t *st;
1322{
1323    p->isfree = true;           /* flag the block free in the header */
1324    p->cookie = 0;              /* clear cookie -- not a valid user record */
1325    write_header(dsh, p, st);   /* write that */
1326    free_block(dsh,p);          /* insert in free list */
1327} /* make_free */
1328
1329/** free_block
1330
1331    Insert a free block into the free list.  The free list is currently
1332    a single linked list sorted in ascending order of size.  Since the
1333    allocator returns the first match it sees this means that it will
1334    always allocate the smallest suitable block.  Experimentation is in
1335    order, but the rationale here is that allocating a too-large block
1336    requires i/o to stably fragment the block and again to coalesce
1337    adjacent free blocks later, while allocation of a suitably small block
1338    is free of i/o.  The cost of stably changing the disk is such that
1339    overhead of a) maintaining the list sorted and b) potentially having
1340    to link through many small blocks to find a big one is deemed
1341    acceptable.  *** experiment.
1342*/
1343
1344private void free_block(dsh,ptr)
1345dsm_handle     dsh;    /* data store handle */
1346block_t        *ptr;    /* block to free */
1347{
1348    block_t *p,*pp;     /* list traversers */
1349
1350    pp = NULL;                          /* haven't seen any blocks yet */
1351    for (p = dsh->freelist; p != NULL; p = p->link) {   /* look at each block on free list */
1352        if (ptr->size <= p->size) break; /* if new block is smaller than this one, insert here */
1353        pp = p;
1354    }
1355
1356    if (pp) pp->link = ptr;             /* in the list link previous node around */
1357    else dsh->freelist = ptr;           /* inserting at beginning of list, replace freelist */
1358
1359    ptr->link = p;                      /* followed by larger block or NULL at EOL */
1360} /* free_block */
1361
1362/** free_map
1363
1364    Given a pointer to a memory map list, frees all associated memory (both
1365    mapped memory and the map records themselves) in that list.
1366*/
1367
1368private void free_map(m)
1369file_map_t *m;
1370{
1371    if (m == NULL) return;          /* all done for an empty list */
1372    if (m->link) free_map(m->link); /* first free the tail of the list if any */
1373
1374    free(m->ptr);                   /* free mapped memory */
1375    free(m);                        /* free the map record */
1376} /* free_map */
1377
1378/** coalesce
1379
1380    Traverses the file in order merging adjacent free blocks into single larger
1381    free blocks.  Rebuilds the free list in the process.  Two adjacent blocks
1382    are merged by simply incrementing the size of the first one to subsume the
1383    second.  Note that coalescing does not occur across allocation chunks; each
1384    chunk is separately coalesced.
1385*/
1386
1387private void coalesce(dsh, st)
1388dsm_handle      dsh;  /* data store handle */
1389error_status_t *st;
1390{
1391    file_map_t *map;        /* for traversing chunk list */
1392    block_t    *this,*next; /* current block, next block */
1393
1394    CLEANUP {
1395        return;
1396    }
1397    CLEAR_ST;
1398
1399    /* if there are any pending allocations (allocated records that have not
1400       been committed with a write (or free), then we should not coalesce,
1401       as we could destroy those pending allocations.
1402    */
1403    if (dsh->pending > 0) return;
1404
1405    dsh->freelist = NULL;   /* clear existing freelist */
1406    cache_clear(dsh);       /* clear dsm_read cache */
1407    dsh->coalesced = true;  /* can't play cache tricks anymore */
1408
1409    for (map = dsh->map; map != NULL; map = map->link) {    /* for each file chunk */
1410
1411        /*  The idea is this: consider this block and the next one.  If
1412            they're both free, join them by increasing this block's size,
1413            and iterate with a new larger 'this', looking at the new
1414            'next'.  If they're not both free, iterate with 'next' as the
1415            current block, after adding 'this' to the free list (it's as
1416            big as it's going to get).
1417        */
1418        this = map->ptr;                                /* start with 1st block in chunk */
1419        for (;;) {                                      /* until explicit break at end of chunk */
1420            next = (block_t *) (((char *)this)+this->size+PREHEADER);   /* find next block */
1421            if ((unsigned)((char *)next-(char *)map->ptr) >= map->size) {   /* is this the last block? */
1422                if (this->isfree) free_block(dsh,this); /* add it to free list if free */
1423                break;                                  /* finished with this chunk */
1424            }
1425            if (this->isfree && next->isfree) {         /* if this and next are both free */
1426                this->size += next->size + PREHEADER;   /* join the two */
1427                write_header(dsh, this, st);            /* stably */
1428                PROP_BAD_ST;
1429                next->link = NULL;                      /* avoid dangling pointers */
1430            }
1431            else {                                      /* else move on */
1432                if (this->isfree) free_block(dsh,this); /* if this is free, add to free list */
1433                this = next;                            /* iterate looking at next */
1434            }
1435        }
1436    }
1437    CLEAR_ST;
1438} /* coalesce */
1439
1440/** build_freelist
1441
1442    Called after opening a data store file this traverses the file building
1443    the free list.
1444*/
1445
1446private void build_freelist(dsh)
1447dsm_handle   dsh;  /* data store handle */
1448{
1449    file_map_t *map;        /* for traversing chunk list */
1450    block_t    *this,*next; /* current block, next block */
1451
1452    dsh->freelist = NULL;   /* clear existing freelist */
1453
1454    for (map = dsh->map; map != NULL; map = map->link) {    /* for each file chunk */
1455
1456        this = map->ptr;                                /* start with 1st block in chunk */
1457        for (;;) {                                      /* until explicit break at end of chunk */
1458            if (this->isfree) free_block(dsh,this); /* if this is free, add to free list */
1459
1460            next = (block_t *) (((char *)this)+this->size+PREHEADER);   /* find next block */
1461            if ((unsigned)((char *)next-(char *)map->ptr) >= map->size) break;   /* is this the last block? */
1462
1463            this = next;                            /* iterate looking at next */
1464        }
1465    }
1466} /* build_freelist */
1467
1468/** get_next_block
1469
1470    Returns (a pointer to) the next non-free block after the one given, or NULL
1471    if there are none.
1472*/
1473
1474private block_t *get_next_block(dsh,this)
1475dsm_handle     dsh;
1476block_t        *this;
1477{
1478    file_map_t     *map;        /* for traversing chunk list */
1479    block_t        *next;       /* next block */
1480    unsigned long   next_loc;   /* file location of next block */
1481    unsigned long   offset;     /* offset within a chunk */
1482
1483    next_loc = this->loc+PREHEADER+this->size;              /* where next block is in the file */
1484
1485    for (map = dsh->map; map != NULL; map = map->link) {    /* for each file chunk */
1486        while (map->loc+map->size > next_loc) {             /* is this the one? */
1487            offset = next_loc-map->loc;                     /* compute position within chunk */
1488            next = (block_t *) (((char *)map->ptr)+offset); /* develop pointer */
1489            if (!next->isfree) return next;                 /* if it's free, take it */
1490            next_loc += PREHEADER+next->size;               /* compute next block after this */
1491        }
1492    }
1493
1494    return NULL;    /* ran off the end without finding it */
1495
1496} /* get_next_block */
1497
1498/** block_from_ptr
1499
1500    Given a pointer from a user (i.e. what ought to be a pointer to the
1501    user data in a block), return a pointer to the block it's in, after
1502    basic validation.  Caller should have a cleanup handler in place to
1503    catch possible memory faults as we attempt the dereference.
1504*/
1505
1506private block_t *block_from_ptr(ptr, st)
1507void *  ptr;    /* user pointer */
1508error_status_t *st;
1509{
1510    block_t *p; /* the block preheader */
1511
1512    CLEANUP {
1513        return NULL;
1514    }
1515
1516    if (ptr == NULL) SIGNAL(dsm_err_invalid_pointer);  /* basic sanity check */
1517
1518    p = (block_t *)(((char *)ptr)-PREHEADER);  /* calculate block pointer */
1519
1520    if (p->cookie != HDR_COOKIE) SIGNAL(dsm_err_invalid_pointer);  /* check magic cookie */
1521
1522    CLEAR_ST;
1523    return p;           /* got it */
1524
1525} /* block_from_ptr */
1526
1527/** get_earlier_block
1528
1529    Given a file location (marker), return a pointer to a valid block whose
1530    location is earlier (so we can traverse valid blocks looking for the one
1531    given which lost on a cache miss).  There are numerous heuristics we
1532    could use to find the best (i.e. closest to the marker) block.  As of
1533    now though we just return the first block.
1534*/
1535
1536private block_t *get_earlier_block(dsh,marker)
1537dsm_handle     dsh;
1538dsm_marker_t   marker ATTRIBUTE_UNUSED;
1539{
1540    if (dsh->map) return dsh->map->ptr; /* if any allocated memory use 1st block */
1541    else return NULL;                   /* otherwise, no go: NULL */
1542} /* get_earlier_block */
1543
1544/*  cache handling
1545
1546    We maintain, per database, a cache of location/pointer values for use
1547    by dsm_read.  The cache is cleared initially and upon coalesce(),
1548    and the location/pointer pair returned by successful dsm_read is
1549    entered in the cache.  The size of the cache depends on the expected
1550    pattern of dsm_read calls.
1551
1552    Currently the cache is trivial: one deep.  This assumes that dsm_read
1553    will be called sequentially to read the database in a single thread,
1554    there won't be multiple tasks simultaneously reading the same database,
1555    and markers to older records won't be saved.  If any of these occur
1556    then the cost of dsm_read will go up, as many records must be scanned
1557    (depending on get_earlier_block()) for each cache miss.
1558
1559    We have a special case for lookups in a recently opened data store - one
1560    that's still in a single memory chunk and which has not been coalesced;
1561    in this case the address can be computed from the chunk address and the
1562    file offset.  (This could be extended beyond a single chunk but since we
1563    always coalesce before allocating a new chunk it avails not).  Once you've
1564    coalesced the boundaries of blocks have changed, and stale pointers or
1565    markers could point other than to the beginning of a block.
1566*/
1567
1568/** cache_clear
1569
1570    Clear all entries in the cache to invalid.
1571*/
1572
1573private void cache_clear(dsh)
1574dsm_handle     dsh;
1575{
1576    dsh->cache.loc = MAGIC_MARKER;
1577} /* cache_clear */
1578
1579/** cache_add
1580
1581    Add an entry to the cache.
1582*/
1583
1584private void cache_add(dsh,p,mkr)
1585dsm_handle     dsh;
1586block_t        *p;
1587dsm_marker_t   mkr;
1588{
1589    dsh->cache.loc = mkr;
1590    dsh->cache.p = p;
1591} /* cache_add */
1592
1593/** cache_lookup
1594
1595    Look up a given location (marker) in the cache; if found, return the
1596    corresponding block pointer, otherwise return NULL.
1597*/
1598
1599private block_t *cache_lookup(dsh,mkr)
1600dsm_handle     dsh;
1601dsm_marker_t   mkr;
1602{
1603    if (dsh->cache.loc == mkr) return dsh->cache.p; /* in the cache? */
1604    else if (!dsh->coalesced && dsh->map != NULL && dsh->map->link == NULL) {
1605        long offset;
1606
1607        offset = mkr - dsh->map->loc;   /* bytes into chunk */
1608        return (block_t *)((char *)dsh->map->ptr + offset);
1609    }
1610    else return NULL;
1611} /* cache_lookup */
1612