-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer is stuck #162
Comments
It's strange that no log is written at all. Have you tried lowering the log level for the Silverback namespace? The message broker also has a log and you should be able to see the reason why it got disconnected. It's hard to help you further without a log or any other hint. What I can say is that it isn't a known issue and we would have noticed by now if this were a systematic bug. |
Thank for quick answer.
Example from one module configuration. services.ConfigureSilverback()
.AddEndpointsConfigurator(x =>
{
return new NotificationsEndpointsConfigurator(moduleOptions.BridgeOptions);
})
// Add Subscribers
.AddScopedSubscriber<DrivingAssignedEventQueueReader>(filter)
.AddScopedSubscriber<DrivingRequestConfirmedQueueReader>(filter)
.AddScopedSubscriber<DrivingCanceledEventQueueReader>(filter)
.AddScopedSubscriber<DrivingRequestCanceledEventQueueReader>(filter)
.AddScopedSubscriber<SurveyCreatedEventQueueReader>(filter)
.AddScopedSubscriber<DrivingStartingQueueReader>(filter)
.AddScopedSubscriber<OffsetFailedEventQueueReader>(filter)
.AddScopedSubscriber<NotifyPaymentEventQueueReader>(filter)
; Note: public class NotificationsEndpointsConfigurator : IEndpointsConfigurator
{
private readonly BridgeOptions bridgeOptions;
public NotificationsEndpointsConfigurator(BridgeOptions bridgeOptions)
{
this.bridgeOptions = bridgeOptions;
}
public void Configure(IEndpointsConfigurationBuilder builder)
{
Action<KafkaConsumerConfig> consumersConfig = config =>
{
config.BootstrapServers =
bridgeOptions.BootstrapServers;
config.GroupId = bridgeOptions.ConsumerGroupId;
config.AutoOffsetReset = bridgeOptions.AutoOffsetReset;
};
Action<KafkaProducerConfig> producerConfig = config =>
{
config.BootstrapServers =
bridgeOptions.BootstrapServers;
};
builder
.AddKafkaEndpoints(
endpoints => endpoints
.AddOutbound<AddNotification>(endpoint => endpoint.ProduceTo(nameof(AddNotification)).Configure(producerConfig).SerializeAsJsonUsingNewtonsoft())
.AddInbound<DrivingRequestEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingRequestEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
.AddInbound<DrivingRequestConfirmedEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingRequestConfirmedEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
.AddInbound<DrivingAssignedEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingAssignedEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
.AddInbound<DrivingCanceledEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingCanceledEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
.AddInbound<DrivingRequestCanceledEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingRequestCanceledEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
.AddInbound<SurveyCreatedEvent>(endpoint => endpoint.ConsumeFrom(nameof(SurveyCreatedEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
.AddInbound<DrivingStartingEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingStartingEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
.AddInbound<OffsetFailedEvent>(endpoint => endpoint.ConsumeFrom(nameof(OffsetFailedEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
.AddInbound<NotifyPaymentEvent>(endpoint => endpoint.ConsumeFrom(nameof(NotifyPaymentEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
);
}
} |
What logging levels do you recommend https://github.com/BEagle1984/silverback/blob/master/docs/concepts/logging.md ?
|
Just add this to your app.settings to have all possible logs (it's gonna be verbose): {
"Logging": {
"LogLevel": {
"Silverback": "Trace"
}
} The configuration is really basic and I don't see anything wrong. If the consumer disconnects you should see the related info entries in the log (consumer disconnected, partitions revoked, etc.). When you say "after some time idle" do you mean that you don't produce any message for a while and when you start producing again your consumers don't pull them? Did I get it correctly? |
LogLevel changed, we will check for errors... Yes you are right. Also I have a problem. Maybe it's related. Can mode of build (Debug|Release, ASPNETCORE_ENVIRONMENT etc) change behavior? |
I have event A and B, consumer A and B, topic A and B. |
It should work exactly the same on the local machine and in kubernetes. Unless you implemented some Kafka-related health check that causes your pod to be stopped in that situation. The consumers/producers in Silverback are completely unrelated and I never observed the behavior you describe. It's very difficult to help you with the information I have. Are you able to provide a sample project that reproduces the issue on a local environment (Kafka started via docker compose)? Or at least the trace logs of your service where I can try to figure out what happened? |
I was no longer able to reproduce the problem, if it appears again I will let you know. The only thing I did was to remove the call to ConfigureSilverback in the application modules. But I'm not sure if that had any effect. |
OK, keep me posted. No, calling |
After some time idle, consumer stop to read messages from Kafka (without errors and logs). I need to re-run consumer to start reading again. Have you any ideas?
The text was updated successfully, but these errors were encountered: