Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STOMP Connection Closes Due to Missing Heartbeats Despite Frontend Showing Ping-Pongs #34009

Open
abhishek0499 opened this issue Dec 3, 2024 · 2 comments
Labels
in: messaging Issues in messaging modules (jms, messaging) status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged or decided on

Comments

@abhishek0499
Copy link

I’m experiencing a connection issue with WebSocket STOMP in Spring Boot using ActiveMQ Artemis. Despite proper heartbeat configurations, the broker closes the connection with the following error:
AMQ229014: Did not receive data from /192.0.2.1:46748 within the 20000ms connection TTL.

Setup

  • Frontend: SockJS and Stomp.js, showing regular ping/pong exchanges in the browser console.
  • Backend: Spring Boot using StompBrokerRelayMessageHandler with custom ReactorNettyTcpClient for broker connections. Relevant configuration:
public void configureMessageBroker(MessageBrokerRegistry config) {
    int sendInterval = 10000; // Send interval in milliseconds
    int receiveInterval = (int) (sendInterval * heartBeatReceiveScale);
    config.setApplicationDestinationPrefixes("/app");
    config.enableStompBrokerRelay("/topic", "/queue")
            .setUserDestinationBroadcast("/topic/random")
            .setUserRegistryBroadcast("/topic/simp-user-registry")
            .setTcpClient(createTcpClient())
            .setSystemLogin(username)
            .setSystemPasscode(password)
            .setClientLogin(username)
            .setClientPasscode(password)
            .setSystemHeartbeatSendInterval(sendInterval) // Set heartbeat send interval
            .setSystemHeartbeatReceiveInterval(receiveInterval);
}

Logs confirm a CONNECTED frame with heart-beat=[10000, 10000].

Observations

  • Frontend pings/pongs appear consistent.
  • Backend logs indicate heartbeats are sent and received.
  • Connection closes after the TTL timeout (20 seconds).

Questions

  • How can I verify heartbeats are properly received by the broker?
  • Are additional Spring Boot or Artemis configurations required to prevent disconnections?

Steps to Reproduce

  • Configure WebSocket STOMP with a broker using the above setup.
  • Send/receive heartbeats with the specified intervals.
  • Observe connection closure in logs despite consistent heartbeats.

Additional Context
Backend logs:
DEBUG StompBrokerRelayMessageHandler : Received CONNECTED heart-beat=[10000, 10000] session=itwv0lto DEBUG StompBrokerRelayMessageHandler : Forwarding SEND /topic/simp-user-registry session=_system_

@v-perfilev
Copy link
Contributor

Hi @abhishek0499,

I've been playing with ActiveMQ-Artemis as relay broker but haven't been able to reproduce the error you described. It seems you might be using a custom configuration for Artemis because the default connection TTL is 60000ms and not 20000ms. Unfortunately you didn't share how you created your custom TcpClient.

This is my configuration:

  1. docker-compose for ActiveMQ-Artemis:
version: '3.8'
services:
  broker:
    image: apache/activemq-artemis:2.39.0
    ports:
      - "61613:61613"
      - "61616:61616"
      - "8161:8161"
  1. WebSocketConfig on Spring Boot 3.4.1:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

  private final String host = "127.0.0.1";
  private final int port = 61613;
  private final String username = "artemis";
  private final String password = "artemis";
  private final int sendInterval = 10000;
  private final int receiveInterval = 10000;

  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry
        .addEndpoint("/ws")
        .setAllowedOrigins("*")
        .withSockJS();
  }

  @Override
  public void configureMessageBroker(MessageBrokerRegistry config) {
    config
        .setApplicationDestinationPrefixes("/app")
        .enableStompBrokerRelay("/topic", "/queue")
        .setUserDestinationBroadcast("/topic/random")
        .setUserRegistryBroadcast("/topic/simp-user-registry")
        .setTcpClient(createCustomTcpClient())
        .setSystemLogin(username)
        .setSystemPasscode(password)
        .setClientLogin(username)
        .setClientPasscode(password)
        .setSystemHeartbeatSendInterval(sendInterval)
        .setSystemHeartbeatReceiveInterval(receiveInterval);
  }

  private TcpOperations<byte[]> createCustomTcpClient() {
    TcpClient tcpClient = TcpClient
        .create()
        .host(host)
        .port(port);
    return new ReactorNettyTcpClient<>(tcpClient, new StompReactorNettyCodec());
  }
}
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-websocket'
    implementation 'io.projectreactor.netty:reactor-netty:1.1.21'
}

JS Client

import SockJS from "sockjs-client";
import * as Stomp from "@stomp/stompjs";

