Skip to content

Commit

Permalink
various small test fixes & more tests for claim processing (EthClaimM…
Browse files Browse the repository at this point in the history
…anager)
  • Loading branch information
pk910 committed Jun 26, 2023
1 parent d9ee6da commit 0b3b5bc
Show file tree
Hide file tree
Showing 22 changed files with 802 additions and 145 deletions.
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
],
"outputPath": "dist"
},
"nyc": {
"exclude": [
"libs/*.js",
"tests/**"
]
},
"devDependencies": {
"@types/better-sqlite3": "^7.6.4",
"@types/chai": "^4.3.5",
Expand Down
7 changes: 4 additions & 3 deletions src/common/FaucetProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ export class FaucetProcess extends TypedEmitter<FaucetProcessEvents> {
});
}

private async shutdown(code: number) {
public async shutdown(code: number) {
try {
setTimeout(() => process.exit(code), 10 * 1000);
await ServiceManager.GetService(SessionManager).saveAllSessions();
await ServiceManager.GetService(FaucetDatabase).closeDatabase();
let dbsvc = ServiceManager.GetService(FaucetDatabase);
await ServiceManager.DisposeAllServices();
await dbsvc.closeDatabase();
} catch(ex) {}
process.exit(code);
}
Expand Down
12 changes: 10 additions & 2 deletions src/common/ServiceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,19 @@ export class ServiceManager {
return serviceObj;
}

public static ClearAllServices() {
public static DisposeAllServices(): Promise<void> {
let promises: Promise<void>[] = [];
this._serviceInstances.forEach((instanceArr) => {
if(instanceArr.length > 0)
if(instanceArr.length > 0) {
instanceArr.forEach((instance) => {
if(typeof (instance[1] as any).dispose === "function") {
promises.push((instance[1] as any).dispose());
}
});
instanceArr.splice(0, instanceArr.length);
}
});
return Promise.all(promises).then();
}

}
11 changes: 10 additions & 1 deletion src/db/FaucetDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export enum FaucetDbDriver {
export class FaucetDatabase {

private initialized: boolean;
private cleanupTimer: NodeJS.Timer;
private db: BaseDriver;
private dbWorker: Worker;
private moduleDBs: {[module: string]: FaucetModuleDB} = {};
Expand All @@ -34,11 +35,19 @@ export class FaucetDatabase {
this.initialized = true;

await this.initDatabase();
setInterval(() => {
this.cleanupTimer = setInterval(() => {
this.cleanStore();
}, (1000 * 60 * 60 * 2));
}

public dispose() {
if(!this.initialized)
return;
this.initialized = false;

clearInterval(this.cleanupTimer);
}

private async initDatabase(): Promise<void> {
switch(faucetConfig.database.driver) {
case "sqlite":
Expand Down
55 changes: 23 additions & 32 deletions src/eth/EthClaimManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { FaucetHttpServer } from '../webserv/FaucetHttpServer';
import { IncomingMessage } from 'http';
import { EthClaimNotificationClient, IEthClaimNotificationData } from './EthClaimNotificationClient';
import { FaucetOutflowModule } from '../modules/faucet-outflow/FaucetOutflowModule';
import { clearInterval } from 'timers';

export enum ClaimTxStatus {
QUEUE = "queue",
Expand Down Expand Up @@ -58,7 +59,7 @@ export interface EthClaimData {

export class EthClaimManager {
private initialized: boolean;

private queueInterval: NodeJS.Timer;
private claimTxDict: {[session: string]: EthClaimInfo} = {};
private claimTxQueue: EthClaimInfo[] = [];
private pendingTxQueue: {[hash: string]: EthClaimInfo} = {};
Expand Down Expand Up @@ -112,7 +113,16 @@ export class EthClaimManager {
ServiceManager.GetService(FaucetHttpServer).addWssEndpoint("claim", /^\/ws\/claim($|\?)/, (req, ws, ip) => this.processClaimNotificationWebSocket(req, ws, ip));

// start queue processing interval
setInterval(() => this.processQueue(), 2000);
this.queueInterval = setInterval(() => this.processQueue(), 2000);
}

public dispose() {
if(!this.initialized)
return;
this.initialized = false;

EthClaimNotificationClient.resetClaimNotification();
clearInterval(this.queueInterval);
}

private async processClaimNotificationWebSocket(req: IncomingMessage, ws: WebSocket, remoteIp: string) {
Expand All @@ -121,41 +131,25 @@ export class EthClaimManager {
let urlParts = req.url.split("?");
let url = new URLSearchParams(urlParts[1]);
sessionId = url.get("session");
} catch(ex) {
ws.send(JSON.stringify({
action: "error",
data: {
reason: "session id missing"
}
}));
ws.close();
return;
}

let sessionInfo: FaucetSessionStoreData
if(!sessionId || !(sessionInfo = await ServiceManager.GetService(FaucetDatabase).getSession(sessionId)))
throw "session not found";

let sessionInfo: FaucetSessionStoreData
if(!sessionId || !(sessionInfo = await ServiceManager.GetService(FaucetDatabase).getSession(sessionId))) {
ws.send(JSON.stringify({
action: "error",
data: {
reason: "session not found"
}
}));
ws.close();
return;
}
if(sessionInfo.status !== FaucetSessionStatus.CLAIMING)
throw "session not claiming";

if(sessionInfo.status !== FaucetSessionStatus.CLAIMING) {
new EthClaimNotificationClient(ws, sessionInfo.claim.claimIdx);
} catch(ex) {
ws.send(JSON.stringify({
action: "error",
data: {
reason: "session claiming"
reason: ex.toString()
}
}));
ws.close();
return;
}

new EthClaimNotificationClient(ws, sessionInfo.claim.claimIdx);
}

public getTransactionQueue(queueOnly?: boolean): EthClaimInfo[] {
Expand Down Expand Up @@ -193,8 +187,6 @@ export class EthClaimManager {
public async createSessionClaim(sessionData: FaucetSessionStoreData, userInput: any): Promise<EthClaimInfo> {
if(sessionData.status !== FaucetSessionStatus.CLAIMABLE)
throw new FaucetError("NOT_CLAIMABLE", "cannot claim session: not claimable (state: " + sessionData.status + ")");
if(this.claimTxDict[sessionData.sessionId])
throw new FaucetError("NOT_CLAIMABLE", "cannot claim session: already claiming");
if(BigInt(sessionData.dropAmount) < BigInt(faucetConfig.minDropAmount))
throw new FaucetError("AMOUNT_TOO_LOW", "drop amount lower than minimum");
if(BigInt(sessionData.dropAmount) > BigInt(faucetConfig.maxDropAmount))
Expand All @@ -218,7 +210,7 @@ export class EthClaimManager {

// prevent multi claim via race condition
if(this.claimTxDict[sessionData.sessionId])
throw new FaucetError("NOT_CLAIMABLE", "cannot claim session: already claiming (race condition)");
throw new FaucetError("RACE_CLAIMING", "cannot claim session: already claiming (race condition)");

claimInfo.claim = {
claimIdx: this.lastClaimTxIdx++,
Expand Down Expand Up @@ -294,7 +286,6 @@ export class EthClaimManager {
return;
}
if(
!walletState.ready ||
walletState.balance - BigInt(faucetConfig.spareFundsAmount) < BigInt(claimTx.amount) ||
walletState.nativeBalance <= BigInt(faucetConfig.ethTxGasLimit) * BigInt(faucetConfig.ethTxMaxFee)
) {
Expand Down Expand Up @@ -336,7 +327,7 @@ export class EthClaimManager {
claimTx.claim.txBlock = txData.block;
claimTx.claim.txFee = txData.fee.toString();

this.lastProcessedClaimTxIdx = claimTx.claim.claimIdx;
this.lastConfirmedClaimTxIdx = claimTx.claim.claimIdx;

claimTx.claim.claimStatus = ClaimTxStatus.CONFIRMED;
this.updateClaimStatus(claimTx);
Expand Down
52 changes: 21 additions & 31 deletions src/eth/EthClaimNotificationClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,21 @@ export interface IEthClaimNotificationData {
}

export class EthClaimNotificationClient {
public static cfgPingInterval = 30;
public static cfgPingTimeout = 120;

private static activeClients: EthClaimNotificationClient[] = [];
private static lastNotificationData: IEthClaimNotificationData;

public static broadcastClaimNotification(data: IEthClaimNotificationData) {
this.lastNotificationData = data;
this.activeClients.forEach((client) => client.sendClaimNotification(data));
for(let i = this.activeClients.length - 1; i >= 0; i--) {
this.activeClients[i].sendClaimNotification(data);
}
}

public static resetClaimNotification() {
this.lastNotificationData = null;
}

private socket: WebSocket;
Expand All @@ -28,17 +37,15 @@ export class EthClaimNotificationClient {

this.socket.on("ping", (data) => {
this.lastPingPong = new Date();
if(this.socket)
this.socket.pong(data)
this.socket?.pong(data)
});
this.socket.on("pong", (data) => {
this.lastPingPong = new Date();
});
this.socket.on("error", (err) => {
ServiceManager.GetService(FaucetProcess).emitLog(FaucetLogLevel.WARNING, "WebSocket error: " + err.toString());
try {
if(this.socket)
this.socket.close();
this.socket?.close();
} catch(ex) {}
this.dispose();
});
Expand All @@ -53,10 +60,6 @@ export class EthClaimNotificationClient {
}
}

public isReady(): boolean {
return !!this.socket;
}

private dispose() {
this.socket = null;

Expand All @@ -72,45 +75,32 @@ export class EthClaimNotificationClient {
}

public killClient(reason?: string) {
if(!this.socket)
return;
try {
this.sendMessage("error", {
reason: reason,
});
this.socket.close();
this.socket?.close();
} catch(ex) {}
this.dispose();
}

private pingClientLoop() {
this.pingTimer = setInterval(() => {
if(!this.socket)
return;

let pingpongTime = Math.floor(((new Date()).getTime() - this.lastPingPong.getTime()) / 1000);
if(pingpongTime > 120) {
if(pingpongTime > EthClaimNotificationClient.cfgPingTimeout) {
this.killClient("ping timeout");
return;
}

this.socket.ping();
}, 60 * 1000);
this.socket?.ping();
}, EthClaimNotificationClient.cfgPingInterval * 1000);
}

private sendMessage(action: string, data?: any, rsp?: any) {
if(!this.socket)
return;

let message: any = {
action: action
};
if(data !== undefined)
message.data = data;
if(rsp !== undefined)
message.rsp = rsp;

this.socket.send(JSON.stringify(message));
private sendMessage(action: string, data: any) {
this.socket?.send(JSON.stringify({
action: action,
data: data,
}));
}

private sendClaimNotification(data: IEthClaimNotificationData) {
Expand Down
Loading

0 comments on commit 0b3b5bc

Please sign in to comment.