diff --git a/src/main/java/hlf/java/rest/client/config/KafkaConsumerErrorHandler.java b/src/main/java/hlf/java/rest/client/config/KafkaConsumerErrorHandler.java new file mode 100644 index 00000000..bb464160 --- /dev/null +++ b/src/main/java/hlf/java/rest/client/config/KafkaConsumerErrorHandler.java @@ -0,0 +1,105 @@ +package hlf.java.rest.client.config; + +import hlf.java.rest.client.exception.FabricTransactionException; +import hlf.java.rest.client.exception.RetryableServiceException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.ConsumerAwareRecordRecoverer; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.util.backoff.FixedBackOff; + +@Configuration +@ConditionalOnProperty("kafka.integration-points[0].brokerHost") +@RefreshScope +@Slf4j +public class KafkaConsumerErrorHandler { + + private static final long RETRY_INTERVAL_IN_MS = 1000L; + private static final int MAX_RETRY_ATTEMPTS = 5; + private static final List> connectorRetryableExceptions = + Arrays.asList(RetryableServiceException.class, FabricTransactionException.class); + + @Autowired private KafkaProperties kafkaProperties; + + @Autowired private KafkaProducerConfig kafkaProducerConfig; + + @Bean + public CommonErrorHandler topicTransactionErrorHandler() { + + ConsumerAwareRecordRecoverer deadLetterPublishingRecoverer = null; + + /** + * Check if the runtime config has a valid Failed Message Listener. If a dedicated Failed + * message listener is available, then the dead letter configuration would push the failed + * message to the exclusive dead letter topic. If a dedicated failed message listener is not + * configured, check if the event publisher topic can be utilised for publishing the failed + * message. + */ + if (Objects.nonNull(kafkaProperties.getFailedMessageListener()) + && StringUtils.isNotBlank(kafkaProperties.getFailedMessageListener().getTopic())) { + + deadLetterPublishingRecoverer = + generateRecordRecovererWithPublisher(kafkaProperties.getFailedMessageListener()); + } else if (Objects.nonNull(kafkaProperties.getEventListener()) + && kafkaProperties.getEventListener().isListenToFailedMessages()) { + + deadLetterPublishingRecoverer = + generateRecordRecovererWithPublisher(kafkaProperties.getEventListener()); + } + + /* + If no topics are available to publish the failed message, fall-back to a 'NO-OP' record recoverer that would simply log + the message after retry attempts + */ + if (Objects.isNull(deadLetterPublishingRecoverer)) { + deadLetterPublishingRecoverer = + new ConsumerAwareRecordRecoverer() { + @Override + public void accept( + ConsumerRecord consumerRecord, Consumer consumer, Exception e) { + log.warn( + "Retries exhausted.. Committing offset. Dead letter record is not published since the configuration doesnt use subscription topic for publishing failed messages " + + "nor it has a dead letter topic configured."); + } + }; + } + + DefaultErrorHandler defaultErrorHandler = + new DefaultErrorHandler( + deadLetterPublishingRecoverer, + new FixedBackOff(RETRY_INTERVAL_IN_MS, MAX_RETRY_ATTEMPTS)); + defaultErrorHandler.setCommitRecovered(true); + + for (Class retryableExceptionClass : connectorRetryableExceptions) { + defaultErrorHandler.addRetryableExceptions(retryableExceptionClass); + } + + return defaultErrorHandler; + } + + private DeadLetterPublishingRecoverer generateRecordRecovererWithPublisher( + KafkaProperties.Producer destination) { + + KafkaTemplate deadLetterPublisherTemplate = + new KafkaTemplate<>(kafkaProducerConfig.eventProducerFactory(destination)); + deadLetterPublisherTemplate.setDefaultTopic(destination.getTopic()); + + return new DeadLetterPublishingRecoverer( + deadLetterPublisherTemplate, + (cr, e) -> new TopicPartition(destination.getTopic(), cr.partition())); + } +} diff --git a/src/main/java/hlf/java/rest/client/config/KafkaProperties.java b/src/main/java/hlf/java/rest/client/config/KafkaProperties.java index 04800e73..fa66f6ce 100644 --- a/src/main/java/hlf/java/rest/client/config/KafkaProperties.java +++ b/src/main/java/hlf/java/rest/client/config/KafkaProperties.java @@ -20,7 +20,8 @@ public class KafkaProperties { private List integrationPoints; - private Producer eventListener; + private EventProducer eventListener; + private Producer failedMessageListener; @Getter @Setter @@ -45,6 +46,12 @@ public String toString() { } } + @Getter + @Setter + public static class EventProducer extends Producer { + private boolean listenToFailedMessages; + } + /** The type Ssl properties is added for configuring SSL configuration for Kafka Cluster. */ @Getter @Setter diff --git a/src/main/java/hlf/java/rest/client/exception/ErrorCode.java b/src/main/java/hlf/java/rest/client/exception/ErrorCode.java index 32bb9ea9..3162b629 100644 --- a/src/main/java/hlf/java/rest/client/exception/ErrorCode.java +++ b/src/main/java/hlf/java/rest/client/exception/ErrorCode.java @@ -51,6 +51,9 @@ public enum ErrorCode { 5002, "Hyperledger Fabric chaincode operations request has illegal argument or argument is missing."), + HYPERLEDGER_FABRIC_CONNECTION_TIMEOUT_ERROR( + 5000, "Hyperledger Fabric Connection timed-out during Transaction"), + HYPERLEDGER_FABRIC_TRANSACTION_ERROR(6000, "Hyperledger Fabric transaction related error"), HYPERLEDGER_FABRIC_NOT_SUPPORTED(8000, "In Hyperledger Fabric this feature is not supported"), diff --git a/src/main/java/hlf/java/rest/client/exception/RetryableServiceException.java b/src/main/java/hlf/java/rest/client/exception/RetryableServiceException.java new file mode 100644 index 00000000..2a26e20e --- /dev/null +++ b/src/main/java/hlf/java/rest/client/exception/RetryableServiceException.java @@ -0,0 +1,8 @@ +package hlf.java.rest.client.exception; + +public class RetryableServiceException extends ServiceException { + + public RetryableServiceException(ErrorCode code, String message, Throwable cause) { + super(code, message, cause); + } +} diff --git a/src/main/java/hlf/java/rest/client/listener/DynamicKafkaListener.java b/src/main/java/hlf/java/rest/client/listener/DynamicKafkaListener.java index 424e2ff7..128070f7 100644 --- a/src/main/java/hlf/java/rest/client/listener/DynamicKafkaListener.java +++ b/src/main/java/hlf/java/rest/client/listener/DynamicKafkaListener.java @@ -21,6 +21,7 @@ import org.springframework.kafka.listener.AcknowledgingMessageListener; import org.springframework.kafka.listener.BatchAcknowledgingMessageListener; import org.springframework.kafka.listener.BatchListenerFailedException; +import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.support.Acknowledgment; @@ -39,13 +40,15 @@ public class DynamicKafkaListener { private List existingContainers = new ArrayList<>(); - @Autowired KafkaProperties kafkaProperties; + @Autowired private KafkaProperties kafkaProperties; - @Autowired KafkaConsumerConfig kafkaConsumerConfig; + @Autowired private KafkaConsumerConfig kafkaConsumerConfig; - @Autowired TransactionConsumer transactionConsumer; + @Autowired private TransactionConsumer transactionConsumer; - @Autowired TaskExecutor defaultTaskExecutor; + @Autowired private TaskExecutor defaultTaskExecutor; + + @Autowired private CommonErrorHandler topicTransactionErrorHandler; @EventListener public void handleEvent(ContextRefreshedEvent event) { @@ -78,6 +81,7 @@ public void generateAndStartConsumerGroup(KafkaProperties.Consumer consumer) { DefaultKafkaConsumerFactory factory = kafkaConsumerConfig.consumerFactory(consumer); + ContainerProperties containerProperties = new ContainerProperties(consumer.getTopic()); containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); @@ -94,6 +98,7 @@ public void generateAndStartConsumerGroup(KafkaProperties.Consumer consumer) { } container.setConcurrency(consumerListenerConcurrency); + container.setCommonErrorHandler(topicTransactionErrorHandler); container.start(); existingContainers.add(container); diff --git a/src/main/java/hlf/java/rest/client/listener/TransactionConsumer.java b/src/main/java/hlf/java/rest/client/listener/TransactionConsumer.java index 021a4c5d..ef034b9a 100644 --- a/src/main/java/hlf/java/rest/client/listener/TransactionConsumer.java +++ b/src/main/java/hlf/java/rest/client/listener/TransactionConsumer.java @@ -6,7 +6,6 @@ import hlf.java.rest.client.exception.ServiceException; import hlf.java.rest.client.metrics.EmitKafkaCustomMetrics; import hlf.java.rest.client.model.MultiDataTransactionPayload; -import hlf.java.rest.client.service.EventPublishService; import hlf.java.rest.client.service.TransactionFulfillment; import hlf.java.rest.client.util.FabricClientConstants; import java.nio.charset.StandardCharsets; @@ -30,11 +29,8 @@ public class TransactionConsumer { private static final String PAYLOAD_KIND = "payload_kind"; private static final String PL_KIND_MULTI_DATA = "multi_data"; - @Autowired TransactionFulfillment transactionFulfillment; - @Autowired ObjectMapper objectMapper; - - @Autowired(required = false) - EventPublishService eventPublishServiceImpl; + @Autowired private TransactionFulfillment transactionFulfillment; + @Autowired private ObjectMapper objectMapper; /** * This method routes the kafka messages to appropriate methods and acknowledges once processing @@ -125,9 +121,9 @@ public void listen(ConsumerRecord message) { if (isIdentifiableFunction(networkName, contractName, transactionFunctionName) && !transactionParams.isEmpty()) { - if (null != peerNames && !peerNames.isEmpty()) { + if (!peerNames.isEmpty()) { List lstPeerNames = Arrays.asList(peerNames.split(",")); - if (null != lstPeerNames && !lstPeerNames.isEmpty()) { + if (!lstPeerNames.isEmpty()) { if (StringUtils.isNotBlank(collections) && StringUtils.isNotBlank(transientKey)) { transactionFulfillment.writePrivateTransactionToLedger( networkName, @@ -166,13 +162,19 @@ public void listen(ConsumerRecord message) { } } else { - log.info("Incorrect Transaction Payload"); + log.error("Incorrect Transaction Payload"); + throw new ServiceException( + ErrorCode.VALIDATION_FAILED, + "Inbound transaction format is incorrect or doesn't contain valid parameters."); } } catch (FabricTransactionException fte) { - eventPublishServiceImpl.publishTransactionFailureEvent( - fte.getMessage(), networkName, contractName, transactionFunctionName, transactionParams); log.error("Error in Submitting Transaction - Exception - " + fte.getMessage()); + /* + If the error handler has dead letter publish enabled, the errored Record header will be enriched by extracting + the error cause and message from the thrown exception. + */ + throw fte; } catch (Exception ex) { log.error("Error in Kafka Listener - Message Format exception - " + ex.getMessage()); throw new ServiceException(ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, ex.getMessage()); diff --git a/src/main/java/hlf/java/rest/client/service/EventPublishService.java b/src/main/java/hlf/java/rest/client/service/EventPublishService.java index c70fbaa8..ce94f863 100644 --- a/src/main/java/hlf/java/rest/client/service/EventPublishService.java +++ b/src/main/java/hlf/java/rest/client/service/EventPublishService.java @@ -4,7 +4,7 @@ /** * The EventPublishService is a service class, which include the kafka template. It sends the - * Message to the the Event Kafka message topic + * Message to the Event Kafka message topic */ @ConditionalOnProperty("kafka.event-listener.brokerHost") public interface EventPublishService { @@ -49,19 +49,4 @@ boolean publishBlockEvents( String chaincodeName, String functionName, Boolean isPrivateDataPresent); - - /** - * @param errorMsg contents of the error message - * @param networkName channel name in Fabric - * @param contractName chaincode name in the Fabric - * @param functionName function name in a given chaincode. - * @param parameters parameters sent to the chaincode - * @return status of the published message to Kafka, successful or not - */ - boolean publishTransactionFailureEvent( - String errorMsg, - String networkName, - String contractName, - String functionName, - String parameters); } diff --git a/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java b/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java index 04513a68..7b2d4d83 100644 --- a/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java +++ b/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java @@ -222,73 +222,4 @@ public void onFailure(Throwable ex) { return status; } - - @Override - public boolean publishTransactionFailureEvent( - String errorMsg, - String networkName, - String contractName, - String functionName, - String parameters) { - boolean status = true; - - try { - - ProducerRecord producerRecord = - new ProducerRecord(topicName, functionName, parameters); - - producerRecord - .headers() - .add(new RecordHeader(FabricClientConstants.ERROR_MSG, errorMsg.getBytes())); - - producerRecord - .headers() - .add( - new RecordHeader( - FabricClientConstants.FABRIC_CHAINCODE_NAME, contractName.getBytes())); - producerRecord - .headers() - .add(new RecordHeader(FabricClientConstants.FABRIC_CHANNEL_NAME, networkName.getBytes())); - - producerRecord - .headers() - .add( - new RecordHeader( - FabricClientConstants.FABRIC_EVENT_TYPE, - FabricClientConstants.FABRIC_EVENT_TYPE_ERROR.getBytes())); - - producerRecord - .headers() - .add( - new RecordHeader( - FabricClientConstants.FABRIC_EVENT_FUNC_NAME, functionName.getBytes())); - - ListenableFuture> future = kafkaTemplate.send(producerRecord); - - future.addCallback( - new ListenableFutureCallback>() { - - @Override - public void onSuccess(SendResult result) { - log.info( - "Sent message=[" - + parameters - + "] with offset=[" - + result.getRecordMetadata().offset() - + "]"); - } - - @Override - public void onFailure(Throwable ex) { - log.error("Unable to send message=[" + parameters + "] due to : " + ex.getMessage()); - } - }); - - } catch (Exception ex) { - status = false; - log.error("Error sending message - " + ex.getMessage()); - } - - return status; - } } diff --git a/src/main/java/hlf/java/rest/client/service/impl/TransactionFulfillmentImpl.java b/src/main/java/hlf/java/rest/client/service/impl/TransactionFulfillmentImpl.java index ce1d13d0..3b60e6e9 100644 --- a/src/main/java/hlf/java/rest/client/service/impl/TransactionFulfillmentImpl.java +++ b/src/main/java/hlf/java/rest/client/service/impl/TransactionFulfillmentImpl.java @@ -5,6 +5,7 @@ import hlf.java.rest.client.exception.ErrorConstants; import hlf.java.rest.client.exception.FabricTransactionException; import hlf.java.rest.client.exception.NotFoundException; +import hlf.java.rest.client.exception.RetryableServiceException; import hlf.java.rest.client.exception.ServiceException; import hlf.java.rest.client.model.AbstractModelValidator; import hlf.java.rest.client.model.BlockEventWriteSet; @@ -58,6 +59,11 @@ @Service public class TransactionFulfillmentImpl implements TransactionFulfillment { + // List of exceptions that would be wrapped as 'FabricTransactionException' once caught. + private static final List> fabricTransactionExceptionCandidates = + Arrays.asList( + GatewayRuntimeException.class, ContractException.class, InterruptedException.class); + private static final long DEFAULT_ORDERER_TIMEOUT = 60; private static final TimeUnit DEFAULT_ORDERER_TIMEOUT_UNIT = TimeUnit.SECONDS; @@ -189,7 +195,7 @@ public ResponseEntity writeTransactionToLedger( Optional> peerNames, String... transactionParams) { log.info("Initiate the Write Transaction to Ledger process"); - String resultString; + String resultString = StringUtils.EMPTY; List endorsingPeers = new ArrayList<>(); try { Network network = gateway.getNetwork(networkName); @@ -216,38 +222,8 @@ public ResponseEntity writeTransactionToLedger( resultString = new String(result, StandardCharsets.UTF_8); log.info("Transaction Successfully Submitted - Response: " + resultString); - } catch (GatewayRuntimeException gre) { - log.error("Action Failed: A problem occured with Gateway transaction to the peer"); - throw new FabricTransactionException( - ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, gre.getMessage(), gre); - } catch (TimeoutException e) { - // Retry in case of timeout, add limit on number of times this section gets hit. - // For now, it is an infinite trials i.e. ServiceException would not acknowledge - // for the Kafka - // topic. - log.error("Action Failed: Timeout occurred while waiting to submit"); - throw new ServiceException(ErrorCode.HYPERLEDGER_FABRIC_CONNECTION_ERROR, e.getMessage(), e); - } catch (ContractException e) { - // SDK converts the nio exception into an instance of RuntimeException. The - // whole exception - // trace is converted into a ContractException before it is sent back. Rely on - // cause - // information to know if it was an IOException so that a retry can be - // attempted. - if (e.getCause() instanceof IOException - || fabricTxErrorList.stream().anyMatch(e.getMessage()::contains)) { - log.error("Action Failed: A problem occurred with the network connection"); - throw new ServiceException( - ErrorCode.HYPERLEDGER_FABRIC_CONNECTION_ERROR, e.getMessage(), e); - } - log.error("Action Failed: A problem occured while submitting the transaction to the peer"); - throw new FabricTransactionException( - ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, e.getMessage(), e); - } catch (InterruptedException e) { - log.error("Action Failed: A problem occured while submitting the transaction to the peer"); - Thread.currentThread().interrupt(); - throw new FabricTransactionException( - ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, e.getMessage(), e); + } catch (Exception exception) { + handleTransactionException(exception); } return new ResponseEntity<>( new ClientResponseModel(ErrorConstants.NO_ERROR, resultString), HttpStatus.OK); @@ -281,7 +257,7 @@ public ResponseEntity writePrivateTransactionToLedger( Optional> peerNames, String jsonPayload) { log.info("Initiate the Write Transaction to Ledger process"); - String resultString; + String resultString = StringUtils.EMPTY; Collection endorsingPeers = new ArrayList<>(); Map transientParam = new HashMap<>(); try { @@ -310,19 +286,8 @@ public ResponseEntity writePrivateTransactionToLedger( byte[] result = fabricTransaction.submit(collection, transientKey); resultString = new String(result, StandardCharsets.UTF_8); log.info("Transaction Successfully Submitted - Response: " + resultString); - } catch (GatewayRuntimeException gre) { - log.error("Action Failed: A problem occurred with Gateway transaction to the peer"); - throw new FabricTransactionException( - ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, gre.getMessage(), gre); - } catch (ContractException | TimeoutException e) { - log.error("Action Failed: A problem occurred while submitting the transaction to the peer"); - throw new FabricTransactionException( - ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, e.getMessage(), e); - } catch (InterruptedException e) { - log.error("Action Failed: A problem occurred while submitting the transaction to the peer"); - Thread.currentThread().interrupt(); - throw new FabricTransactionException( - ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, e.getMessage(), e); + } catch (Exception exception) { + handleTransactionException(exception); } return new ResponseEntity<>( new ClientResponseModel(ErrorConstants.NO_ERROR, resultString), HttpStatus.OK); @@ -485,7 +450,7 @@ public ResponseEntity writeMultiDataTransactionToLedger( validator.validate(multiDataTransactionPayload); Collection endorsingPeers; - String resultString; + String resultString = StringUtils.EMPTY; try { @@ -555,30 +520,47 @@ public ResponseEntity writeMultiDataTransactionToLedger( resultString = new String(result, StandardCharsets.UTF_8); log.info("Transaction Successfully Submitted - Response: " + resultString); - } catch (GatewayRuntimeException gre) { - log.error("Action Failed: A problem occurred with Gateway transaction to the peer"); - throw new FabricTransactionException( - ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, gre.getMessage(), gre); - } catch (TimeoutException e) { - log.error("Action Failed: Timeout occurred while waiting to submit"); - throw new ServiceException(ErrorCode.HYPERLEDGER_FABRIC_CONNECTION_ERROR, e.getMessage(), e); - } catch (ContractException e) { - if (e.getCause() instanceof IOException - || fabricTxErrorList.stream().anyMatch(e.getMessage()::contains)) { - log.error("Action Failed: A problem occurred with the network connection"); - throw new ServiceException( - ErrorCode.HYPERLEDGER_FABRIC_CONNECTION_ERROR, e.getMessage(), e); - } - log.error("Action Failed: A problem occured while submitting the transaction to the peer"); - throw new FabricTransactionException( - ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, e.getMessage(), e); - } catch (InterruptedException e) { - log.error("Action Failed: A problem occurred while submitting the transaction to the peer"); - Thread.currentThread().interrupt(); - throw new FabricTransactionException( - ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, e.getMessage(), e); + } catch (Exception exception) { + handleTransactionException(exception); } return new ResponseEntity<>( new ClientResponseModel(ErrorConstants.NO_ERROR, resultString), HttpStatus.OK); } + + private void handleTransactionException(Exception incomingException) { + + log.error( + "An error occurred while submitting the transaction to the Network. Error Type: {} & Error Message: {}", + incomingException.getCause(), + incomingException.getMessage()); + + if (fabricTransactionExceptionCandidates.contains(incomingException.getClass())) { + + if (Objects.nonNull(incomingException.getCause()) + && (incomingException.getCause() instanceof IOException + || fabricTxErrorList.stream().anyMatch(incomingException.getMessage()::contains))) { + throw new RetryableServiceException( + ErrorCode.HYPERLEDGER_FABRIC_CONNECTION_ERROR, + incomingException.getMessage(), + incomingException); + } + + throw new FabricTransactionException( + ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, + incomingException.getMessage(), + incomingException); + } + + if (incomingException instanceof TimeoutException) { + throw new RetryableServiceException( + ErrorCode.HYPERLEDGER_FABRIC_CONNECTION_TIMEOUT_ERROR, + incomingException.getMessage(), + incomingException); + } + + throw new ServiceException( + ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_ERROR, + incomingException.getMessage(), + incomingException); + } } diff --git a/src/main/resources/application.template b/src/main/resources/application.template index d5acf8a6..cd50d739 100644 --- a/src/main/resources/application.template +++ b/src/main/resources/application.template @@ -36,8 +36,29 @@ fabric: - channelName: Name of the Channel chaincodeId: chaincode-id of the deployed chaincode in this Channel +kafka: + integration-points: + brokerHost: + topic: + ssl-enabled: boolean + security-protocol: + ssl-keystore-base64: + ssl-truststore-base64: + event-listener: + brokerHost: + topic: + ssl-enabled: boolean + security-protocol: + listenToFailedMessages: boolean - + failed-message-listener: + brokerHost: + topic: + ssl-enabled: boolean + security-protocol: --- spring: profiles: container