buffindexed jumbo patch
Sang-yong Suh
sysuh at kigam.re.kr
Mon Jan 13 06:19:40 UTC 2003
On Wed, Jan 08, 2003 at 07:25:48PM +0900, Katsuhiro Kondou wrote:
>
> In article <20021223011506.GA20736 at cdp.kigam.re.kr>,
> Sang-yong Suh <sysuh at kigam.re.kr> wrote;
>
> } 1. buffindexed.c which is an update to the Kondou's original
> } version 1.40.2.8 2002/01/17, which can be found on
> } inn-STABLE-20021027
>
> Considering the patch will be incorporated in 2.5, could
> you write it against CURRENT? There are some differences
> between STABLE and CURRENT. I modified some to improve
> performance in CURRENT.
I finally finished the patch w.r.t inn-CURRENT-20030107.
During the work, I found a typo on the new ovlock(), and fixed it.
Also, I have revised ovbuffinit_disks() to check disk and shared memory
sync status.
And finally, here is the patch.
--
Sang-yong Suh
diff -uNr inn-CURRENT-20030107/storage/buffindexed/buffindexed.c buffindexed/buffindexed.c
--- inn-CURRENT-20030107/storage/buffindexed/buffindexed.c Mon Nov 18 07:04:35 2002
+++ buffindexed/buffindexed.c Sun Jan 12 22:57:56 2003
@@ -3,6 +3,28 @@
** Overview buffer and index method.
*/
+/*
+** Buffindexed using shared memory on ovbuff by Sang-yong Suh
+**
+** During the recent discussions in inn-workers, Alex Kiernan found
+** that INN LockRange() is not working for MMAPed file. This explains
+** why buffindexed has long outstanding bugs such as "could not MMAP...".
+**
+** This version corrects the file locking error by using shared memory.
+** The bitfield of each buffer file is loaded into memory, and is shared
+** by all programs such as innd, expireover, makehistory, and overchan.
+** The locking problem is handled by semaphore.
+*/
+
+/*
+ * Yes. I know that it violates INN coding style. However, this allows
+ * me to compile this new version without reconfiguring INN.
+ * If all goes well, shmem.c should go to $INN/lib, and shmem.h should
+ * go to $INN/include.
+ */
+#include "shmem.h"
+#include "shmem.c"
+
#include "config.h"
#include "clibrary.h"
#include "portable/mmap.h"
@@ -28,6 +50,15 @@
#include "buffindexed.h"
#define OVBUFF_MAGIC "ovbuff"
+#define OVBUFF_VERSION 2
+
+/*
+** Because ovbuff bitfields are residing in memory, we don't have to
+** do file write for each update. Instead we'll do it at every
+** OVBUFF_SYNC_COUNT updates.
+*/
+#define OVBUFF_SYNC_COUNT (innconf->icdsynccount * 10 + 1)
+/* #define OVBUFF_SYNC_COUNT 1 */
/* ovbuff header */
#define OVBUFFMASIZ 8
@@ -37,9 +68,28 @@
#define OVMAXCYCBUFFNAME 8
-#define OV_HDR_PAGESIZE 16384
+/*
+** The preferred value of shared memory version of buffindexed.
+**
+** OV_HDR_PAGESIZE 4096
+** OV_BEFOREBITF 256
+**
+** Turing on the above change will result no upward compatibility.
+** The benifit is saving (8192 - 256) bytes per buffindexed file.
+** Also it will save additional 16384 byets depending on the round-off
+** effect of the buffer size. BTW, I am not sure if the value 4096
+** is applicable to OSes other than i386 Linux.
+*/
+
+#ifndef KEEP_COMPATIBILTY
+#define OV_HDR_PAGESIZE 4096
+#define OV_BEFOREBITF 256
+#else
+#define OV_HDR_PAGESIZE 16384
+#define OV_BEFOREBITF (1 * OV_BLOCKSIZE)
+#endif
+
#define OV_BLOCKSIZE 8192
-#define OV_BEFOREBITF (1 * OV_BLOCKSIZE)
#define OV_FUDGE 1024
/* ovblock pointer */
@@ -58,13 +108,18 @@
char useda[OVBUFFLASIZ]; /* ASCII version of used */
char freea[OVBUFFLASIZ]; /* ASCII version of free */
char updateda[OVBUFFLASIZ]; /* ASCII version of updated */
+ /*
+ * The following parts will be synced to the bitfield
+ */
+ int version; /* magic version number */
+ unsigned int freeblk; /* next free block number */
+ unsigned int usedblk; /* number of used blocks */
} OVBUFFHEAD;
/* ovbuff info */
typedef struct _OVBUFF {
unsigned int index; /* ovbuff index */
char path[OVBUFFPASIZ]; /* Path to file */
- int magicver; /* Magic version number */
int fd; /* file descriptor for this
ovbuff */
off_t len; /* Length of writable area, in
@@ -80,10 +135,10 @@
header */
void * bitfield; /* Bitfield for ovbuff block in
use */
- bool needflush; /* true if OVBUFFHEAD is needed
- to be flushed */
+ int dirty; /* OVBUFFHEAD dirty count */
struct _OVBUFF *next; /* next ovbuff */
int nextchunk; /* next chunk */
+ smcd_t *smc; /* shared mem control data */
#ifdef OV_DEBUG
struct ov_trace_array *trace;
#endif /* OV_DEBUG */
@@ -253,6 +308,12 @@
static int Gibcount;
#ifdef MMAP_MISSES_WRITES
+#define PWRITE(fd, buf, nbyte, offset) mmapwrite(fd, buf, nbyte, offset)
+#else
+#define PWRITE(fd, buf, nbyte, offset) pwrite(fd, buf, nbyte, offset)
+#endif
+
+#ifdef MMAP_MISSES_WRITES
/* With HP/UX, you definitely do not want to mix mmap-accesses of
a file with read()s and write()s of the same file */
static off_t mmapwrite(int fd, void *buf, off_t nbyte, off_t offset) {
@@ -338,7 +399,7 @@
ovbuff->len = len;
ovbuff->fd = -1;
ovbuff->next = (OVBUFF *)NULL;
- ovbuff->needflush = FALSE;
+ ovbuff->dirty = 0;
ovbuff->bitfield = NULL;
ovbuff->nextchunk = 1;
@@ -483,51 +544,80 @@
}
static void ovreadhead(OVBUFF *ovbuff) {
- OVBUFFHEAD rpx;
- char buff[OVBUFFLASIZ+1];
-
- memcpy(&rpx, ovbuff->bitfield, sizeof(OVBUFFHEAD));
- strncpy((char *)buff, rpx.useda, OVBUFFLASIZ);
- buff[OVBUFFLASIZ] = '\0';
- ovbuff->usedblk = (unsigned int)hex2offt((char *)buff);
- strncpy((char *)buff, rpx.freea, OVBUFFLASIZ);
- buff[OVBUFFLASIZ] = '\0';
- ovbuff->freeblk = (unsigned int)hex2offt((char *)buff);
- return;
+ OVBUFFHEAD *x = (OVBUFFHEAD *)ovbuff->bitfield;
+ ovbuff->freeblk = x->freeblk;
+ ovbuff->usedblk = x->usedblk;
+ return;
}
static void ovflushhead(OVBUFF *ovbuff) {
- OVBUFFHEAD rpx;
+ OVBUFFHEAD rpx;
- if (!ovbuff->needflush)
+ /* skip time consuming data conversion and write call */
+ if (ovbuff->dirty < OVBUFF_SYNC_COUNT) {
+ OVBUFFHEAD *x = (OVBUFFHEAD *)ovbuff->bitfield;
+ x->freeblk = ovbuff->freeblk;
+ x->usedblk = ovbuff->usedblk;
+ return;
+ }
+
+ memset(&rpx, 0, sizeof(OVBUFFHEAD));
+ ovbuff->updated = time(NULL);
+ strncpy(rpx.magic, OVBUFF_MAGIC, strlen(OVBUFF_MAGIC));
+ strncpy(rpx.path, ovbuff->path, OVBUFFPASIZ);
+ /* Don't use sprintf() directly ... the terminating '\0' causes grief */
+ strncpy(rpx.indexa, offt2hex(ovbuff->index, TRUE), OVBUFFLASIZ);
+ strncpy(rpx.lena, offt2hex(ovbuff->len, TRUE), OVBUFFLASIZ);
+ strncpy(rpx.totala, offt2hex(ovbuff->totalblk, TRUE), OVBUFFLASIZ);
+ strncpy(rpx.useda, offt2hex(ovbuff->usedblk, TRUE), OVBUFFLASIZ);
+ strncpy(rpx.freea, offt2hex(ovbuff->freeblk, TRUE), OVBUFFLASIZ);
+ strncpy(rpx.updateda, offt2hex(ovbuff->updated, TRUE), OVBUFFLASIZ);
+ rpx.version = OVBUFF_VERSION;
+ rpx.freeblk = ovbuff->freeblk;
+ rpx.usedblk = ovbuff->usedblk;
+ memcpy(ovbuff->bitfield, &rpx, sizeof(OVBUFFHEAD));
+
+ if( pwrite(ovbuff->fd, ovbuff->bitfield, ovbuff->base, 0) != ovbuff->base )
+ syslog(L_ERROR, "%s: ovflushhead: cant flush on %s: %m",
+ LocalLogName, ovbuff->path);
+ ovbuff->dirty = 0;
return;
- memset(&rpx, 0, sizeof(OVBUFFHEAD));
- ovbuff->updated = time(NULL);
- strncpy(rpx.magic, OVBUFF_MAGIC, strlen(OVBUFF_MAGIC));
- strncpy(rpx.path, ovbuff->path, OVBUFFPASIZ);
- /* Don't use sprintf() directly ... the terminating '\0' causes grief */
- strncpy(rpx.indexa, offt2hex(ovbuff->index, TRUE), OVBUFFLASIZ);
- strncpy(rpx.lena, offt2hex(ovbuff->len, TRUE), OVBUFFLASIZ);
- strncpy(rpx.totala, offt2hex(ovbuff->totalblk, TRUE), OVBUFFLASIZ);
- strncpy(rpx.useda, offt2hex(ovbuff->usedblk, TRUE), OVBUFFLASIZ);
- strncpy(rpx.freea, offt2hex(ovbuff->freeblk, TRUE), OVBUFFLASIZ);
- strncpy(rpx.updateda, offt2hex(ovbuff->updated, TRUE), OVBUFFLASIZ);
- memcpy(ovbuff->bitfield, &rpx, sizeof(OVBUFFHEAD));
- mmap_flush(ovbuff->bitfield, ovbuff->base);
- ovbuff->needflush = FALSE;
- return;
}
static bool ovlock(OVBUFF *ovbuff, enum inn_locktype type) {
- return inn_lock_range(ovbuff->fd, type, true, 0, sizeof(OVBUFFHEAD));
+ int ret;
+ smcd_t *smc = ovbuff->smc;
+
+ if( type == INN_LOCK_WRITE ) {
+ ret = smcGetExclusiveLock( smc );
+ smc->locktype = (int)INN_LOCK_WRITE;
+ } else if( type == INN_LOCK_READ ) {
+ ret = smcGetSharedLock( smc );
+ smc->locktype = (int)INN_LOCK_READ;
+ } else if( smc->locktype == (int)INN_LOCK_WRITE ) {
+ ret = smcReleaseExclusiveLock( smc );
+ } else {
+ ret = smcReleaseSharedLock( smc );
+ }
+ return (ret == 0);
}
static bool ovbuffinit_disks(void) {
OVBUFF *ovbuff = ovbufftab;
char buf[64];
- OVBUFFHEAD *rpx;
+ OVBUFFHEAD *rpx, dpx;
int i, fd;
off_t tmpo;
+ smcd_t *smc;
+ static bool atexit_registered = FALSE;
+
+ /*
+ * Register the exit callback to sync the bitfield to the disk
+ */
+ if( !atexit_registered ) {
+ atexit( buffindexed_close );
+ atexit_registered = TRUE;
+ }
/*
** Discover the state of our ovbuffs. If any of them are in icky shape,
@@ -543,24 +633,41 @@
ovbuff->fd = fd;
}
}
- if ((ovbuff->bitfield =
- mmap(NULL, ovbuff->base, ovbuffmode & OV_WRITE ? (PROT_READ | PROT_WRITE) : PROT_READ,
- MAP_SHARED, ovbuff->fd, (off_t) 0)) == MAP_FAILED) {
- syslog(L_ERROR,
- "%s: ovinitdisks: mmap for %s offset %d len %lu failed: %m",
- LocalLogName, ovbuff->path, 0, ovbuff->base);
- return FALSE;
+
+ /* get shared memory buffer */
+ smc = smcGetShmemBuffer( ovbuff->path, ovbuff->base );
+ if ( !smc ) {
+ /* No shared memory exists, create one. */
+ smc = smcCreateShmemBuffer( ovbuff->path, ovbuff->base );
+ if ( !smc ) {
+ syslog(L_ERROR,
+ "%s: ovinitdisks: cant create shmem for %s len %d: %m",
+ LocalLogName, ovbuff->path, ovbuff->base);
+ return FALSE;
+ }
}
+
+ ovbuff->smc = smc;
+ ovbuff->bitfield = smc->addr;
rpx = (OVBUFFHEAD *)ovbuff->bitfield;
- ovlock(ovbuff, INN_LOCK_WRITE);
- if (strncmp(rpx->magic, OVBUFF_MAGIC, strlen(OVBUFF_MAGIC)) == 0) {
- ovbuff->magicver = 1;
- if (strncmp(rpx->path, ovbuff->path, OVBUFFPASIZ) != 0) {
- syslog(L_ERROR, "%s: Path mismatch: read %s for buffindexed %s",
- LocalLogName, rpx->path, ovbuff->path);
- ovbuff->needflush = TRUE;
- }
- strncpy(buf, rpx->indexa, OVBUFFLASIZ);
+
+ /* lock the buffer */
+ ovlock(ovbuff, ovbuffmode & OV_WRITE ? INN_LOCK_WRITE : INN_LOCK_READ);
+
+ if (pread(ovbuff->fd, &dpx, sizeof(OVBUFFHEAD), 0) < 0) {
+ syslog(L_ERROR, "%s: cant read from %s, %m",
+ LocalLogName, ovbuff->path);
+ ovlock(ovbuff, INN_LOCK_UNLOCK);
+ return FALSE;
+ }
+
+ /*
+ * check validity of the disk data
+ */
+ if (strncmp(dpx.magic, OVBUFF_MAGIC, strlen(OVBUFF_MAGIC)) == 0 &&
+ strncmp(dpx.path, ovbuff->path, OVBUFFPASIZ) == 0 )
+ {
+ strncpy(buf, dpx.indexa, OVBUFFLASIZ);
buf[OVBUFFLASIZ] = '\0';
i = hex2offt(buf);
if (i != ovbuff->index) {
@@ -569,7 +676,7 @@
ovlock(ovbuff, INN_LOCK_UNLOCK);
return FALSE;
}
- strncpy(buf, rpx->lena, OVBUFFLASIZ);
+ strncpy(buf, dpx.lena, OVBUFFLASIZ);
buf[OVBUFFLASIZ] = '\0';
tmpo = hex2offt(buf);
if (tmpo != ovbuff->len) {
@@ -578,18 +685,52 @@
ovlock(ovbuff, INN_LOCK_UNLOCK);
return FALSE;
}
- strncpy(buf, rpx->totala, OVBUFFLASIZ);
+
+ /*
+ * compare shared memory with disk data.
+ */
+ if (strncmp(dpx.magic, rpx->magic, strlen(OVBUFF_MAGIC)) != 0 ||
+ strncmp(dpx.path, rpx->path, OVBUFFPASIZ) != 0 ||
+ strncmp(dpx.indexa, rpx->indexa, OVBUFFLASIZ) != 0 ||
+ strncmp(dpx.lena, rpx->lena, OVBUFFLASIZ) != 0 )
+ {
+ /*
+ * Load shared memory with disk data.
+ */
+ if (pread(ovbuff->fd, rpx, ovbuff->base, 0) < 0) {
+ syslog(L_ERROR, "%s: cant read from %s, %m",
+ LocalLogName, ovbuff->path);
+ ovlock(ovbuff, INN_LOCK_UNLOCK);
+ return FALSE;
+ }
+ }
+ strncpy(buf, dpx.totala, OVBUFFLASIZ);
buf[OVBUFFLASIZ] = '\0';
ovbuff->totalblk = hex2offt(buf);
- strncpy(buf, rpx->useda, OVBUFFLASIZ);
- buf[OVBUFFLASIZ] = '\0';
- ovbuff->usedblk = hex2offt(buf);
- strncpy(buf, rpx->freea, OVBUFFLASIZ);
- buf[OVBUFFLASIZ] = '\0';
- ovbuff->freeblk = hex2offt(buf);
- ovflushhead(ovbuff);
+
+ if ( rpx->version == 0 ) {
+ /* no binary data available. use character data */
+ strncpy(buf, rpx->useda, OVBUFFLASIZ);
+ buf[OVBUFFLASIZ] = '\0';
+ ovbuff->usedblk = hex2offt(buf);
+ strncpy(buf, rpx->freea, OVBUFFLASIZ);
+ buf[OVBUFFLASIZ] = '\0';
+ ovbuff->freeblk = hex2offt(buf);
+ } else {
+ /* use binary data. The first reason is the speed.
+ and the second reason is the other partner is not
+ synced.
+ */
+ ovbuff->usedblk = rpx->usedblk;
+ ovbuff->freeblk = rpx->freeblk;
+ }
Needunlink = FALSE;
} else {
+ /*
+ * Initialize the contents of the shared memory
+ */
+ memset( rpx, 0, ovbuff->base );
+
ovbuff->totalblk = (ovbuff->len - ovbuff->base)/OV_BLOCKSIZE;
if (ovbuff->totalblk < 1) {
syslog(L_ERROR, "%s: too small length '%lu' for buffindexed %s",
@@ -597,11 +738,10 @@
ovlock(ovbuff, INN_LOCK_UNLOCK);
return FALSE;
}
- ovbuff->magicver = 1;
ovbuff->usedblk = 0;
ovbuff->freeblk = 0;
ovbuff->updated = 0;
- ovbuff->needflush = TRUE;
+ ovbuff->dirty = OVBUFF_SYNC_COUNT + 1;
syslog(L_NOTICE,
"%s: No magic cookie found for buffindexed %d, initializing",
LocalLogName, ovbuff->index);
@@ -712,6 +852,7 @@
static OVBUFF *ovbuffnext = NULL;
OVBUFF *ovbuff;
OV ov;
+ bool done = FALSE;
#ifdef OV_DEBUG
int recno;
struct ov_trace_array *trace;
@@ -719,6 +860,13 @@
if (ovbuffnext == NULL)
ovbuffnext = ovbufftab;
+
+ /*
+ * We will try to recover broken overview possibly due to unsync.
+ * The recovering is inactive for OV_DEBUG mode.
+ */
+
+retry:
for (ovbuff = ovbuffnext ; ovbuff != (OVBUFF *)NULL ; ovbuff = ovbuff->next) {
ovlock(ovbuff, INN_LOCK_WRITE);
ovreadhead(ovbuff);
@@ -771,17 +919,33 @@
trace->ov_trace[trace->cur].gloc.recno = recno;
trace->ov_trace[trace->cur].occupied = time(NULL);
#endif /* OV_DEBUG */
+
ov.index = ovbuff->index;
ov.blocknum = ovbuff->freeblk;
+
+#ifndef OV_DEBUG
+ if (ovusedblock(ovbuff, ovbuff->freeblk, FALSE, TRUE)) {
+ syslog(L_NOTICE, "%s: fixing invalid free block(%d, %d).",
+ LocalLogName, ovbuff->index, ovbuff->freeblk);
+ } else
+ done = TRUE;
+#endif /* OV_DEBUG */
+
+ /* mark it as allocated */
ovusedblock(ovbuff, ov.blocknum, TRUE, TRUE);
+
ovnextblock(ovbuff);
ovbuff->usedblk++;
- ovbuff->needflush = TRUE;
+ ovbuff->dirty++;
ovflushhead(ovbuff);
ovlock(ovbuff, INN_LOCK_UNLOCK);
ovbuffnext = ovbuff->next;
if (ovbuffnext == NULL)
ovbuffnext = ovbufftab;
+
+ if( !done )
+ goto retry;
+
return ov;
}
@@ -825,12 +989,20 @@
trace->ov_trace[trace->cur].gloc.recno = recno;
trace->cur++;
#endif /* OV_DEBUG */
+
+#ifndef OV_DEBUG
+ if (!ovusedblock(ovbuff, ov.blocknum, FALSE, FALSE)) {
+ syslog(L_NOTICE, "%s: trying to free block(%d, %d), but already freed.",
+ LocalLogName, ov.index, ov.blocknum);
+ }
+#endif
+
ovusedblock(ovbuff, ov.blocknum, TRUE, FALSE);
ovreadhead(ovbuff);
if (ovbuff->freeblk == ovbuff->totalblk)
ovbuff->freeblk = ov.blocknum;
ovbuff->usedblk--;
- ovbuff->needflush = TRUE;
+ ovbuff->dirty++;
ovflushhead(ovbuff);
ovlock(ovbuff, INN_LOCK_UNLOCK);
return;
@@ -1188,11 +1360,7 @@
ovindexhead.next = ovnull;
ovindexhead.low = 0;
ovindexhead.high = 0;
-#ifdef MMAP_MISSES_WRITES
- if (mmapwrite(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ov.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
-#else
- if (pwrite(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ov.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
-#endif /* MMAP_MISSES_WRITES */
+ if (PWRITE(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ov.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
syslog(L_ERROR, "%s: could not write index record index '%d', blocknum '%d': %m", LocalLogName, ge->curindex.index, ge->curindex.blocknum);
return TRUE;
}
@@ -1201,20 +1369,19 @@
} else {
if ((ovbuff = getovbuff(ge->curindex)) == NULL)
return FALSE;
-#ifdef OV_DEBUG
if (!ovusedblock(ovbuff, ge->curindex.blocknum, FALSE, FALSE)) {
- syslog(L_FATAL, "%s: block(%d, %d) not occupied (index)", LocalLogName, ovbuff->index, ge->curindex.blocknum);
+ syslog(L_ERROR, "%s: block(%d, %d) not occupied (index)", LocalLogName, ovbuff->index, ge->curindex.blocknum);
+#ifdef OV_DEBUG
abort();
- }
+#else /* OV_DEBUG */
+ /* fix it */
+ ovusedblock(ovbuff, ge->curindex.blocknum, TRUE, TRUE);
#endif /* OV_DEBUG */
+ }
ovindexhead.next = ov;
ovindexhead.low = ge->curlow;
ovindexhead.high = ge->curhigh;
-#ifdef MMAP_MISSES_WRITES
- if (mmapwrite(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
-#else
- if (pwrite(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
-#endif /* MMAP_MISSES_WRITES */
+ if (PWRITE(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
syslog(L_ERROR, "%s: could not write index record index '%d', blocknum '%d': %m", LocalLogName, ge->curindex.index, ge->curindex.blocknum);
return FALSE;
}
@@ -1282,18 +1449,18 @@
ge->curdata = ov;
ge->curoffset = 0;
}
-#ifdef OV_DEBUG
if (!ovusedblock(ovbuff, ge->curdata.blocknum, FALSE, FALSE)) {
- syslog(L_FATAL, "%s: block(%d, %d) not occupied", LocalLogName, ovbuff->index, ge->curdata.blocknum);
+ syslog(L_ERROR, "%s: block(%d, %d) not occupied", LocalLogName, ovbuff->index, ge->curdata.blocknum);
+#ifdef OV_DEBUG
buffindexed_close();
abort();
- }
+#else /* OV_DEBUG */
+ /* fix it */
+ ovusedblock(ovbuff, ge->curdata.blocknum, TRUE, TRUE);
#endif /* OV_DEBUG */
-#ifdef MMAP_MISSES_WRITES
- if (mmapwrite(ovbuff->fd, data, len, ovbuff->base + ge->curdata.blocknum * OV_BLOCKSIZE + ge->curoffset) != len) {
-#else
- if (pwrite(ovbuff->fd, data, len, ovbuff->base + ge->curdata.blocknum * OV_BLOCKSIZE + ge->curoffset) != len) {
-#endif /* MMAP_MISSES_WRITES */
+ }
+
+ if (PWRITE(ovbuff->fd, data, len, ovbuff->base + ge->curdata.blocknum * OV_BLOCKSIZE + ge->curoffset) != len) {
syslog(L_ERROR, "%s: could not append overview record index '%d', blocknum '%d': %m", LocalLogName, ge->curdata.index, ge->curdata.blocknum);
return FALSE;
}
@@ -1319,18 +1486,17 @@
}
if ((ovbuff = getovbuff(ge->curindex)) == NULL)
return FALSE;
-#ifdef OV_DEBUG
if (!ovusedblock(ovbuff, ge->curindex.blocknum, FALSE, FALSE)) {
- syslog(L_FATAL, "%s: block(%d, %d) not occupied (index)", LocalLogName, ovbuff->index, ge->curindex.blocknum);
+ syslog(L_ERROR, "%s: block(%d, %d) not occupied (index)", LocalLogName, ovbuff->index, ge->curindex.blocknum);
+#ifdef OV_DEBUG
buffindexed_close();
abort();
- }
+#else /* OV_DEBUG */
+ /* fix this */
+ ovusedblock(ovbuff, ge->curindex.blocknum, TRUE, TRUE);
#endif /* OV_DEBUG */
-#ifdef MMAP_MISSES_WRITES
- if (mmapwrite(ovbuff->fd, &ie, sizeof(ie), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE + sizeof(OVINDEXHEAD) + sizeof(ie) * ge->curindexoffset) != sizeof(ie)) {
-#else
- if (pwrite(ovbuff->fd, &ie, sizeof(ie), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE + sizeof(OVINDEXHEAD) + sizeof(ie) * ge->curindexoffset) != sizeof(ie)) {
-#endif /* MMAP_MISSES_WRITES */
+ }
+ if (PWRITE(ovbuff->fd, &ie, sizeof(ie), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE + sizeof(OVINDEXHEAD) + sizeof(ie) * ge->curindexoffset) != sizeof(ie)) {
syslog(L_ERROR, "%s: could not write index record index '%d', blocknum '%d': %m", LocalLogName, ge->curindex.index, ge->curindex.blocknum);
return TRUE;
}
@@ -1346,11 +1512,7 @@
ovindexhead.next = ovnull;
ovindexhead.low = ge->curlow;
ovindexhead.high = ge->curhigh;
-#ifdef MMAP_MISSES_WRITES
- if (mmapwrite(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
-#else
- if (pwrite(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
-#endif /* MMAP_MISSES_WRITES */
+ if (PWRITE(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
syslog(L_ERROR, "%s: could not write index record index '%d', blocknum '%d': %m", LocalLogName, ge->curindex.index, ge->curindex.blocknum);
return TRUE;
}
@@ -1945,8 +2107,10 @@
GROUPlock(gloc, INN_LOCK_WRITE);
ge = &GROUPentries[gloc.recno];
if (ge->count == 0) {
+ if (ge->low < ge->high)
+ ge->low = ge->high;
if (lo)
- *lo = ge->low;
+ *lo = ge->low + 1;
ge->expired = time(NULL);
GROUPlock(gloc, INN_LOCK_UNLOCK);
return TRUE;
@@ -1996,10 +2160,12 @@
newge.low = newge.high;
*ge = newge;
if (lo) {
- if (ge->count == 0)
+ if (ge->count == 0) {
+ if (ge->low < ge->high)
+ ge->low = ge->high;
/* lomark should be himark + 1, if no article for the group */
*lo = ge->low + 1;
- else
+ } else
*lo = ge->low;
}
ovclosesearch(handle, TRUE);
@@ -2148,7 +2314,14 @@
return;
}
}
+
+ /* sync the bit field */
+ ovbuff = ovbufftab;
for (; ovbuff != (OVBUFF *)NULL; ovbuff = ovbuffnext) {
+ if (ovbuff->dirty) {
+ ovbuff->dirty = OVBUFF_SYNC_COUNT + 1;
+ ovflushhead( ovbuff );
+ }
ovbuffnext = ovbuff->next;
DISPOSE(ovbuff);
}
diff -uNr inn-CURRENT-20030107/storage/buffindexed/shmem.c buffindexed/shmem.c
--- inn-CURRENT-20030107/storage/buffindexed/shmem.c Thu Jan 1 09:00:00 1970
+++ buffindexed/shmem.c Thu Jan 9 11:02:38 2003
@@ -0,0 +1,377 @@
+/*
+** $Id$
+*/
+
+/* shared memory control utility */
+
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/ipc.h>
+#include <sys/sem.h>
+#include <sys/shm.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <assert.h>
+#include <signal.h>
+#include <syslog.h>
+#include <errno.h>
+
+#include "shmem.h"
+
+#ifndef MAP_FAILED
+ #define MAP_FAILED ((caddr_t)-1)
+#endif
+
+#ifndef L_NOTICE
+#define L_NOTICE LOG_NOTICE
+#endif
+
+#ifndef L_ERROR
+#define L_ERROR LOG_ERR
+#endif
+
+#ifndef L_FATAL
+#define L_FATAL LOG_CRIT
+#endif
+
+static int smcGetSemaphore(const char *name)
+{
+ key_t kt = ftok( (char *)name, 0 );
+ int id = semget(kt, 0, S_IRWXU|S_IRWXG|S_IRWXO);
+
+ if (id < 0) {
+ syslog( L_ERROR, "semget failed to get semaphore for %s: %m", name );
+ }
+ return id;
+}
+
+static int smcCreateSemaphore(const char *name)
+{
+ key_t kt = ftok( (char *)name, 0 );
+ int id = semget(kt, 2, IPC_CREAT|S_IRWXU|S_IRWXG|S_IRWXO);
+
+ if (id < 0) {
+ if (errno == EACCES || errno == EINVAL) {
+ /* looks like a wrong semaphore exists. remove it. */
+ id = semget(kt, 0, S_IRWXU|S_IRWXG|S_IRWXO);
+ if (id < 0) {
+ /* couldn't even retrieve it. */
+ syslog( L_ERROR, "cant get semaphore using %s", name );
+ return id;
+ }
+ /* try to remove it */
+#ifdef SEMCTL_NEEDS_UNION
+ {
+ union semun semArg;
+ semArg.val = 1;
+ if (semctl(id, 0, IPC_RMID, semArg) < 0) {
+ syslog( L_FATAL, "cant remove semaphore %s", name );
+ exit(1);
+ }
+ }
+#else
+ if (semctl(id, 0, IPC_RMID, NULL) < 0) {
+ syslog( L_FATAL, "cant remove semaphore %s", name );
+ exit(1);
+ }
+#endif
+ /* and retry creating it */
+ id = semget(kt, 2, IPC_CREAT|S_IRWXU|S_IRWXG|S_IRWXO);
+ }
+ }
+ if (id < 0) {
+ syslog( L_ERROR, "cant create semaphore using %s", name );
+ }
+ return id;
+}
+
+int smcGetExclusiveLock(smcd_t *this)
+{
+ struct sembuf sops[3] = {
+ {0, 0, SEM_UNDO}, /* wait for exclusive lock. */
+ {0, 1, SEM_UNDO}, /* lock */
+ {1, 0, SEM_UNDO} /* wait for shared lock */
+ };
+
+ /* Get a lock for the buffer. Try again if it fails because our
+ SIGHUP may interrupt this semop() call */
+ if (semop(this->semap, sops, 3) < 0 &&
+ semop(this->semap, sops, 3) < 0)
+ {
+ syslog( L_ERROR, "semop failed to getExclusiveLock: %m" );
+ return(-1);
+ }
+ return(0);
+}
+
+int smcGetSharedLock(smcd_t *this)
+{
+ struct sembuf sops[2] = {
+ {0, 0, SEM_UNDO}, /* wait for exclusive lock. */
+ {1, 1, SEM_UNDO} /* increase access count */
+ };
+
+ /* Get a lock for the buffer. Try again if it fails because our
+ SIGHUP may interrupt this semop() call */
+ if (semop(this->semap, sops, 2) < 0 &&
+ semop(this->semap, sops, 2) < 0)
+ {
+ syslog( L_ERROR, "semop failed to getSharedLock: %m" );
+ return(-1);
+ }
+ return(0);
+}
+
+int smcReleaseSharedLock(smcd_t *this)
+{
+ struct sembuf sops = { 1, -1, SEM_UNDO|IPC_NOWAIT };
+
+ /* Release the lock */
+ if (semop(this->semap, &sops, 1) < 0) {
+ syslog( L_ERROR, "semop failed to release shared lock: %m" );
+ return(-1);
+ }
+ return(0);
+}
+
+int smcReleaseExclusiveLock(smcd_t *this)
+{
+ struct sembuf sops = { 0, -1, SEM_UNDO|IPC_NOWAIT };
+
+ /* Release the lock */
+ if (semop(this->semap, &sops, 1) < 0) {
+ syslog( L_ERROR, "semop failed to release exclusive lock: %m" );
+ return(-1);
+ }
+ return(0);
+}
+
+/*
+** Get an existing shared memory buffer
+*/
+smcd_t* smcGetShmemBuffer(const char *name, int size)
+{
+ int shmid, semap;
+ smcd_t *this;
+ caddr_t addr;
+ key_t fk = ftok( (char *)name, 0 );
+
+ /* create shared memory buffer */
+ shmid = shmget(fk, size, S_IRWXU|S_IRGRP|S_IROTH);
+ if (shmid < 0) {
+ /* this is normal */
+ return NULL;
+ }
+
+ /* attach to shared memory buffer */
+ if ((addr = (caddr_t)shmat(shmid,0,0)) == MAP_FAILED) {
+ syslog( L_ERROR, "cant attach shared memory" );
+ if (shmctl(shmid, IPC_RMID, 0) < 0) {
+ syslog( L_ERROR, "cant remove shared memory" );
+ }
+ return NULL;
+ }
+
+ /* Get control semaphore */
+ if ((semap = smcGetSemaphore(name)) < 0) {
+ syslog( L_ERROR, "failed to get semaphore for key %s", name );
+ if (shmdt(addr) < 0)
+ syslog( L_ERROR, "cant detatch shared memory" );
+ if (shmctl(shmid, IPC_RMID, 0) < 0)
+ syslog( L_ERROR, "cant remove shared memory" );
+ return NULL;
+ }
+
+ this = malloc( sizeof(smcd_t) );
+ this->addr = addr;;
+ this->size = size;
+ this->shmid = shmid;
+ this->semap = semap;
+
+#if 0
+ /* This makes news log file huge */
+ syslog( L_NOTICE, "got shmid %d semap %d addr %8.8x size %lu",
+ shmid, semap, addr, size );
+#endif
+ return this;
+}
+
+/*
+** Create a shared memory buffer
+*/
+smcd_t* smcCreateShmemBuffer(const char *name, int size)
+{
+ int shmid, semap;
+ smcd_t *this;
+ caddr_t addr;
+ key_t fk = ftok( (char *)name, 0 );
+
+ /* create shared memory buffer */
+ shmid = shmget(fk, size, IPC_CREAT|S_IRWXU|S_IRGRP|S_IROTH);
+ if (shmid < 0) {
+ /* try to get existing segment */
+ shmid = shmget(fk, 4, S_IRWXU|S_IRGRP|S_IROTH);
+ if (shmid >= 0) {
+ syslog( L_ERROR, "shmem segment already exists name %s", name );
+ /* try to delete old segment */
+ if (shmctl(shmid, IPC_RMID, 0) < 0) {
+ syslog( L_ERROR, "cant delete old memory segment" );
+ return NULL;
+ }
+ syslog( L_NOTICE, "recreating another shmem segment." );
+ shmid = shmget(fk, size, IPC_CREAT|S_IRWXU|S_IRGRP|S_IROTH);
+ }
+ }
+ if (shmid < 0) {
+ syslog( L_ERROR, "cant create shared memory segment" );
+ return NULL;
+ }
+
+ /* attach to shared memory buffer */
+ if ((addr = (caddr_t)shmat(shmid,0,0)) == MAP_FAILED) {
+ syslog( L_ERROR, "cant attach shared memory" );
+ if (shmctl(shmid, IPC_RMID, 0) < 0) {
+ syslog( L_ERROR, "cant remove shared memory" );
+ }
+ return NULL;
+ }
+ /* clear the data */
+ memset( addr, 0, size );
+
+ /* Create control semaphore */
+ if ((semap = smcCreateSemaphore(name)) < 0) {
+ syslog( L_ERROR, "failed to create semaphore for %s", name );
+ if (shmdt(addr) < 0)
+ syslog( L_ERROR, "cant detatch shared memory" );
+ if (shmctl(shmid, IPC_RMID, 0) < 0)
+ syslog( L_ERROR, "cant remove shared memory" );
+ return NULL;
+ }
+
+ this = malloc( sizeof(smcd_t) );
+ this->addr = addr;
+ this->size = size;
+ this->shmid = shmid;
+ this->semap = semap;
+
+ syslog( L_NOTICE, "created shmid %d semap %d addr %8.8x size %lu",
+ shmid, semap, addr, size );
+
+ return this;
+}
+
+void smcClose( smcd_t *this )
+{
+ struct shmid_ds buf;
+
+ if (this->addr != MAP_FAILED) {
+ /* detach shared memory segment */
+ if (shmdt(this->addr) < 0) {
+ syslog( L_ERROR, "cant detach shared memory segment: %m" );
+ }
+ this->addr = MAP_FAILED;
+ }
+
+ /* delete shm if no one has attached it */
+ if ( shmctl(this->shmid, IPC_STAT, &buf) < 0) {
+ syslog( L_ERROR, "cant stat shmid %s", this->shmid );
+ } else if ( buf.shm_nattch == 0 ) {
+ if (shmctl(this->shmid, IPC_RMID, 0) < 0) {
+ syslog( L_ERROR, "cant delete shmid %d", this->shmid );
+ } else {
+ syslog( L_NOTICE, "shmid %d deleted", this->shmid );
+ }
+ }
+ free( this );
+}
+
+#ifdef _TEST_
+
+/* Check if the testfile exists.
+ If the file is absent
+ create one with size 1M, and fill the contents with all zero.
+ for (i=0; i<100; i++)
+ add 1 to the content;
+*/
+static const char* testfile = "testfile";
+#define TESTSIZE ( 1024 * 1024 )
+#define MAXCOUNT 100
+
+static smcd_t *this;
+static void myexit( void )
+{
+ if( this ) {
+ smcClose( this );
+ }
+}
+
+int main( int argc, char** argv )
+{
+ struct stat st;
+ int fd, i, k;
+ int *x;
+ int len, xmin, xmax;
+ struct flock fl;
+
+ atexit( myexit );
+ openlog( "shmemtest", LOG_PID, LOG_DAEMON );
+
+ /* open the testfile */
+ fd = creat(testfile, 0660);
+ if( fd < 0 ) {
+ printf( "cant open %s", testfile );
+ exit(1);
+ }
+
+ /* lock the file */
+ if( flock( fd, LOCK_EX ) < 0 ) {
+ printf( "cant get flock" );
+ exit(1);
+ }
+
+ /* try to get shared memory buffer */
+ this = smcGetShmemBuffer(testfile, TESTSIZE);
+ if( !this ) {
+ /* because there's no shared memory, create one. */
+ this = smcCreateShmemBuffer(testfile, TESTSIZE);
+ if( !this ) {
+ printf( "cant create shmem buffer" );
+ exit(1);
+ }
+ }
+
+ /* unlock the file */
+ if( flock( fd, LOCK_UN ) < 0 ) {
+ printf( "cant unflock %s", testfile );
+ exit(1);
+ }
+
+ x = (int *)this->addr;
+ len = this->size / sizeof(int);
+ for( k=0; k<MAXCOUNT; k++ ) {
+ if( smcGetExclusiveLock(this) < 0 ) {
+ printf( "cant get exclusive lock" );
+ exit(1);
+ }
+ for( i=0; i<len; i++)
+ x[i] += 1;
+ if( write(fd, this->addr, this->size) != this->size ) {
+ printf( "cant write" );
+ exit(1);
+ }
+ if( smcReleaseExclusiveLock( this ) ) {
+ printf( "cant release exclusive lock" );
+ exit(1);
+ }
+ }
+ /* write the minimum and maximum */
+ xmin = xmax = x[0];
+ for( i=1; i<len; i++ ) {
+ if( x[i] < xmin ) xmin = x[i];
+ if( x[i] > xmax ) xmax = x[i];
+ }
+ printf( "min %d max %d\n", xmin, xmax );
+
+ return(0);
+}
+#endif /* _TEST_ */
diff -uNr inn-CURRENT-20030107/storage/buffindexed/shmem.h buffindexed/shmem.h
--- inn-CURRENT-20030107/storage/buffindexed/shmem.h Thu Jan 1 09:00:00 1970
+++ buffindexed/shmem.h Thu Jan 9 11:02:38 2003
@@ -0,0 +1,26 @@
+/*
+** shared memory control utility
+*/
+
+#ifndef SHMEM_H
+#define SHMEM_H
+
+#include <sys/types.h>
+
+typedef struct {
+ caddr_t addr; /* attached shared memory address */
+ size_t size; /* size of the shared memory */
+ int shmid; /* shared memory segment id */
+ int semap; /* semaphore id */
+ int locktype; /* current lock type */
+} smcd_t;
+
+int smcGetExclusiveLock(smcd_t *this);
+int smcGetSharedLock(smcd_t *this);
+int smcReleaseSharedLock(smcd_t *this);
+int smcReleaseExclusiveLock(smcd_t *this);
+smcd_t* smcGetShmemBuffer(const char *name, int mapSize);
+smcd_t* smcCreateShmemBuffer(const char *name, int mapSize);
+void smcClose( smcd_t *this );
+
+#endif /* SHMEM_H */
More information about the inn-patches
mailing list