Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zstd::stream::write::Decoder cannot decode valid input #316

Open
inganault opened this issue Jan 4, 2025 · 0 comments
Open

zstd::stream::write::Decoder cannot decode valid input #316

inganault opened this issue Jan 4, 2025 · 0 comments

Comments

@inganault
Copy link

inganault commented Jan 4, 2025

zstd::stream::write::Decoder can only decode small input correctly, while most larger input will either resulted in "Data corrupted" or produce incomplete output.

I think it is due to stream::zio::writer was coded for encoding however there are some unexpected differences between streaming compression and decompression API but I haven't look deeper into it.

Reproduction

use rand::Rng as _;
use rand_xoshiro::rand_core::SeedableRng as _;
use rand_xoshiro::Xoshiro256PlusPlus;
use zstd::stream::raw::Operation as _;

#[test]
fn stream_decode() {
    // Prepare test vector
    let mut rng = Xoshiro256PlusPlus::seed_from_u64(0);
    let content: Vec<u8> = (0..1024000).map(|_| rng.gen_range(0..20)).collect();
    println!("Original size  : {}", content.len());
    let content_compressed = zstd::encode_all(&content[..], 6).unwrap();
    println!("Compressed size: {}", content_compressed.len());

    // Decompress
    let mut content_decompressed = Vec::new();
    let mut decompressor = zstd::stream::write::Decoder::new(&mut content_decompressed).unwrap();
    //let mut decompressor = ZstdDecompressor::new(&mut content_decompressed).unwrap();
    for chunk in content_compressed.chunks(4096) {
        decompressor.write(chunk).unwrap();
    }
    decompressor.flush().unwrap();
    println!("Decompressed   : {}", content_decompressed.len());
    assert!(&content == &content_decompressed);
}

This will resulted in error on both v0.13.2 and latest main branch (bfe1e34)

thread 'stream_decode' panicked at src/main.rs:24:39:
called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "Data corruption detected" }

While my minimal implementation based on https://github.com/facebook/zstd/blob/cf5e53f618c87c7be88c6f5537c1fb70a45f2c09/examples/streaming_decompression.c can decode them correctly, so it seems that the problem is located in the zio wrapper.

struct ZstdDecompressor<'a, W: Write> {
    decoder: zstd::stream::raw::Decoder<'a>,
    out_buf: Vec<u8>,
    writer: W,
}

impl<'a, W: Write> ZstdDecompressor<'a, W> {
    fn new(writer: W) -> std::io::Result<Self> {
        Ok(Self {
            decoder: zstd::stream::raw::Decoder::new()?,
            out_buf: vec![0; 32 * 1024 * 1024],
            writer,
        })
    }
}

impl<'a, W: Write> Write for ZstdDecompressor<'a, W> {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        let mut src = zstd::zstd_safe::InBuffer::around(buf);
        loop {
            let dst_pos;
            {
                let mut dst = zstd::zstd_safe::OutBuffer::around(&mut self.out_buf);
                self.decoder.run(&mut src, &mut dst)?;
                dst_pos = dst.pos();
            }
            self.writer.write(&self.out_buf[..dst_pos])?;
            if src.pos == buf.len() {
                break;
            }
        }
        Ok(buf.len())
    }

    fn flush(&mut self) -> std::io::Result<()> {
        self.writer.flush()
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant