Commit dd7a05ee authored by Mike Hibler's avatar Mike Hibler
Browse files

Commit of USENIX driven improvements:

1. The big one is from Leigh: multithreading the unzipper so that
   decompression overlaps with disk IO.  Also added the '-n' "no threads"
   option for comparison purposes.

2. Eh...guess that's it for imagezip/unzip.  Oh wait, mike fixed a comment!
parent e0127c8c
......@@ -13,8 +13,14 @@ WITH_NTFS = @WINSUPPORT@
include $(OBJDIR)/Makeconf
PTHREADCFLAGS = -D_THREAD_SAFE \
-I/usr/local/include/pthread/linuxthreads
PTHREADLIBS = -L/usr/local/lib -llthread -llgcc_r
CFLAGS = -O2 -g -static
LIBS = -lz
LIBS = -lz $(PTHREADLIBS)
UNZIPCFLAGS = $(CFLAGS) $(PTHREADCFLAGS) -Wall
UNZIPLIBS = $(LIBS) $(PTHREADLIBS)
# with NTFS
ifeq ($(WITH_NTFS),1)
......@@ -35,10 +41,13 @@ imagezip: $(NTFSDIR) imagezip.o version.o
$(CC) $(CFLAGS) imagezip.o version.o $(LIBS) -o imagezip
imageunzip: imageunzip.o version.o
$(CC) $(CFLAGS) imageunzip.o version.o $(LIBS) -o imageunzip
$(CC) $(CFLAGS) imageunzip.o version.o $(UNZIPLIBS) -o imageunzip
imageunzip.o: imageunzip.c
$(CC) -c $(UNZIPCFLAGS) -o imageunzip.o $<
frisbee.o: imageunzip.c
$(CC) -c $(CFLAGS) -DFRISBEE -o frisbee.o $<
$(CC) -c $(UNZIPCFLAGS) -DFRISBEE -o frisbee.o $<
ntfs:
@$(MAKE) -C ntfs all
......@@ -49,8 +58,9 @@ version.c: imagezip.c imageunzip.c
install: $(INSTALL_BINDIR)/imagezip $(INSTALL_BINDIR)/imageunzip
client-install:
$(INSTALL_PROGRAM) imagezip /usr/local/bin/imagezip
$(INSTALL_PROGRAM) imageunzip /usr/local/bin/imageunzip
$(INSTALL) -m 755 -o root -g wheel -d $(DESTDIR)/usr/local/bin
$(INSTALL_PROGRAM) imagezip $(DESTDIR)/usr/local/bin/imagezip
$(INSTALL_PROGRAM) imageunzip $(DESTDIR)/usr/local/bin/imageunzip
clean:
@if [ -d ntfs ]; then \
......
......@@ -23,7 +23,12 @@
#include <sys/types.h>
#include <sys/time.h>
#include <sys/disklabel.h>
#include <pthread.h>
#include "imagehdr.h"
#include "queue.h"
long long totaledata = 0;
long long totalrdata = 0;
/*
* In slice mode, we read the DOS MBR to find out where the slice is on
......@@ -48,41 +53,120 @@ static long long outputmaxsize = 0; /* Sanity check */
#define SECSIZE 512
#define BSIZE (32 * 1024)
#define OUTSIZE (3 * BSIZE)
char inbuf[BSIZE], outbuf[OUTSIZE + SECSIZE], zeros[BSIZE];
#define OUTSIZE (8 * BSIZE)
char outbuf[OUTSIZE + SECSIZE], zeros[BSIZE];
static int infd, outfd;
static int debug = 0;
static int outfd;
static int doseek = 0;
static int dofill = 0;
static unsigned fillpat= 0;
static int debug = 0;
static int nothreads = 0;
static pthread_t child_pid;
static int rdycount;
#ifndef FRISBEE
static int infd;
static int version= 0;
static unsigned fillpat= 0;
static int dots = 1;
static int dotcol;
static long long total = 0;
static char chunkbuf[SUBBLOCKSIZE];
static struct timeval stamp;
#endif
static void threadinit(void);
static void threadwait(void);
static void threadquit(void);
int readmbr(int slice);
int inflate_subblock(char *);
void writezeros(off_t offset, size_t zcount);
void *DiskWriter(void *arg);
int inflate_subblock(char *);
void writezeros(off_t zcount);
static int writeinprogress; /* XXX */
#ifdef linux
#define devlseek lseek
#define devwrite write
#else
static inline off_t devlseek(int fd, off_t off, int whence)
{
off_t noff;
assert((off & (SECSIZE-1)) == 0);
noff = lseek(fd, off, whence);
assert(noff == (off_t)-1 || (noff & (SECSIZE-1)) == 0);
return noff;
}
/*
* Some stats
*/
unsigned long decompidles;
unsigned long writeridles;
/*
* A queue of ready to write data blocks.
*/
typedef struct {
queue_chain_t chain;
off_t offset;
size_t size;
int zero;
} readyhdr_t;
typedef struct {
readyhdr_t header;
unsigned char buf[OUTSIZE + SECSIZE - sizeof(readyhdr_t)];
} readyblock_t;
static queue_head_t readyqueue;
static readyblock_t *freelist;
#define READYQSIZE 256
static pthread_mutex_t freelist_mutex, readyqueue_mutex;
static pthread_cond_t freelist_condvar, readyqueue_condvar;
static inline int devwrite(int fd, const void *buf, size_t size)
static void
dowrite_request(off_t offset, int cc, void *buf)
{
assert((size & (SECSIZE-1)) == 0);
assert(outputmaxsize == 0 || (total + size) <= outputmaxsize);
return write(fd, buf, size);
readyhdr_t *hdr;
if (!buf) {
/*
* Null buf means its a request to zero.
*/
if (nothreads) {
writezeros(offset, cc);
return;
}
if ((hdr = (readyhdr_t *) malloc(sizeof(readyhdr_t))) == NULL){
fprintf(stderr, "Out of memory\n");
exit(1);
}
hdr->zero = 1;
hdr->offset = offset;
hdr->size = cc;
}
else {
readyblock_t *rdyblk;
if (nothreads) {
int count = pwrite(outfd, buf, cc, offset);
if (count != cc) {
printf("Short write!\n");
exit(1);
}
return;
}
/*
* Try to allocate a block. Wait if none available.
*/
pthread_mutex_lock(&freelist_mutex);
if (! freelist) {
decompidles++;
do {
pthread_cond_wait(&freelist_condvar,
&freelist_mutex);
} while (! freelist);
}
rdyblk = freelist;
freelist = (void *) rdyblk->header.chain.next;
pthread_mutex_unlock(&freelist_mutex);
rdyblk->header.offset = offset;
rdyblk->header.size = cc;
memcpy(rdyblk->buf, buf, cc);
hdr = (readyhdr_t *) rdyblk;
}
pthread_mutex_lock(&readyqueue_mutex);
queue_enter(&readyqueue, hdr, readyhdr_t *, chain);
pthread_mutex_unlock(&readyqueue_mutex);
pthread_cond_signal(&readyqueue_condvar);
}
static inline int devread(int fd, void *buf, size_t size)
......@@ -90,8 +174,8 @@ static inline int devread(int fd, void *buf, size_t size)
assert((size & (SECSIZE-1)) == 0);
return read(fd, buf, size);
}
#endif
#ifndef FRISBEE
static void
usage(void)
{
......@@ -104,24 +188,28 @@ usage(void)
" -p pattern Write 32 bit pattern to free blocks.\n"
" NOTE: Use -z/-p to avoid seeking.\n"
" -o Output 'dots' indicating progress\n"
" -n Single threaded (slow) mode\n"
" -d Turn on progressive levels of debugging\n");
exit(1);
}
#ifndef FRISBEE
int
main(int argc, char **argv)
{
int i, ch, slice = 0;
struct timeval stamp, estamp;
extern char build_info[];
struct timeval estamp;
while ((ch = getopt(argc, argv, "vdhs:zp:o")) != -1)
while ((ch = getopt(argc, argv, "vdhs:zp:on")) != -1)
switch(ch) {
case 'd':
debug++;
break;
case 'n':
nothreads++;
break;
case 'v':
version++;
break;
......@@ -202,7 +290,8 @@ main(int argc, char **argv)
exit(1);
}
}
threadinit();
gettimeofday(&stamp, 0);
while (1) {
......@@ -211,7 +300,7 @@ main(int argc, char **argv)
/*
* Decompress one subblock at a time. We read the entire
* chunk and had it off. Since we might be reading from
* chunk and hand it off. Since we might be reading from
* stdin, we have to make sure we get the entire amount.
*/
while (count) {
......@@ -231,31 +320,44 @@ main(int argc, char **argv)
}
done:
close(infd);
/* This causes the output queue to drain */
threadquit();
gettimeofday(&estamp, 0);
estamp.tv_sec -= stamp.tv_sec;
if (dots) {
while (dotcol++ <= 64)
while (dotcol++ <= 60)
printf(" ");
printf("%14qd\n", total);
printf("%4ld %13qd\n", estamp.tv_sec, totaledata);
}
gettimeofday(&estamp, 0);
estamp.tv_sec -= stamp.tv_sec;
printf("Done in %ld seconds!\n", estamp.tv_sec);
else {
printf("Done in %ld seconds!\n", estamp.tv_sec);
}
fprintf(stderr, "%lu %lu %d\n", decompidles, writeridles, rdycount);
return 0;
}
#else
/*
* When compiled for frisbee, act as a library.
*/
ImageUnzipInit(char *filename, int slice, int dbg)
int
ImageUnzipInit(char *filename, int slice, int dbg, int zero, int goslow)
{
if (outfd >= 0)
close(outfd);
if ((outfd = open(filename, O_RDWR|O_CREAT|O_TRUNC, 0666)) < 0) {
perror("opening output file");
exit(1);
}
doseek = 1;
debug = dbg;
if (zero)
dofill = doseek = 1;
else
doseek = 1;
debug = dbg;
nothreads = goslow;
if (slice) {
off_t minseek;
......@@ -271,19 +373,112 @@ ImageUnzipInit(char *filename, int slice, int dbg)
exit(1);
}
}
threadinit();
return 0;
}
void
ImageUnzipFlush(void)
{
threadwait();
}
int
ImageUnzipQuit(void)
{
threadquit();
fprintf(stderr, "Wrote %qd bytes (%qd actual)\n",
totaledata, totalrdata);
fprintf(stderr, "%lu %lu %d\n", decompidles, writeridles, rdycount);
return 0;
}
#endif
static void *readyqueuemem;
static void
threadinit(void)
{
int i;
readyblock_t *ptr;
static int called;
if (nothreads)
return;
decompidles = writeridles = 0;
/*
* Allocate blocks for the ready queue.
*/
queue_init(&readyqueue);
if ((ptr = (readyblock_t *) malloc(sizeof(readyblock_t) * READYQSIZE))
== NULL) {
fprintf(stderr, "Out of memory!\n");
exit(1);
}
readyqueuemem = ptr;
for (i = 0; i < READYQSIZE; i++, ptr++) {
ptr->header.zero = 0;
ptr->header.chain.next = (void *) freelist;
freelist = ptr;
}
if (!called) {
called = 1;
pthread_mutex_init(&freelist_mutex, 0);
pthread_cond_init(&freelist_condvar, 0);
pthread_mutex_init(&readyqueue_mutex, 0);
pthread_cond_init(&readyqueue_condvar, 0);
}
if (pthread_create(&child_pid, NULL, DiskWriter, (void *)0)) {
fprintf(stderr, "Failed to create pthread!\n");
exit(1);
}
}
static void
threadwait(void)
{
int done;
if (nothreads)
return;
while (1) {
pthread_mutex_lock(&readyqueue_mutex);
done = (queue_empty(&readyqueue) && !writeinprogress);
pthread_mutex_unlock(&readyqueue_mutex);
if (done)
return;
usleep(300000);
}
}
static void
threadquit(void)
{
void *ignored;
if (nothreads)
return;
threadwait();
pthread_cancel(child_pid);
pthread_join(child_pid, &ignored);
free(readyqueuemem);
freelist = 0;
}
int
inflate_subblock(char *chunkbufp)
{
int cc, ccres, err, count, ibsize = 0, ibleft = 0;
int cc, err, count, ibsize = 0, ibleft = 0;
z_stream d_stream; /* inflation stream */
char *bp;
struct blockhdr *blockhdr;
struct region *curregion;
off_t offset, size;
char *buf = inbuf;
int chunkbytes = SUBBLOCKSIZE;
d_stream.zalloc = (alloc_func)0;
......@@ -319,24 +514,6 @@ inflate_subblock(char *chunkbufp)
curregion++;
blockhdr->regioncount--;
/*
* Set the output pointer to the beginning of the region.
*/
if (doseek) {
if (devlseek(outfd,
offset + (((off_t) outputminsec) * SECSIZE),
SEEK_SET) < 0) {
perror("Skipping to start of output region");
exit(1);
}
total += offset - total;
}
else {
assert(offset >= total);
if (offset > total)
writezeros(offset - total);
}
if (debug == 1)
fprintf(stderr, "Decompressing: %14qd --> ", offset);
......@@ -344,18 +521,13 @@ inflate_subblock(char *chunkbufp)
/*
* Read just up to the end of compressed data.
*/
if (blockhdr->size >= sizeof(inbuf))
count = sizeof(inbuf);
else
count = blockhdr->size;
memcpy(buf, chunkbufp, count);
chunkbufp += count;
chunkbytes -= count;
assert(chunkbytes >= 0);
count = blockhdr->size;
blockhdr->size -= count;
d_stream.next_in = buf;
d_stream.next_in = chunkbufp;
d_stream.avail_in = count;
chunkbufp += count;
chunkbytes -= count;
assert(chunkbytes >= 0);
inflate_again:
/*
* Must operate on multiples of the sector size!
......@@ -368,7 +540,7 @@ inflate_subblock(char *chunkbufp)
err = inflate(&d_stream, Z_SYNC_FLUSH);
if (err != Z_OK && err != Z_STREAM_END) {
fprintf(stderr, "inflate failed, err=%ld\n", err);
fprintf(stderr, "inflate failed, err=%d\n", err);
exit(1);
}
ibsize = (OUTSIZE - d_stream.avail_out) + ibleft;
......@@ -378,41 +550,36 @@ inflate_subblock(char *chunkbufp)
while (count) {
/*
* Write data only as far as the end of the current
* region.
* Move data into the output block only as far as
* the end of the current region. Since outbuf is
* same size as rdyblk->buf, its guaranteed to fit.
*/
if (count < size)
cc = count;
else
cc = size;
/*
* Put it on the output queue.
*/
dowrite_request(offset, cc, bp);
if (debug == 2) {
fprintf(stderr,
"%12qd %8d %8d %12qd %10qd %8d %5d %8d"
"\n",
offset, cc, count, total, size, ibsize,
ibleft, d_stream.avail_in);
offset, cc, count, totaledata, size,
ibsize, ibleft, d_stream.avail_in);
}
if ((ccres = devwrite(outfd, bp, cc)) != cc) {
if (ccres < 0) {
perror("Writing uncompressed data");
}
fprintf(stderr, "inflate failed\n");
exit(1);
}
cc = ccres;
count -= cc;
bp += cc;
size -= cc;
offset += cc;
total += cc;
totaledata += cc;
assert(count >= 0);
assert(size >= 0);
#ifndef FRISBEE
assert(total == offset);
#endif
/*
* Hit the end of the region. Need to figure out
* where the next one starts. We write a block of
......@@ -421,7 +588,7 @@ inflate_subblock(char *chunkbufp)
* not writing to stdout.
*/
if (! size) {
off_t newoffset;
off_t newoffset;
/*
* No more regions. Must be done.
......@@ -434,15 +601,15 @@ inflate_subblock(char *chunkbufp)
assert(size);
curregion++;
blockhdr->regioncount--;
#ifdef FRISBEE
writezeros(newoffset - offset);
offset = newoffset;
#else
if (dofill) {
assert((newoffset-offset) > 0);
dowrite_request(offset,
newoffset-offset,
NULL);
} else
totaledata += (newoffset - offset);
offset = newoffset;
assert(offset >= total);
if (offset > total)
writezeros(offset - total);
#endif
}
}
if (d_stream.avail_in)
......@@ -460,14 +627,19 @@ inflate_subblock(char *chunkbufp)
#ifndef FRISBEE
if (debug == 1) {
fprintf(stderr, "%14qd\n", total);
fprintf(stderr, "%14qd\n", offset);
}
else if (dots) {
struct timeval estamp;
gettimeofday(&estamp, 0);
estamp.tv_sec -= stamp.tv_sec;
printf(".");
fflush(stdout);
if (dotcol++ > 63) {
if (dotcol++ > 59) {
dotcol = 0;
printf("%14qd\n", total);
printf("%4ld %13qd\n", estamp.tv_sec, totaledata);
}
}
#endif
......@@ -476,37 +648,34 @@ inflate_subblock(char *chunkbufp)
}
void
writezeros(off_t zcount)
writezeros(off_t offset, size_t zcount)
{
int zcc;
off_t offset;
if (doseek) {
if ((offset = devlseek(outfd, zcount, SEEK_CUR)) < 0) {
perror("Skipping ahead");
exit(1);
}
total += zcount;
#ifndef FRISBEE
assert(offset == total + (((long long)outputminsec)*SECSIZE));
#endif
if (doseek)
return;
assert((offset & (SECSIZE-1)) == 0);
if (lseek(outfd, offset, SEEK_SET) < 0) {
perror("lseek to write zeros");
exit(1);
}
while (zcount) {
if (zcount <= BSIZE)
zcc = (int) zcount;
else
zcc = BSIZE;
if ((zcc = devwrite(outfd, zeros, zcc)) != zcc) {
if ((zcc = write(outfd, zeros, zcc)) != zcc) {
if (zcc < 0) {
perror("Writing Zeros");