Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples/rust: add echo-stream-nats-{client,server} #217

Merged
merged 1 commit into from
Aug 13, 2024

Conversation

jvatic
Copy link
Contributor

@jvatic jvatic commented Aug 10, 2024

This is essentially a copy of the hello-nats-{client,server} example, but with a bi-directional stream (server echos the client) that came out of me wanting to know if this functionality works.

There are a few things I'd like to note:

  1. The streams in the WIT definition are stream<u64> but the bindings are for Vec<u64>.
  2. The output of echo on the client is a tuple with a stream and a future, where the future has to be polled before the stream will work. This feels clunky; could the future be implemented as part of the stream?
  3. wit_bindgen::generate! doesn't implement stream (or future) yet, so I'm not sure if implementing this as a component example is feasible?

I'd like to get this working with a component example, so I'd be happy to help out with any of the above.

@jvatic jvatic marked this pull request as ready for review August 10, 2024 12:26
@jvatic jvatic requested a review from rvolosatovs as a code owner August 10, 2024 12:26
Copy link
Member

@rvolosatovs rvolosatovs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks!

@rvolosatovs
Copy link
Member

rebased and fixed-up a small merge conflict in Cargo.toml

@rvolosatovs rvolosatovs added this pull request to the merge queue Aug 13, 2024
@rvolosatovs
Copy link
Member

1. The streams in the WIT definition are stream<u64> but the bindings are for Vec<u64>.

indeed, that's because wRPC does not handle chunking and it's expected that implementations would handle that instead, I believe https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.ready_chunks should be able to be used out-of-the-box here on sender side

2. The output of echo on the client is a tuple with a stream and a future, where the future has to be polled before the stream will work. This feels clunky; could the future be implemented as part of the stream?

The intention here is to be able to poll both the stream and the I/O future concurrently for most use cases providing for clean separation of data and I/O error handling. I think we need to have both separate for lower-level use cases, however I certainly agree that providing a wrapper, which could take these two and return a single future/stream would be ideal.

3. wit_bindgen::generate! doesn't implement stream (or future) yet, so I'm not sure if implementing this as a component example is feasible?

That's a bit tricky, async support in component model is currently WIP (latest draft at https://github.com/WebAssembly/component-model/blob/main/design/mvp/Async.md, refs WebAssembly/component-model#363). I believe there is a set of patches for Wasmtime, which would allow using async in components, but I have not explored that myself yet. I also don't think we should depend on an unreleased version of Wasmtime in wRPC (we could, however, add an additional runtime using one)

Other than that, I'm thinking to work on a wRPC stream resource and a small reflection API to go along with it, which would allow this functionality to be used by any component. Btw, I'm currently tackling #185 , which is tangentially related.

Would you perhaps be interested in working on (beginnings of) a reflection API? Basically define all WIT kinds in WIT and perhaps have some way to "construct" a record type at runtime?

Merged via the queue into bytecodealliance:main with commit f834a8e Aug 13, 2024
25 checks passed
@rvolosatovs
Copy link
Member

Small follow-up, I suggest to take a look at the slightly reworked stream usage at

let numbers = Box::pin(
stream::iter(1..)
.take(10)
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
.map(|(i, _)| i)
.ready_chunks(10),
);
// `stream<u8>` items are chunked using [`Bytes`]
let bytes = Box::pin(
stream::iter(b"foo bar baz")
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
.map(|(i, _)| *i)
.ready_chunks(10)
.map(Bytes::from),
);
let wrpc = wrpc_transport_nats::Client::new(nats.clone(), prefix.clone(), None);
let (mut numbers, mut bytes, io) = echo(&wrpc, None, Req { numbers, bytes })
.await
.context("failed to invoke `wrpc-examples:streams/handler.echo`")?;
try_join!(
async {
if let Some(io) = io {
debug!("performing async I/O");
io.await.context("failed to complete async I/O")
} else {
Ok(())
}
},
async {
while let Some(item) = numbers.next().await {
eprintln!("numbers: {item:?}");
}
Ok(())
},
async {
while let Some(item) = bytes.next().await {
eprintln!("bytes: {item:?}");
}
Ok(())
}
)?;
to see examples of batching and I/O future handling

@jvatic
Copy link
Contributor Author

jvatic commented Aug 15, 2024

Small follow-up, I suggest to take a look at the slightly reworked stream usage at

let numbers = Box::pin(
stream::iter(1..)
.take(10)
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
.map(|(i, _)| i)
.ready_chunks(10),
);
// `stream<u8>` items are chunked using [`Bytes`]
let bytes = Box::pin(
stream::iter(b"foo bar baz")
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
.map(|(i, _)| *i)
.ready_chunks(10)
.map(Bytes::from),
);
let wrpc = wrpc_transport_nats::Client::new(nats.clone(), prefix.clone(), None);
let (mut numbers, mut bytes, io) = echo(&wrpc, None, Req { numbers, bytes })
.await
.context("failed to invoke `wrpc-examples:streams/handler.echo`")?;
try_join!(
async {
if let Some(io) = io {
debug!("performing async I/O");
io.await.context("failed to complete async I/O")
} else {
Ok(())
}
},
async {
while let Some(item) = numbers.next().await {
eprintln!("numbers: {item:?}");
}
Ok(())
},
async {
while let Some(item) = bytes.next().await {
eprintln!("bytes: {item:?}");
}
Ok(())
}
)?;

to see examples of batching and I/O future handling

Ah, yes, this certainly makes sense, thanks!

  1. wit_bindgen::generate! doesn't implement stream (or future) yet, so I'm not sure if implementing this as a component example is feasible?

That's a bit tricky, async support in component model is currently WIP (latest draft at https://github.com/WebAssembly/component-model/blob/main/design/mvp/Async.md, refs WebAssembly/component-model#363). I believe there is a set of patches for Wasmtime, which would allow using async in components, but I have not explored that myself yet. I also don't think we should depend on an unreleased version of Wasmtime in wRPC (we could, however, add an additional runtime using one)

Other than that, I'm thinking to work on a wRPC stream resource and a small reflection API to go along with it, which would allow this functionality to be used by any component. Btw, I'm currently tackling #185 , which is tangentially related.

Would you perhaps be interested in working on (beginnings of) a reflection API? Basically define all WIT kinds in WIT and perhaps have some way to "construct" a record type at runtime?

Thanks! I'm new to the WIT ecosystem, and while I'm not entirely sure where to get started on such a reflect API, I'd be interested in working on that given a bit more context. Is the idea to provide non-blocking/blocking APIs within a sync component for working with a stream similar to wasi::io streams that could evolve for async usage?

And would it be possible currently to use wasi::io streams in place of stream<u8> with wRPC? (If so I'm thinking that could be a viable alternative for my use-case for now.)

@jvatic jvatic deleted the stream-example branch August 15, 2024 18:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants