chunk.c 11.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
/*
 * Copyright (c) 2014-2015 University of Utah and the Flux Group.
 * 
 * {{{EMULAB-LICENSE
 * 
 * This file is part of the Emulab network testbed software.
 * 
 * This file is free software: you can redistribute it and/or modify it
 * under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at
 * your option) any later version.
 * 
 * This file is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
 * License for more details.
 * 
 * You should have received a copy of the GNU Affero General Public License
 * along with this file.  If not, see <http://www.gnu.org/licenses/>.
 * 
 * }}}
 */

/*
 * Chunk-oriented IO routines.
 *
 * Since chunks are independently compressed, we can manipulate them
 * independently.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
 *
 * TODO:
 *  - In _read entire chunk at once, optionally return a pointer to the
 *    header struct. Alternatively, at least have a 1M buffer, read
 *    incrementally, and keep a high water mark that we have read so far.
 *
 *  - Add a _reopen call when seeking backward in the same chunk. Still
 *    have to reset the zlib state, but don't have to reread the compressed
 *    data.
 *
 *  - In _create, return a pointer to where the header should go so caller
 *    can fill it in. Option to _flush to say whether to write the header
 *    out or not.
 *
 *  - Page-align the buffer.
44 45 46 47 48 49 50 51 52 53 54 55 56 57
 */

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <zlib.h>
#include <sys/stat.h>

#include "libndz.h"

58 59 60 61 62 63 64 65 66 67 68 69 70
//#define CHUNK_DEBUG

/*
 * Currently we use the "classic" imagezip algorithm for filling chunks.
 *
 * 1. Before we even try compressing data, we make sure there is at least
 *    (request_size + 1000) bytes available, where request_size is the
 *    UNCOMPRESSED size.
 * 2. After a compression is done, there needs to be at least 8K left, or
 *    we call it a day.
 */
#define CHUNKSLOP	1000
#define CHUNKTHRESH	8192
71 72 73 74 75 76

struct ndz_chunk {
    struct ndz_file *ndz;
    ndz_chunkno_t chunkno;
    off_t foff;
    z_stream z;
77 78 79 80 81 82 83 84
    int clevel;
    /* buffer stuff */
    char *cbufbase;	/* allocated memory */
    blockhdr_t *cbufhdr;/* (aligned) chunk header location */
    char *cbufdata;	/* (aligned) chunk data location */
    int cbufsize;	/* size of data portion */
    int cbufoff;	/* high-water mark of valid data */
    int cbufleft;	/* write: space left to fill */
85 86
};

87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
int
getchunkbuffer(struct ndz_chunk *chunk)
{
    int psize = getpagesize();
    uintptr_t ptr;

    chunk->cbufbase = malloc(CHUNKSIZE + psize);
    if (chunk->cbufbase == NULL)
	return -1;

    ptr = (((uintptr_t)chunk->cbufbase + psize - 1) & ~(psize - 1));
    chunk->cbufhdr = (blockhdr_t *)ptr;
    chunk->cbufdata = (char *)(ptr + DEFAULTREGIONSIZE);
    chunk->cbufsize = CHUNKSIZE - DEFAULTREGIONSIZE;
    chunk->cbufoff = 0;
    chunk->cbufleft = chunk->cbufsize;

    return 0;
}

107 108 109 110 111 112
ndz_chunk_t
ndz_chunk_open(struct ndz_file *ndz, ndz_chunkno_t chunkno)
{
    struct ndz_chunk *chunk = malloc(sizeof *chunk);
    if (chunk == NULL)
	return NULL;
113 114 115 116 117

#ifdef CHUNK_DEBUG
    fprintf(stderr, "%s: chunk_open called\n", ndz->fname);
#endif
    if (getchunkbuffer(chunk)) {
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
	free(chunk);
	return NULL;
    }

    chunk->ndz = ndz;
    chunk->chunkno = chunkno;
    chunk->z.zalloc = Z_NULL;
    chunk->z.zfree = Z_NULL;
    chunk->z.opaque = Z_NULL;
    chunk->z.next_in = Z_NULL;
    chunk->z.avail_in = 0;
    chunk->z.next_out = Z_NULL;
    if (inflateInit(&chunk->z) != Z_OK) {
	free(chunk);
	return NULL;
    }
    chunk->foff = (off_t)chunkno * ndz->chunksize + DEFAULTREGIONSIZE;

136 137 138 139 140 141 142 143
    /*
     * XXX currently we use ndzfile::ndz_readchunkheader for read access to the
     * chunk header. It is better for applications that don't need access to the
     * decompressed data, since it won't drag in libz as use of functions in this
     * file would.
     */
    chunk->cbufhdr = NULL;

144 145 146
    return (ndz_chunk_t)chunk;
}

