Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce capability for publishing error Records to Dead letter topic #106

Merged
merged 7 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package hlf.java.rest.client.config;

import hlf.java.rest.client.exception.FabricTransactionException;
import hlf.java.rest.client.exception.RetryableServiceException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRecordRecoverer;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

@Configuration
@ConditionalOnProperty("kafka.integration-points[0].brokerHost")
@RefreshScope
@Slf4j
public class KafkaConsumerErrorHandler {

private static final long RETRY_INTERVAL_IN_MS = 1000L;
private static final int MAX_RETRY_ATTEMPTS = 5;
private static final List<Class<? extends Exception>> connectorRetryableExceptions =
Arrays.asList(RetryableServiceException.class, FabricTransactionException.class);

@Autowired private KafkaProperties kafkaProperties;

@Autowired private KafkaProducerConfig kafkaProducerConfig;

@Bean
public CommonErrorHandler topicTransactionErrorHandler() {

ConsumerAwareRecordRecoverer deadLetterPublishingRecoverer = null;

/**
* Check if the runtime config has a valid Failed Message Listener. If a dedicated Failed
* message listener is available, then the dead letter configuration would push the failed
* message to the exclusive dead letter topic. If a dedicated failed message listener is not
* configured, check if the event publisher topic can be utilised for publishing the failed
* message.
*/
if (Objects.nonNull(kafkaProperties.getFailedMessageListener())
&& StringUtils.isNotBlank(kafkaProperties.getFailedMessageListener().getTopic())) {

deadLetterPublishingRecoverer =
generateRecordRecovererWithPublisher(kafkaProperties.getFailedMessageListener());
} else if (Objects.nonNull(kafkaProperties.getEventListener())
&& kafkaProperties.getEventListener().isListenToFailedMessages()) {

deadLetterPublishingRecoverer =
generateRecordRecovererWithPublisher(kafkaProperties.getEventListener());
}

/*
If no topics are available to publish the failed message, fall-back to a 'NO-OP' record recoverer that would simply log
the message after retry attempts
*/
if (Objects.isNull(deadLetterPublishingRecoverer)) {
deadLetterPublishingRecoverer =
new ConsumerAwareRecordRecoverer() {
@Override
public void accept(
ConsumerRecord<?, ?> consumerRecord, Consumer<?, ?> consumer, Exception e) {
log.warn(
"Retries exhausted.. Committing offset. Dead letter record is not published since the configuration doesnt use subscription topic for publishing failed messages "
+ "nor it has a dead letter topic configured.");
}
};
}

DefaultErrorHandler defaultErrorHandler =
new DefaultErrorHandler(
deadLetterPublishingRecoverer,
new FixedBackOff(RETRY_INTERVAL_IN_MS, MAX_RETRY_ATTEMPTS));
defaultErrorHandler.setCommitRecovered(true);

for (Class<? extends Exception> retryableExceptionClass : connectorRetryableExceptions) {
defaultErrorHandler.addRetryableExceptions(retryableExceptionClass);
}

return defaultErrorHandler;
}

private DeadLetterPublishingRecoverer generateRecordRecovererWithPublisher(
KafkaProperties.Producer destination) {

KafkaTemplate<String, String> deadLetterPublisherTemplate =
new KafkaTemplate<>(kafkaProducerConfig.eventProducerFactory(destination));
deadLetterPublisherTemplate.setDefaultTopic(destination.getTopic());

return new DeadLetterPublishingRecoverer(
deadLetterPublisherTemplate,
(cr, e) -> new TopicPartition(destination.getTopic(), cr.partition()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
public class KafkaProperties {

private List<Consumer> integrationPoints;
private Producer eventListener;
private EventProducer eventListener;
private Producer failedMessageListener;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the application.yml change for this in application.template file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!


@Getter
@Setter
Expand All @@ -45,6 +46,12 @@ public String toString() {
}
}

@Getter
@Setter
public static class EventProducer extends Producer {
private boolean listenToFailedMessages;
}

/** The type Ssl properties is added for configuring SSL configuration for Kafka Cluster. */
@Getter
@Setter
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/hlf/java/rest/client/exception/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public enum ErrorCode {
5002,
"Hyperledger Fabric chaincode operations request has illegal argument or argument is missing."),

HYPERLEDGER_FABRIC_CONNECTION_TIMEOUT_ERROR(
5000, "Hyperledger Fabric Connection timed-out during Transaction"),

HYPERLEDGER_FABRIC_TRANSACTION_ERROR(6000, "Hyperledger Fabric transaction related error"),

HYPERLEDGER_FABRIC_NOT_SUPPORTED(8000, "In Hyperledger Fabric this feature is not supported"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package hlf.java.rest.client.exception;

public class RetryableServiceException extends ServiceException {

public RetryableServiceException(ErrorCode code, String message, Throwable cause) {
super(code, message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.listener.BatchListenerFailedException;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
Expand All @@ -39,13 +40,15 @@ public class DynamicKafkaListener {

private List<ConcurrentMessageListenerContainer> existingContainers = new ArrayList<>();

@Autowired KafkaProperties kafkaProperties;
@Autowired private KafkaProperties kafkaProperties;

@Autowired KafkaConsumerConfig kafkaConsumerConfig;
@Autowired private KafkaConsumerConfig kafkaConsumerConfig;

@Autowired TransactionConsumer transactionConsumer;
@Autowired private TransactionConsumer transactionConsumer;

@Autowired TaskExecutor defaultTaskExecutor;
@Autowired private TaskExecutor defaultTaskExecutor;

@Autowired private CommonErrorHandler topicTransactionErrorHandler;

@EventListener
public void handleEvent(ContextRefreshedEvent event) {
Expand Down Expand Up @@ -78,6 +81,7 @@ public void generateAndStartConsumerGroup(KafkaProperties.Consumer consumer) {

DefaultKafkaConsumerFactory<String, String> factory =
kafkaConsumerConfig.consumerFactory(consumer);

ContainerProperties containerProperties = new ContainerProperties(consumer.getTopic());
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

Expand All @@ -94,6 +98,7 @@ public void generateAndStartConsumerGroup(KafkaProperties.Consumer consumer) {
}

container.setConcurrency(consumerListenerConcurrency);
container.setCommonErrorHandler(topicTransactionErrorHandler);

container.start();
existingContainers.add(container);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import hlf.java.rest.client.exception.ServiceException;
import hlf.java.rest.client.metrics.EmitKafkaCustomMetrics;
import hlf.java.rest.client.model.MultiDataTransactionPayload;
import hlf.java.rest.client.service.EventPublishService;
import hlf.java.rest.client.service.TransactionFulfillment;
import hlf.java.rest.client.util.FabricClientConstants;
import java.nio.charset.StandardCharsets;
Expand All @@ -30,11 +29,8 @@ public class TransactionConsumer {
private static final String PAYLOAD_KIND = "payload_kind";
private static final String PL_KIND_MULTI_DATA = "multi_data";

@Autowired TransactionFulfillment transactionFulfillment;
@Autowired ObjectMapper objectMapper;

@Autowired(required = false)
EventPublishService eventPublishServiceImpl;
@Autowired private TransactionFulfillment transactionFulfillment;
@Autowired private ObjectMapper objectMapper;

/**
* This method routes the kafka messages to appropriate methods and acknowledges once processing
Expand Down Expand Up @@ -125,9 +121,9 @@ public void listen(ConsumerRecord<String, String> message) {
if (isIdentifiableFunction(networkName, contractName, transactionFunctionName)
&& !transactionParams.isEmpty()) {

if (null != peerNames && !peerNames.isEmpty()) {
if (!peerNames.isEmpty()) {
List<String> lstPeerNames = Arrays.asList(peerNames.split(","));
if (null != lstPeerNames && !lstPeerNames.isEmpty()) {
if (!lstPeerNames.isEmpty()) {
if (StringUtils.isNotBlank(collections) && StringUtils.isNotBlank(transientKey)) {
transactionFulfillment.writePrivateTransactionToLedger(
networkName,
Expand Down Expand Up @@ -166,13 +162,19 @@ public void listen(ConsumerRecord<String, String> message) {
}

} else {
log.info("Incorrect Transaction Payload");
log.error("Incorrect Transaction Payload");
throw new ServiceException(
ErrorCode.VALIDATION_FAILED,
"Inbound transaction format is incorrect or doesn't contain valid parameters.");
}

} catch (FabricTransactionException fte) {
eventPublishServiceImpl.publishTransactionFailureEvent(
fte.getMessage(), networkName, contractName, transactionFunctionName, transactionParams);
log.error("Error in Submitting Transaction - Exception - " + fte.getMessage());
/*
If the error handler has dead letter publish enabled, the errored Record header will be enriched by extracting
the error cause and message from the thrown exception.
*/
throw fte;
} catch (Exception ex) {
log.error("Error in Kafka Listener - Message Format exception - " + ex.getMessage());
throw new ServiceException(ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, ex.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

/**
* The EventPublishService is a service class, which include the kafka template. It sends the
* Message to the the Event Kafka message topic
* Message to the Event Kafka message topic
*/
@ConditionalOnProperty("kafka.event-listener.brokerHost")
public interface EventPublishService {
Expand Down Expand Up @@ -49,19 +49,4 @@ boolean publishBlockEvents(
String chaincodeName,
String functionName,
Boolean isPrivateDataPresent);

/**
* @param errorMsg contents of the error message
* @param networkName channel name in Fabric
* @param contractName chaincode name in the Fabric
* @param functionName function name in a given chaincode.
* @param parameters parameters sent to the chaincode
* @return status of the published message to Kafka, successful or not
*/
boolean publishTransactionFailureEvent(
String errorMsg,
String networkName,
String contractName,
String functionName,
String parameters);
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,73 +222,4 @@ public void onFailure(Throwable ex) {

return status;
}

@Override
public boolean publishTransactionFailureEvent(
String errorMsg,
String networkName,
String contractName,
String functionName,
String parameters) {
boolean status = true;

try {

ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>(topicName, functionName, parameters);

producerRecord
.headers()
.add(new RecordHeader(FabricClientConstants.ERROR_MSG, errorMsg.getBytes()));

producerRecord
.headers()
.add(
new RecordHeader(
FabricClientConstants.FABRIC_CHAINCODE_NAME, contractName.getBytes()));
producerRecord
.headers()
.add(new RecordHeader(FabricClientConstants.FABRIC_CHANNEL_NAME, networkName.getBytes()));

producerRecord
.headers()
.add(
new RecordHeader(
FabricClientConstants.FABRIC_EVENT_TYPE,
FabricClientConstants.FABRIC_EVENT_TYPE_ERROR.getBytes()));

producerRecord
.headers()
.add(
new RecordHeader(
FabricClientConstants.FABRIC_EVENT_FUNC_NAME, functionName.getBytes()));

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);

future.addCallback(
new ListenableFutureCallback<SendResult<String, String>>() {

@Override
public void onSuccess(SendResult<String, String> result) {
log.info(
"Sent message=["
+ parameters
+ "] with offset=["
+ result.getRecordMetadata().offset()
+ "]");
}

@Override
public void onFailure(Throwable ex) {
log.error("Unable to send message=[" + parameters + "] due to : " + ex.getMessage());
}
});

} catch (Exception ex) {
status = false;
log.error("Error sending message - " + ex.getMessage());
}

return status;
}
}
Loading
Loading