-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[QUERY] Managing event consumption from EventHub in batch mode with multiple consumers #47654
Comments
Thank you for your feedback. Tagging and routing to the team member best able to assist. |
@davidedg87: Thanks for reaching out and we regret that you're experiencing difficulties. Event Hubs has an at-least-once delivery guarantee; your application must be tolerant of duplicates and idempotent in its processing. General informationWhen you "kill" a processor - whether by graceful stop or terminating the process - any partitions that it had owned will be claimed by other processors in the group and restart processing from the last recorded checkpoint. When a partition moves between processors, there is a potential for 1-2 batches of overlap in which the new owner has taken control, but the old owner is not yet aware and has a batch held in memory that it is being dispatched for processing. When old owner's load balancing loop runs (every 30 seconds by default) or it attempts to read the next batch from the partition, it recognizes the new owner and updates its state to reflect the change. During this period of overlap, there will be two processor instances processing data from the same partition. Either the old or new owner may write a checkpoint during this time. Checkpointing guidance
Offsets are opaque data, and they do not change in a predictable pattern from one event to another. You cannot safely reason about the highest offset being later in the stream than lower. It is quite possible that the opposite is true. Sequence numbers, on the other hand, do change in a predictable pattern and are safe to infer a relationship between. Snippet analysis
The Event Hubs clients prioritize data integrity above all else; they will always resend events unless your application has explicitly created a checkpoint. There is no client scenario where data loss takes place. Unless your application is creating checkpoints for events that it has not yet processed or has not fully processed, the stream will always rewind for ownership changes or recovery, and you would see duplicates. In the snippet that you shared, your error handling seems problematic when processing events. On L119, you're catching all exceptions as you process the payload of an event. If any exception takes place, you ignore it and move on. You don't account for this later in the logic and will create a checkpoint for the batch (L139) and cache entry for every event in the batch - including those with errors. (L148-152) As a result, you will skip any events that triggered an exception and would not see an entry in your database for them. Likewise, when an event does not have a payload, you'll explicitly skip over that event (L106) and write the checkpoint and cache entry. Those will also not appear in your database. I'm going to assume that your publisher is explicitly setting the Next stepsThere's not much else that I can offer with the available context. If you'd like to collect a +/- 5-minute slice of Azure SDK logs around the behavior that you're asking about, we'd be happy to take a look and offer thoughts. You'll want to capture logs at the "Verbose" level and filter to the ids discussed in this section of the Troubleshooting Guide. Discussion of capturing logs can be found in this sample and in the article Logging with the Azure SDK for .NET. I'm going to mark this as addressed. If you'd like us to take a look at a logs, please unresolve once logs are available. |
Hi @davidedg87. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text "/unresolve" to remove the "issue-addressed" label and continue the conversation. |
/unresolve |
Hi @jsquire first of all thank you for time dedicated to my issue. |
Hi @davidedg87. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue. |
Hi @davidedg87, we're sending this friendly reminder because we haven't heard back from you in 7 days. We need more information about this issue to help address it. Please be sure to give us your input. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you! |
Hi @jsquire. Attached, you’ll find a ZIP file containing the logs collected over approximately 7 minutes at the verbose level, filtered for the exceptions you indicated. During the test, I used a maximum parallelism of 4 consumers for 4 partitions, and I repeatedly killed running consumers to restart them. You’ll find several .txt files because each time a consumer restarted, a new file was generated. In this version, I corrected the handling of the event search for checkpointing, ensuring it selects the one with the maximum sequenceNumber instead of the maximum offset. Could you take a look? Thanks! |
Hi @davidedg87. I've taken a look over the logs, and I don't see anything unusual or unexpected in the patterns there. One thing that I did notice is that the logs are missing some events that we would be interested in - such as how the reader was positioned (checkpoint or default) from ETW Id: 105. As a result, we don't know what position was chosen or why. Can you confirm the version of the package that you're using? I'm seeing formatting fixes missing, which seems to indicate that we're looking at 5.11.3 or earlier. I'd also be interested in seeing your logging code. Log analysisSince the discussion was centered on overlapping ownership as instances stop/die, that's what I focused on. I saw nothing in the logs that was out of place for the scenario; log patterns were normal and expected. The ownership changes and transition followed expected norms. Changes were generally clean with no contention, other than one transition that seemed to have two rapid transitions. I don't see anything that indicates a problem. If you have an example of where you think an event was skipped during this run, can you provide the partition and approximate time? We do have ETW 129, which tells us explicitly which sequence number range was dispatched for processing, but summarization and gap analysis is difficult due to having things broken into disparate files and in plain text format. Having an idea of where to look will save me from having to write something to parse these out and extract the ranges over time, which I'd prefer not to do unless data points to there being a client issue. Log observationsfiltered_verbose_logs_20250103_101428:
filtered_verbose_logs_20250103_101236
filtered_verbose_logs_20250103_101111
filtered_verbose_logs_20250103_101015
filtered_verbose_logs_20250103_100851
filtered_verbose_logs_20250103_100844
filtered_verbose_logs_20250103_100832
filtered_verbose_logs_20250103_100822
|
Hi @davidedg87. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue. |
Hi @jsquire thanks for the answer. This is the snippet of code related to logging //Path file log
string timestamp = DateTime.Now.ToString("yyyyMMdd_HHmmss");
string logFilePath = @$"C:\temp\filtered_verbose_logs_{timestamp}.txt";
//Event ids to be filetered
var filteredEventIds = new HashSet<int>
{
3, 4, 5, 6, 7, 8, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45,
76, 79, 83, 88, 89, 90, 103, 104, 123, 124, 125, 126, 127, 129
};
//Writer configuration
using StreamWriter fileWriter = new StreamWriter(logFilePath, append: true)
{
AutoFlush = true
};
//Definition of the listener
using AzureEventSourceListener customListener = new AzureEventSourceListener(
(args, message) =>
{
if (args.EventSource.Name.StartsWith("Azure-Messaging-EventHubs")
&& filteredEventIds.Contains(args.EventId))
{
string logEntry = $"[{DateTime.Now}] [Verbose] [Event ID: {args.EventId}] {message}";
// Scrivi il log su file
fileWriter.WriteLine(logEntry);
}
},
EventLevel.Verbose); Regarding the management, I would like to ask if it is correct to handle batch processing this way. public async Task OnProcessingEventBatchAsync(IEnumerable<EventData> events, Func<Task> checkpointAsync)
{
try
{
// Step 1: Filter already processed events
// Check the cache and database to exclude events that have already been processed.
var eventsToProcess = await FilterEventsAsync(events);
// Step 2: Process the events
// Apply the business logic for each event in the filtered list.
await ProcessEventsAsync(eventsToProcess);
// Step 3: Save the processed events state
// Persist the processed events in both the in-memory cache and the database.
await SaveProcessedEventsAsync(eventsToProcess);
// Step 4: Save the checkpoint
// Commit the checkpoint to mark the batch as successfully processed.
await SaveCheckpointAsync(checkpointAsync);
}
catch (Exception ex)
{
// Handle any errors during batch processing
// Log or perform appropriate actions for the encountered exception.
HandleBatchError(ex);
}
} Summary of the Steps
Thanks in advance for the answer |
Thanks, @davidedg87. That answers a couple of things:
There's a couple of open questions still:
As mentioned, there's nothing in the logs for this run that indicates a problem or abnormal behavior. If there's no example of "skipped" events, then there's no analysis left to do. If we are, I'd like to confirm that the processor dispatched those to your handler. |
Hi @davidedg87. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue. |
Hi @jsquire . Answers to your questions :
Thanks |
@jsquire Can you please give me also a feedback related to the management i've done inside the OnProcessingEventBatchAsync to understand if I've done it in the correct way? Thanks |
Library name and version
Azure.Messaging.EventHubs.Processor 5.11.5
Query/Question
In my usage scenario, I need to consume events from an EventHub with 4 partitions.
The goal is to process the events in batch mode to optimize the underlying business logic by aggregating queries to handle 'batches' of events rather than individual events.
To achieve this, I created a CustomEventProcessor by extending the EventProcessor class because the EventProcessorClient does not allow processing multiple events at once.
The issues I am currently facing are as follows:
When I kill a consumer and restart it, there is a moment when it seems that multiple consumers read the same data from the same partition. This forces me to perform a check to verify whether the event has already been processed or not.
[Currently, this check is done in the database by verifying a unique property of the message.] Is this approach correct, or am I managing it incorrectly?
When the situation described in point 1 occurs, I skip that event and process only the events that have not been processed. At the end, I update the checkpoint using UpdateCheckpointAsync for the event [not skipped] with the highest offset.
This approach seems to work, but occasionally, I have noticed that some events are 'lost,' as if a checkpoint for a subsequent offset was performed, causing some events to not be correctly processed.
This is the code of Consumer
EventBatchProcessor.txt
The goal is to create a robust consumer that can scale with multiple replicas of the consumers (always a maximum equal to the number of partitions) and to correctly manage checkpoints and duplicate control.
Thanks in advance for your help.
Environment
No response
The text was updated successfully, but these errors were encountered: