Skip to content

Commit

Permalink
refactor tasks and retry pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
barnjamin committed Oct 15, 2023
1 parent 04d6a5b commit 7530482
Show file tree
Hide file tree
Showing 13 changed files with 270 additions and 271 deletions.
20 changes: 9 additions & 11 deletions connect/__tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,15 @@ describe("Wormhole Tests", () => {
expect(vaa).toBeUndefined();
});

// TODO: Fix this test
//test("returns after first try fails", async () => {
// publicRpcMock.givenSignedVaaRequestWorksAfterRetry();
// const vaa = await wh.getVAABytes(
// "Base",
// testing.utils.makeChainAddress("Base").address,
// 1n,
// 5,
// );
// expect(vaa).toBeDefined();
//});
test("returns after first try fails", async () => {
publicRpcMock.givenSignedVaaRequestWorksAfterRetry();
const vaa = await wh.getVAABytes(
"Base",
testing.utils.makeChainAddress("Base").address,
1n,
);
expect(vaa).toBeDefined();
});
});
});

Expand Down
63 changes: 0 additions & 63 deletions connect/src/api.ts

This file was deleted.

2 changes: 2 additions & 0 deletions connect/src/circle-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ export async function getCircleAttestation(
const attestation = mapCircleAttestation(response?.data);
return attestation.message === "PENDING" ? null : attestation.message;
} catch (error) {
// This is a 404 error, which means the attestation is not yet available
// since its not available yet, we return null signaling it can be tried again
if (!(axios.isAxiosError(error) && error?.response?.status === 404)) {
return null;
}
Expand Down
File renamed without changes.
2 changes: 2 additions & 0 deletions connect/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import {
ChainsConfig,
} from "@wormhole-foundation/sdk-definitions";

export const DEFAULT_TASK_TIMEOUT = 60 * 1000; // 1 minute in milliseconds

/*
TODO:
add missing chains for each config
Expand Down
2 changes: 1 addition & 1 deletion connect/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export * from "./protocols/cctpTransfer";
export * from "./protocols/gatewayTransfer";

export * as circle from "./circle-api";
export * as api from "./api";
export * as api from "./whscan-api";

// Re-export from core packages
export {
Expand Down
151 changes: 66 additions & 85 deletions connect/src/protocols/cctpTransfer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import {
TransferState,
WormholeTransfer,
} from "../wormholeTransfer";
import { retry } from "./retry";
import { retry } from "../tasks";
import { signSendWait } from "../common";
import { DEFAULT_TASK_TIMEOUT } from "../config";

export class CCTPTransfer implements WormholeTransfer {
private readonly wh: Wormhole;
Expand All @@ -43,7 +45,7 @@ export class CCTPTransfer implements WormholeTransfer {
transfer: CCTPTransferDetails;

// Populated after Initialized
txids?: TxHash[];
txids?: TransactionId[];

// Populated if !automatic and after initialized
circleAttestations?: {
Expand Down Expand Up @@ -75,16 +77,26 @@ export class CCTPTransfer implements WormholeTransfer {
static async from(
wh: Wormhole,
from: WormholeMessageId,
timeout?: number,
): Promise<CCTPTransfer>;
static async from(
wh: Wormhole,
from: CircleMessageId,
timeout?: number,
): Promise<CCTPTransfer>;
static async from(
wh: Wormhole,
from: TransactionId,
timeout?: number,
): Promise<CCTPTransfer>;
static async from(wh: Wormhole, from: CircleMessageId): Promise<CCTPTransfer>;
static async from(wh: Wormhole, from: TransactionId): Promise<CCTPTransfer>;
static async from(
wh: Wormhole,
from:
| CCTPTransferDetails
| WormholeMessageId
| CircleMessageId
| TransactionId,
timeout: number = DEFAULT_TASK_TIMEOUT,
): Promise<CCTPTransfer> {
// This is a new transfer, just return the object
if (isCCTPTransferDetails(from)) {
Expand All @@ -94,15 +106,15 @@ export class CCTPTransfer implements WormholeTransfer {
// This is an existing transfer, fetch the details
let tt: CCTPTransfer | undefined;
if (isWormholeMessageId(from)) {
tt = await CCTPTransfer.fromWormholeMessageId(wh, from);
tt = await CCTPTransfer.fromWormholeMessageId(wh, from, timeout);
} else if (isTransactionIdentifier(from)) {
tt = await CCTPTransfer.fromTransaction(wh, from);
tt = await CCTPTransfer.fromTransaction(wh, from, timeout);
} else if (isCircleMessageId(from)) {
tt = await CCTPTransfer.fromCircleMessageId(wh, from);
tt = await CCTPTransfer.fromCircleMessageId(wh, from, timeout);
} else {
throw new Error("Invalid `from` parameter for CCTPTransfer");
}
await tt.fetchAttestation();
await tt.fetchAttestation(timeout);

return tt;
}
Expand All @@ -111,6 +123,7 @@ export class CCTPTransfer implements WormholeTransfer {
private static async fromWormholeMessageId(
wh: Wormhole,
from: WormholeMessageId,
timeout: number,
): Promise<CCTPTransfer> {
const { chain, emitter, sequence } = from;
const vaa = await CCTPTransfer.getTransferVaa(wh, chain, emitter, sequence);
Expand Down Expand Up @@ -152,6 +165,7 @@ export class CCTPTransfer implements WormholeTransfer {
private static async fromCircleMessageId(
wh: Wormhole,
messageId: CircleMessageId,
timeout: number,
): Promise<CCTPTransfer> {
const [message, hash] = deserializeCircleMessage(
hexByteStringToUint8Array(messageId.message),
Expand Down Expand Up @@ -187,6 +201,7 @@ export class CCTPTransfer implements WormholeTransfer {
private static async fromTransaction(
wh: Wormhole,
from: TransactionId,
timeout: number,
): Promise<CCTPTransfer> {
const { chain, txid } = from;
const originChain = wh.getChain(chain);
Expand All @@ -201,7 +216,7 @@ export class CCTPTransfer implements WormholeTransfer {
// If we found a VAA message, use it
let ct: CCTPTransfer;
if (msgIds.length > 0) {
ct = await CCTPTransfer.fromWormholeMessageId(wh, msgIds[0]);
ct = await CCTPTransfer.fromWormholeMessageId(wh, msgIds[0], timeout);
} else {
// Otherwise try to parse out a circle message
const cb = await originChain.getCircleBridge();
Expand All @@ -217,7 +232,7 @@ export class CCTPTransfer implements WormholeTransfer {
}

ct.state = TransferState.Initiated;
ct.txids = [from.txid];
ct.txids = [from];
return ct;
}

Expand Down Expand Up @@ -257,28 +272,13 @@ export class CCTPTransfer implements WormholeTransfer {
);
}

let unsigned: UnsignedTransaction[] = [];
const txHashes: TxHash[] = [];
for await (const tx of xfer) {
unsigned.push(tx);
if (!tx.parallelizable) {
const signed = await signer.sign(unsigned);
txHashes.push(...(await fromChain.sendWait(signed)));
unsigned = [];
}
}
if (unsigned.length > 0) {
const signed = await signer.sign(unsigned);
txHashes.push(...(await fromChain.sendWait(signed)));
}

this.txids = txHashes;
this.txids = await signSendWait(fromChain, xfer, signer);
this.state = TransferState.Initiated;

return txHashes;
return this.txids!.map(({ txid }) => txid);
}

private async fetchWormholeAttestation(
private async _fetchWormholeAttestation(
timeout?: number,
): Promise<WormholeMessageId[]> {
if (!this.vaas || this.vaas.length == 0)
Expand All @@ -300,7 +300,7 @@ export class CCTPTransfer implements WormholeTransfer {
return this.vaas.map((v) => v.id);
}

private async fetchCircleAttestation(
private async _fetchCircleAttestation(
timeout?: number,
): Promise<CircleMessageId[]> {
if (!this.circleAttestations || this.circleAttestations.length == 0) {
Expand All @@ -315,7 +315,7 @@ export class CCTPTransfer implements WormholeTransfer {
const fromChain = this.wh.getChain(this.transfer.from.chain);

const cb = await fromChain.getCircleBridge();
const circleMessage = await cb.parseTransactionDetails(txid);
const circleMessage = await cb.parseTransactionDetails(txid.txid);
this.circleAttestations = [{ id: circleMessage.messageId }];
}

Expand Down Expand Up @@ -352,8 +352,8 @@ export class CCTPTransfer implements WormholeTransfer {
throw new Error("Invalid state transition in `fetchAttestation`");

const ids: AttestationId[] = this.transfer.automatic
? await this.fetchWormholeAttestation(timeout)
: await this.fetchCircleAttestation(timeout);
? await this._fetchWormholeAttestation(timeout)
: await this._fetchCircleAttestation(timeout);

this.state = TransferState.Attested;

Expand All @@ -372,71 +372,52 @@ export class CCTPTransfer implements WormholeTransfer {
if (this.state < TransferState.Attested)
throw new Error("Invalid state transition in `finish`");

// If its automatic, this does not need to be called
if (this.transfer.automatic) {
if (!this.vaas) throw new Error("No VAA details available");
if (this.vaas.length > 1)
throw new Error(`Expected a VAA, found ${this.vaas.length}`);

const toChain = this.wh.getChain(this.transfer.to.chain);

const txHashes: TxHash[] = [];
for (const cachedVaa of this.vaas) {
const { vaa } = cachedVaa;
if (!vaa) throw new Error("No Vaa found");
const tb = await toChain.getAutomaticCircleBridge();
//TODO: tb.redeem()
throw new Error("No method to redeem auto circle bridge tx (yet)");
}
return txHashes;
} else {
// If no circle attestations, fetch 'em
if (!this.circleAttestations) await this.fetchAttestation();

const toChain = this.wh.getChain(this.transfer.to.chain);

const txHashes: TxHash[] = [];
for (const cachedAttestation of this.circleAttestations!) {
const { id, attestation } = cachedAttestation;

if (!attestation)
throw new Error(`No Circle Attestation for ${id.hash}`);

const tb = await toChain.getCircleBridge();
const xfer = tb.redeem(
this.transfer.to.address,
id.message,
attestation,
);

let unsigned: UnsignedTransaction[] = [];
for await (const tx of xfer) {
unsigned.push(tx);
// If we get a non-parallelizable tx, sign and send the transactions
// we've gotten so far
if (!tx.parallelizable) {
const signed = await signer.sign(unsigned);
txHashes.push(...(await toChain.sendWait(signed)));
// reset unsigned
unsigned = [];
}
}

if (unsigned.length > 0) {
const signed = await signer.sign(unsigned);
txHashes.push(...(await toChain.sendWait(signed)));
}
}
return txHashes;
const { vaa } = this.vaas[0];
if (!vaa) throw new Error("No VAA found");

//const tb = await toChain.getAutomaticCircleBridge();
//const xfer = tb.redeem(vaa);
//const txids = await signSendWait(toChain, xfer, signer);
throw new Error("No method to redeem auto circle bridge tx (yet)");
}

if (!this.circleAttestations)
throw new Error("No Circle Attestations found");

if (this.circleAttestations.length > 1)
throw new Error(
`Expected a single circle attestation, found ${this.circleAttestations.length}`,
);

const toChain = this.wh.getChain(this.transfer.to.chain);

const { id, attestation } = this.circleAttestations[0];

if (!attestation) throw new Error(`No Circle Attestation for ${id.hash}`);

const tb = await toChain.getCircleBridge();
const xfer = tb.redeem(this.transfer.to.address, id.message, attestation);

const txids = await signSendWait(toChain, xfer, signer);
this.txids?.push(...txids);
return txids.map(({ txid }) => txid);
}

static async getTransferVaa(
wh: Wormhole,
chain: ChainName,
emitter: UniversalAddress | NativeAddress<PlatformName>,
sequence: bigint,
retries: number = 5,
timeout?: number,
): Promise<VAA<"CircleTransferRelay">> {
const vaaBytes = await wh.getVAABytes(chain, emitter, sequence, retries);
if (!vaaBytes) throw new Error(`No VAA available after ${retries} retries`);
const vaaBytes = await wh.getVAABytes(chain, emitter, sequence, timeout);
if (!vaaBytes) throw new Error(`No VAA available after timeout exhausted`);

const partial = deserialize("Uint8Array", vaaBytes);
switch (partial.payload[0]) {
Expand Down
Loading

0 comments on commit 7530482

Please sign in to comment.