Skip to content

Commit

Permalink
chore(examples): replace async-stream usage by explicit streams
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Aug 14, 2024
1 parent 862140d commit 6ea5b37
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 43 deletions.
25 changes: 1 addition & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ wrpc-cli = { workspace = true }
[workspace.dependencies]
anyhow = { version = "1", default-features = false }
async-nats = { package = "async-nats-wrpc", version = "0.35.1", default-features = false }
async-stream = { version = "0.3", default-features = false }
bitflags = { version = "2", default-features = false }
bytes = { version = "1", default-features = false }
clap = { version = "4", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/streams-nats-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ repository.workspace = true
[dependencies]
anyhow = { workspace = true }
async-nats = { workspace = true }
async-stream = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true, features = [
"color",
Expand All @@ -24,6 +23,7 @@ clap = { workspace = true, features = [
] }
futures = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-stream = { workspace = true, features = ["time"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = [
"ansi",
Expand Down
37 changes: 21 additions & 16 deletions examples/rust/streams-nats-client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use core::time::Duration;

use anyhow::Context as _;
use async_stream::stream;
use bytes::Bytes;
use clap::Parser;
use futures::StreamExt as _;
use tokio::time::sleep;
use tokio::{sync::mpsc, try_join};
use futures::{stream, StreamExt as _};
use tokio::sync::mpsc;
use tokio::{time, try_join};
use tokio_stream::wrappers::IntervalStream;
use tracing::debug;
use tracing_subscriber::layer::SubscriberExt as _;
use tracing_subscriber::util::SubscriberInitExt as _;
Expand Down Expand Up @@ -50,18 +50,23 @@ async fn main() -> anyhow::Result<()> {
.await
.context("failed to connect to NATS.io")?;
for prefix in prefixes {
let numbers = Box::pin(stream! {
for i in 1..=10 {
yield vec![i];
sleep(Duration::from_secs(1)).await;
}
});
let bytes = Box::pin(stream! {
for i in 1..=10 {
yield Bytes::from(i.to_string());
sleep(Duration::from_secs(1)).await;
}
});
let numbers = Box::pin(
stream::iter(1..)
.take(10)
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
.map(|(i, _)| i)
.ready_chunks(10),
);

// `stream<u8>` items are chunked using [`Bytes`]
let bytes = Box::pin(
stream::iter(b"foo bar baz")
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
.map(|(i, _)| *i)
.ready_chunks(10)
.map(Bytes::from),
);

let wrpc = wrpc_transport_nats::Client::new(nats.clone(), prefix.clone(), None);
let (mut numbers, mut bytes, io) = echo(&wrpc, None, Req { numbers, bytes })
.await
Expand Down
1 change: 0 additions & 1 deletion examples/rust/streams-nats-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ repository.workspace = true
[dependencies]
anyhow = { workspace = true }
async-nats = { workspace = true }
async-stream = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true, features = [
"color",
Expand Down

0 comments on commit 6ea5b37

Please sign in to comment.