-
Notifications
You must be signed in to change notification settings - Fork 83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
proposal: Export codec and Encode
/Decode
trait
#150
Comments
This comment was marked as outdated.
This comment was marked as outdated.
I am (slowly) working on a big refactor that will expose a lot of the internal building blocks, including the |
That's cool! If there is anything I can help with, just ping me back. |
@Xuanwo I was thinking about your usecase a bit more, and it seems like you would need the codec traits to be async; so you can spawn the synchronous work into your sync-runtime and return a handle to get the result? |
Yep, similar. In databend, we have our own processors like the following: pub enum Event {
NeedData,
NeedConsume,
Sync,
Async,
Finished,
}
// The design is inspired by ClickHouse processors
#[async_trait::async_trait]
pub trait Processor: Send {
fn name(&self) -> &'static str;
fn event(&mut self) -> Result<Event>;
// Synchronous work.
fn process(&mut self) -> Result<()> {
Err(ErrorCode::UnImplement("Unimplemented process."))
}
// Asynchronous work.
async fn async_process(&mut self) -> Result<()> {
Err(ErrorCode::UnImplement("Unimplemented async_process."))
}
} Every task will emit a new event, and the processor will decide whether to execute on blocking way or async way. In our case (decompress), we have an async IO task and a sync decompress task. In my current plan, I will:
In conclusion, what we need is (based on current codebase, better design or ideas always welcome):
|
Hi, @Nemo157, I got a demo here. Code#[derive(Debug)]
enum State {
Reading,
Decoding,
Finishing,
Done,
}
#[derive(Debug)]
struct Reader<R: AsyncBufRead + Unpin, D: Decode> {
reader: R,
decoder: D,
multiple_members: bool,
}
impl<R: AsyncBufRead + Unpin, D: Decode> Reader<R, D> {
pub fn new(reader: R, decoder: D) -> Self {
Self {
reader,
decoder,
multiple_members: false,
}
}
pub async fn fill_buf(&mut self) -> Result<&[u8]> {
self.reader.fill_buf().await
}
pub fn decode(&mut self, input: &[u8], output: &mut [u8]) -> Result<(State, usize)> {
// If input is empty, inner reader must reach EOF, return directly.
if input.is_empty() {
debug!("input is empty, return directly");
// Avoid attempting to reinitialise the decoder if the reader
// has returned EOF.
self.multiple_members = false;
return Ok((State::Finishing, 0));
}
let mut input = PartialBuffer::new(input);
let mut output = PartialBuffer::new(output);
let done = self.decoder.decode(&mut input, &mut output)?;
let len = input.written().len();
debug!("advance reader with amt {}", len);
Pin::new(&mut self.reader).consume(len);
if done {
Ok((State::Finishing, output.written().len()))
} else {
Ok((State::Reading, output.written().len()))
}
}
pub fn finish(&mut self, output: &mut [u8]) -> Result<(State, usize)> {
let mut output = PartialBuffer::new(output);
let done = self.decoder.finish(&mut output)?;
if done {
if self.multiple_members {
self.decoder.reinit()?;
Ok((State::Reading, output.written().len()))
} else {
Ok((State::Done, output.written().len()))
}
} else {
Ok((State::Finishing, output.written().len()))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_compression::codec::{Decode, ZlibDecoder as ZlibCodec};
use bytes::BufMut;
use flate2::write::ZlibEncoder;
use flate2::Compression;
use futures::io::Cursor;
use log::debug;
use rand::prelude::*;
use std::io;
use std::io::Result;
use std::io::Write;
#[tokio::test]
async fn decode_gzip() -> Result<()> {
env_logger::init();
let mut rng = ThreadRng::default();
let mut content = vec![0; 16 * 1024 * 1024];
rng.fill_bytes(&mut content);
debug!("raw_content size: {}", content.len());
let mut e = ZlibEncoder::new(Vec::new(), Compression::default());
e.write_all(&content)?;
let compressed_content = e.finish()?;
debug!("compressed_content size: {}", compressed_content.len());
let br = BufReader::with_capacity(64 * 1024, Cursor::new(compressed_content));
let mut cr = Reader::new(br, ZlibCodec::new());
let mut result = vec![0; 16 * 1024 * 1024];
let mut cnt = 0;
let mut state = State::Reading;
let mut buf = Vec::new();
loop {
let (_, output) = result.split_at_mut(cnt);
match state {
State::Reading => {
debug!("start reading");
buf = cr.fill_buf().await?.to_vec();
debug!("read data: {}", buf.len());
state = State::Decoding
}
State::Decoding => unsafe {
debug!("start decoding from buf {} to output {}", buf.len(), cnt);
let (decode_state, written) = cr.decode(&buf, output)?;
debug!("decoded from buf {} as output {}", buf.len(), written);
state = decode_state;
cnt += written;
},
State::Finishing => {
debug!("start finishing to output {}", cnt);
let (finish_state, written) = cr.finish(output)?;
debug!("finished from buf {} as output {}", buf.len(), written);
state = finish_state;
cnt += written;
}
State::Done => {
debug!("done");
break;
}
}
}
assert_eq!(result, content);
Ok(())
}
} The lesson learnt from this demo: based on the current design, we will need the following things to export: (only for the decoder)
The real changes: split IO from |
Yeah, I was thinking |
Here is my progress, please take a look: async_compress side: https://github.com/Xuanwo/async-compression/tree/public_api |
We replace tower-http's `CompressionLayer` with a custom stream transformation. This is necessary because tower-http uses async-compression, which buffers data until the end of the stream to then write it, ensuring a better compression. This is incompatible with the multipart protocol for `@defer`, which requires chunks to be sent as soon as possible. So we need to compress them independently. This extracts parts of the codec module of async-compression, which so far is not public, and makes a streaming wrapper above it that flushes the compressed data on every response in the stream. This is expected to be temporary, as we have in flight PRs for async-compression: - Nullus157/async-compression#155 - Nullus157/async-compression#178 With Nullus157/async-compression#150 we might be able to at least remove the vendored code
We are using our runtime in databend which will:
So we want to control the underlying behavior of
async-compression
:decode
happen on sync runtimeThus, we have to directly access the
Encode
/Decode
trait incodec
and build our ownXzDecoder
.Does this idea make sense to you? I'm willing to start a PR for it.
Also address: #141
The text was updated successfully, but these errors were encountered: