Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce dedupe for outbound events #148

Merged
merged 8 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@
<scope>runtime</scope>
<optional>true</optional>
</dependency>

<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.2.0-jre</version>
</dependency>


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package hlf.java.rest.client.config;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import hlf.java.rest.client.service.RecencyTransactionContext;
import hlf.java.rest.client.service.impl.DefaultCacheBasedRecencyTransactionContext;
import hlf.java.rest.client.service.impl.NoOpRecencyTransactionContext;
import java.util.concurrent.TimeUnit;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Data
@Configuration
@ConfigurationProperties(prefix = "dedupe")
@Slf4j
public class EventDedupeConfig {

private boolean enable;
private int recencyWindowSize;
private int recencyWindowExpiryInMinutes;

@Bean
public RecencyTransactionContext recencyTransactionContext() {

if (!this.isEnable()) {
log.info(
"Dedupe config is disabled. Events wont be validated prior to submission to publisher topic");
return new NoOpRecencyTransactionContext();
}

Cache<String, Object> recencyCache =
CacheBuilder.newBuilder()
.maximumSize(this.getRecencyWindowSize())
.expireAfterAccess(this.getRecencyWindowExpiryInMinutes(), TimeUnit.MINUTES)
.build();

log.info(
"Enabling recency check with cache size {} and TTL {} minutes",
recencyWindowSize,
recencyWindowExpiryInMinutes);

return new DefaultCacheBasedRecencyTransactionContext(recencyCache);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public ProducerFactory<String, String> eventProducerFactory(
props.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, Boolean.FALSE);
props.put(
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerProperties.getEnableIdempotence());

// Azure event-hub config
configureSaslProperties(props, kafkaProducerProperties.getSaslJaasConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public static class Producer extends SSLProperties {
private String brokerHost;
private String topic;
private String saslJaasConfig;
private Boolean enableIdempotence;

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class EventController {
public ResponseEntity<ClientResponseModel> replayEvents(
@RequestParam("block-number-start") Long startBlockNumber,
@RequestParam("block-number-end") Long endBlockNumber,
@RequestParam(value = "transaction-id", required = false) String transactionId,
@RequestParam("channel") @Validated String networkName,
@RequestParam("eventType") @Validated String eventType) {
log.info(
Expand All @@ -41,6 +42,7 @@ public ResponseEntity<ClientResponseModel> replayEvents(
endBlockNumber,
networkName,
eventType);
return eventFulfillment.replayEvents(startBlockNumber, endBlockNumber, networkName, eventType);
return eventFulfillment.replayEvents(
startBlockNumber, endBlockNumber, transactionId, networkName, eventType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import hlf.java.rest.client.model.EventType;
import hlf.java.rest.client.sdk.StandardCCEvent;
import hlf.java.rest.client.service.EventPublishService;
import hlf.java.rest.client.service.RecencyTransactionContext;
import hlf.java.rest.client.util.FabricClientConstants;
import hlf.java.rest.client.util.FabricEventParseUtil;
import java.nio.charset.StandardCharsets;
Expand All @@ -29,6 +30,8 @@ public class ChaincodeEventListener {

@Autowired private FabricProperties fabricProperties;

@Autowired private RecencyTransactionContext recencyTransactionContext;

private static String eventTxnId = FabricClientConstants.FABRIC_TRANSACTION_ID;

public void chaincodeEventListener(ContractEvent contractEvent) {
Expand All @@ -43,7 +46,16 @@ public void chaincodeEventListener(ContractEvent contractEvent) {
? new String(contractEvent.getPayload().get(), StandardCharsets.UTF_8)
: StringUtils.EMPTY;

publishChaincodeEvent(txId, chaincodeId, eventName, payload, channelName, blockNumber);
if (recencyTransactionContext.validateAndRemoveTransactionContext(txId)) {
publishChaincodeEvent(txId, chaincodeId, eventName, payload, channelName, blockNumber);
return;
}

log.info(
"TxnID {} for Block Number {} qualifies as a duplicate event.. Discarding the payload {} from being published.",
txId,
blockNumber,
payload);
}

@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,9 @@ public interface EventFulfillment {
* @return responseEntity ResponseEntity Transaction Response
*/
ResponseEntity<ClientResponseModel> replayEvents(
Long startBlockNumber, Long endBlockNumber, String networkName, String eventType);
Long startBlockNumber,
Long endBlockNumber,
String transactionId,
String networkName,
String eventType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package hlf.java.rest.client.service;

public interface RecencyTransactionContext {

void setTransactionContext(String transactionId);

boolean validateAndRemoveTransactionContext(String transactionId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package hlf.java.rest.client.service.impl;

import com.google.common.cache.Cache;
import hlf.java.rest.client.service.RecencyTransactionContext;

public class DefaultCacheBasedRecencyTransactionContext implements RecencyTransactionContext {

private Cache<String, Object> recencyCache;

public DefaultCacheBasedRecencyTransactionContext(Cache<String, Object> recencyCache) {
this.recencyCache = recencyCache;
}

@Override
public void setTransactionContext(String transactionId) {
recencyCache.put(transactionId, 1);
}

@Override
public boolean validateAndRemoveTransactionContext(String transactionId) {
synchronized (this) {
if (recencyCache.getIfPresent(transactionId) == null) {
return false;
}
recencyCache.invalidate(transactionId);
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ public class EventFulfillmentImpl implements EventFulfillment {
*/
@Override
public ResponseEntity<ClientResponseModel> replayEvents(
Long startBlockNumber, Long endBlockNumber, String networkName, String eventType) {
Long startBlockNumber,
Long endBlockNumber,
String transactionId,
String networkName,
String eventType) {
log.info(
"Initiate the replay of events since {} until {} on channel {} and type {}",
startBlockNumber,
Expand Down Expand Up @@ -87,8 +91,17 @@ public ResponseEntity<ClientResponseModel> replayEvents(
.forEach(
transactionActionInfo -> {
ChaincodeEvent chaincodeEvent = transactionActionInfo.getEvent();
chaincodeEventListener.listener(
StringUtils.EMPTY, blockInfo, chaincodeEvent, networkName);

if (Objects.isNull(transactionId)
|| chaincodeEvent.getTxId().equals(transactionId)) {
chaincodeEventListener.listener(
StringUtils.EMPTY, blockInfo, chaincodeEvent, networkName);
} else {
log.info(
"Event TransactionID {} does not match the provided TransactionID filter {}. Skipping event.",
chaincodeEvent.getTxId(),
transactionId);
}
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,18 @@ public boolean sendMessage(String msg, String fabricTxId, String eventName, Stri
@Override
public void onSuccess(SendResult<String, String> result) {
log.info(
"Sent message=["
+ msg
+ "] with offset=["
+ result.getRecordMetadata().offset()
+ "]");
"Sent message '{}' to partition {} for offset {}",
msg,
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}

@Override
public void onFailure(Throwable ex) {
log.warn("Unable to send message=[" + msg + "] due to : " + ex.getMessage());
log.error(
"Failed to send message event for Transaction ID {} due to {}",
fabricTxId,
ex.getMessage());
}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package hlf.java.rest.client.service.impl;

import hlf.java.rest.client.service.RecencyTransactionContext;

public class NoOpRecencyTransactionContext implements RecencyTransactionContext {
@Override
public void setTransactionContext(String transactionId) {
// NO-OP
}

@Override
public boolean validateAndRemoveTransactionContext(String transactionId) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import hlf.java.rest.client.model.MultiDataTransactionPayload;
import hlf.java.rest.client.model.MultiPrivateDataTransactionPayloadValidator;
import hlf.java.rest.client.service.HFClientWrapper;
import hlf.java.rest.client.service.RecencyTransactionContext;
import hlf.java.rest.client.service.TransactionFulfillment;
import hlf.java.rest.client.util.FabricEventParseUtil;
import java.io.IOException;
Expand Down Expand Up @@ -84,6 +85,8 @@ public class TransactionFulfillmentImpl implements TransactionFulfillment {

@Autowired private HFClientWrapper hfClientWrapper;

@Autowired private RecencyTransactionContext recencyTransactionContext;

@Override
public ResponseEntity<ClientResponseModel> initSmartContract(
String networkName,
Expand Down Expand Up @@ -218,6 +221,11 @@ public ResponseEntity<ClientResponseModel> writeTransactionToLedger(
}
}

recencyTransactionContext.setTransactionContext(fabricTransaction.getTransactionId());

log.info(
"Performing Write Transaction to Ledger with Tx ID {}",
fabricTransaction.getTransactionId());
byte[] result = fabricTransaction.submit(transactionParams);
resultString = new String(result, StandardCharsets.UTF_8);
log.info("Transaction Successfully Submitted - Response: " + resultString);
Expand Down Expand Up @@ -283,6 +291,12 @@ public ResponseEntity<ClientResponseModel> writePrivateTransactionToLedger(

transientParam.put(transientKey, jsonPayload.getBytes());
fabricTransaction.setTransient(transientParam);

recencyTransactionContext.setTransactionContext(fabricTransaction.getTransactionId());

log.info(
"Performing Write Transaction to Ledger with Tx ID {}",
fabricTransaction.getTransactionId());
byte[] result = fabricTransaction.submit(collection, transientKey);
resultString = new String(result, StandardCharsets.UTF_8);
log.info("Transaction Successfully Submitted - Response: " + resultString);
Expand Down Expand Up @@ -513,6 +527,11 @@ public ResponseEntity<ClientResponseModel> writeMultiDataTransactionToLedger(
// Map to String Array for dispatching via SDK method
String[] publicDataArgs = publicParamsList.toArray(new String[publicParamsList.size()]);

recencyTransactionContext.setTransactionContext(fabricTransaction.getTransactionId());

log.info(
"Performing Write Transaction to Ledger with Tx ID {}",
fabricTransaction.getTransactionId());
byte[] result = fabricTransaction.submit(publicDataArgs);

resultString = new String(result, StandardCharsets.UTF_8);
Expand Down
5 changes: 5 additions & 0 deletions src/main/resources/application.template
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,17 @@ kafka:
ssl-enabled: boolean
security-protocol: <Only supports SSL>
listenToFailedMessages: boolean <set as true if you wish to recieve errored Transaction records back to this topic>
enableIdempotence: boolean, enable strict Kafka producer idempotence

failed-message-listener: <Note, if you wish to receive errored Transactions to a dedicated topic, these details should be filled up>
brokerHost: <Comma separated list of boostrap servers>
topic: <topic to publish errored Records>
ssl-enabled: boolean
security-protocol: <Only supports SSL>
dedupe:
enable: boolean, if enabled, the runtime instance of Connector utilises an in-memory recency cache that would validate a recent submission of Transaction prior to emitting an event with the matching Transaction ID.
recency-window-size: applicable only if dedupe is enabled, defines the recency cache size.
recency-window-expiry-in-minutes: applicable only if dedupe is enabled, defines the recency cache TTL in minites
---
spring:
profiles: container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import hlf.java.rest.client.exception.FabricTransactionException;
import hlf.java.rest.client.exception.ServiceException;
import hlf.java.rest.client.model.ClientResponseModel;
import hlf.java.rest.client.service.RecencyTransactionContext;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -42,6 +43,7 @@ public class TransactionFulfillmentImplTest {
@Mock Network network;
@Mock Contract contract;
@Mock Transaction transaction;
@Mock RecencyTransactionContext recencyTransactionContext;

private String testNetworkString = "some string";
private String testContractString = "some string";
Expand Down Expand Up @@ -69,7 +71,9 @@ public void writeTransactionToLedgerTest()
Mockito.when(gatewayConnetion.getNetwork(Mockito.anyString())).thenReturn(network);
Mockito.when(network.getContract(Mockito.anyString())).thenReturn(contract);
Mockito.when(contract.createTransaction(Mockito.anyString())).thenReturn(transaction);
Mockito.when(transaction.getTransactionId()).thenReturn("TX-1234");
Mockito.when(transaction.submit(Mockito.<String>any())).thenReturn(byteArrayResponse);
Mockito.doNothing().when(recencyTransactionContext).setTransactionContext(Mockito.anyString());
/*
* Mockito.when(contract.submitTransaction(Mockito.anyString(),
* Mockito.<String>any())) .thenReturn(byteArrayResponse);
Expand All @@ -92,7 +96,9 @@ public void writeTransactionToLedgerContractExceptionTest()
Mockito.when(gatewayConnetion.getNetwork(Mockito.anyString())).thenReturn(network);
Mockito.when(network.getContract(Mockito.anyString())).thenReturn(contract);
Mockito.when(contract.createTransaction(Mockito.anyString())).thenReturn(transaction);
Mockito.when(transaction.getTransactionId()).thenReturn("TX-1234");
Mockito.when(transaction.submit(Mockito.<String>any())).thenThrow(new ContractException(""));
Mockito.doNothing().when(recencyTransactionContext).setTransactionContext(Mockito.anyString());
/*
* Mockito.when(contract.submitTransaction(Mockito.anyString(),
* Mockito.<String>any())) .thenThrow(new ContractException(""));
Expand All @@ -115,7 +121,9 @@ public void writeTransactionToLedgerTimeoutExceptionTest()
Mockito.when(gatewayConnetion.getNetwork(Mockito.anyString())).thenReturn(network);
Mockito.when(network.getContract(Mockito.anyString())).thenReturn(contract);
Mockito.when(contract.createTransaction(Mockito.anyString())).thenReturn(transaction);
Mockito.when(transaction.getTransactionId()).thenReturn("TX-1234");
Mockito.when(transaction.submit(Mockito.<String>any())).thenThrow(new TimeoutException());
Mockito.doNothing().when(recencyTransactionContext).setTransactionContext(Mockito.anyString());
/*
* Mockito.when(contract.submitTransaction(Mockito.anyString(),
* Mockito.<String>any())) .thenThrow(new TimeoutException());
Expand All @@ -138,7 +146,9 @@ public void writeTransactionToLedgerInterruptExceptionTest()
Mockito.when(gatewayConnetion.getNetwork(Mockito.anyString())).thenReturn(network);
Mockito.when(network.getContract(Mockito.anyString())).thenReturn(contract);
Mockito.when(contract.createTransaction(Mockito.anyString())).thenReturn(transaction);
Mockito.when(transaction.getTransactionId()).thenReturn("TX-1234");
Mockito.when(transaction.submit(Mockito.<String>any())).thenThrow(new InterruptedException());
Mockito.doNothing().when(recencyTransactionContext).setTransactionContext(Mockito.anyString());
/*
* Mockito.when(contract.submitTransaction(Mockito.anyString(),
* Mockito.<String>any())) .thenThrow(new InterruptedException());
Expand Down
Loading
Loading