147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
int
ndz_chunk_rewind(ndz_chunk_t chobj)
{
    struct ndz_chunk *chunk = (struct ndz_chunk *)chobj;
    if (chunk == NULL || chunk->cbufbase == NULL)
	return -1;

    if (inflateReset(&chunk->z) != Z_OK) {
	fprintf(stderr, "chunk_rewind: could not reset zlib state\n");
	return -1;
    }

    chunk->z.next_in = (Bytef *)chunk->cbufdata;
    chunk->z.avail_in = chunk->cbufoff;

    return 0;
}

165 166 167 168 169 170 171
void
ndz_chunk_close(ndz_chunk_t chobj)
{
    struct ndz_chunk *chunk = (struct ndz_chunk *)chobj;
    if (chunk == NULL)
	return;

172 173 174
#ifdef CHUNK_DEBUG
    fprintf(stderr, "%s: chunk_close called\n", chunk->ndz->fname);
#endif
175
    inflateEnd(&chunk->z);
176 177 178

    if (chunk->cbufbase)
	free(chunk->cbufbase);
179 180 181 182 183 184 185 186 187 188 189 190 191
    free(chunk);
}

ndz_chunkno_t
ndz_chunk_chunkno(ndz_chunk_t chobj)
{
    struct ndz_chunk *chunk = (struct ndz_chunk *)chobj;
    if (chunk == NULL)
	return ~0;

    return chunk->chunkno;
}

192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
blockhdr_t *
ndz_chunk_header(ndz_chunk_t chobj)
{
    struct ndz_chunk *chunk = (struct ndz_chunk *)chobj;
    if (chunk == NULL)
	return NULL;

    return chunk->cbufhdr;
}

ssize_t
ndz_chunk_datasize(ndz_chunk_t chobj)
{
    struct ndz_chunk *chunk = (struct ndz_chunk *)chobj;
    if (chunk == NULL)
	return -1;

    return chunk->cbufoff;
}

212 213 214 215 216 217 218 219 220 221 222 223 224
/*
 * Sequentially read data from a chunk til there is no more to be read
 */
ssize_t
ndz_chunk_read(ndz_chunk_t chobj, void *buf, size_t bytes)
{
    int rv;
    ssize_t cc;

    struct ndz_chunk *chunk = (struct ndz_chunk *)chobj;
    if (chunk == NULL)
	return -1;

225 226 227 228
#ifdef CHUNK_DEBUG
    fprintf(stderr, "%s: chunk_read called\n", chunk->ndz->fname);
#endif

229 230 231 232 233
    chunk->z.next_out = (Bytef *)buf;
    chunk->z.avail_out = bytes;
    while (chunk->z.avail_out > 0) {
	/* read more compressed data from file if necessary */
	if (chunk->z.avail_in == 0) {
234 235 236 237 238 239 240
	    cc = ndz_read(chunk->ndz, chunk->cbufdata + chunk->cbufoff,
			  chunk->cbufsize - chunk->cbufoff, chunk->foff);
#ifdef CHUNK_DEBUG
	    fprintf(stderr, "chunk_read: reading %d bytes at %lu returns %ld\n",
		    chunk->cbufsize - chunk->cbufoff,
		    (unsigned long)chunk->foff, cc);
#endif
241 242
	    if (cc <= 0)
		return cc;
243
	    chunk->z.next_in = (Bytef *)(chunk->cbufdata + chunk->cbufoff);
244
	    chunk->z.avail_in = cc;
245
	    chunk->cbufoff += cc;
246 247 248 249 250 251 252 253
	    chunk->foff += cc;
	}
	assert(chunk->z.next_in != Z_NULL);
	assert(chunk->z.avail_in > 0);

	rv = inflate(&chunk->z, Z_SYNC_FLUSH);

	if (rv == Z_STREAM_END) {
254
#ifdef CHUNK_DEBUG
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
	    fprintf(stderr, "chunk_read hit STREAM_END at foff=%ld, avail_out=%d\n",
		    (unsigned long)chunk->foff, chunk->z.avail_out);
#endif
	    break;
	}

	if (rv != Z_OK) {
	    fprintf(stderr, "%s: inflate failed, rv=%d\n",
		    chunk->ndz->fname, rv);
	    return -1;
	}
    }

    return (bytes - chunk->z.avail_out);
}

