Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal: Export codec and Encode/Decode trait #150

Open
Xuanwo opened this issue May 20, 2022 · 8 comments
Open

proposal: Export codec and Encode/Decode trait #150

Xuanwo opened this issue May 20, 2022 · 8 comments

Comments

@Xuanwo
Copy link

Xuanwo commented May 20, 2022

We are using our runtime in databend which will:

  • Running async task on async runtime (tokio here), for example, IO task.
  • Running sync task on sync runtime, for example, CPU bound task like decompress.

So we want to control the underlying behavior of async-compression:

  • Make IO happen on async runtime: poll the futures
  • Make decode happen on sync runtime

Thus, we have to directly access the Encode/Decode trait in codec and build our own XzDecoder.

Does this idea make sense to you? I'm willing to start a PR for it.


Also address: #141

@Xuanwo

This comment was marked as outdated.

@Nemo157
Copy link
Member

Nemo157 commented May 20, 2022

I am (slowly) working on a big refactor that will expose a lot of the internal building blocks, including the codec stuff. I'm hoping to have enough time to actually get something public in the next few weeks.

@Xuanwo
Copy link
Author

Xuanwo commented May 20, 2022

That's cool! If there is anything I can help with, just ping me back.

@Nemo157
Copy link
Member

Nemo157 commented May 23, 2022

@Xuanwo I was thinking about your usecase a bit more, and it seems like you would need the codec traits to be async; so you can spawn the synchronous work into your sync-runtime and return a handle to get the result?

@Xuanwo
Copy link
Author

Xuanwo commented May 23, 2022

so you can spawn the synchronous work into your sync-runtime and return a handle to get the result?

Yep, similar. In databend, we have our own processors like the following:

https://github.com/datafuselabs/databend/blob/2f9e425b55e2e28761c5a9b1e3fb11131fd7fa8e/query/src/pipelines/new/processors/processor.rs#L25-L49

pub enum Event {
    NeedData,
    NeedConsume,
    Sync,
    Async,
    Finished,
}


// The design is inspired by ClickHouse processors
#[async_trait::async_trait]
pub trait Processor: Send {
    fn name(&self) -> &'static str;


    fn event(&mut self) -> Result<Event>;


    // Synchronous work.
    fn process(&mut self) -> Result<()> {
        Err(ErrorCode::UnImplement("Unimplemented process."))
    }


    // Asynchronous work.
    async fn async_process(&mut self) -> Result<()> {
        Err(ErrorCode::UnImplement("Unimplemented async_process."))
    }
}

Every task will emit a new event, and the processor will decide whether to execute on blocking way or async way.

In our case (decompress), we have an async IO task and a sync decompress task. In my current plan, I will:

  • Implement a decoder wrapper that has an AsyncRead and a async-compression::codec::XzDecoder
  • Emit an async IO task to read a buffer from the reader
  • Emit a decompress task to make XzDecoder decode this buffer (via PartialBuffer)
  • Then we will parse the decompressed data into csv/parquet/json...

In conclusion, what we need is (based on current codebase, better design or ideas always welcome):

  • codec::Decoder trait (no changes need so far)
  • codec::XxxDecoder structs (we will call decode directly)

@Xuanwo
Copy link
Author

Xuanwo commented May 26, 2022

Hi, @Nemo157, I got a demo here.

Code
#[derive(Debug)]
enum State {
    Reading,
    Decoding,
    Finishing,
    Done,
}

#[derive(Debug)]
struct Reader<R: AsyncBufRead + Unpin, D: Decode> {
    reader: R,
    decoder: D,
    multiple_members: bool,
}

impl<R: AsyncBufRead + Unpin, D: Decode> Reader<R, D> {
    pub fn new(reader: R, decoder: D) -> Self {
        Self {
            reader,
            decoder,
            multiple_members: false,
        }
    }

    pub async fn fill_buf(&mut self) -> Result<&[u8]> {
        self.reader.fill_buf().await
    }

    pub fn decode(&mut self, input: &[u8], output: &mut [u8]) -> Result<(State, usize)> {
        // If input is empty, inner reader must reach EOF, return directly.
        if input.is_empty() {
            debug!("input is empty, return directly");
            // Avoid attempting to reinitialise the decoder if the reader
            // has returned EOF.
            self.multiple_members = false;
            return Ok((State::Finishing, 0));
        }

        let mut input = PartialBuffer::new(input);
        let mut output = PartialBuffer::new(output);
        let done = self.decoder.decode(&mut input, &mut output)?;
        let len = input.written().len();
        debug!("advance reader with amt {}", len);
        Pin::new(&mut self.reader).consume(len);

        if done {
            Ok((State::Finishing, output.written().len()))
        } else {
            Ok((State::Reading, output.written().len()))
        }
    }

