diff --git a/src/jobs/element-sync/queues/backfill-queue-offers.ts b/src/jobs/element-sync/queues/backfill-queue-offers.ts index b4fe723..31aecaa 100644 --- a/src/jobs/element-sync/queues/backfill-queue-offers.ts +++ b/src/jobs/element-sync/queues/backfill-queue-offers.ts @@ -34,7 +34,7 @@ if (config.doBackfillWork && config.doElementWork) { const { startTime, endTime }: Data = job.data; try { - const lastCreatedAt = await fetchOrders("buy", startTime); + const cursor = await fetchOrders("buy", startTime); logger.info( BACKFILL_QUEUE_NAME, @@ -42,8 +42,8 @@ if (config.doBackfillWork && config.doElementWork) { ); // If there are more order within th given time frame - if (lastCreatedAt <= endTime) { - job.data.newStartTime = lastCreatedAt; + if (cursor <= endTime) { + job.data.newStartTime = cursor; } } catch (error) { logger.error( diff --git a/src/jobs/element-sync/queues/backfill-queue.ts b/src/jobs/element-sync/queues/backfill-queue.ts index dc66639..9a26583 100644 --- a/src/jobs/element-sync/queues/backfill-queue.ts +++ b/src/jobs/element-sync/queues/backfill-queue.ts @@ -29,12 +29,15 @@ if (config.doBackfillWork && config.doElementWork) { type Data = { startTime: number; endTime: number; + offset?: number; }; + const limit = 50; const { startTime, endTime }: Data = job.data; + let offset = job.data?.offset ?? 0; try { - const cursor = await fetchOrders("sell", startTime, endTime); + const cursor = await fetchOrders("sell", startTime, endTime, offset, limit); logger.info( BACKFILL_QUEUE_NAME, @@ -44,6 +47,7 @@ if (config.doBackfillWork && config.doElementWork) { // If there are more order within th given time frame if (cursor >= startTime) { job.data.newEndTime = cursor >= endTime ? endTime - 10 : cursor; + job.data.offset = offset + limit; } } catch (error) { logger.error( @@ -59,7 +63,7 @@ if (config.doBackfillWork && config.doElementWork) { // If there's newEndTime schedule the next job if (job.data.newEndTime) { await new Promise((resolve) => setTimeout(resolve, 1000)); // Wait to avoid rate-limiting - await addToElementBackfillQueue(job.data.startTime, job.data.newEndTime); + await addToElementBackfillQueue(job.data.startTime, job.data.newEndTime, job.data.offset ); } }); @@ -71,6 +75,7 @@ if (config.doBackfillWork && config.doElementWork) { export const addToElementBackfillQueue = async ( startTime: number, endTime: number, + offset: number = 0, delayMs: number = 0 ) => { // Make sure endTime is bigger than startTime @@ -80,7 +85,7 @@ export const addToElementBackfillQueue = async ( await backfillQueue.add( BACKFILL_QUEUE_NAME, - { startTime, endTime }, + { startTime, endTime, offset }, { delay: delayMs } ); }; diff --git a/src/jobs/element-sync/utils.ts b/src/jobs/element-sync/utils.ts index 30bd932..67ced2e 100644 --- a/src/jobs/element-sync/utils.ts +++ b/src/jobs/element-sync/utils.ts @@ -14,7 +14,9 @@ import { Element, ElementOrder, SaleKind } from "../../utils/element"; export const fetchOrders = async ( side: "sell" | "buy", listedAfter = 0, - listedBefore = 0 + listedBefore = 0, + offset = 0, + limit = 50 ) => { logger.debug( "fetch_orders_element", @@ -22,7 +24,6 @@ export const fetchOrders = async ( ); const element = new Element(); - let limit = 50; let newCursor = 0; let numOrders = 0; @@ -34,6 +35,7 @@ export const fetchOrders = async ( listed_after: listedAfter > 0 ? listedAfter : undefined, listed_before: listedBefore > 0 ? listedBefore : undefined, limit, + offset }); try { @@ -126,7 +128,7 @@ export const fetchOrders = async ( const lastOrder = _.last(orders); if (lastOrder) { - newCursor = lastOrder.createTime; + newCursor = lastOrder.listingTime; } }