const socket = new SockJS('http://localhost:8080/ws');
const stompClient = Stomp.Stomp.over(socket);

stompClient.connect({}, (frame) => {
  stompClient.subscribe('/topic/random', (message) => {
    console.log('Received message: ' + message.body);
  });
}, (error) => {
  console.error('Error: ', error);
});

So... check your TCP client - maybe you've set a very short connection timeout. Or maybe the traffic is interrupted at the network level in your environment. You can display extended logs from your client by setting wiretap in your client:

TcpClient tcpClient = TcpClient.create()
    .host(host)
    .port(port)
    .wiretap("reactor.netty", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL);

and in your applicaton.yaml:

logging:
  level:
    reactor.netty: DEBUG

So you'll be able to check if heartbeat messages are sent correctly for both the Spring app and the JS client (ports for two will differ):

2025-01-04T17:17:32.916+01:00 DEBUG 13665 --- [actor-tcp-nio-3] reactor.netty                            : [bc92da25, L:/127.0.0.1:64607 - R:/127.0.0.1:61613] READ: 1B 
2025-01-04T17:17:32.921+01:00 DEBUG 13665 --- [actor-tcp-nio-3] reactor.netty                            : [bc92da25, L:/127.0.0.1:64607 - R:/127.0.0.1:61613] WRITE: 1B

Cheers,
Vladimir Perfilev

@bclozel bclozel added the status: waiting-for-feedback We need additional information before we can continue label Jan 4, 2025
@abhishek0499
Copy link
Author

Thank you for the response. I've tested the suggested approach with a single broker host-port configuration, and it works as expected. However, my use case involves multiple broker host-ports (e.g., tcp://host1:61613,tcp://host2:61613,tcp://host2:61613).

I implemented a custom TcpClient using a Supplier<SocketAddress> that:

  1. Attempts to connect to each broker
  2. Maintains a list of valid broker addresses
  3. Randomly selects one broker from the valid addresses

But when using multiple brokers, I'm still encountering the same TTL error as in my original issue.

I've attached my current implementation for reference. Could you provide guidance on how to properly handle multiple broker connections while avoiding the TTL errors?

private String[] brokerAddresses = "tcp://host1:61613,tcp://host2:61613,tcp://host2:61613"

private static SocketAddress selectRandomBroker(List<SocketAddress> validAddresses) {
        if (validAddresses.isEmpty()) {
            throw new ArtemisException("No valid Artemis broker addresses found.");
        }
        Random random = new Random();
        int randomIndex = random.nextInt(validAddresses.size());
        return validAddresses.get(randomIndex);
    }
 private TcpOperations<byte[]> createCustomTcpClient() {
        Supplier<SocketAddress> addressSupplier = new Supplier<>() {
            @Override
            public SocketAddress get() {
                if (brokerAddresses == null || brokerAddresses.length == 0) {
                    throw new ArtemisException("No Artemis broker URLs found. Please set 'artemis.broker.urls' property.");
                }
                List<SocketAddress> validAddresses = new ArrayList<>();
                for (String address : brokerAddresses) {
                    URI uri;
                    try {
                        uri = new URI(address);
                        LOGGER.info("Trying to connect to Artemis broker with URL {}", uri);
                        ConnectionFactory factory = new ActiveMQConnectionFactory(uri.toString(), username, password);
                        Connection connection = factory.createConnection();
                        connection.close();  // Close temporary connection
                        validAddresses.add(new InetSocketAddress(uri.getHost(), uri.getPort()));
                    } catch (JMSException e) {
                        LOGGER.error("Connection failed on: {}. Reason: {}", address, e.getMessage());
                        LOGGER.info("Retrying connection in 5 seconds");
                        try {
                            Thread.sleep(5 * 1000);
                        } catch (InterruptedException ex) {
                            Thread.currentThread().interrupt();
                        }
                    } catch (URISyntaxException e) {
                        throw new ArtemisException(e.getMessage());
                    }
                }
                if (validAddresses.isEmpty()) {
                    throw new ArtemisException("No valid Artemis broker addresses found.");
                }
                return selectRandomBroker(validAddresses);
            }
        };

        TcpClient tcpClient = TcpClient
                .create()
                .remoteAddress(addressSupplier) // Use the address supplier for random selection
                .wiretap(true); // Enable wiretap for monitoring TCP traffic

        return new
``` ReactorNettyTcpClient<>(tcpClient, new StompReactorNettyCodec());
    }
    

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Jan 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: messaging Issues in messaging modules (jms, messaging) status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged or decided on
Projects
None yet
Development

No branches or pull requests

4 participants