    pub fn finish(&mut self, output: &mut [u8]) -> Result<(State, usize)> {
        let mut output = PartialBuffer::new(output);
        let done = self.decoder.finish(&mut output)?;
        if done {
            if self.multiple_members {
                self.decoder.reinit()?;
                Ok((State::Reading, output.written().len()))
            } else {
                Ok((State::Done, output.written().len()))
            }
        } else {
            Ok((State::Finishing, output.written().len()))
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use async_compression::codec::{Decode, ZlibDecoder as ZlibCodec};
    use bytes::BufMut;
    use flate2::write::ZlibEncoder;
    use flate2::Compression;
    use futures::io::Cursor;
    use log::debug;
    use rand::prelude::*;
    use std::io;
    use std::io::Result;
    use std::io::Write;

    #[tokio::test]
    async fn decode_gzip() -> Result<()> {
        env_logger::init();

        let mut rng = ThreadRng::default();
        let mut content = vec![0; 16 * 1024 * 1024];
        rng.fill_bytes(&mut content);
        debug!("raw_content size: {}", content.len());

        let mut e = ZlibEncoder::new(Vec::new(), Compression::default());
        e.write_all(&content)?;
        let compressed_content = e.finish()?;
        debug!("compressed_content size: {}", compressed_content.len());

        let br = BufReader::with_capacity(64 * 1024, Cursor::new(compressed_content));
        let mut cr = Reader::new(br, ZlibCodec::new());

        let mut result = vec![0; 16 * 1024 * 1024];
        let mut cnt = 0;
        let mut state = State::Reading;
        let mut buf = Vec::new();
        loop {
            let (_, output) = result.split_at_mut(cnt);

            match state {
                State::Reading => {
                    debug!("start reading");
                    buf = cr.fill_buf().await?.to_vec();
                    debug!("read data: {}", buf.len());
                    state = State::Decoding
                }
                State::Decoding => unsafe {
                    debug!("start decoding from buf {} to output {}", buf.len(), cnt);
                    let (decode_state, written) = cr.decode(&buf, output)?;
                    debug!("decoded from buf {} as output {}", buf.len(), written);
                    state = decode_state;
                    cnt += written;
                },
                State::Finishing => {
                    debug!("start finishing to output {}", cnt);
                    let (finish_state, written) = cr.finish(output)?;
                    debug!("finished from buf {} as output {}", buf.len(), written);
                    state = finish_state;
                    cnt += written;
                }
                State::Done => {
                    debug!("done");
                    break;
                }
            }
        }

        assert_eq!(result, content);

        Ok(())
    }
}

The lesson learnt from this demo: based on the current design, we will need the following things to export: (only for the decoder)

  • Decode trait
  • every algo under codec
  • PartialBuffer (I don't know if we can replace it by tokio::io::ReadBuf)

The real changes: split IO from decode and let user to do it.

@Nemo157
Copy link
Member

Nemo157 commented May 26, 2022

PartialBuffer (I don't know if we can replace it by tokio::io::ReadBuf)

Yeah, I was thinking ReadBuf would hopefully be a replacement for it, at least for the output side; I have a very old test branch using it. I'm just waiting on it stabilizing in std before doing anything more with it.

@Xuanwo
Copy link
Author

Xuanwo commented May 26, 2022

Here is my progress, please take a look:

async_compress side: https://github.com/Xuanwo/async-compression/tree/public_api
my use-case: apache/opendal#289

Geal added a commit to apollographql/router that referenced this issue Apr 27, 2023
We replace tower-http's `CompressionLayer` with a custom stream transformation. This is necessary because tower-http uses async-compression, which buffers data until the end of the stream to then write it, ensuring a better compression. This is incompatible with the multipart protocol for `@defer`, which requires chunks to be sent as soon as possible. So we need to compress them independently.

This extracts parts of the codec module of async-compression, which so far is not public, and makes a streaming wrapper above it that flushes the compressed data on every response in the stream.

This is expected to be temporary, as we have in flight PRs for async-compression:
- Nullus157/async-compression#155
- Nullus157/async-compression#178

With Nullus157/async-compression#150 we might be able to at least remove the vendored code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants