Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce capability for publishing error Records to Dead letter topic #106

Merged
merged 7 commits into from
Nov 20, 2023

Conversation

nithin-pankaj
Copy link
Contributor

This PR contains the changes for publishing failed consumer Records polled from the Integration topic(s) to a configured error Event listener Topic.

The changes would effectively allow the Connector to publish the Error record to a topic based on the following criteria :-

  • If the Event-Listener topic (Block & Chaincode) has the optional field listenToFailedMessages set as true, the failed message will be published back to Event listener topic (this is the current behaviour).
  • If the Kafka properties of a dedicated Dead letter topic is configured, the errored message will be published to the configured dead letter topic.
  • If the configuration listenToFailedMessages is set to true as well as a dedicated Dead letter topic is configured, the dead letter topic will take priority and record will be dispatched to it.
  • If none of the above options are available, the listener will attempt retries and log the failed record by committing the offset.

@nithin-pankaj nithin-pankaj requested a review from a team as a code owner November 16, 2023 09:46
Comment on lines 55 to 67
KafkaTemplate<String, String> deadLetterPublisherTemplate =
new KafkaTemplate<>(
kafkaProducerConfig.eventProducerFactory(kafkaProperties.getFailedMessageListener()));

// Set the exclusive dead letter topic
deadLetterPublisherTemplate.setDefaultTopic(
kafkaProperties.getFailedMessageListener().getTopic());
deadLetterPublishingRecoverer =
new DeadLetterPublishingRecoverer(
deadLetterPublisherTemplate,
(cr, e) ->
new TopicPartition(
kafkaProperties.getFailedMessageListener().getTopic(), cr.partition()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] a method can be extracted for these lines as only listener is different and rest is same in both scenarios

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Done.

@@ -20,7 +20,8 @@
public class KafkaProperties {

private List<Consumer> integrationPoints;
private Producer eventListener;
private EventProducer eventListener;
private Producer failedMessageListener;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the application.yml change for this in application.template file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

@nithin-pankaj nithin-pankaj merged commit 76caa88 into hyperledger-labs:main Nov 20, 2023
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants