buffindexed jumbo patch

Sang-yong Suh sysuh at kigam.re.kr
Mon Jan 13 06:19:40 UTC 2003



On Wed, Jan 08, 2003 at 07:25:48PM +0900, Katsuhiro Kondou wrote:
> 
> In article <20021223011506.GA20736 at cdp.kigam.re.kr>,
> 	Sang-yong Suh <sysuh at kigam.re.kr> wrote;
> 
> } 1. buffindexed.c which is an update to the Kondou's original
> }    version 1.40.2.8 2002/01/17, which can be found on
> }    inn-STABLE-20021027
> 
> Considering the patch will be incorporated in 2.5, could
> you write it against CURRENT?  There are some differences
> between STABLE and CURRENT.  I modified some to improve
> performance in CURRENT.

I finally finished the patch w.r.t inn-CURRENT-20030107.
During the work, I found a typo on the new ovlock(), and fixed it.
Also, I have revised ovbuffinit_disks() to check disk and shared memory
sync status.

And finally, here is the patch.
--
Sang-yong Suh

diff -uNr inn-CURRENT-20030107/storage/buffindexed/buffindexed.c buffindexed/buffindexed.c
--- inn-CURRENT-20030107/storage/buffindexed/buffindexed.c	Mon Nov 18 07:04:35 2002
+++ buffindexed/buffindexed.c	Sun Jan 12 22:57:56 2003
@@ -3,6 +3,28 @@
 **  Overview buffer and index method.
 */
 
+/*
+** Buffindexed using shared memory on ovbuff by Sang-yong Suh
+**
+** During the recent discussions in inn-workers, Alex Kiernan found
+** that INN LockRange() is not working for MMAPed file.  This explains
+** why buffindexed has long outstanding bugs such as "could not MMAP...".
+**
+** This version corrects the file locking error by using shared memory.
+** The bitfield of each buffer file is loaded into memory, and is shared
+** by all programs such as innd, expireover, makehistory, and overchan.
+** The locking problem is handled by semaphore.
+*/
+
+/*
+ * Yes. I know that it violates INN coding style. However, this allows
+ * me to compile this new version without reconfiguring INN.
+ * If all goes well, shmem.c should go to $INN/lib, and shmem.h should
+ * go to $INN/include.
+ */
+#include "shmem.h"
+#include "shmem.c"
+
 #include "config.h"
 #include "clibrary.h"
 #include "portable/mmap.h"
@@ -28,6 +50,15 @@
 #include "buffindexed.h"
 
 #define	OVBUFF_MAGIC	"ovbuff"
+#define OVBUFF_VERSION	2
+
+/*
+** Because ovbuff bitfields are residing in memory, we don't have to
+** do file write for each update.  Instead we'll do it at every
+** OVBUFF_SYNC_COUNT updates.
+*/
+#define OVBUFF_SYNC_COUNT	(innconf->icdsynccount * 10 + 1)
+/* #define OVBUFF_SYNC_COUNT	1 */
 
 /* ovbuff header */
 #define	OVBUFFMASIZ	8
@@ -37,9 +68,28 @@
 
 #define	OVMAXCYCBUFFNAME	8
 
-#define	OV_HDR_PAGESIZE	16384
+/*
+** The preferred value of shared memory version of buffindexed.
+**
+** OV_HDR_PAGESIZE	4096
+** OV_BEFOREBITF	256
+**
+** Turing on the above change will result no upward compatibility.
+** The benifit is saving (8192 - 256) bytes per buffindexed file.
+** Also it will save additional 16384 byets depending on the round-off
+** effect of the buffer size.  BTW, I am not sure if the value 4096
+** is applicable to OSes other than i386 Linux.
+*/
+ 
+#ifndef  KEEP_COMPATIBILTY
+#define OV_HDR_PAGESIZE 4096
+#define OV_BEFOREBITF   256
+#else
+#define OV_HDR_PAGESIZE 16384
+#define OV_BEFOREBITF   (1 * OV_BLOCKSIZE)
+#endif
+
 #define	OV_BLOCKSIZE	8192
-#define	OV_BEFOREBITF	(1 * OV_BLOCKSIZE)
 #define	OV_FUDGE	1024
 
 /* ovblock pointer */
@@ -58,13 +108,18 @@
   char	useda[OVBUFFLASIZ];	/* ASCII version of used */
   char	freea[OVBUFFLASIZ];	/* ASCII version of free */
   char	updateda[OVBUFFLASIZ];	/* ASCII version of updated */
