Skip to content

Commit

Permalink
feat: log epoch in consumption logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Candinya committed Jun 25, 2024
1 parent 7461072 commit 0c897fa
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 8 deletions.
2 changes: 1 addition & 1 deletion internal/database/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Client interface {
SaveBillingRecordWithdrawal(ctx context.Context, billingRecord *schema.BillingRecordWithdrawal) error
SaveBillingRecordCollected(ctx context.Context, billingRecord *schema.BillingRecordCollected) error

PrepareBillingCollectTokens(ctx context.Context, nowTime time.Time) (*map[common.Address]schema.BillingCollectDataPerAddress, error)
PrepareBillingCollectTokens(ctx context.Context, nowTime time.Time, epoch *big.Int) (*map[common.Address]schema.BillingCollectDataPerAddress, error)
PrepareBillingWithdrawTokens(ctx context.Context) (*map[common.Address]float64, error)
UpdateBillingRuLimit(ctx context.Context, succeededUsersWithRu map[common.Address]int64) error

Expand Down
9 changes: 6 additions & 3 deletions internal/database/dialer/cockroachdb/client_billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -40,7 +41,7 @@ func (c *client) SaveBillingRecordCollected(ctx context.Context, billingRecord *
return c.database.WithContext(ctx).Create(&value).Error
}

func (c *client) prepareBillingCollectTokensCreateOrUpdateConsumptionLog(nowTime time.Time, k *table.GatewayKey, tx *gorm.DB) error {
func (c *client) prepareBillingCollectTokensCreateOrUpdateConsumptionLog(nowTime time.Time, epoch *big.Int, k *table.GatewayKey, tx *gorm.DB) error {
var possibleExistLog table.GatewayConsumptionLog
err := tx.Where("consumption_date = ? AND key_id = ?", nowTime, k.ID).
First(&possibleExistLog).
Expand All @@ -51,6 +52,7 @@ func (c *client) prepareBillingCollectTokensCreateOrUpdateConsumptionLog(nowTime
if errors.Is(err, gorm.ErrRecordNotFound) {
// Fine, let's create it
err = tx.Create(&table.GatewayConsumptionLog{
Epoch: epoch.Uint64(),
KeyID: k.ID,
ConsumptionDate: nowTime,
RuUsed: k.RuUsedCurrent,
Expand All @@ -59,6 +61,7 @@ func (c *client) prepareBillingCollectTokensCreateOrUpdateConsumptionLog(nowTime
} else {
// TODO: Error happens, but we don't know what's this, create a new record for now.
err = tx.Create(&table.GatewayConsumptionLog{
Epoch: epoch.Uint64(),
KeyID: k.ID,
ConsumptionDate: nowTime,
RuUsed: k.RuUsedCurrent,
Expand All @@ -78,7 +81,7 @@ func (c *client) prepareBillingCollectTokensCreateOrUpdateConsumptionLog(nowTime
return err
}

func (c *client) PrepareBillingCollectTokens(ctx context.Context, nowTime time.Time) (*map[common.Address]schema.BillingCollectDataPerAddress, error) {
func (c *client) PrepareBillingCollectTokens(ctx context.Context, nowTime time.Time, epoch *big.Int) (*map[common.Address]schema.BillingCollectDataPerAddress, error) {
// Get all keys whose ru_used_current is > 0
var activeKeys []table.GatewayKey

Expand Down Expand Up @@ -108,7 +111,7 @@ func (c *client) PrepareBillingCollectTokens(ctx context.Context, nowTime time.T
k := k

// Create or update consumption log
err = c.prepareBillingCollectTokensCreateOrUpdateConsumptionLog(nowTime, &k, tx)
err = c.prepareBillingCollectTokensCreateOrUpdateConsumptionLog(nowTime, epoch, &k, tx)
if err != nil {
zap.L().Error("create or update consumption log", zap.Error(err), zap.Any("key", k))
// but no need to stop here - data error can be fixed later, let's focus on billing now
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- +goose Up
-- modify "consumption_log" table
ALTER TABLE "consumption_log" ADD COLUMN "epoch" bigint NULL;
-- create index "idx_consumption_log_epoch" to table: "consumption_log"
CREATE INDEX "idx_consumption_log_epoch" ON "consumption_log" ("epoch");

-- +goose Down
-- reverse: create index "idx_consumption_log_epoch" to table: "consumption_log"
DROP INDEX "idx_consumption_log_epoch";
-- reverse: modify "consumption_log" table
ALTER TABLE "consumption_log" DROP COLUMN "epoch";
3 changes: 2 additions & 1 deletion internal/database/dialer/cockroachdb/migration/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
h1:ZB34dfRTafUwvatIUV2sfG90qWrY9r2uYqwTGjWJ3Mo=
h1:NodNA4vCC47We5+GZ856N+lOSUshOma5QhzzeeXzzMA=
20240412045602_paymentprocessor_initialize.sql h1:g7m9g5t7iEBcov37BjXuSBuJ2ZU1TGr1nXp5BFLsXSY=
20240412060447_fix_billing_records.sql h1:loxzDAViwQIGG0FMizIx5JAVSNFo5bSBKezVPGM+m2w=
20240412091335_add_node_request_records.sql h1:dCOPKhH6bk+hGdwuBPd15FRE1VKw4upxKhEbNhlaDmQ=
20240606022850_alter_column_index_set_not_null.sql h1:c2TudjXeo9Zat/dI1IVIFHRCtQMkWGiTtKonuGehrfE=
20240606022851_use_composite_primary_key.sql h1:by+rptJXIC/AGyYo2rOoCotEJiWFAUkU04CUGnKxvl0=
20240625032621_log_epoch_in_consumption_logs.sql h1:Oj+Pn+Bzv8niB4/PIhzNqVU2JXqFMteasMDcFjdad78=
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type GatewayConsumptionLog struct {
UpdatedAt time.Time

ConsumptionDate time.Time `gorm:"index;column:consumption_date"`
Epoch uint64 `gorm:"index;column:epoch"`
RuUsed int64 `gorm:"column:ru_used"`
APICalls int64 `gorm:"column:api_calls"`

Expand Down
6 changes: 3 additions & 3 deletions internal/service/indexer/l2/trigger_billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *server) billingCollect(ctx context.Context, epoch *big.Int) ([]common.A
// billing collect tokens
nowTime := time.Now() // Epoch round identifier for billing

users, amounts, err := s.buildBillingCollectTokens(ctx, nowTime)
users, amounts, err := s.buildBillingCollectTokens(ctx, nowTime, epoch)

if err != nil {
zap.L().Error("build billing collect tokens", zap.Error(err))
Expand Down Expand Up @@ -164,10 +164,10 @@ func (s *server) billingUpdateRuLimit(ctx context.Context, usersRequireRuLimitRe
return nil
}

func (s *server) buildBillingCollectTokens(ctx context.Context, nowTime time.Time) ([]common.Address, []*big.Int, error) {
func (s *server) buildBillingCollectTokens(ctx context.Context, nowTime time.Time, epoch *big.Int) ([]common.Address, []*big.Int, error) {
zap.L().Debug("Build billing collect tokens")

collectTokensData, err := s.databaseClient.PrepareBillingCollectTokens(ctx, nowTime)
collectTokensData, err := s.databaseClient.PrepareBillingCollectTokens(ctx, nowTime, epoch)

if err != nil {
zap.L().Error("prepare billing data", zap.Error(err))
Expand Down

0 comments on commit 0c897fa

Please sign in to comment.