diff --git a/Makefile b/Makefile index a9421aa..49c899b 100644 --- a/Makefile +++ b/Makefile @@ -45,7 +45,68 @@ pigzn: pigzn.o zopfli/deflate.o zopfli/blocksplitter.o zopfli/tree.o zopfli/lz77 pigzn.o: pigz.c $(CC) $(CFLAGS) -DDEBUG -DNOTHREAD -g -c -o pigzn.o pigz.c -test: pigz +# +# set up pattern rules for tests +# + +LONG_NAME = VeryLongNameVeryLongNameVeryLongNameVeryLongNameVeryLongNameVeryLongNameVeryLongNameVeryLongNameVeryLongNameVeryLongNameVeryLongNameVeryLongNameVeryLongNameVeryLongNameVeryLongNameVeryLongNameVeryLongNameVeryLongName + +TESTFILES = $(addprefix .testfile-, empty pigz.c largefile $(LONG_NAME) ) + +.testfile-empty: + cat /dev/null > $@ + +.testfile-pigz.c: pigz.c + cp pigz.c $@ + +.testfile-largefile: pigz.c + number=1 ; while [[ $$number -le 100 ]] ; do cat pigz.c >> $@ ; ((number = number + 1)) ; done + +.testfile-$(LONG_NAME): .testfile-largefile + cp $< $@ + +TEST_OPTIONS = .gz .b32.gz .1.gz .B.gz .B1.gz .gz2.gz .B-gz2.gz + +.testfile-%.gz : .testfile-% pigz + ./pigz -kf $< && touch $@ + +.testfile-%.b32.gz : .testfile-% pigz + ./pigz -kfb 32 $< && mv $<.gz $@ && touch $@ + +.testfile-%.1.gz : .testfile-% pigz + ./pigz -kfp 1 $< && mv $<.gz $@ && touch $@ + +.testfile-%.B.gz : .testfile-% pigz + ./pigz -kfB $< && mv $<.gz $@ && touch $@ + +.testfile-%.B1.gz : .testfile-% pigz + ./pigz -kfBp 1 $< && mv $<.gz $@ && touch $@ + +.testfile-%.gz2.gz : .testfile-% pigz + ./pigz -kf $< && ./pigz -kf $<.gz && mv $<.gz.gz $@ && touch $@ + +.testfile-%.B-gz2.gz : .testfile-% pigz + ./pigz -kfB $< && ./pigz -kfB $<.gz && mv $<.gz.gz $@ && touch $@ + +.test% : %.gz pigz + ./pigz -t $< + ./pigz -tp 1 $< + gzip -t $< + ./pigz -dc $< > $@.out && diff -q $@.out $* + ./pigz -dcp 1 $< > $@.out && diff -q $@.out $* + gzip -dc $< > $@.out && diff -q $@.out $* + +TESTFILES_GZ = $(foreach option, $(TEST_OPTIONS), $(addsuffix $(option), $(TESTFILES)) ) + +testfiles : $(TESTFILES) $(TESTFILES_GZ) + +TESTS = $(addprefix .test, $(TESTFILES_GZ)) + +.SECONDARY : $(TESTFILES) $(TESTFILES_GZ) $(TESTS) + +moretests : pigz $(TESTS) + +test: pigz moretests ./pigz -kf pigz.c ; ./pigz -t pigz.c.gz ./pigz -kfb 32 pigz.c ; ./pigz -t pigz.c.gz ./pigz -kfp 1 pigz.c ; ./pigz -t pigz.c.gz @@ -62,7 +123,15 @@ test: pigz echo 'compress -f < pigz.c | ./unpigz | cmp - pigz.c' ;\ compress -f < pigz.c | ./unpigz | cmp - pigz.c ;\ fi - @rm -f pigz.c.gz pigz.c.zz pigz.c.zip + ./pigz -cp 1 < /dev/null | ./pigz -t - + ./pigz -c < /dev/null | ./pigz -t - + ./pigz -kfB -p 1 pigz.c \ + && ./pigz -d -c -p 1 pigz.c.gz | diff -q - pigz.c \ + && ./pigz -d -c -p 2 pigz.c.gz | diff -q - pigz.c + ./pigz -kfB pigz.c \ + && ./pigz -d -c -p 1 pigz.c.gz | diff -q - pigz.c \ + && ./pigz -d -c -p 2 pigz.c.gz | diff -q - pigz.c + @rm -f pigz.c.gz pigz.c.zz pigz.c.zip .test* tests: dev test ./pigzn -kf pigz.c ; ./pigz -t pigz.c.gz @@ -74,4 +143,4 @@ pigz.pdf: pigz.1 groff -mandoc -f H -T ps pigz.1 | ps2pdf - pigz.pdf clean: - @rm -f *.o zopfli/*.o pigz unpigz pigzn pigzt pigz.c.gz pigz.c.zz pigz.c.zip + @rm -f *.o zopfli/*.o pigz unpigz pigzn pigzt pigz.c.gz pigz.c.zz pigz.c.zip .test* diff --git a/pigz.c b/pigz.c index df25cf3..fbedeab 100644 --- a/pigz.c +++ b/pigz.c @@ -294,6 +294,15 @@ the first set of compressions are being performed. The number of output buffers is not directly limited, but is indirectly limited by the release of input buffers to about the same number. + + Added support for the BGZF Blocked GNU Zip Format extension to gzip that enables + parallel decompression as well as random access to the uncompressed contents + of a gzip archive. The BGZF specification is described in the SAM format + specification: http://samtools.github.io/hts-specs/SAMv1.pdf + Where the uncompressed data is compressed in blocks of up to 65280 bytes with a + gzip header and trailer. Each block can be decompressed independently and + if the file is indexed by block boundaries, randomly accessed. + */ /* use large file functions if available */ @@ -447,8 +456,25 @@ #define INBUFS(p) (((p)<<1)+3) #define OUTPOOL(s) ((s)+((s)>>4)+DICT) +/* BGZF constants */ +#define BGZF_MAX_BLOCK_SIZE 0x10000 +#define BGZF_BLOCK_SIZE 0x0ff00 // make sure compressed_bytes(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE +#define BGZF_HEADER_SIZE 18 +#define BGZF_FOOTER_SIZE 8 +/* BGZF/GZIP header (speciallized from RFC 1952; little endian): + +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ + | 31|139| 8| 4| 0| 0|255|XLEN=6 | 66| 67| 2|BLK_LEN| + +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ + ... compressed_data[ BLK_LEN - XLEN - 19 ] + +---+---+---+---+---+---+---+---+ + | CRC32 | ISIZE | + +---+---+---+---+---+---+---+---+ + */ +/* bgzf_eof is the last block signifying EOF */ +local const uint8_t bgzf_eof[28] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0"; + /* input buffer size */ -#define BUF 32768U +#define BUF BGZF_MAX_BLOCK_SIZE /* globals (modified by main thread only when it's the only thread) */ local struct { @@ -463,6 +489,8 @@ local struct { int keep; /* true to prevent deletion of input file */ int force; /* true to overwrite, compress links, cat */ int form; /* gzip = 0, zlib = 1, zip = 2 or 3 */ + int bgzf; /* true to include BGZF BlockCompress compliance */ + int bgzf_bsize; /* the remainder size of the next block to decompress */ unsigned char magic1; /* first byte of possible header when decoding */ int recurse; /* true to dive down into directory structure */ char *sufx; /* suffix to use (".gz" or user supplied) */ @@ -864,11 +892,12 @@ local unsigned long time2dos(time_t t) #define PUT4M(a,b) (*(a)=(b)>>24,(a)[1]=(b)>>16,(a)[2]=(b)>>8,(a)[3]=(b)) /* write a gzip, zlib, or zip header using the information in the globals */ -local unsigned long put_header(void) +/* compressedLen is ignored unless this is a BGZF formated gzip */ +local unsigned long put_header(long compressedLen) { unsigned long len; unsigned char head[30]; - + unsigned int bsize; /* for bgzf */ if (g.form > 1) { /* zip */ /* write local header */ PUT4L(head, 0x04034b50UL); /* local header signature */ @@ -918,12 +947,59 @@ local unsigned long put_header(void) PUT4L(head + 4, g.mtime); head[8] = g.level >= 9 ? 2 : (g.level == 1 ? 4 : 0); head[9] = 3; /* unix */ - writen(g.outd, head, 10); - len = 10; - if (g.name != NULL) + + if (g.bgzf) { + assert(compressedLen > 0); + + head[3] |= 4; /* include FEXTRA */ + PUT2L(head+10, 6); /* 6 byte XLEN */ + head[12] = 'B'; /* BC tag */ + head[13] = 'C'; + PUT2L(head+14, 2); /* 2 byte BSIZE field */ + + /* calculate BGZF bsize. Account for FNAME field, but FCOMMENT and FHCRC are never included */ + bsize = BGZF_HEADER_SIZE + (g.name == NULL ? 0 : strlen(g.name) + 1) + compressedLen + BGZF_FOOTER_SIZE - 1; + + if (bsize >= BGZF_MAX_BLOCK_SIZE - 1) { + assert(g.name != NULL); + /* Including name field will exceed the capacity of the BGZF block. + * write name now, then empty deflate stream then trailer followed by a new header + */ + bsize = BGZF_HEADER_SIZE + (g.name == NULL ? 0 : strlen(g.name) + 1) + 2 + BGZF_FOOTER_SIZE - 1; + PUT2L(head+16, (uint16_t) (bsize)); + writen(g.outd, head, BGZF_HEADER_SIZE); + writen(g.outd, (unsigned char *)g.name, strlen(g.name) + 1); + g.name = NULL; /* do not include name again */ + /* empty deflate Z_FINISH-ed deflate stream (2 bytes 3, 0 */ + head[16] = 3; + head[17] = 0; + /* 0 check and 0 uncompressed length */ + head[18] = head[19] = head[20] = head[21] = 0; /* trailer check */ + head[22] = head[23] = head[24] = head[25] = 0; /* trailer ulen */ + writen(g.outd, head+16, 2+4+4); + return put_header(compressedLen); + } + + assert( bsize > BGZF_HEADER_SIZE + BGZF_FOOTER_SIZE && bsize < BGZF_MAX_BLOCK_SIZE ); + assert( (head[3] & 2) == 0 && (head[3] & 16) == 0); + + /* write 2-byte BSIZE field */ + PUT2L(head+16, (uint16_t) (bsize)); + + len = 18; + writen(g.outd, head, len); + + } else { + writen(g.outd, head, 10); + len = 10; + } + if (g.name != NULL) { writen(g.outd, (unsigned char *)g.name, strlen(g.name) + 1); - if (g.name != NULL) len += strlen(g.name) + 1; + if (g.bgzf) { + g.name = NULL; /* do not write this name again for BGZF */ + } + } } return len; } @@ -1006,6 +1082,25 @@ local void put_trailer(unsigned long ulen, unsigned long clen, /* compute check value depending on format */ #define CHECK(a,b,c) (g.form == 1 ? adler32(a,b,c) : crc32(a,b,c)) +local void put_bgzf_trailer_and_header(unsigned long *ulen, unsigned long *clen, + unsigned long *check, unsigned long *head) +{ + assert( g.bgzf ); + if (*head > 0) { + /* finish this block */ + put_trailer(*ulen, *clen, *check, *head); + } // else this is the first block header... + + /* write new header for new block */ + *head = put_header(*clen); + + assert( *clen + *head + BGZF_FOOTER_SIZE <= BGZF_MAX_BLOCK_SIZE ); + + /* reset counting */ + *ulen = *clen = 0; + *check = CHECK(0L, Z_NULL, 0); +} + #ifndef NOTHREAD /* -- threaded portions of pigz -- */ @@ -1186,17 +1281,34 @@ local struct space *get_space(struct pool *pool) pool->limit--; pool->made++; release(pool->have); + space = NULL; space = MALLOC(sizeof(struct space)); - if (space == NULL) - bail("not enough memory", ""); - space->use = new_lock(1); /* initially one user */ - space->buf = MALLOC(pool->size); - if (space->buf == NULL) - bail("not enough memory", ""); - space->size = pool->size; - space->len = 0; - space->pool = pool; /* remember the pool this belongs to */ - return space; + if (space != NULL) { + space->buf = NULL; + space->buf = MALLOC(pool->size); + if (space->buf != NULL) { + space->use = new_lock(1); /* initially one user */ + space->size = pool->size; + space->len = 0; + space->pool = pool; /* remember the pool this belongs to */ + return space; + } + } + assert( space == NULL || space->buf == NULL ); + if (pool->limit < 0 && pool->made > 1) { + possess(pool->have); + if (space != NULL) { + FREE(space); + } + pool->limit = 0; /* do not make any more buffers */ + pool->made--; + complain("Running out of memory with %d buffers of %d bytes, reducing buffer count\n", pool->made, pool->size); + twist(pool->have, TO, 0); + /* try again this time waiting for an existing buffer */ + return get_space(pool); + } + bail("not enough memory", ""); + return NULL; } /* compute next size up by multiplying by about 2**(1/3) and round to the next @@ -1361,6 +1473,8 @@ local void finish_jobs(void) possess(compress_have); job.seq = -1; job.next = NULL; + job.in = NULL; + job.out = NULL; compress_head = &job; compress_tail = &(job.next); twist(compress_have, BY, +1); /* will wake them all up */ @@ -1407,6 +1521,136 @@ local void deflate_engine(z_stream *strm, struct space *out, int flush) assert(strm->avail_in == 0); } +local void uncompress_thread(void *dummy) +{ + int ret, i; + struct job *job; /* job pulled and working on */ + struct job *here, **prior; /* pointers for inserting in write list */ + struct space *out; /* pointer to output buffer */ + unsigned long check, incheck; /* check value of uncompressed output */ + unsigned long len, inlen; /* length of uncompressed output */ + z_stream strm; /* inflate stream */ + + (void)dummy; + + // prepare strm for inflate + strm.zalloc = ZALLOC; + strm.zfree = ZFREE; + strm.opaque = OPAQUE; + + ret = inflateInit2(&strm, -15); + if (ret != Z_OK) { + bail("not enough memory", ""); + } + + /* keep looking for work */ + for (;;) { + + job = NULL; + /* acquire and allocate output buffer */ + out = get_space(&out_pool); + while( out->size < BGZF_MAX_BLOCK_SIZE) { + grow_space(out); + } + out->len = 0; + + /* get a job (like I tell my son) */ + possess(compress_have); + wait_for(compress_have, NOT_TO_BE, 0); + job = compress_head; + assert(job != NULL); + if (job->seq == -1) { + job = NULL; + break; + } + compress_head = job->next; + if (job->next == NULL) { + compress_tail = &compress_head; + } + twist(compress_have, BY, -1); + + /* get output buffer space */ + assert(job->out == NULL); + job->out = out; + + assert(job->in != NULL && job->in->len > BGZF_FOOTER_SIZE); + + /* decompress, compute lengths and check value */ + strm.total_in = strm.total_out = 0; + + strm.next_in = job->in->buf; + strm.avail_in = job->in->len; + + strm.next_out = out->buf; + strm.avail_out = out->size; + + ret = inflate(&strm, Z_FINISH); + if (ret != Z_STREAM_END) { + bail("corrupted input -- invalid BGZF deflate data: ", g.inf); + } + + if (strm.avail_in != BGZF_FOOTER_SIZE) { + bail("corrupted input -- invalid byte count in BGZF block", g.inf); + } + + if (inflateReset(&strm) != Z_OK) { + bail("corrupted input -- inflateReset failed in BGZF block", g.inf); + } + + /* compute and record uncompressed data length */ + len = strm.next_out - out->buf; + out->len = len; + incheck = 0; + for(i=0; i<4;i++) { + incheck += (unsigned) *(strm.next_in++) << (8*i); + } + inlen = 0; + for(i=0; i<4; i++) { + inlen += (unsigned) *(strm.next_in++) << (8*i); + } + strm.avail_in -= 8; + assert(strm.avail_in == 0); + + /* compute uncompressed data check */ + check = CHECK(0L, Z_NULL, 0); + check = CHECK(check, out->buf, out->len); + + /* check gzip trailer */ + if (check != incheck || len != inlen) { + complain("Calculated %lu check and %lu length, but got %lu and %lu in the footer\n", check, len, incheck, inlen); + bail("corrupted input checksum or length in BGZF block", g.inf); + } + + if (g.decode == 1) { + + /* insert write job in list in sorted order, alert write thread */ + possess(write_first); + prior = &write_head; + while ((here = *prior) != NULL) { + if (here->seq > job->seq) + break; + prior = &(here->next); + } + job->next = here; + *prior = job; + twist(write_first, TO, write_head->seq); + + } else { + drop_space(job->in); + drop_space(job->out); + FREE(job); + } + + } + if (inflateEnd(&strm) != Z_OK) { + bail("corrupted input -- inflateEnd failed in BGZF block", g.inf); + } + + /* found job with seq == -1 -- free deflate memory and return to join */ + release(compress_have); + drop_space(out); +} + /* get the next compression job from the head of the list, compress and compute the check value on the input, and put a job in the write list with the results -- keep looking for more jobs, returning when a job is found with a @@ -1519,9 +1763,9 @@ local void compress_thread(void *dummy) /* run the last piece through deflate -- end on a byte boundary, using a sync marker if necessary, or finish the - deflate stream if this is the last block */ + deflate stream if this is the last block or a BGZF compliant gzip */ strm.avail_in = (unsigned)len; - if (left || job->more) { + if (g.bgzf == 0 && (left || job->more)) { #if ZLIB_VERNUM >= 0x1260 deflate_engine(&strm, job->out, Z_BLOCK); @@ -1630,8 +1874,9 @@ local void compress_thread(void *dummy) } /* collect the write jobs off of the list in sequence order and write out the - compressed data until the last chunk is written -- also write the header and - trailer and combine the individual check values of the input buffers */ + output data until the last chunk is written. + if decoding, also write the header and trailer and combine the individual + check values of the input buffers */ local void write_thread(void *dummy) { long seq; /* next sequence number looking for */ @@ -1647,8 +1892,14 @@ local void write_thread(void *dummy) /* build and write header */ Trace(("-- write thread running")); - head = put_header(); - + if (!g.decode) { + if (g.bgzf) { + // BGZF will write header just in time. + head = 0; + } else { + head = put_header(0); + } + } /* process output of compress threads until end of input */ ulen = clen = 0; check = CHECK(0L, Z_NULL, 0); @@ -1665,32 +1916,50 @@ local void write_thread(void *dummy) more = job->more; len = job->in->len; drop_space(job->in); - ulen += (unsigned long)len; + clen += (unsigned long)(job->out->len); + if (g.bgzf && !g.decode) { + /* special logic for BGZF compliant gzip... finish the block and start a new one */ + put_bgzf_trailer_and_header(&ulen, &clen, &check, &head); + } + + ulen += (unsigned long)len; + /* write the compressed data and drop the output buffer */ Trace(("-- writing #%ld", seq)); writen(g.outd, job->out->buf, job->out->len); drop_space(job->out); Trace(("-- wrote #%ld%s", seq, more ? "" : " (last)")); - /* wait for check calculation to complete, then combine, once - the compress thread is done with the input, release it */ - possess(job->calc); - wait_for(job->calc, TO_BE, 1); - release(job->calc); - check = COMB(check, job->check, len); - - /* free the job */ - free_lock(job->calc); + if (!g.decode) { + /* wait for check calculation to complete, then combine, once + the compress thread is done with the input, release it */ + possess(job->calc); + wait_for(job->calc, TO_BE, 1); + release(job->calc); + check = COMB(check, job->check, len); + + /* free the job */ + free_lock(job->calc); + } FREE(job); /* get the next buffer in sequence */ seq++; } while (more); - /* write trailer */ - put_trailer(ulen, clen, check, head); + if (!g.decode) { + /* write trailer */ + if (head == 0) { + head = put_header(0); + } + put_trailer(ulen, clen, check, head); + if (g.bgzf) { + /* write the final trailing EOF block */ + writen(g.outd, (unsigned char*) bgzf_eof, 28); + } + } /* verify no more jobs, prepare for next use */ possess(compress_have); @@ -1917,9 +2186,19 @@ local void parallel_compress(void) do { \ strm->avail_out = out_size; \ strm->next_out = out; \ + tmp_ulen = strm->avail_in; \ + check = CHECK(check, strm->next_in, strm->avail_in); \ (void)deflate(strm, flush); \ - writen(g.outd, out, out_size - strm->avail_out); \ clen += out_size - strm->avail_out; \ + if (g.bgzf) { \ + tmp_check = check; \ + put_bgzf_trailer_and_header(&last_ulen, &clen, &last_check, &head); \ + ulen = last_ulen; \ + check = last_check; \ + last_ulen = tmp_ulen; \ + last_check = tmp_check; \ + } \ + writen(g.outd, out, out_size - strm->avail_out); \ } while (strm->avail_out == 0); \ assert(strm->avail_in == 0); \ } while (0) @@ -1942,6 +2221,9 @@ local void single_compress(int reset) unsigned long ulen; /* total uncompressed size (overflow ok) */ unsigned long clen; /* total compressed size (overflow ok) */ unsigned long check; /* check value of uncompressed data */ + unsigned long last_ulen; /* previous block uncompressed size */ + unsigned long last_check; /* previous block checksum */ + unsigned long tmp_check, tmp_ulen; static unsigned out_size; /* size of output buffer */ static unsigned char *in, *next, *out; /* reused i/o buffers */ static z_stream *strm = NULL; /* reused deflate structure */ @@ -1976,7 +2258,12 @@ local void single_compress(int reset) } /* write header */ - head = put_header(); + if (g.bgzf) { + // BGZF will write header just in time + head = 0; + } else { + head = put_header(0); + } /* set compression level in case it changed */ if (g.level <= 9) { @@ -1993,6 +2280,8 @@ local void single_compress(int reset) clen = 0; have = 0; check = CHECK(0L, Z_NULL, 0); + last_check = check; + last_ulen = 0; hash = RSYNCHIT; do { /* get data to compress, see if there is any more input */ @@ -2055,7 +2344,7 @@ local void single_compress(int reset) } while (hash != RSYNCHIT); got -= left; } - + /* clear history for --independent option */ fresh = 0; if (!g.setdict) { @@ -2074,16 +2363,16 @@ local void single_compress(int reset) /* compress MAXP2-size chunks in case unsigned type is small */ while (got > MAXP2) { strm->avail_in = MAXP2; - check = CHECK(check, strm->next_in, strm->avail_in); DEFLATE_WRITE(Z_NO_FLUSH); got -= MAXP2; } - /* compress the remainder, emit a block, finish if end of input */ + /* compress the remainder, emit a block, finish if end of input + or if this is a BGZF compliant gzip + */ strm->avail_in = (unsigned)got; got = left; - check = CHECK(check, strm->next_in, strm->avail_in); - if (more || got) { + if (g.bgzf == 0 && (more || got)) { #if ZLIB_VERNUM >= 0x1260 int bits; @@ -2159,7 +2448,17 @@ local void single_compress(int reset) } while (more || got); /* write trailer */ - put_trailer(ulen, clen, check, head); + if (head == 0) { + head = put_header(0); + } + + if (g.bgzf) { + /* write the final BGZF EOF block */ + put_trailer(last_ulen, clen, last_check, head); + writen(g.outd, (unsigned char*) bgzf_eof, 28); + } else { + put_trailer(ulen, clen, check, head); + } } /* --- decompression --- */ @@ -2197,7 +2496,7 @@ local size_t load(void) g.in_left = 0; return 0; } - + #ifndef NOTHREAD /* if first time in or procs == 1, read a buffer to have something to return, otherwise wait for the previous read job to complete */ @@ -2208,7 +2507,6 @@ local size_t load(void) g.load_state = new_lock(1); g.load_thread = launch(load_read, NULL); } - /* wait for the previously requested read to complete */ possess(g.load_state); wait_for(g.load_state, TO_BE, 0); @@ -2378,7 +2676,10 @@ local int get_header(int save) unsigned magic; /* magic header */ int method; /* compression method */ int flags; /* header flags */ + int count; /* number of bytes read */ unsigned fname, extra; /* name and extra field lengths */ + char c1, c2; /* for parsing extra field type */ + unsigned field_len; /* for parsing extra field */ unsigned tmp2; /* for macro */ unsigned long tmp4; /* for macro */ @@ -2474,16 +2775,38 @@ local int get_header(int save) else SKIP(4); - /* skip extra field and OS */ + /* skip extra flags and OS */ SKIP(2); - /* skip extra field, if present */ + if (g.in_eof) + return -3; + + /* parse extra field, if present */ + g.bgzf = g.bgzf_bsize = extra = 0; /* assume it is NOT BGZF */ if (flags & 4) { extra = GET2(); if (g.in_eof) return -3; - SKIP(extra); + // parse extra field(s) look for BC && set g.bgzf_bsize, if present + count = extra; + g.bgzf_bsize = 0; + while (count > 0) { + c1 = GET(); c2 = GET(); field_len = GET2(); + if (g.in_eof) + return -3; + if (c1 == 'B' && c2 == 'C' && field_len == 2) { + /* read the BGZF block size value */ + g.bgzf = 1; + g.bgzf_bsize = GET2(); + } else { + SKIP(field_len); + } + if (g.in_eof) + return -3; + count -= 4 + field_len; + } } + count = extra + 12; /* read file name, if present, into allocated memory */ if ((flags & 8) && save) { @@ -2509,23 +2832,40 @@ local int get_header(int save) have += copy; g.in_left -= copy; g.in_next += copy; + count+=copy; } while (end == NULL); } - else if (flags & 8) - while (GET() != 0) + else if (flags & 8) { + while (GET() != 0) { if (g.in_eof) return -3; + count++; + } + } /* skip comment */ if (flags & 16) - while (GET() != 0) + while (GET() != 0) { if (g.in_eof) return -3; + count++; + } /* skip header crc */ - if (flags & 2) + if (flags & 2) { SKIP(2); + count += 2; + } + + if (g.in_eof) + return -3; + if (g.bgzf && g.bgzf_bsize) { + // subtract header bytes already read, but include trailer bytes at the end... + assert(g.bgzf_bsize > count + BGZF_FOOTER_SIZE - 1); + assert(count >= BGZF_HEADER_SIZE); + g.bgzf_bsize = g.bgzf_bsize - count + 1; // add 1 (see BGZF specification) + } /* return gzip compression method */ g.form = 0; return method; @@ -2915,37 +3255,143 @@ local int outb(void *desc, unsigned char *buf, unsigned len) read and check the gzip, zlib, or zip trailer */ local void infchk(void) { - int ret, cont, was; - unsigned long check, len; + int ret, cont, was, count; + unsigned long check, len, seq, block_len; z_stream strm; unsigned tmp2; unsigned long tmp4; off_t clen; + struct job *job; + struct space *input; cont = 0; + assert(g.decode); + +#ifndef NOTHREAD + if (g.form == 0 && g.bgzf && g.procs > 1) { + /* This is a BGZF gzip file, decompress all blocks in parallel until the form changes */ + setup_jobs(); + if (g.decode == 1) { + /* start write thread */ + writeth = launch(write_thread, NULL); + } + + seq = 0; + job = NULL; + do { + + /* create a new job */ + job = MALLOC(sizeof(struct job)); + if (job == NULL) { + bail("not enough memory", ""); + } + job->calc = NULL; // no calc lock in decode + job->lens = NULL; + job->next = NULL; + + /* update input spaces */ + block_len = g.bgzf_bsize; + input = get_space(&in_pool); + + while (input->size < block_len) { + grow_space(input); + } + assert(input->len == 0); + job->in = input; + job->out = NULL; + job->more = 1; + job->seq = seq; + Trace(("-- read #%ld%s", "")); + if (++seq < 1) { + bail("input too long: ", g.inf); + } + + /* read block_len from g.in_next, modify g.in_left */ + /* instead of reading directly: job->in->len += readn(g.ind, job->in->buf, block_len); */ + while (block_len) { + count = block_len > g.in_left ? g.in_left : block_len; + memcpy(input->buf + input->len, g.in_next, count); + g.in_next += count; + g.in_left -= count; + input->len += count; + block_len -= count; + if (g.in_left == 0) { + load(); + if (g.in_left == 0) { + break; } + } + } + + if (input->len != (size_t) g.bgzf_bsize) { + bail("incomplete BGZF gzip -- reached eof", ""); + } + + /* preparation of job is complete */ + + /* start another uncompress thread if needed */ + if ((unsigned long) cthreads < seq && cthreads < g.procs) { + (void)launch(uncompress_thread, NULL); + cthreads++; + } + + was = g.form; + + ret = get_header(0); + if (ret != 8) { + job->more = 0; + } + + /* put job at end of compress list, let all the compressors know */ + possess(compress_have); + job->next = NULL; + *compress_tail = job; + compress_tail = &(job->next); + twist(compress_have, BY, +1); + + } while (was == 0 && ret == 8 && g.form == 0 && g.bgzf && g.bgzf_bsize); + + if (g.decode == 1) { + /* wait for the write thread to complete (we leave the compress threads out + there and waiting in case there is another stream to compress) */ + join(writeth); + writeth = NULL; + } else { + /* wait for last block to be processed */ + possess(compress_have); + wait_for(compress_have, TO_BE, 0); + release(compress_have); + } + + if ( ret != 8 ) { + return; + } + } +#endif + do { /* header already read -- set up for decompression */ g.in_tot = g.in_left; /* track compressed data length */ g.out_tot = 0; g.out_check = CHECK(0L, Z_NULL, 0); + strm.zalloc = ZALLOC; strm.zfree = ZFREE; strm.opaque = OPAQUE; ret = inflateBackInit(&strm, 15, out_buf); if (ret != Z_OK) bail("not enough memory", ""); - + /* decompress, compute lengths and check value */ strm.avail_in = g.in_left; strm.next_in = g.in_next; ret = inflateBack(&strm, inb, NULL, outb, NULL); if (ret != Z_STREAM_END) - bail("corrupted input -- invalid deflate data: ", g.inf); + bail("corrupted input -- invalid deflate data: ", g.inf); g.in_left = strm.avail_in; g.in_next = strm.next_in; inflateBackEnd(&strm); outb(NULL, NULL, 0); /* finish off final write and check */ - + /* compute compressed data length */ clen = g.in_tot - g.in_left; @@ -3617,6 +4063,7 @@ local char *helptext[] = { " -F --first Do iterations first, before block split for -11", " -h, --help Display a help screen and quit", " -i, --independent Compress blocks independently for damage recovery", +" -B, --bgzf Create BGZF compliant gzip. (overrides -b and -i)", " -I, --iterations n Number of iterations for -11 optimization", " -k, --keep Do not delete original file after processing", " -K, --zip Compress to PKWare zip (.zip) single entry format", @@ -3713,6 +4160,8 @@ local void defaults(void) g.force = 0; /* don't overwrite, don't compress links */ g.recurse = 0; /* don't go into directories */ g.form = 0; /* use gzip format */ + g.bgzf = 0; /* do not create BGZF gzip */ + g.bgzf_bsize = 0; /* size of bgzf block if this is a BGZF decompress */ } /* long options conversion to short options */ @@ -3725,7 +4174,7 @@ local char *longopts[][2] = { {"processes", "p"}, {"quiet", "q"}, {"recursive", "r"}, {"rsyncable", "R"}, {"silent", "q"}, {"stdout", "c"}, {"suffix", "S"}, {"test", "t"}, {"to-stdout", "c"}, {"uncompress", "d"}, {"verbose", "v"}, - {"version", "V"}, {"zip", "K"}, {"zlib", "z"}}; + {"version", "V"}, {"zip", "K"}, {"zlib", "z"}, {"bgzf", "B"}}; #define NLOPTS (sizeof(longopts) / (sizeof(char *) << 1)) /* either new buffer size, new compression level, or new number of processes -- @@ -3844,6 +4293,7 @@ local int option(char *arg) case 'f': g.force = 1; break; case 'h': help(); break; case 'i': g.setdict = 0; break; + case 'B': g.bgzf = 1; break; case 'k': g.keep = 1; break; case 'l': g.list = 1; break; case 'n': g.headis &= ~1; break; @@ -4007,6 +4457,18 @@ int main(int argc, char **argv) if (done == 1 && g.pipeout && !g.decode && !g.list && g.form > 1) complain("warning: output will be concatenated zip files -- " "will not be able to extract"); + if (g.bgzf) { + if (g.decode) { + /* decompressing. Ignore this hint as BGZF autodetected from gzip header, + and use g.bgzf_bsize as next block size when reading */ + g.bgzf = 0; + g.block = BGZF_MAX_BLOCK_SIZE; + } else { + /* compressing. Override settings on block size and force independent checksums */ + g.block = BGZF_BLOCK_SIZE; + g.setdict = 0; + } + } process(strcmp(argv[n], "-") ? argv[n] : NULL); done++; }