+  /*
+   * The following parts will be synced to the bitfield
+   */
+  int          version;         /* magic version number */
+  unsigned int freeblk;         /* next free block number */
+  unsigned int usedblk;         /* number of used blocks */
 } OVBUFFHEAD;
 
 /* ovbuff info */
 typedef struct _OVBUFF {
   unsigned int		index;			/* ovbuff index */
   char			path[OVBUFFPASIZ];	/* Path to file */
-  int			magicver;		/* Magic version number */
   int			fd;			/* file descriptor for this
 						   ovbuff */
   off_t 		len;			/* Length of writable area, in
@@ -80,10 +135,10 @@
 						   header */
   void *		bitfield;		/* Bitfield for ovbuff block in
 						   use */
-  bool			needflush;		/* true if OVBUFFHEAD is needed
-						   to be flushed */
+  int			dirty;			/* OVBUFFHEAD dirty count */
   struct _OVBUFF	*next;			/* next ovbuff */
   int			nextchunk;		/* next chunk */
+  smcd_t                *smc;                   /* shared mem control data */
 #ifdef OV_DEBUG
   struct ov_trace_array	*trace;
 #endif /* OV_DEBUG */
@@ -253,6 +308,12 @@
 static int	Gibcount;
 
 #ifdef MMAP_MISSES_WRITES
+#define PWRITE(fd, buf, nbyte, offset)  mmapwrite(fd, buf, nbyte, offset)
+#else
+#define PWRITE(fd, buf, nbyte, offset)  pwrite(fd, buf, nbyte, offset)
+#endif
+
+#ifdef MMAP_MISSES_WRITES
 /* With HP/UX, you definitely do not want to mix mmap-accesses of
    a file with read()s and write()s of the same file */
 static off_t mmapwrite(int fd, void *buf, off_t nbyte, off_t offset) {
@@ -338,7 +399,7 @@
   ovbuff->len = len;
   ovbuff->fd = -1;
   ovbuff->next = (OVBUFF *)NULL;
-  ovbuff->needflush = FALSE;
+  ovbuff->dirty = 0;
   ovbuff->bitfield = NULL;
   ovbuff->nextchunk = 1;
 
@@ -483,51 +544,80 @@
 }
 
 static void ovreadhead(OVBUFF *ovbuff) {
-  OVBUFFHEAD	rpx;
-  char		buff[OVBUFFLASIZ+1];
-
-  memcpy(&rpx, ovbuff->bitfield, sizeof(OVBUFFHEAD));
-  strncpy((char *)buff, rpx.useda, OVBUFFLASIZ);
-  buff[OVBUFFLASIZ] = '\0';
-  ovbuff->usedblk = (unsigned int)hex2offt((char *)buff);
-  strncpy((char *)buff, rpx.freea, OVBUFFLASIZ);
-  buff[OVBUFFLASIZ] = '\0';
-  ovbuff->freeblk = (unsigned int)hex2offt((char *)buff);
-  return;
+    OVBUFFHEAD *x = (OVBUFFHEAD *)ovbuff->bitfield;
+    ovbuff->freeblk = x->freeblk;
+    ovbuff->usedblk = x->usedblk;
+    return;
 }
 
 static void ovflushhead(OVBUFF *ovbuff) {
-  OVBUFFHEAD	rpx;
+    OVBUFFHEAD	rpx;
 
-  if (!ovbuff->needflush)
+    /* skip time consuming data conversion and write call */
+    if (ovbuff->dirty < OVBUFF_SYNC_COUNT) {
+        OVBUFFHEAD *x = (OVBUFFHEAD *)ovbuff->bitfield;
+        x->freeblk = ovbuff->freeblk;
+        x->usedblk = ovbuff->usedblk;
+        return;
+    }
+ 
+    memset(&rpx, 0, sizeof(OVBUFFHEAD));
+    ovbuff->updated = time(NULL);
+    strncpy(rpx.magic, OVBUFF_MAGIC, strlen(OVBUFF_MAGIC));
+    strncpy(rpx.path, ovbuff->path, OVBUFFPASIZ);
+    /* Don't use sprintf() directly ... the terminating '\0' causes grief */
+    strncpy(rpx.indexa, offt2hex(ovbuff->index, TRUE), OVBUFFLASIZ);
+    strncpy(rpx.lena, offt2hex(ovbuff->len, TRUE), OVBUFFLASIZ);
+    strncpy(rpx.totala, offt2hex(ovbuff->totalblk, TRUE), OVBUFFLASIZ);
+    strncpy(rpx.useda, offt2hex(ovbuff->usedblk, TRUE), OVBUFFLASIZ);
+    strncpy(rpx.freea, offt2hex(ovbuff->freeblk, TRUE), OVBUFFLASIZ);
+    strncpy(rpx.updateda, offt2hex(ovbuff->updated, TRUE), OVBUFFLASIZ);
+    rpx.version = OVBUFF_VERSION;
+    rpx.freeblk = ovbuff->freeblk;
+    rpx.usedblk = ovbuff->usedblk;
+    memcpy(ovbuff->bitfield, &rpx, sizeof(OVBUFFHEAD));
+
+    if( pwrite(ovbuff->fd, ovbuff->bitfield, ovbuff->base, 0) != ovbuff->base )
+        syslog(L_ERROR, "%s: ovflushhead: cant flush on %s: %m",
+                     LocalLogName, ovbuff->path);
+    ovbuff->dirty = 0;
     return;
-  memset(&rpx, 0, sizeof(OVBUFFHEAD));
-  ovbuff->updated = time(NULL);
-  strncpy(rpx.magic, OVBUFF_MAGIC, strlen(OVBUFF_MAGIC));
-  strncpy(rpx.path, ovbuff->path, OVBUFFPASIZ);
-  /* Don't use sprintf() directly ... the terminating '\0' causes grief */
-  strncpy(rpx.indexa, offt2hex(ovbuff->index, TRUE), OVBUFFLASIZ);
-  strncpy(rpx.lena, offt2hex(ovbuff->len, TRUE), OVBUFFLASIZ);
-  strncpy(rpx.totala, offt2hex(ovbuff->totalblk, TRUE), OVBUFFLASIZ);
-  strncpy(rpx.useda, offt2hex(ovbuff->usedblk, TRUE), OVBUFFLASIZ);
-  strncpy(rpx.freea, offt2hex(ovbuff->freeblk, TRUE), OVBUFFLASIZ);
-  strncpy(rpx.updateda, offt2hex(ovbuff->updated, TRUE), OVBUFFLASIZ);
-  memcpy(ovbuff->bitfield, &rpx, sizeof(OVBUFFHEAD));
-  mmap_flush(ovbuff->bitfield, ovbuff->base);
-  ovbuff->needflush = FALSE;
-  return;
 }
 
 static bool ovlock(OVBUFF *ovbuff, enum inn_locktype type) {
-  return inn_lock_range(ovbuff->fd, type, true, 0, sizeof(OVBUFFHEAD));
+    int    ret;
+    smcd_t *smc = ovbuff->smc;
+ 
+    if( type == INN_LOCK_WRITE ) {
+        ret = smcGetExclusiveLock( smc );
+        smc->locktype = (int)INN_LOCK_WRITE;
+    } else if( type == INN_LOCK_READ ) {
+        ret = smcGetSharedLock( smc );
+        smc->locktype = (int)INN_LOCK_READ;
+    } else if( smc->locktype == (int)INN_LOCK_WRITE ) {
+        ret = smcReleaseExclusiveLock( smc );
+    } else {
+        ret = smcReleaseSharedLock( smc );
+    }
+    return (ret == 0);
 }
 
 static bool ovbuffinit_disks(void) {
   OVBUFF	*ovbuff = ovbufftab;
   char		buf[64];
-  OVBUFFHEAD	*rpx;
+  OVBUFFHEAD	*rpx, dpx;
   int		i, fd;
   off_t         tmpo;
+  smcd_t        *smc;
+  static bool   atexit_registered = FALSE;
+
+  /*
+   * Register the exit callback to sync the bitfield to the disk
+   */
+  if( !atexit_registered ) {
+      atexit( buffindexed_close );
+      atexit_registered = TRUE;
+  }
 
   /*
   ** Discover the state of our ovbuffs.  If any of them are in icky shape,
@@ -543,24 +633,41 @@
 	ovbuff->fd = fd;
       }
     }
-    if ((ovbuff->bitfield =
-	 mmap(NULL, ovbuff->base, ovbuffmode & OV_WRITE ? (PROT_READ | PROT_WRITE) : PROT_READ,
-	      MAP_SHARED, ovbuff->fd, (off_t) 0)) == MAP_FAILED) {
-      syslog(L_ERROR,
-	       "%s: ovinitdisks: mmap for %s offset %d len %lu failed: %m",
-	       LocalLogName, ovbuff->path, 0, ovbuff->base);
-      return FALSE;
+
+    /* get shared memory buffer */
+    smc = smcGetShmemBuffer( ovbuff->path, ovbuff->base );
+    if ( !smc ) {
+        /* No shared memory exists, create one. */
+        smc = smcCreateShmemBuffer( ovbuff->path, ovbuff->base );
+        if ( !smc ) {
+            syslog(L_ERROR,
+                "%s: ovinitdisks: cant create shmem for %s len %d: %m",
+                 LocalLogName, ovbuff->path, ovbuff->base);
+            return FALSE;
+        }
     }
+
+    ovbuff->smc = smc;
+    ovbuff->bitfield = smc->addr;
     rpx = (OVBUFFHEAD *)ovbuff->bitfield;
-    ovlock(ovbuff, INN_LOCK_WRITE);
-    if (strncmp(rpx->magic, OVBUFF_MAGIC, strlen(OVBUFF_MAGIC)) == 0) {
-	ovbuff->magicver = 1;
-	if (strncmp(rpx->path, ovbuff->path, OVBUFFPASIZ) != 0) {
-	  syslog(L_ERROR, "%s: Path mismatch: read %s for buffindexed %s",
-		   LocalLogName, rpx->path, ovbuff->path);
-	  ovbuff->needflush = TRUE;
-	}
-	strncpy(buf, rpx->indexa, OVBUFFLASIZ);
+
+    /* lock the buffer */
+    ovlock(ovbuff, ovbuffmode & OV_WRITE ? INN_LOCK_WRITE : INN_LOCK_READ);
+
+    if (pread(ovbuff->fd, &dpx, sizeof(OVBUFFHEAD), 0) < 0) {
+        syslog(L_ERROR, "%s: cant read from %s, %m",
+            LocalLogName, ovbuff->path);
+        ovlock(ovbuff, INN_LOCK_UNLOCK);
+        return FALSE;
+    }
+
+    /*
+     * check validity of the disk data
+     */
+    if (strncmp(dpx.magic, OVBUFF_MAGIC, strlen(OVBUFF_MAGIC)) == 0 &&
+	strncmp(dpx.path, ovbuff->path, OVBUFFPASIZ) == 0 )
+    {
+	strncpy(buf, dpx.indexa, OVBUFFLASIZ);
 	buf[OVBUFFLASIZ] = '\0';
 	i = hex2offt(buf);
 	if (i != ovbuff->index) {
@@ -569,7 +676,7 @@
 	    ovlock(ovbuff, INN_LOCK_UNLOCK);
 	    return FALSE;
 	}
-	strncpy(buf, rpx->lena, OVBUFFLASIZ);
+	strncpy(buf, dpx.lena, OVBUFFLASIZ);
 	buf[OVBUFFLASIZ] = '\0';
 	tmpo = hex2offt(buf);
 	if (tmpo != ovbuff->len) {
@@ -578,18 +685,52 @@
 	    ovlock(ovbuff, INN_LOCK_UNLOCK);
 	    return FALSE;
 	}
-	strncpy(buf, rpx->totala, OVBUFFLASIZ);
+
+        /*
+         * compare shared memory with disk data.
+         */
+        if (strncmp(dpx.magic, rpx->magic, strlen(OVBUFF_MAGIC)) != 0 ||
+	    strncmp(dpx.path, rpx->path, OVBUFFPASIZ) != 0 ||
+	    strncmp(dpx.indexa, rpx->indexa, OVBUFFLASIZ) != 0 ||
+	    strncmp(dpx.lena, rpx->lena, OVBUFFLASIZ) != 0 )
+        {
+            /*
+             * Load shared memory with disk data.
+             */
+            if (pread(ovbuff->fd, rpx, ovbuff->base, 0) < 0) {
+                syslog(L_ERROR, "%s: cant read from %s, %m",
+                    LocalLogName, ovbuff->path);
+                ovlock(ovbuff, INN_LOCK_UNLOCK);
+                return FALSE;
+            }
+        }
+	strncpy(buf, dpx.totala, OVBUFFLASIZ);
 	buf[OVBUFFLASIZ] = '\0';
 	ovbuff->totalblk = hex2offt(buf);
-	strncpy(buf, rpx->useda, OVBUFFLASIZ);
-	buf[OVBUFFLASIZ] = '\0';
-	ovbuff->usedblk = hex2offt(buf);
-	strncpy(buf, rpx->freea, OVBUFFLASIZ);
-	buf[OVBUFFLASIZ] = '\0';
-	ovbuff->freeblk = hex2offt(buf);
-	ovflushhead(ovbuff);
+
+        if ( rpx->version == 0 ) {
+            /* no binary data available. use character data */
+	    strncpy(buf, rpx->useda, OVBUFFLASIZ);
+	    buf[OVBUFFLASIZ] = '\0';
+    	    ovbuff->usedblk = hex2offt(buf);
+	    strncpy(buf, rpx->freea, OVBUFFLASIZ);
+	    buf[OVBUFFLASIZ] = '\0';
+	    ovbuff->freeblk = hex2offt(buf);
+        } else {
+            /* use binary data. The first reason is the speed.
+               and the second reason is the other partner is not
+               synced.
+            */
+            ovbuff->usedblk = rpx->usedblk;
+            ovbuff->freeblk = rpx->freeblk;
+        }
 	Needunlink = FALSE;
     } else {
+        /*
+         * Initialize the contents of the shared memory
+         */
+	memset( rpx, 0, ovbuff->base );
+
 	ovbuff->totalblk = (ovbuff->len - ovbuff->base)/OV_BLOCKSIZE;
 	if (ovbuff->totalblk < 1) {
 	  syslog(L_ERROR, "%s: too small length '%lu' for buffindexed %s",
@@ -597,11 +738,10 @@
 	  ovlock(ovbuff, INN_LOCK_UNLOCK);
 	  return FALSE;
 	}
-	ovbuff->magicver = 1;
 	ovbuff->usedblk = 0;
 	ovbuff->freeblk = 0;
 	ovbuff->updated = 0;
-	ovbuff->needflush = TRUE;
+	ovbuff->dirty   = OVBUFF_SYNC_COUNT + 1;
 	syslog(L_NOTICE,
 		"%s: No magic cookie found for buffindexed %d, initializing",
 		LocalLogName, ovbuff->index);
@@ -712,6 +852,7 @@
   static OVBUFF	*ovbuffnext = NULL;
   OVBUFF	*ovbuff;
   OV		ov;
+  bool          done = FALSE;
 #ifdef OV_DEBUG
   int		recno;
   struct ov_trace_array *trace;
@@ -719,6 +860,13 @@
 
   if (ovbuffnext == NULL)
     ovbuffnext = ovbufftab;
+
+  /*
+   * We will try to recover broken overview possibly due to unsync.
+   * The recovering is inactive for OV_DEBUG mode.
+   */
+
+retry:
   for (ovbuff = ovbuffnext ; ovbuff != (OVBUFF *)NULL ; ovbuff = ovbuff->next) {
     ovlock(ovbuff, INN_LOCK_WRITE);
     ovreadhead(ovbuff);
@@ -771,17 +919,33 @@
   trace->ov_trace[trace->cur].gloc.recno = recno;
   trace->ov_trace[trace->cur].occupied = time(NULL);
 #endif /* OV_DEBUG */
+
   ov.index = ovbuff->index;
   ov.blocknum = ovbuff->freeblk;
+ 
+#ifndef OV_DEBUG
+  if (ovusedblock(ovbuff, ovbuff->freeblk, FALSE, TRUE)) {
+      syslog(L_NOTICE, "%s: fixing invalid free block(%d, %d).",
+             LocalLogName, ovbuff->index, ovbuff->freeblk);
+  } else
+      done = TRUE;
+#endif /* OV_DEBUG */
+
+  /* mark it as allocated */
   ovusedblock(ovbuff, ov.blocknum, TRUE, TRUE);
+
   ovnextblock(ovbuff);
   ovbuff->usedblk++;
-  ovbuff->needflush = TRUE;
+  ovbuff->dirty++;
   ovflushhead(ovbuff);
   ovlock(ovbuff, INN_LOCK_UNLOCK);
   ovbuffnext = ovbuff->next;
   if (ovbuffnext == NULL)
     ovbuffnext = ovbufftab;
+
+  if( !done )
+    goto retry;
+
   return ov;
 }
 
@@ -825,12 +989,20 @@
   trace->ov_trace[trace->cur].gloc.recno = recno;
   trace->cur++;
 #endif /* OV_DEBUG */
+
+#ifndef OV_DEBUG
+  if (!ovusedblock(ovbuff, ov.blocknum, FALSE, FALSE)) {
+    syslog(L_NOTICE, "%s: trying to free block(%d, %d), but already freed.",
+           LocalLogName, ov.index, ov.blocknum);
+  }
+#endif
+
   ovusedblock(ovbuff, ov.blocknum, TRUE, FALSE);
   ovreadhead(ovbuff);
   if (ovbuff->freeblk == ovbuff->totalblk)
     ovbuff->freeblk = ov.blocknum;
   ovbuff->usedblk--;
-  ovbuff->needflush = TRUE;
+  ovbuff->dirty++;
   ovflushhead(ovbuff);
   ovlock(ovbuff, INN_LOCK_UNLOCK);
   return;
@@ -1188,11 +1360,7 @@
   ovindexhead.next = ovnull;
   ovindexhead.low = 0;
   ovindexhead.high = 0;
-#ifdef MMAP_MISSES_WRITES
-  if (mmapwrite(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ov.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
-#else
-  if (pwrite(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ov.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
-#endif /* MMAP_MISSES_WRITES */
+  if (PWRITE(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ov.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
     syslog(L_ERROR, "%s: could not write index record index '%d', blocknum '%d': %m", LocalLogName, ge->curindex.index, ge->curindex.blocknum);
     return TRUE;
   }
@@ -1201,20 +1369,19 @@
   } else {
     if ((ovbuff = getovbuff(ge->curindex)) == NULL)
       return FALSE;
-#ifdef OV_DEBUG
     if (!ovusedblock(ovbuff, ge->curindex.blocknum, FALSE, FALSE)) {
-      syslog(L_FATAL, "%s: block(%d, %d) not occupied (index)", LocalLogName, ovbuff->index, ge->curindex.blocknum);
+      syslog(L_ERROR, "%s: block(%d, %d) not occupied (index)", LocalLogName, ovbuff->index, ge->curindex.blocknum);
+#ifdef OV_DEBUG
       abort();
-    }
+#else	/* OV_DEBUG */
+      /* fix it */
+      ovusedblock(ovbuff, ge->curindex.blocknum, TRUE, TRUE);
 #endif /* OV_DEBUG */
+    }
     ovindexhead.next = ov;
     ovindexhead.low = ge->curlow;
     ovindexhead.high = ge->curhigh;
-#ifdef MMAP_MISSES_WRITES
-    if (mmapwrite(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
-#else
-    if (pwrite(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
-#endif /* MMAP_MISSES_WRITES */
+    if (PWRITE(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
       syslog(L_ERROR, "%s: could not write index record index '%d', blocknum '%d': %m", LocalLogName, ge->curindex.index, ge->curindex.blocknum);
       return FALSE;
     }
@@ -1282,18 +1449,18 @@
     ge->curdata = ov;
     ge->curoffset = 0;
   }
-#ifdef OV_DEBUG
   if (!ovusedblock(ovbuff, ge->curdata.blocknum, FALSE, FALSE)) {
-    syslog(L_FATAL, "%s: block(%d, %d) not occupied", LocalLogName, ovbuff->index, ge->curdata.blocknum);
+    syslog(L_ERROR, "%s: block(%d, %d) not occupied", LocalLogName, ovbuff->index, ge->curdata.blocknum);
+#ifdef OV_DEBUG
     buffindexed_close();
     abort();
-  }
+#else  /* OV_DEBUG */
+    /* fix it */
+    ovusedblock(ovbuff, ge->curdata.blocknum, TRUE, TRUE);
 #endif /* OV_DEBUG */
-#ifdef MMAP_MISSES_WRITES
-  if (mmapwrite(ovbuff->fd, data, len, ovbuff->base + ge->curdata.blocknum * OV_BLOCKSIZE + ge->curoffset) != len) {
-#else
-  if (pwrite(ovbuff->fd, data, len, ovbuff->base + ge->curdata.blocknum * OV_BLOCKSIZE + ge->curoffset) != len) {
-#endif /* MMAP_MISSES_WRITES */
+  }
+
+  if (PWRITE(ovbuff->fd, data, len, ovbuff->base + ge->curdata.blocknum * OV_BLOCKSIZE + ge->curoffset) != len) {
     syslog(L_ERROR, "%s: could not append overview record index '%d', blocknum '%d': %m", LocalLogName, ge->curdata.index, ge->curdata.blocknum);
     return FALSE;
   }
@@ -1319,18 +1486,17 @@
   }
   if ((ovbuff = getovbuff(ge->curindex)) == NULL)
     return FALSE;
-#ifdef OV_DEBUG
   if (!ovusedblock(ovbuff, ge->curindex.blocknum, FALSE, FALSE)) {
-    syslog(L_FATAL, "%s: block(%d, %d) not occupied (index)", LocalLogName, ovbuff->index, ge->curindex.blocknum);
+    syslog(L_ERROR, "%s: block(%d, %d) not occupied (index)", LocalLogName, ovbuff->index, ge->curindex.blocknum);
+#ifdef OV_DEBUG
     buffindexed_close();
     abort();
-  }
+#else  /* OV_DEBUG */
+    /* fix this */
+    ovusedblock(ovbuff, ge->curindex.blocknum, TRUE, TRUE);
 #endif /* OV_DEBUG */
-#ifdef MMAP_MISSES_WRITES
-  if (mmapwrite(ovbuff->fd, &ie, sizeof(ie), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE + sizeof(OVINDEXHEAD) + sizeof(ie) * ge->curindexoffset) != sizeof(ie)) {
-#else
-  if (pwrite(ovbuff->fd, &ie, sizeof(ie), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE + sizeof(OVINDEXHEAD) + sizeof(ie) * ge->curindexoffset) != sizeof(ie)) {
-#endif /* MMAP_MISSES_WRITES */
+  }
+  if (PWRITE(ovbuff->fd, &ie, sizeof(ie), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE + sizeof(OVINDEXHEAD) + sizeof(ie) * ge->curindexoffset) != sizeof(ie)) {
     syslog(L_ERROR, "%s: could not write index record index '%d', blocknum '%d': %m", LocalLogName, ge->curindex.index, ge->curindex.blocknum);
     return TRUE;
   }
@@ -1346,11 +1512,7 @@
     ovindexhead.next = ovnull;
     ovindexhead.low = ge->curlow;
     ovindexhead.high = ge->curhigh;
-#ifdef MMAP_MISSES_WRITES
-    if (mmapwrite(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
-#else
-    if (pwrite(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
-#endif /* MMAP_MISSES_WRITES */
+    if (PWRITE(ovbuff->fd, &ovindexhead, sizeof(OVINDEXHEAD), ovbuff->base + ge->curindex.blocknum * OV_BLOCKSIZE) != sizeof(OVINDEXHEAD)) {
       syslog(L_ERROR, "%s: could not write index record index '%d', blocknum '%d': %m", LocalLogName, ge->curindex.index, ge->curindex.blocknum);
       return TRUE;
     }
@@ -1945,8 +2107,10 @@
   GROUPlock(gloc, INN_LOCK_WRITE);
   ge = &GROUPentries[gloc.recno];
   if (ge->count == 0) {
+    if (ge->low < ge->high)
+      ge->low = ge->high;
     if (lo)
-      *lo = ge->low;
+      *lo = ge->low + 1;
     ge->expired = time(NULL);
     GROUPlock(gloc, INN_LOCK_UNLOCK);
     return TRUE;
@@ -1996,10 +2160,12 @@
     newge.low = newge.high;
   *ge = newge;
   if (lo) {
-    if (ge->count == 0)
+    if (ge->count == 0) {
+      if (ge->low < ge->high)
+          ge->low = ge->high;
       /* lomark should be himark + 1, if no article for the group */
       *lo = ge->low + 1;
-    else
+    } else
       *lo = ge->low;
   }
   ovclosesearch(handle, TRUE);
@@ -2148,7 +2314,14 @@
       return;
     }
   }
+
+  /* sync the bit field */
+  ovbuff = ovbufftab;
   for (; ovbuff != (OVBUFF *)NULL; ovbuff = ovbuffnext) {
+    if (ovbuff->dirty) {
+      ovbuff->dirty = OVBUFF_SYNC_COUNT + 1;
+      ovflushhead( ovbuff );
+    }
     ovbuffnext = ovbuff->next;
     DISPOSE(ovbuff);
   }
diff -uNr inn-CURRENT-20030107/storage/buffindexed/shmem.c buffindexed/shmem.c
--- inn-CURRENT-20030107/storage/buffindexed/shmem.c	Thu Jan  1 09:00:00 1970
+++ buffindexed/shmem.c	Thu Jan  9 11:02:38 2003
@@ -0,0 +1,377 @@
+/*
+** $Id$
+*/
+
+/* shared memory control utility */
+
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/ipc.h>
+#include <sys/sem.h>
+#include <sys/shm.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <assert.h>
+#include <signal.h>
+#include <syslog.h>
+#include <errno.h>
+
+#include "shmem.h"
+  
+#ifndef MAP_FAILED
+  #define MAP_FAILED ((caddr_t)-1)
+#endif
+
+#ifndef	L_NOTICE
+#define L_NOTICE	LOG_NOTICE
+#endif
+
+#ifndef	L_ERROR
+#define L_ERROR		LOG_ERR
+#endif
+
+#ifndef	L_FATAL
+#define L_FATAL		LOG_CRIT
+#endif
+
+static int smcGetSemaphore(const char *name)
+{
+    key_t kt = ftok( (char *)name, 0 );
+    int   id = semget(kt, 0, S_IRWXU|S_IRWXG|S_IRWXO);
+
+    if (id < 0) {
+        syslog( L_ERROR, "semget failed to get semaphore for %s: %m", name );
+    }
+    return id;
+}
+
+static int smcCreateSemaphore(const char *name)
+{
+    key_t kt = ftok( (char *)name, 0 );
+    int   id = semget(kt, 2, IPC_CREAT|S_IRWXU|S_IRWXG|S_IRWXO);
+
+    if (id < 0) {
+        if (errno == EACCES || errno == EINVAL) {
+            /* looks like a wrong semaphore exists. remove it. */
+            id = semget(kt, 0, S_IRWXU|S_IRWXG|S_IRWXO);
+            if (id < 0) {
+                /* couldn't even retrieve it. */
+                syslog( L_ERROR, "cant get semaphore using %s", name );
+                return id;
+            }
+            /* try to remove it */
+#ifdef SEMCTL_NEEDS_UNION
+            {
+                union semun semArg;
+                semArg.val = 1;
+                if (semctl(id, 0, IPC_RMID, semArg) < 0) {
+                    syslog( L_FATAL, "cant remove semaphore %s", name );
+                    exit(1);
+                }
+            }
+#else
+            if (semctl(id, 0, IPC_RMID, NULL) < 0) {
+                syslog( L_FATAL, "cant remove semaphore %s", name );
+                exit(1);
+            }
+#endif
+            /* and retry creating it */
+            id = semget(kt, 2, IPC_CREAT|S_IRWXU|S_IRWXG|S_IRWXO);
+        }
+    }
+    if (id < 0) {
+        syslog( L_ERROR, "cant create semaphore using %s", name );
+    }
+    return id;
+}
+
+int smcGetExclusiveLock(smcd_t *this)
+{
+    struct sembuf sops[3] = {
+        {0, 0, SEM_UNDO},    /* wait for exclusive lock. */
+        {0, 1, SEM_UNDO},    /* lock */
+        {1, 0, SEM_UNDO}     /* wait for shared lock */
+    };
+
+    /* Get a lock for the buffer. Try again if it fails because our
+       SIGHUP may interrupt this semop() call */
+    if (semop(this->semap, sops, 3) < 0 &&
+        semop(this->semap, sops, 3) < 0)
+    {
+        syslog( L_ERROR, "semop failed to getExclusiveLock: %m" );
+        return(-1);
+    }
+    return(0);
+}
+
+int smcGetSharedLock(smcd_t *this)
+{
+    struct sembuf sops[2] = {
+        {0, 0, SEM_UNDO},    /* wait for exclusive lock. */
+        {1, 1, SEM_UNDO}     /* increase access count */
+    };
+
+    /* Get a lock for the buffer. Try again if it fails because our
+       SIGHUP may interrupt this semop() call */
+    if (semop(this->semap, sops, 2) < 0 &&
+        semop(this->semap, sops, 2) < 0)
+    {
+        syslog( L_ERROR, "semop failed to getSharedLock: %m" );
+        return(-1);
+    }
+    return(0);
+}
+
+int smcReleaseSharedLock(smcd_t *this)
+{
+    struct sembuf sops = { 1, -1, SEM_UNDO|IPC_NOWAIT };
+
+    /* Release the lock */
+    if (semop(this->semap, &sops, 1) < 0) {
+        syslog( L_ERROR, "semop failed to release shared lock: %m" );
+        return(-1);
+    }
+    return(0);
+}
+
+int smcReleaseExclusiveLock(smcd_t *this)
+{
+    struct sembuf sops = { 0, -1, SEM_UNDO|IPC_NOWAIT };
+
+    /* Release the lock */
+    if (semop(this->semap, &sops, 1) < 0) {
+        syslog( L_ERROR, "semop failed to release exclusive lock: %m" );
+        return(-1);
+    }
+    return(0);
+}
+
+/*
+** Get an existing shared memory buffer
+*/
+smcd_t* smcGetShmemBuffer(const char *name, int size)
+{
+    int     shmid, semap;
+    smcd_t  *this;
+    caddr_t addr;
+    key_t   fk = ftok( (char *)name, 0 );
+
+    /* create shared memory buffer */
+    shmid = shmget(fk, size, S_IRWXU|S_IRGRP|S_IROTH);
+    if (shmid < 0) {
+        /* this is normal */
+        return NULL;
+    }
+
+    /* attach to shared memory buffer */
+    if ((addr = (caddr_t)shmat(shmid,0,0)) == MAP_FAILED) {
+        syslog( L_ERROR, "cant attach shared memory" );
+        if (shmctl(shmid, IPC_RMID, 0) < 0) {
+            syslog( L_ERROR, "cant remove shared memory" );
+        }
+        return NULL;
+    }
+
+    /* Get control semaphore */
+    if ((semap = smcGetSemaphore(name)) < 0) {
+        syslog( L_ERROR, "failed to get semaphore for key %s", name );
+        if (shmdt(addr) < 0)
+            syslog( L_ERROR, "cant detatch shared memory" );
+        if (shmctl(shmid, IPC_RMID, 0) < 0)
+            syslog( L_ERROR, "cant remove shared memory" );
+        return NULL;
+    }
+
+    this = malloc( sizeof(smcd_t) );
+    this->addr = addr;;
+    this->size = size;
+    this->shmid = shmid;
+    this->semap = semap;
+
+#if 0
+    /* This makes news log file huge */
+    syslog( L_NOTICE, "got shmid %d semap %d addr %8.8x size %lu",
+        shmid, semap, addr, size );
+#endif
+    return this;
+}
+
+/*
+** Create a shared memory buffer
+*/
+smcd_t* smcCreateShmemBuffer(const char *name, int size)
+{
+    int     shmid, semap;
+    smcd_t  *this;
+    caddr_t addr;
+    key_t   fk = ftok( (char *)name, 0 );
+
+    /* create shared memory buffer */
+    shmid = shmget(fk, size, IPC_CREAT|S_IRWXU|S_IRGRP|S_IROTH);
+    if (shmid < 0) {
+        /* try to get existing segment */
+        shmid = shmget(fk, 4, S_IRWXU|S_IRGRP|S_IROTH);
+        if (shmid >= 0) {
+            syslog( L_ERROR, "shmem segment already exists name %s", name );
+            /* try to delete old segment */
+            if (shmctl(shmid, IPC_RMID, 0) < 0) {
+                syslog( L_ERROR, "cant delete old memory segment" );
+                return NULL;
+            }
+            syslog( L_NOTICE, "recreating another shmem segment." );
+            shmid = shmget(fk, size, IPC_CREAT|S_IRWXU|S_IRGRP|S_IROTH);
+        }
+    }
+    if (shmid < 0) {
+        syslog( L_ERROR, "cant create shared memory segment" );
+        return NULL;
+    }
+
+    /* attach to shared memory buffer */
+    if ((addr = (caddr_t)shmat(shmid,0,0)) == MAP_FAILED) {
+        syslog( L_ERROR, "cant attach shared memory" );
+        if (shmctl(shmid, IPC_RMID, 0) < 0) {
+            syslog( L_ERROR, "cant remove shared memory" );
+        }
+        return NULL;
+    }
+    /* clear the data */
+    memset( addr, 0, size );
+
+    /* Create control semaphore */
+    if ((semap = smcCreateSemaphore(name)) < 0) {
+        syslog( L_ERROR, "failed to create semaphore for %s", name );
+        if (shmdt(addr) < 0)
+            syslog( L_ERROR, "cant detatch shared memory" );
+        if (shmctl(shmid, IPC_RMID, 0) < 0)
+            syslog( L_ERROR, "cant remove shared memory" );
+        return NULL;
+    }
+
+    this = malloc( sizeof(smcd_t) );
+    this->addr = addr;
+    this->size = size;
+    this->shmid = shmid;
+    this->semap = semap;
+
+    syslog( L_NOTICE, "created shmid %d semap %d addr %8.8x size %lu",
+        shmid, semap, addr, size );
+
+    return this;
+}
+
+void smcClose( smcd_t *this )
+{
+    struct shmid_ds buf;
+
+    if (this->addr != MAP_FAILED) {
+        /* detach shared memory segment */
+        if (shmdt(this->addr) < 0) {
+            syslog( L_ERROR, "cant detach shared memory segment: %m" );
+        }
+        this->addr = MAP_FAILED;
+    }
+
+    /* delete shm if no one has attached it */
+    if ( shmctl(this->shmid, IPC_STAT, &buf) < 0) {
+        syslog( L_ERROR, "cant stat shmid %s", this->shmid );
+    } else if ( buf.shm_nattch == 0 ) {
+        if (shmctl(this->shmid, IPC_RMID, 0) < 0) {
+            syslog( L_ERROR, "cant delete shmid %d", this->shmid );
+        } else {
+            syslog( L_NOTICE, "shmid %d deleted", this->shmid );
+        }
+    }
+    free( this );
+}
+
+#ifdef	_TEST_
+
+/* Check if the testfile exists.
+   If the file is absent
+       create one with size 1M, and fill the contents with all zero.
+   for (i=0; i<100; i++)
+       add 1 to the content;
+*/
+static const char* testfile = "testfile";
+#define TESTSIZE	( 1024 * 1024 )
+#define MAXCOUNT	100
+
+static smcd_t *this;
+static void myexit( void )
+{
+    if( this ) {
+        smcClose( this );
+    }
+}
+
+int main( int argc, char** argv )
+{
+    struct stat st;
+    int fd, i, k;
+    int *x;
+    int len, xmin, xmax;
+    struct flock fl;
+
+    atexit( myexit );
+    openlog( "shmemtest", LOG_PID, LOG_DAEMON );
+
+    /* open the testfile */
+    fd = creat(testfile, 0660);
+    if( fd < 0 ) {
+        printf( "cant open %s", testfile );
+        exit(1);
+    }
+
+    /* lock the file */
+    if( flock( fd, LOCK_EX ) < 0 ) {
+        printf( "cant get flock" );
+        exit(1);
+    }
+
+    /* try to get shared memory buffer */
+    this = smcGetShmemBuffer(testfile, TESTSIZE);
+    if( !this ) {
+        /* because there's no shared memory, create one. */
+        this = smcCreateShmemBuffer(testfile, TESTSIZE);
+        if( !this ) {
+            printf( "cant create shmem buffer" );
+            exit(1);
+        }
+    }
+
+    /* unlock the file */
+    if( flock( fd, LOCK_UN ) < 0 ) {
+        printf( "cant unflock %s", testfile );
+        exit(1);
+    }
+
+    x = (int *)this->addr;
+    len = this->size / sizeof(int);
+    for( k=0; k<MAXCOUNT; k++ ) {
+        if( smcGetExclusiveLock(this) < 0 ) {
+            printf( "cant get exclusive lock" );
+            exit(1);
+        }
+        for( i=0; i<len; i++)
+            x[i] += 1;
+        if( write(fd, this->addr, this->size) != this->size ) {
+            printf( "cant write" );
+            exit(1);
+        }
+        if( smcReleaseExclusiveLock( this ) ) {
+            printf( "cant release exclusive lock" );
+            exit(1);
+        }
+    }
+    /* write the minimum and maximum */
+    xmin = xmax = x[0];
+    for( i=1; i<len; i++ ) {
+        if( x[i] < xmin ) xmin = x[i];       
+        if( x[i] > xmax ) xmax = x[i];       
+    }
+    printf( "min %d max %d\n", xmin, xmax );
+
+    return(0);
+}
+#endif	/* _TEST_ */
diff -uNr inn-CURRENT-20030107/storage/buffindexed/shmem.h buffindexed/shmem.h
--- inn-CURRENT-20030107/storage/buffindexed/shmem.h	Thu Jan  1 09:00:00 1970
+++ buffindexed/shmem.h	Thu Jan  9 11:02:38 2003
@@ -0,0 +1,26 @@
+/*
+** shared memory control utility
+*/
+
+#ifndef SHMEM_H
+#define SHMEM_H
+
+#include <sys/types.h>
+
+typedef struct {
+    caddr_t addr;	/* attached shared memory address */
+    size_t  size;	/* size of the shared memory */
+    int     shmid;	/* shared memory segment id */
+    int     semap;	/* semaphore id */
+    int     locktype;	/* current lock type */
+} smcd_t;
+
+int smcGetExclusiveLock(smcd_t *this);
+int smcGetSharedLock(smcd_t *this);
+int smcReleaseSharedLock(smcd_t *this);
+int smcReleaseExclusiveLock(smcd_t *this);
+smcd_t* smcGetShmemBuffer(const char *name, int mapSize);
+smcd_t* smcCreateShmemBuffer(const char *name, int mapSize);
+void smcClose( smcd_t *this );
+
+#endif	/* SHMEM_H */


More information about the inn-patches mailing list