Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement socket options struct and builder for socket with options #116

Open
Alexei-Kornienko opened this issue Jan 1, 2021 · 17 comments

Comments

@Alexei-Kornienko
Copy link
Collaborator

Currently socket doesn't provide any means to customize it's behaviour. Some options are required by ZMQ spec (for example high water mark for message buffers) other would be nice to have (for example number of retries on reconnect before returning an error)

@xpepermint
Copy link

Similar: #131

@CorinJG
Copy link
Contributor

CorinJG commented Mar 5, 2021

Was about to create new issue for ability to specify socket's identity - right now PeerIdentities are by uuid only. But I guess this falls under the builder issue.

@Alexei-Kornienko
Copy link
Collaborator Author

Yeah. I'm pretty sure it is. I'll try to work on it on a weekend if I have time..

Alexei-Kornienko added a commit that referenced this issue Mar 6, 2021
This is related to #116.

Currently it's only enables user to set custom peer-identity if needed
and passes options all around. Later we can add other options to this
struct.

Some minor changes to ReadyCommand properties were required cause
ZMQ_IDENTITY could be any byte vec and not guaranteed to be valid String
@Alexei-Kornienko
Copy link
Collaborator Author

I've added a prototype implementation.. It looks a bit bulky in terms of small changes here and there however I don't know a better way of doing it right now..

@CorinJG
Copy link
Contributor

CorinJG commented Mar 7, 2021

Looks pretty functional to me, can perform userspace routing now - Thanks!

@Alexei-Kornienko
Copy link
Collaborator Author

@CorinJG cool. Merged it to master. Let me know if there are other minor changes that might help you. I don't have much time to work on library but I'm trying to help people with specific requests

@CorinJG
Copy link
Contributor

CorinJG commented Mar 7, 2021

That's very kind of you, thanks a lot:)

@CorinJG
Copy link
Contributor

CorinJG commented Mar 9, 2021

I could be wrong but I can't see that TCP Nagle is disabled anywhere in the lib (eg. simply at socket creation via set_nodelay) - tokio::net::TcpStream sockets enable Nagle by default. It should be disabled in the ZMQ protocol as the batching is performed on the ZMQ level so Nagle can only hurt throughput. If it is disabled and I just couldn't see it, sorry!

@Alexei-Kornienko
Copy link
Collaborator Author

@CorinJG it seems you are right. However I think this should be tracked as a separate issue.

@CorinJG
Copy link
Contributor

CorinJG commented Mar 9, 2021

@Alexei-Kornienko Yes that's sensible, looks like it's part of the ZeroMQ protocol so shouldn't be a socket option.

@domenicquirl
Copy link

@Alexei-Kornienko as #133 added only the peer identity, no other options are implemented at the moment? We are interested in replacing our dependency on the from-source FFI version (which we currently need to vendor for cross-compilation), but would need send/receive timeouts (ZMQ_SNDTIMEO, ZMQ_RCVTIMEO) to be supported. Is there a way to achieve this already through zmq.rs or would they need to be added (and implemented) here?

@poyea
Copy link
Collaborator

poyea commented Apr 29, 2022

@domenicquirl I'm afraid they're not supported - like several other options.

@domenicquirl
Copy link

@poyea do you have an estimate of how much effort it would be to add timeouts / where they would need to be handled (besides adding the option itself)? Could I help with a limited part of my work-time without being familiar with the codebase?

@poyea
Copy link
Collaborator

poyea commented Apr 30, 2022

It's hard to tell, but I'd say maybe a medium-sized change for ZMQ_{SND,RCV}TIMEO. I can imagine, at that Socket level, we need a clock. Then, we handle failures separately according to the option. Re-visiting https://github.com/zeromq/libzmq/blob/master/src/socket_base.cpp#L1378 may also help.

@domenicquirl
Copy link

Following up on this after looking through the stack over the week-end. I am still unsure where the correct place to put a timeout would be here, in particular due to the buffering that happens as part of Framed{Read,Write}. If there is buffer space available, poll_ready will always be Ready, independently of the state of the socket. Only if the buffer is full would poll_ready attempt to write it out. Here, as in poll_flush, we would then go down to the runtime and things depend - e.g., tokio offers things like readable and try_read (and similar for writing) for UnixSocket, whereas async_std only implements Async{Read,Write} (looking at the IPC case here). start_send, meanwhile, always just encodes into the buffer.

All of this somewhat makes sense for an async context, but since all Pending responses translate to BufferFull (because, well, they only happen when the buffer is full), any messages here would be lost? Hence my remaining confusion as to how to add a sensible timeout given that for poll_ready there either is a buffer space and we don't block but also don't actually send yet, or there is no space and if we can't flush the message is dropped.

For receiving, the SUB socket only awaits its queue. Could a timeout be added for this operation, i.e. is the queue cancel-safe? That would already help out a lot - the main problem for us is still that we cannot have the task that receives updates block unboundedly. A try_send/try_recv would be a different angle to solve that problem, but probably leads to the same design considerations.

@eh3an
Copy link

eh3an commented Jan 2, 2025

is there a way to set ZMQ_{SND,RCV}TIMEO at the time? I saw the example here, but not sure what to use for connect/send/receive timeout.

@Alexei-Kornienko
Copy link
Collaborator Author

is there a way to set ZMQ_{SND,RCV}TIMEO at the time? I saw the example here, but not sure what to use for connect/send/receive timeout.

basically you don't need a dedicated timeout option to use it. Simplest solution would be to use builtin tokio select macro and sleep. Some simplified example could look something like:

#[tokio::main]
async fn main() {
    let data = "Hello, world!";
    let timeout_duration = Duration::from_secs(1);

    let timeout = time::sleep(timeout_duration);
    tokio::pin!(timeout); // Pin the timeout future so it can be used in select!

    tokio::select! {
        result = send(data) => {
            match result {
                Ok(_) => println!("Send operation completed successfully."),
                Err(e) => println!("Send operation failed: {}", e),
            }
        }
        _ = &mut timeout => {
            println!("Send operation timed out after {:?}.", timeout_duration);
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants