Skip to content

Commit

Permalink
feat: update element backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
ipeleg committed Dec 12, 2024
1 parent 5d5c802 commit 19d4baa
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
6 changes: 3 additions & 3 deletions src/jobs/element-sync/queues/backfill-queue-offers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ if (config.doBackfillWork && config.doElementWork) {
const { startTime, endTime }: Data = job.data;

try {
const lastCreatedAt = await fetchOrders("buy", startTime);
const cursor = await fetchOrders("buy", startTime);

logger.info(
BACKFILL_QUEUE_NAME,
`Element backfilled offers from startTime=${startTime} to endTime=${endTime}`
);

// If there are more order within th given time frame
if (lastCreatedAt <= endTime) {
job.data.newStartTime = lastCreatedAt;
if (cursor <= endTime) {
job.data.newStartTime = cursor;
}
} catch (error) {
logger.error(
Expand Down
11 changes: 8 additions & 3 deletions src/jobs/element-sync/queues/backfill-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ if (config.doBackfillWork && config.doElementWork) {
type Data = {
startTime: number;
endTime: number;
offset?: number;
};

const limit = 50;
const { startTime, endTime }: Data = job.data;
let offset = job.data?.offset ?? 0;

try {
const cursor = await fetchOrders("sell", startTime, endTime);
const cursor = await fetchOrders("sell", startTime, endTime, offset, limit);

logger.info(
BACKFILL_QUEUE_NAME,
Expand All @@ -44,6 +47,7 @@ if (config.doBackfillWork && config.doElementWork) {
// If there are more order within th given time frame
if (cursor >= startTime) {
job.data.newEndTime = cursor >= endTime ? endTime - 10 : cursor;
job.data.offset = offset + limit;
}
} catch (error) {
logger.error(
Expand All @@ -59,7 +63,7 @@ if (config.doBackfillWork && config.doElementWork) {
// If there's newEndTime schedule the next job
if (job.data.newEndTime) {
await new Promise((resolve) => setTimeout(resolve, 1000)); // Wait to avoid rate-limiting
await addToElementBackfillQueue(job.data.startTime, job.data.newEndTime);
await addToElementBackfillQueue(job.data.startTime, job.data.newEndTime, job.data.offset );
}
});

Expand All @@ -71,6 +75,7 @@ if (config.doBackfillWork && config.doElementWork) {
export const addToElementBackfillQueue = async (
startTime: number,
endTime: number,
offset: number = 0,
delayMs: number = 0
) => {
// Make sure endTime is bigger than startTime
Expand All @@ -80,7 +85,7 @@ export const addToElementBackfillQueue = async (

await backfillQueue.add(
BACKFILL_QUEUE_NAME,
{ startTime, endTime },
{ startTime, endTime, offset },
{ delay: delayMs }
);
};
8 changes: 5 additions & 3 deletions src/jobs/element-sync/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ import { Element, ElementOrder, SaleKind } from "../../utils/element";
export const fetchOrders = async (
side: "sell" | "buy",
listedAfter = 0,
listedBefore = 0
listedBefore = 0,
offset = 0,
limit = 50
) => {
logger.debug(
"fetch_orders_element",
`listedAfter = ${listedAfter} Fetching orders from Element`
);

const element = new Element();
let limit = 50;
let newCursor = 0;
let numOrders = 0;

Expand All @@ -34,6 +35,7 @@ export const fetchOrders = async (
listed_after: listedAfter > 0 ? listedAfter : undefined,
listed_before: listedBefore > 0 ? listedBefore : undefined,
limit,
offset
});

try {
Expand Down Expand Up @@ -126,7 +128,7 @@ export const fetchOrders = async (
const lastOrder = _.last(orders);

if (lastOrder) {
newCursor = lastOrder.createTime;
newCursor = lastOrder.listingTime;
}
}

Expand Down

0 comments on commit 19d4baa

Please sign in to comment.