From 07286ad05fc2a60a485f542340aa04ceeaa3748c Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Wed, 8 Sep 2021 16:02:41 +0200 Subject: Implement lz4::{istream,ostream} --- libbutl/lz4-stream.cxx | 281 +++++++++++++++++++++++++++++++++++++++++++++++++ libbutl/lz4-stream.hxx | 280 ++++++++++++++++++++++++++++++++++++++++++++++++ libbutl/lz4.cxx | 136 ++++++++++++++++-------- libbutl/lz4.hxx | 19 ++-- tests/lz4/testscript | 9 +- 5 files changed, 675 insertions(+), 50 deletions(-) create mode 100644 libbutl/lz4-stream.cxx create mode 100644 libbutl/lz4-stream.hxx diff --git a/libbutl/lz4-stream.cxx b/libbutl/lz4-stream.cxx new file mode 100644 index 0000000..43b0250 --- /dev/null +++ b/libbutl/lz4-stream.cxx @@ -0,0 +1,281 @@ +// file : libbutl/lz4-stream.cxx -*- C++ -*- +// license : MIT; see accompanying LICENSE file + +#include + +#include // memcpy() +#include // invalid_argument + +#include // eof() + +using namespace std; + +namespace butl +{ + namespace lz4 + { + // istream + // + + // Read into the specified buffer returning the number of bytes read and + // the eof flag. + // + pair istreambuf:: + read (char* b, size_t c) + { + size_t n (0); + bool e (false); + + // @@ TODO: would it be faster to do a direct buffer copy if input + // stream is bufstreabuf-based (see sha*.cxx for code)? + do + { + e = eof (is_->read (b + n, c - n)); + n += static_cast (is_->gcount ()); + } + while (!e && n != c); + + return make_pair (n, e); + } + + optional istreambuf:: + open (std::istream& is, bool end) + { + assert (is.exceptions () == std::istream::badbit); + + is_ = &is; + end_ = end; + + // Read in the header and allocate the buffers. + // + // What if we hit EOF here? And could begin() return 0? Turns out the + // answer to both questions is yes: 0-byte content compresses to 15 + // bytes (with or without content size; 1-byte -- to 20/28 bytes). We + // can ignore EOF here since an attempt to read more will result in + // another EOF. And our load() is prepared to handle 0 hint. + // + // @@ We could end up leaving some of the input content from the header + // in the input buffer which the caller will have to way of using + // (e.g., in a stream of compressed contents). Doesn't look like + // there is much we can do (our streams don't support putback) other + // than document this limitation. + // + optional r; + + d_.hn = read (d_.hb, sizeof (d_.hb)).first; + h_ = d_.begin (&r); + + ib_.reset ((d_.ib = new char[d_.ic])); + ob_.reset ((d_.ob = new char[d_.oc])); + + // Copy over whatever is left in the header buffer. + // + memcpy (d_.ib, d_.hb, (d_.in = d_.hn)); + + setg (d_.ob, d_.ob, d_.ob); + return r; + } + + void istreambuf:: + close () + { + if (is_open ()) + { + is_ = nullptr; + } + } + + istreambuf::int_type istreambuf:: + underflow () + { + int_type r (traits_type::eof ()); + + if (is_open ()) + { + if (gptr () < egptr () || load ()) + r = traits_type::to_int_type (*gptr ()); + } + + return r; + } + + bool istreambuf:: + load () + { + // Note that the first call to this function may be with h_ == 0 (see + // open() for details). In which case we just need to verify there is + // no just after the compressed content. + // + bool r; + + if (h_ == 0) + r = false; // EOF + else + { + // Note: next() may just buffer the data. + // + do + { + // Note that on the first call we may already have some data in the + // input buffer (leftover header data). + // + if (h_ > d_.in) + { + pair p (read (d_.ib + d_.in, h_ - d_.in)); + + d_.in += p.first; + + if (p.second && d_.in != h_) + throw invalid_argument ("incomplete compressed content"); + } + + h_ = d_.next (); // Clears d_.in. + + } while (d_.on == 0 && h_ != 0); + + setg (d_.ob, d_.ob, d_.ob + d_.on); + off_ += d_.on; + r = (d_.on != 0); + } + + // If we don't expect any more compressed content and we were asked to + // end the underlying input stream, then verify there is no more input. + // + if (h_ == 0 && end_) + { + end_ = false; + + if (d_.in != 0 || + (!is_->eof () && + is_->good () && + is_->peek () != istream::traits_type::eof ())) + throw invalid_argument ("junk after compressed content"); + } + + return r; + } + + // ostream + // + + void ostreambuf:: + write (char* b, std::size_t n) + { + os_->write (b, static_cast (n)); + } + + void ostreambuf:: + open (std::ostream& os, + int level, + int block_id, + optional content_size) + { + assert (os.exceptions () == (std::ostream::badbit | + std::ostream::failbit)); + + os_ = &os; + + // Determine required buffer capacities. + // + c_.begin (level, block_id, content_size); + + ib_.reset ((c_.ib = new char[c_.ic])); + ob_.reset ((c_.ob = new char[c_.oc])); + + setp (c_.ib, c_.ib + c_.ic - 1); // Keep space for overflow's char. + end_ = false; + } + + void ostreambuf:: + close () + { + if (is_open ()) + { + if (!end_) + save (); + + os_ = nullptr; + } + } + + ostreambuf:: + ~ostreambuf () + { + close (); + } + + ostreambuf::int_type ostreambuf:: + overflow (int_type c) + { + int_type r (traits_type::eof ()); + + if (is_open () && c != traits_type::eof ()) + { + // Store last character in the space we reserved in open(). Note + // that pbump() doesn't do any checks. + // + *pptr () = traits_type::to_char_type (c); + pbump (1); + + save (); + r = c; + } + + return r; + } + + void ostreambuf:: + save () + { + c_.in = pptr () - pbase (); + off_ += c_.in; + + // We assume this is the end if the input buffer is not full. + // + end_ = (c_.in != c_.ic); + c_.next (end_); + + if (c_.on != 0) // next() may just buffer the data. + write (c_.ob, c_.on); + + setp (c_.ib, c_.ib + c_.ic - 1); + } + + streamsize ostreambuf:: + xsputn (const char_type* s, streamsize sn) + { + if (!is_open () || end_) + return 0; + + // To avoid futher 'signed/unsigned comparison' compiler warnings. + // + size_t n (static_cast (sn)); + + // The plan is to keep copying the data into the input buffer and + // calling save() (our compressor API currently has no way of avoiding + // the copy). + // + while (n != 0) + { + // Amount of free space in the buffer (including the extra byte + // we've reserved). + // + size_t an (epptr () - pptr () + 1); + + size_t m (n > an ? an : n); + memcpy (pptr (), s, m); + pbump (static_cast (m)); + + if (n < an) + break; // All fitted with at least 1 byte left. + + save (); + + s += m; + n -= m; + } + + return sn; + } + } +} diff --git a/libbutl/lz4-stream.hxx b/libbutl/lz4-stream.hxx new file mode 100644 index 0000000..611fe37 --- /dev/null +++ b/libbutl/lz4-stream.hxx @@ -0,0 +1,280 @@ +// file : libbutl/lz4-stream.hxx -*- C++ -*- +// license : MIT; see accompanying LICENSE file + +#pragma once + +#include // unique_ptr +#include // size_t +#include // uint64_t +#include // move() +#include +#include +#include + +#include +#include +#include + +#include + +namespace butl +{ + namespace lz4 + { + // istream + // + + class LIBBUTL_SYMEXPORT istreambuf: public bufstreambuf + { + public: + optional + open (std::istream&, bool end); + + bool + is_open () const {return is_ != nullptr;} + + void + close (); + + public: + using base = bufstreambuf; + + // basic_streambuf input interface. + // + public: + virtual int_type + underflow () override; + + // Direct access to the get area. Use with caution. + // + using base::gptr; + using base::egptr; + using base::gbump; + + // Return the (logical) position of the next byte to be read. + // + using base::tellg; + + private: + std::pair + read (char*, std::size_t); + + bool + load (); + + private: + std::istream* is_ = nullptr; + bool end_; + decompressor d_; + std::unique_ptr ib_; // Decompressor input buffer. + std::unique_ptr ob_; // Decompressor output buffer. + std::size_t h_; // Decompressor next chunk hint. + }; + + // Typical usage: + // + // try + // { + // ifdstream ifs (..., ifdstream::badbit); + // lz4::istream izs (ifs, true /* end */); + // ... // Read from izs. + // } + // catch (const invalid_argument& e) + // { + // ... // Invalid compressed content, call e.what() for description. + // } + // catch (/* ifdstream exceptions */) + // { + // ... + // } + // + // See class decompressor for details on semantics nad exceptions thrown. + // + // @@ TODO: get rid of badbit-only requirement. + // @@ TODO: re-openning support (will need compressor reset). + // + class LIBBUTL_SYMEXPORT istream: public std::istream + { + public: + explicit + istream (iostate e = badbit | failbit) + : std::istream (&buf_) + { + assert (e & badbit); + exceptions (e); + } + + // The underlying input stream is expected to throw on badbit but not + // failbit. If end is true, then on reaching the end of compressed data + // verify there is no more input. + // + // Note that this implementation does not support handing streams of + // compressed contents (end is false) that may include individual + // contents that uncompress to 0 bytes (see istreambuf::open() + // implementation for details). + // + istream (std::istream& is, bool end, iostate e = badbit | failbit) + : istream (e) + { + open (is, end); + } + + // Return decompressed content size, if available. + // + optional + open (std::istream& is, bool end) + { + return buf_.open (is, end); + } + + bool + is_open () const + { + return buf_.is_open (); + } + + // Signal that no further uncompressed input will be read. + // + void + close () + { + return buf_.close (); + } + + private: + istreambuf buf_; + }; + + // ostream + // + + class LIBBUTL_SYMEXPORT ostreambuf: public bufstreambuf + { + public: + void + open (std::ostream&, + int compression_level, + int block_size_id, + optional content_size); + + bool + is_open () const {return os_ != nullptr;} + + void + close (); + + virtual + ~ostreambuf () override; + + public: + using base = bufstreambuf; + + // basic_streambuf output interface. + // + // Note that syncing the input buffer before the end doesn't make much + // sense (it will just get buffered in the compressor). In fact, it can + // break our single-shot compression arrangement (for compatibility with + // the lz4 utility). Thus we inherit noop sync() from the base. + // + public: + virtual int_type + overflow (int_type) override; + + virtual std::streamsize + xsputn (const char_type*, std::streamsize) override; + + // Return the (logical) position of the next byte to be written. + // + using base::tellp; + + private: + void + write (char*, std::size_t); + + void + save (); + + private: + std::ostream* os_ = nullptr; + bool end_; + compressor c_; + std::unique_ptr ib_; // Compressor input buffer. + std::unique_ptr ob_; // Compressor output buffer. + }; + + // Typical usage: + // + // try + // { + // ofdstream ofs (...); + // lz4::ostream ozs (ofs, 4 /* 64KB */, 9, nullopt /* content_size */); + // + // ... // Write to ozs. + // + // ozs.close (); + // ofs.close (); + // } + // catch (/* ofdstream exceptions */) + // { + // ... + // } + // + // See class compressor for details on semantics nad exceptions thrown. + // + // @@ TODO: re-openning support (will need compressor reset). + // + class LIBBUTL_SYMEXPORT ostream: public std::ostream + { + public: + explicit + ostream (iostate e = badbit | failbit) + : std::ostream (&buf_) + { + assert (e & badbit); + exceptions (e); + } + + // The underlying output stream is expected to throw on badbit or + // failbit. + // + // See compress() for the description of the compression level, block + // size and content size arguments. + // + ostream (std::ostream& os, + int compression_level, + int block_size_id, + optional content_size, + iostate e = badbit | failbit) + : ostream (e) + { + open (os, compression_level, block_size_id, content_size); + } + + void + open (std::ostream& os, + int compression_level, + int block_size_id, + optional content_size) + { + buf_.open (os, compression_level, block_size_id, content_size); + } + + bool + is_open () const + { + return buf_.is_open (); + } + + // Signal that no further uncompressed output will be written. + // + void + close () + { + return buf_.close (); + } + + private: + ostreambuf buf_; + }; + } +} diff --git a/libbutl/lz4.cxx b/libbutl/lz4.cxx index a28ed78..9d15203 100644 --- a/libbutl/lz4.cxx +++ b/libbutl/lz4.cxx @@ -28,6 +28,10 @@ #include // eos() +#if 0 +#include +#endif + using namespace std; namespace butl @@ -212,12 +216,12 @@ namespace butl if (LZ4F_isError (on)) throw_exception (on); + in = 0; // All consumed. return; } else { - if (LZ4F_isError ( - LZ4F_createCompressionContext (&ctx, LZ4F_VERSION))) + if (LZ4F_isError (LZ4F_createCompressionContext (&ctx, LZ4F_VERSION))) throw bad_alloc (); ctx_ = ctx; @@ -239,11 +243,15 @@ namespace butl size_t n; - n = LZ4F_compressUpdate (ctx, ob + on, oc - on, ib, in, nullptr); - if (LZ4F_isError (n)) - throw_exception (n); + if (in != 0) + { + n = LZ4F_compressUpdate (ctx, ob + on, oc - on, ib, in, nullptr); + if (LZ4F_isError (n)) + throw_exception (n); - on += n; + in = 0; // All consumed. + on += n; + } // Write the end marker. // @@ -266,6 +274,21 @@ namespace butl int block_id, optional content_size) { +#if 0 + char buf[1024 * 3 + 7]; + ostream cos (os, level, block_id, content_size); + + for (bool e (false); !e; ) + { + e = eof (is.read (buf, sizeof (buf))); + cos.write (buf, is.gcount ()); + //for (streamsize i (0), n (is.gcount ()); i != n; ++i) + // cos.put (buf[i]); + } + + cos.close (); + return content_size ? *content_size : 0; +#else compressor c; // Input/output buffer guards. @@ -314,6 +337,7 @@ namespace butl } return ot; +#endif } // decompression @@ -323,8 +347,17 @@ namespace butl "LZ4 header size mismatch"); decompressor:: - decompressor () - : hn (0), in (0), on (0) + ~decompressor () + { + if (LZ4F_dctx* ctx = static_cast (ctx_)) + { + LZ4F_errorCode_t e (LZ4F_freeDecompressionContext (ctx)); + assert (!LZ4F_isError (e)); + } + } + + size_t decompressor:: + begin (optional* content_size) { LZ4F_dctx* ctx; @@ -332,20 +365,6 @@ namespace butl throw bad_alloc (); ctx_ = ctx; - } - - decompressor:: - ~decompressor () - { - LZ4F_errorCode_t e ( - LZ4F_freeDecompressionContext (static_cast (ctx_))); - assert (!LZ4F_isError (e)); - } - - size_t decompressor:: - begin () - { - LZ4F_dctx* ctx (static_cast (ctx_)); LZ4F_frameInfo_t info = LZ4F_INIT_FRAMEINFO; @@ -357,6 +376,14 @@ namespace butl if (LZ4F_isError (h)) throw_exception (h); + if (content_size != nullptr) + { + if (info.contentSize != 0) + *content_size = static_cast (info.contentSize); + else + *content_size = nullopt; + } + // Use the block size for the output buffer capacity and compressed // bound plus the header size for the input. The expectation is that // LZ4F_decompress() should never hint for more than that. @@ -391,6 +418,7 @@ namespace butl // We expect LZ4F_decompress() to consume what it asked for. // assert (e == in && h <= ic); + in = 0; // All consumed. return h; } @@ -398,13 +426,26 @@ namespace butl uint64_t decompress (ofdstream& os, ifdstream& is) { - decompressor d; - - // Input/output buffer guards. + // Write the specified number of bytes from the output buffer updating + // the total written. // - unique_ptr ibg; - unique_ptr obg; + uint64_t ot (0); + auto write = [&os, &ot] (char* b, size_t n) + { + os.write (b, static_cast (n)); + ot += n; + }; + +#if 0 + char buf[1024 * 3 + 7]; + istream dis (is, true, istream::badbit); + for (bool e (false); !e; ) + { + e = eof (dis.read (buf, sizeof (buf))); + write (buf, static_cast (dis.gcount ())); + } +#else // Read into the specified buffer returning the number of bytes read and // updating the eof flag. // @@ -422,20 +463,27 @@ namespace butl return n; }; - // Write the specified number of bytes from the output buffer updating - // the total written. + decompressor d; + + // Input/output buffer guards. // - uint64_t ot (0); - auto write = [&os, &ot] (char* b, size_t n) - { - os.write (b, static_cast (n)); - ot += n; - }; + unique_ptr ibg; + unique_ptr obg; size_t h; // Input hint. // First read in the header and allocate the buffers. // + // What if we hit EOF here? And could begin() return 0? Turns out the + // answer to both questions is yes: 0-byte content compresses to 15 + // bytes (with or without content size; 1-byte -- to 20/28 bytes). We + // can ignore EOF here since an attempt to read more will result in + // another EOF. And code below is prepared to handle 0 initial hint. + // + // @@ We could end up leaving some of the input content from the + // header in the input buffer which the caller will have to way + // of using/detecting. + // d.hn = read (d.hb, sizeof (d.hb)); h = d.begin (); @@ -453,20 +501,22 @@ namespace butl // Keep decompressing, writing, and reading chunks of compressed // content. // - for (;;) + while (h != 0) { h = d.next (); - write (d.ob, d.on); + if (d.on != 0) // next() may just buffer the data. + write (d.ob, d.on); - if (h == 0) - break; - - if (eof) - throw invalid_argument ("incomplete compressed content"); + if (h != 0) + { + if (eof) + throw invalid_argument ("incomplete compressed content"); - d.in = read (d.ib, h); + d.in = read (d.ib, h); + } } +#endif return ot; } diff --git a/libbutl/lz4.hxx b/libbutl/lz4.hxx index 98175c1..cfe9967 100644 --- a/libbutl/lz4.hxx +++ b/libbutl/lz4.hxx @@ -30,8 +30,9 @@ namespace butl // The output and most likely the input streams must be in the binary // mode. // - // Valid values for the compressions level are between 1 (fastest) and - // 12 (best compression level). + // Valid values for the compression level are between 1 (fastest) and 12 + // (best compression level) though, practically, after 9 returns are + // diminished. // // Valid block sizes and their IDs: // @@ -40,6 +41,10 @@ namespace butl // 6: 1MB // 7: 4MB // + // Note that due to the underlying API limitations, 0 content size is + // treated as absent and it's therefore impossible to compress 0-byte + // content with content size. + // // This function produces compressed content identical to: // // lz4 -z - -B -BD [--content-size] @@ -120,8 +125,8 @@ namespace butl // This function may throw std::bad_alloc as well as exceptions thrown by // fdstream read/write functions. It may also throw std::invalid_argument // if the compressed content is invalid with what() returning the error - // description. The input stream is expected to throw on badbit (but not - // failbit). The output stream is expected to throw on badbit or failbit. + // description. The input stream is expected to throw on badbit but not + // failbit. The output stream is expected to throw on badbit or failbit. // // The input and most likely the output streams must be in the binary // mode. @@ -162,6 +167,8 @@ namespace butl // function sets the required input and output buffer capacities (ic, // oc) and the number of bytes left in the header buffer (hn) and // returns the number of bytes expected by the following call to next(). + // If content_size is not NULL, then it is set to the decompressed + // content size, if available. // // The caller normally allocates the input and output buffers, copies // remaining header buffer data over to the input buffer, and then fills @@ -169,7 +176,7 @@ namespace butl // call to next(). // std::size_t - begin (); + begin (optional* content_size = nullptr); // Then call next() to decompress the next chunk of input. This function // returns the number of bytes expected by the following call to next() @@ -188,7 +195,7 @@ namespace butl // Implementation details. // - decompressor (); + decompressor (): hn (0), in (0), on (0), ctx_ (nullptr) {} ~decompressor (); public: diff --git a/tests/lz4/testscript b/tests/lz4/testscript index 1806c94..0cd5cba 100644 --- a/tests/lz4/testscript +++ b/tests/lz4/testscript @@ -2,7 +2,8 @@ # license : MIT; see accompanying LICENSE file +touch zero -+echo 'The quick brown fox jumps over the lazy dog.' >=small ++cat <:'1' >=one ++cat <'The quick brown fox jumps over the lazy dog.' >=small +cat <=1kb The quick brown fox jumps over the lazy dog. The quick brown fox jumps over the lazy dog. The quick brown fox jumps over the lazy dog. The quick brown fox @@ -29,6 +30,12 @@ $* -c ../zero zero.lz4 &zero.lz4; $* -d zero.lz4 zero &zero; diff ../zero zero +: rt-one +: +$* -c ../one one.lz4 &one.lz4; +$* -d one.lz4 one &one; +diff ../one one + : rt-small : $* -c ../small small.lz4 &small.lz4; -- cgit v1.1