-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: Support for transactions #109
Comments
This is for sure a cool topic. It's something I had a look in the past already, but I never really tried to integrate. Please note that the whole point of the transactions is to implement the exactly once processing. This can be achieved already with Silverback, storing the offsets into the database and using the outbox table ( The producer transaction at the end of the day does two things: it ensure that all produced messages are either committed or discarded and that the consumed offset is committed as part of the same transaction. As said, both goals are achieved by Silverback already, but using the database. That being said, using the native transactions is far more optimized and will lead to a much greater throughput. I think that a possible approach would be to rely on an ambient transaction. using (var scope = new TransactionScope())
{
await _publisher.PublishAsync(...);
await _publisher.PublishAsync(...);
await _publisher.PublishAsync(...);
} The tricky part here is that in Silverback we cannot assume that this peace of code will never be executed concurrently and we need therefore to create a sort of pool of producers to handle each transaction. (I guess...) To include the consumer into the transaction it should be possible to add an option to the inbound endpoint, like Here we start with some assumptions. I think that in order to join multiple producers into the same transaction, all it's needed is to set the same It all looks very straightforward but it for sure isn't that easy and I'm missing a ton of details. |
I forgot to say that if we go down the ambient transaction route, we get the big bonus of having the database transactions being automatically enlisted into the same transaction. The drawback is maybe the API that is a bit weird if you are not using a database (with isolation levels etc., that don't have any meaning in this context). |
Our problem isn't the exactly-once but the atomicity part. In our use case we have to garantuee that messages are only visible to the consumer in batch (n messages in topic 1 and one message in topic 2). The outbox pattern addresses the problem of combining database transactions and message delivery garantuee [1]. However, it does not enable you to be transactional over topics and partitions. i.e. the producer sends a batch of messages to multiple partitions such that either all messages in the batch are eventually visible to any consumer or none are ever visible to consumers. [2]
This looks very good IMHO. It also means that we don't have to break the abstraction (because other message brokers maybe don't support transactions) but still have granular control.
Currently there is one producer per endpoint, right? We might raise the question if this is the thing we should do anyway. E.g. the following is stated by the KafkaProducer Java doc: "The producer is thread safe and should generally be shared among all threads for best performance." How is this in .NET?
Very interesting. Do we have to think about nested TransactionScopes? [1] https://microservices.io/patterns/data/transactional-outbox.html (09.09.2020) |
This is definitely addressed by the outbox pattern. The messages are atomically written nto the outbox table and that gives you the guarantee that they will eventually be written to the topic. The background service taking care of the outbox queue will just retry over and over until it succedes.
It's the same in .NET. It is even suggested to use a single This could actually create some issues with the transactions, where the
They can definitely be nested, I don't know what it means for us. I think the [4] confluentinc/confluent-kafka-dotnet#868 |
It does garantuee that eventuelly this will be but not as a transaction (eventual consistency isn't the same as atomicity even if the final result might be the same). We must not make messages in one topic visible to the consumer if the write into the other was not successful. In most cases it will be the same because the messags in the outbox queue can be processed but not in every situation.
Hmm, yes, thats true.
Really? This sounds wired. I've to digg a bit into how this transaction-stuff is implemented. |
Well, there are some edge cases, where the messages from the outbox table cannot be published anymore, but you pretty much have to get rid of the topic (or the outbound configuration in Silverback at least) for it to happen. I don't see much other instances.
Yep, nice. They are doing pretty much what I described but it's worth having a closer look. private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
if (txIdPrefix != null) {
if (this.producerPerConsumerPartition) {
return createTransactionalProducerForPartition(txIdPrefix);
}
else {
return createTransactionalProducer(txIdPrefix);
}
}
if (this.producerPerThread) {
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
if (this.threadBoundProducerEpochs.get() == null) {
this.threadBoundProducerEpochs.set(this.epoch.get());
}
if (tlProducer != null && this.epoch.get() != this.threadBoundProducerEpochs.get()) {
closeThreadBoundProducer();
tlProducer = null;
}
if (tlProducer == null) {
tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
this.physicalCloseTimeout, this.beanName);
for (Listener<K, V> listener : this.listeners) {
listener.producerAdded(tlProducer.clientId, tlProducer);
}
this.threadBoundProducers.set(tlProducer);
this.threadBoundProducerEpochs.set(this.epoch.get());
}
return tlProducer;
}
synchronized (this) {
if (this.producer == null) {
this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
this.physicalCloseTimeout, this.beanName);
this.listeners.forEach(listener -> listener.producerAdded(this.producer.clientId, this.producer));
}
return this.producer;
}
}
protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
BlockingQueue<CloseSafeProducer<K, V>> queue = getCache(txIdPrefix);
Assert.notNull(queue, () -> "No cache found for " + txIdPrefix);
Producer<K, V> cachedProducer = queue.poll();
if (cachedProducer == null) {
return doCreateTxProducer(txIdPrefix, "" + this.transactionIdSuffix.getAndIncrement(), this::cacheReturner);
}
else {
return cachedProducer;
}
}
protected Producer<K, V> createTransactionalProducerForPartition(String txIdPrefix) {
String suffix = TransactionSupport.getTransactionIdSuffix();
if (suffix == null) {
return createTransactionalProducer(txIdPrefix);
}
else {
synchronized (this.consumerProducers) {
if (!this.consumerProducers.containsKey(suffix)) {
CloseSafeProducer<K, V> newProducer = doCreateTxProducer(txIdPrefix, suffix,
this::removeConsumerProducer);
this.consumerProducers.put(suffix, newProducer);
return newProducer;
}
else {
return this.consumerProducers.get(suffix);
}
}
}
} If no transaction is involved, they use a single producer. Otherwise a producer is created per each transaction. Then there are some options to have a producer per thread (option that we don't have in Silverback) and a producer per each partition when transactions are involved (I guess because it's partition is consumed separately in differnet threads, this is also not supported in Silverback...yet). |
Let's discuss how we could build support for Kafka-Transactions in Silverback.
An example how to do this with the .NET Kafka lib is shown here: https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Transactions/Program.cs
The text was updated successfully, but these errors were encountered: