Skip to content

Commit

Permalink
split ConnectionTrait into a read and write half and allow listening …
Browse files Browse the repository at this point in the history
…on unuathenticated connections
  • Loading branch information
icewind1991 committed Oct 28, 2024
1 parent 6839fe1 commit d6c6e3a
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 33 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "steam-vent"
version = "0.3.0"
version = "0.4.0"
authors = ["Robin Appelman <[email protected]>"]
edition = "2021"
description = "Interact with the Steam network via rust"
Expand Down
33 changes: 14 additions & 19 deletions examples/auth_ticket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use steam_vent::auth::{
AuthConfirmationHandler, ConsoleAuthConfirmationHandler, DeviceConfirmationHandler,
FileGuardDataStore,
};
use steam_vent::{Connection, ConnectionTrait, ServerList};
use steam_vent::connection::{ConnectionListener, UnAuthenticatedConnection};
use steam_vent::ServerList;
use steam_vent_proto::steammessages_clientserver::CMsgClientGameConnectTokens;

#[tokio::main]
Expand All @@ -17,25 +18,19 @@ async fn main() -> Result<(), Box<dyn Error>> {
let password = args.next().expect("no password");

let server_list = ServerList::discover().await?;
let connection = UnAuthenticatedConnection::connect(&server_list).await?;
// listen for messages before starting the authentication because steam can send the tickets before
// the login call returns
let mut tokens_messages = connection.on::<CMsgClientGameConnectTokens>();

let connection = Connection::login(
&server_list,
&account,
&password,
FileGuardDataStore::user_cache(),
ConsoleAuthConfirmationHandler::default().or(DeviceConfirmationHandler),
)
.await?;

let tokens_messages = connection.on::<CMsgClientGameConnectTokens>();

// also process the messages that were received before we registered our filter
let old_token_messages = connection
.take_unprocessed()
.into_iter()
.filter_map(|raw| raw.into_message::<CMsgClientGameConnectTokens>().ok())
.map(Ok);
let mut tokens_messages = futures_util::stream::iter(old_token_messages).chain(tokens_messages);
let _connection = connection
.login(
&account,
&password,
FileGuardDataStore::user_cache(),
ConsoleAuthConfirmationHandler::default().or(DeviceConfirmationHandler),
)
.await?;

while let Some(Ok(tokens_message)) = tokens_messages.next().await {
println!("got {} token from message", tokens_message.tokens.len());
Expand Down
2 changes: 1 addition & 1 deletion examples/backpack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use steam_vent::auth::{
AuthConfirmationHandler, ConsoleAuthConfirmationHandler, DeviceConfirmationHandler,
FileGuardDataStore,
};
use steam_vent::{Connection, ConnectionTrait, GameCoordinator, ServerList};
use steam_vent::{Connection, ConnectionSender, GameCoordinator, ServerList};
use steam_vent_proto::tf2::base_gcmessages::CSOEconItem;
use steam_vent_proto::tf2::gcsdk_gcmessages::{
CMsgSOCacheSubscribed, CMsgSOCacheSubscriptionRefresh,
Expand Down
3 changes: 2 additions & 1 deletion examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use steam_vent::auth::{
AuthConfirmationHandler, ConsoleAuthConfirmationHandler, DeviceConfirmationHandler,
FileGuardDataStore,
};
use steam_vent::connection::ConnectionListener;
use steam_vent::proto::steammessages_friendmessages_steamclient::{
CFriendMessages_IncomingMessage_Notification, CFriendMessages_SendMessage_Request,
};
use steam_vent::{Connection, ConnectionTrait, ServerList};
use steam_vent::{Connection, ConnectionSender, ServerList};
use steam_vent_proto::enums::EPersonaStateFlag;
use steam_vent_proto::steammessages_clientserver_friends::CMsgClientChangeStatus;
use steamid_ng::SteamID;
Expand Down
2 changes: 1 addition & 1 deletion examples/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use steam_vent::auth::{
FileGuardDataStore,
};
use steam_vent::proto::steammessages_player_steamclient::CPlayer_GetOwnedGames_Request;
use steam_vent::{Connection, ConnectionTrait, ServerList};
use steam_vent::{Connection, ConnectionSender, ServerList};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
Expand Down
2 changes: 1 addition & 1 deletion examples/product_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use steam_vent::proto::steammessages_clientserver_appinfo::{
cmsg_client_picsproduct_info_request, CMsgClientPICSProductInfoRequest,
CMsgClientPICSProductInfoResponse,
};
use steam_vent::{Connection, ConnectionTrait, ServerList};
use steam_vent::{Connection, ConnectionSender, ServerList};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
Expand Down
2 changes: 1 addition & 1 deletion examples/test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::error::Error;
use steam_vent::proto::steammessages_gameservers_steamclient::CGameServers_GetServerList_Request;
use steam_vent::{Connection, ConnectionTrait, ServerList};
use steam_vent::{Connection, ConnectionSender, ServerList};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
Expand Down
10 changes: 8 additions & 2 deletions src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ pub(crate) trait ConnectionImpl: Sync + Debug {
) -> impl Future<Output = Result<()>> + Send;
}

pub trait ConnectionTrait: Debug {
/// A trait for listening for messages coming from steam
pub trait ConnectionListener {
fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static;

/// Wait for one message of a specific kind, also returning the header
Expand All @@ -156,7 +157,10 @@ pub trait ConnectionTrait: Debug {

/// Listen to messages of a specific kind
fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static;
}

/// A trait for sending messages to steam
pub trait ConnectionSender {
/// Send a rpc-request to steam, waiting for the matching rpc-response
fn service_method<Msg: ServiceMethodRequest>(
&self,
Expand Down Expand Up @@ -231,7 +235,7 @@ impl ConnectionImpl for Connection {
}
}

impl<C: ConnectionImpl> ConnectionTrait for C {
impl<C: ConnectionImpl> ConnectionListener for C {
fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static {
BroadcastStream::new(self.filter().on_notification(T::REQ_NAME))
.filter_map(|res| res.ok())
Expand Down Expand Up @@ -268,7 +272,9 @@ impl<C: ConnectionImpl> ConnectionTrait for C {
self.on_with_header::<T>()
.map(|res| res.map(|(_, msg)| msg))
}
}

impl<C: ConnectionImpl> ConnectionSender for C {
async fn service_method<Msg: ServiceMethodRequest>(&self, msg: Msg) -> Result<Msg::Response> {
let header = self.session().header(true);
let recv = self.filter().on_job_id(header.source_job_id);
Expand Down
51 changes: 48 additions & 3 deletions src/connection/unauthenticated.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use super::raw::RawConnection;
use super::Result;
use super::{ConnectionListener, Result};
use crate::auth::{begin_password_auth, AuthConfirmationHandler, GuardDataStore};
use crate::message::{ServiceMethodMessage, ServiceMethodResponseMessage};
use crate::net::RawNetMessage;
use crate::net::{NetMessageHeader, RawNetMessage};
use crate::service_method::ServiceMethodRequest;
use crate::session::{anonymous, login};
use crate::{Connection, ConnectionError, NetworkError, ServerList};
use crate::{Connection, ConnectionError, NetMessage, NetworkError, ServerList};
use futures_util::future::{select, Either};
use futures_util::FutureExt;
use futures_util::Stream;
use std::future::Future;
use std::pin::pin;
use steam_vent_proto::enums_clientserver::EMsg;
use steamid_ng::{AccountType, SteamID};
use tokio::time::timeout;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use tracing::{debug, error};

pub struct UnAuthenticatedConnection(RawConnection);
Expand Down Expand Up @@ -102,6 +107,46 @@ impl UnAuthenticatedConnection {
}
}

/// Listen for messages before starting authentication
impl ConnectionListener for UnAuthenticatedConnection {
fn on_notification<T: ServiceMethodRequest>(&self) -> impl Stream<Item = Result<T>> + 'static {
BroadcastStream::new(self.0.filter.on_notification(T::REQ_NAME))
.filter_map(|res| res.ok())
.map(|raw| raw.into_notification())
}

fn one_with_header<T: NetMessage + 'static>(
&self,
) -> impl Future<Output = Result<(NetMessageHeader, T)>> + 'static {
// async block instead of async fn, so we don't have to tie the lifetime of the returned future
// to the lifetime of &self
let fut = self.0.filter.one_kind(T::KIND);
async move {
let raw = fut.await.map_err(|_| NetworkError::EOF)?;
raw.into_header_and_message()
}
}

fn one<T: NetMessage + 'static>(&self) -> impl Future<Output = Result<T>> + 'static {
self.one_with_header::<T>()
.map(|res| res.map(|(_, msg)| msg))
}

fn on_with_header<T: NetMessage + 'static>(
&self,
) -> impl Stream<Item = Result<(NetMessageHeader, T)>> + 'static {
BroadcastStream::new(self.0.filter.on_kind(T::KIND)).map(|raw| {
let raw = raw.map_err(|_| NetworkError::EOF)?;
raw.into_header_and_message()
})
}

fn on<T: NetMessage + 'static>(&self) -> impl Stream<Item = Result<T>> + 'static {
self.on_with_header::<T>()
.map(|res| res.map(|(_, msg)| msg))
}
}

pub(crate) async fn service_method_un_authenticated<Msg: ServiceMethodRequest>(
connection: &RawConnection,
msg: Msg,
Expand Down
4 changes: 3 additions & 1 deletion src/game_coordinator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::connection::{ConnectionImpl, ConnectionTrait, MessageFilter, MessageSender};
use crate::connection::{
ConnectionImpl, ConnectionListener, ConnectionSender, MessageFilter, MessageSender,
};
use crate::message::EncodableMessage;
use crate::net::{decode_kind, NetMessageHeader, RawNetMessage};
use crate::session::Session;
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod transport;

pub use steam_vent_proto as proto;

pub use connection::{Connection, ConnectionTrait};
pub use connection::{Connection, ConnectionSender};
pub use eresult::EResult;
pub use game_coordinator::GameCoordinator;
pub use message::NetMessage;
Expand Down
2 changes: 1 addition & 1 deletion src/session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::auth::{ConfirmationError, ConfirmationMethod};
use crate::connection::raw::RawConnection;
use crate::connection::{ConnectionImpl, ConnectionTrait};
use crate::connection::{ConnectionImpl, ConnectionSender};
use crate::eresult::EResult;
use crate::net::{JobId, NetMessageHeader, NetworkError};
use crate::proto::steammessages_base::CMsgIPAddress;
Expand Down

0 comments on commit d6c6e3a

Please sign in to comment.