Mike Hibler's avatar
Mike Hibler committed
271 272 273 274 275
/*
 * XXX for now the write functions are distinct from the read functions
 * til I decide if it is worthwhile to combine.
 */
ndz_chunk_t
276
ndz_chunk_create(struct ndz_file *ndz, ndz_chunkno_t chunkno, int clevel)
Mike Hibler's avatar
Mike Hibler committed
277 278 279 280 281
{
    struct ndz_chunk *chunk = malloc(sizeof *chunk);
    if (chunk == NULL)
	return NULL;

282 283 284 285
#ifdef CHUNK_DEBUG
    fprintf(stderr, "%s: chunk_create called\n", chunk->ndz->fname);
#endif
    if (getchunkbuffer(chunk)) {
Mike Hibler's avatar
Mike Hibler committed
286 287 288
	free(chunk);
	return NULL;
    }
289
    memset(chunk->cbufhdr, 0, DEFAULTREGIONSIZE);
Mike Hibler's avatar
Mike Hibler committed
290

291
    chunk->clevel = clevel;
Mike Hibler's avatar
Mike Hibler committed
292 293 294 295 296
    chunk->ndz = ndz;
    chunk->chunkno = chunkno;
    chunk->z.zalloc = Z_NULL;
    chunk->z.zfree = Z_NULL;
    chunk->z.opaque = Z_NULL;
297
    if (deflateInit(&chunk->z, chunk->clevel) != Z_OK) {
Mike Hibler's avatar
Mike Hibler committed
298 299 300
	free(chunk);
	return NULL;
    }
301
    chunk->foff = (off_t)chunkno * ndz->chunksize + DEFAULTREGIONSIZE;
Mike Hibler's avatar
Mike Hibler committed
302 303 304 305

    return (ndz_chunk_t)chunk;
}

306 307
int
ndz_chunk_flush(ndz_chunk_t chobj, int withheader)
Mike Hibler's avatar
Mike Hibler committed
308 309
{
    struct ndz_chunk *chunk = (struct ndz_chunk *)chobj;
310 311 312
    char *buf = chunk->cbufdata;
    size_t count = chunk->cbufsize;
    off_t off = chunk->foff;
Mike Hibler's avatar
Mike Hibler committed
313 314 315
    ssize_t cc;

    if (chunk == NULL)
316 317 318 319 320
	return -1;

#ifdef CHUNK_DEBUG
    fprintf(stderr, "%s: chunk_flush called\n", chunk->ndz->fname);
#endif
Mike Hibler's avatar
Mike Hibler committed
321

322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
    /*
     * XXX there can be some compressed data left even though we use Z_SYNC_FLUSH.
     * So use Z_FINISH to clear the pipes.
     */

    /* XXX fix up cbufleft since we might have zeroed it in chunk_append */
    chunk->cbufleft = CHUNKSIZE - DEFAULTREGIONSIZE - chunk->cbufoff;
    assert(chunk->cbufleft > 0);

    chunk->z.next_in   = 0;
    chunk->z.avail_in  = 0;
    chunk->z.next_out = (Bytef *)(chunk->cbufdata + chunk->cbufoff);
    chunk->z.avail_out = chunk->cbufleft;

    cc = deflate(&chunk->z, Z_FINISH);
    if (cc != Z_STREAM_END) {
	fprintf(stderr, "chunk_flush: deflate(FINISH) failed, rv=%ld\n", cc);
	return -1;
    }
    if (chunk->z.avail_out == 0) {
	fprintf(stderr,
		"chunk_flush: too much data for chunk; "
		"recompile with larger CHUNKSLOP and try again!\n");
	return -1;
    }
    chunk->cbufoff += (chunk->cbufleft - chunk->z.avail_out);
    chunk->cbufleft = chunk->z.avail_out;
#ifdef CHUNK_DEBUG
    fprintf(stderr, "%s: chunk_flush: %d bytes left in chunk\n",
	    chunk->ndz->fname, chunk->cbufleft);
#endif

    /*
     * XXX XXX horrible horrible hack alert!!!
     * Touch up the header if we are writing it out.
     */
    if (withheader && chunk->cbufhdr->magic > COMPRESSED_V1)
	chunk->cbufhdr->size = chunk->cbufoff;

    /* XXX if nothing has been appended, don't write anything */
    if (chunk->cbufoff > 0 || withheader) {
	/* params are setup by default for no header write, adjust */
	if (withheader) {
	    buf = (char *)chunk->cbufhdr;
	    count += DEFAULTREGIONSIZE;
	    off -= DEFAULTREGIONSIZE;
	}
Mike Hibler's avatar
Mike Hibler committed
369

370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
	/* zero the remaining portion of the chunk data buffer */
	memset(chunk->cbufdata + chunk->cbufoff, 0, chunk->cbufleft);

	cc = ndz_write(chunk->ndz, buf, count, off);
	if (cc != count) {
	    fprintf(stderr,
		    "chunk_flush: failed to write entire chunk (%ld of %lu)\n",
		    cc, count);
	    return -1;
	}
    }

    if (deflateEnd(&chunk->z) != Z_OK) {
	fprintf(stderr, "chunk_flush: deflateEnd failed\n");
	return -1;
    }

    if (chunk->cbufbase)
	free(chunk->cbufbase);
Mike Hibler's avatar
Mike Hibler committed
389
    free(chunk);
390 391

    return 0;
Mike Hibler's avatar
Mike Hibler committed
392 393 394 395 396 397 398 399 400 401 402 403 404 405
}

/*
 * Returns the amount of uncompressed data that the chunk object will
 * accept in an ndz_chunk_append operation. Zero indicates that it is
 * time to flush.
 */
ssize_t
ndz_chunk_left(ndz_chunk_t chobj)
{
    struct ndz_chunk *chunk = (struct ndz_chunk *)chobj;
    if (chunk == NULL)
	return -1;

406
    return chunk->cbufleft - CHUNKSLOP;
Mike Hibler's avatar
Mike Hibler committed
407 408 409 410
}

/*
 * Compress and append data to a chunk.
411
 *
Mike Hibler's avatar
Mike Hibler committed
412 413 414 415
 * Writes are all-or-nothing, returns zero if the full write cannot be done.
 * It is up to the caller to use ndz_chunk_left and resize accordingly.
 */
ssize_t
416
ndz_chunk_append(ndz_chunk_t chobj, void *buf, size_t bytes)
Mike Hibler's avatar
Mike Hibler committed
417
{
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
    int rv;

    struct ndz_chunk *chunk = (struct ndz_chunk *)chobj;
    if (chunk == NULL || chunk->ndz == NULL)
	return -1;

    if ((bytes % chunk->ndz->sectsize) != 0) {
	fprintf(stderr, "chunk_append: write not multiple of sector size\n");
	return -1;
    }

    if (bytes == 0 || bytes > chunk->cbufleft - CHUNKSLOP)
	return 0;

    chunk->z.next_in = (Bytef *)buf;
    chunk->z.avail_in = bytes;
    chunk->z.next_out = (Bytef *)(chunk->cbufdata + chunk->cbufoff);
    chunk->z.avail_out = chunk->cbufleft;
    rv = deflate(&chunk->z, Z_SYNC_FLUSH);
    if (rv != Z_OK) {
	fprintf(stderr, "chunk_append: deflate failed, rv=%d\n", rv);
	return -1;
    }
    if (chunk->z.avail_out == 0) {
	fprintf(stderr,
		"chunk_append: too much data for chunk; "
		"recompile with larger CHUNKSLOP and try again!\n");
	return -1;
    }
    assert(chunk->z.avail_in == 0);

    chunk->cbufoff += (chunk->cbufleft - chunk->z.avail_out);
    chunk->cbufleft = chunk->z.avail_out;
    if (chunk->cbufleft < CHUNKTHRESH)
	chunk->cbufleft = 0;

    return (bytes - chunk->z.avail_in);
Mike Hibler's avatar
Mike Hibler committed
455 456
}

457 458 459 460 461 462 463
/*
 * Local variables:
 * mode: C
 * c-set-style: "BSD"
 * c-basic-offset: 4
 * End:
 */