Skip to content

Commit

Permalink
Add saga latency metric (#224)
Browse files Browse the repository at this point in the history
records the execution time of the entire saga, from it's creation until deletion (in ms). having the "Service", "SagaType", "Initiator" as labels|
  • Loading branch information
kerenbe4 authored and Guy Baron committed Dec 2, 2019
1 parent 4fdd9fc commit d66df35
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 24 deletions.
9 changes: 5 additions & 4 deletions docs/METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ grabbit exposes and reports the following metrics to Prometheus
| grabbit | handlers | result | records and counts each run of a handler, having the handler's name, message type and the result as labels|
| grabbit | handlers | latency | records the execution time of each run of a handler, having the handler's name, message type as labels|
| grabbit | messages | rejected_messages | increments each time a message gets rejected |
| grabbit | saga | timedout_sagas | counting the number of timedout saga instances |
| grabbit | outbox | outbox_total_records | reports the total amount of records currently in the outbox |
| grabbit | outbox | outbox_pending_delivery | reports the total amount of records pending delivery currently in the outbox |
| grabbit | outbox | outbox_pending_removal | reports the total amount of records that were sent and pending removal currently in the outbox |
| grabbit | saga | timedout_sagas | counting the number of timedout saga instances |
| grabbit | saga | latency | records the execution time of the entire saga, from it's creation until deletion (in ms). having the "Service", "SagaType", "Initiator" as labels|
| grabbit | outbox | outbox_total_records | reports the total amount of records currently in the outbox |
| grabbit | outbox | outbox_pending_delivery | reports the total amount of records pending delivery currently in the outbox |
| grabbit | outbox | outbox_pending_removal | reports the total amount of records that were sent and pending removal currently in the outbox |

9 changes: 6 additions & 3 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
providerLogger := gb.Log().WithField("provider", "mysql")
mysqltx, err := mysql.NewTxProvider(builder.txConnStr)
if err != nil {
panic(err)
errMsg := fmt.Errorf("grabbit: transaction provider failed creating a transaction. %v", err)
panic(errMsg)
}
gb.TxProvider = mysqltx

Expand All @@ -91,7 +92,8 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
if builder.purgeOnStartup {
err := sagaStore.Purge()
if err != nil {
panic(err)
errMsg := fmt.Errorf("grabbit: saga store faild to purge. error: %v", err)
panic(errMsg)
}
}
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup, builder.busCfg.OutboxCfg)
Expand All @@ -110,7 +112,8 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
if builder.purgeOnStartup {
err := sagaStore.Purge()
if err != nil {
panic(err)
errMsg := fmt.Errorf("grabbit: saga store faild to purge. error: %v", err)
panic(errMsg)
}
}
glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager)
Expand Down
27 changes: 16 additions & 11 deletions gbus/metrics/saga_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,22 @@ import (
io_prometheus_client "github.com/prometheus/client_model/go"
)

//SagaTimeoutCounter is the prometheus counter counting timed out saga instances
var SagaTimeoutCounter = newSagaTimeoutCounter()
var (
//SagaTimeoutCounter is the prometheus counter counting timed out saga instances
SagaTimeoutCounter = promauto.NewCounter(prometheus.CounterOpts{
Namespace: grabbitPrefix,
Subsystem: "saga",
Name: "timedout_sagas",
Help: "counting the number of timedout saga instances",
})
//SagaLatencySummary is the prometheus summary for the total duration of a saga
SagaLatencySummary = promauto.NewSummaryVec(prometheus.SummaryOpts{
Namespace: grabbitPrefix,
Subsystem: "saga",
Name: "latency",
Help: "The latency of the entire saga",
}, []string{"Service", "SagaType", "Initiator"})
)

//GetSagaTimeoutCounterValue gets the counter value of timed out sagas reported to prometheus
func GetSagaTimeoutCounterValue() (float64, error) {
Expand All @@ -20,12 +34,3 @@ func GetSagaTimeoutCounterValue() (float64, error) {

return m.GetCounter().GetValue(), nil
}

func newSagaTimeoutCounter() prometheus.Counter {
return promauto.NewCounter(prometheus.CounterOpts{
Namespace: grabbitPrefix,
Subsystem: "saga",
Name: "timedout_sagas",
Help: "counting the number of timedout saga instances",
})
}
2 changes: 2 additions & 0 deletions gbus/saga/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"reflect"
"strings"
"sync"
"time"

"github.com/opentracing/opentracing-go"
slog "github.com/opentracing/opentracing-go/log"
Expand Down Expand Up @@ -256,6 +257,7 @@ func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error {

if instance.isComplete() {
imsm.Log().WithField("saga_id", instance.ID).Info("saga has completed and will be deleted")
metrics.SagaLatencySummary.WithLabelValues(imsm.svcName, reflect.TypeOf(instance.UnderlyingInstance).String(), instance.StartedBy).Observe(float64(time.Since(instance.CreatedAt) / time.Millisecond))

deleteErr := imsm.sagaStore.DeleteSaga(tx, instance)
if deleteErr != nil {
Expand Down
4 changes: 4 additions & 0 deletions gbus/saga/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type Instance struct {
StartedByMessageID string
//StartedByRPCID the rpc id of the message that created the saga
StartedByRPCID string

//CreatedAt the time.Now() timestamp when the saga was created
CreatedAt time.Time
}

func (si *Instance) log() logrus.FieldLogger {
Expand Down Expand Up @@ -151,6 +154,7 @@ func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair) *Instan
ID: xid.New().String(),
UnderlyingInstance: newSaga,
MsgToMethodMap: msgToMethodMap,
CreatedAt: time.Now(),
}
return newInstance
}
Expand Down
17 changes: 17 additions & 0 deletions gbus/tx/mysql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@ func sagaStoreAddRPCIDDetails(svcName string) *migrator.Migration {
}
}

func sagaStoreAddCreatedAtDetails(svcName string) *migrator.Migration {
tblName := tx.GrabbitTableNameTemplate(svcName, "sagas")

addCreatorDetailsSQL := `ALTER TABLE ` + tblName + ` ADD COLUMN created_at DATETIME NOT NULL DEFAULT NOW() AFTER version`

return &migrator.Migration{
Name: "adding the created_at column to the saga table",
Func: func(tx *sql.Tx) error {
if _, err := tx.Exec(addCreatorDetailsSQL); err != nil {
return err
}
return nil
},
}
}

func outboxMigrations(svcName string) *migrator.Migration {

tblName := tx.GrabbitTableNameTemplate(svcName, "outbox")
Expand Down Expand Up @@ -158,6 +174,7 @@ func EnsureSchema(db *sql.DB, svcName string) {
outboxChangeColumnLength(svcName),
sagaStoreAddSagaCreatorDetails(svcName),
sagaStoreAddRPCIDDetails(svcName),
sagaStoreAddCreatedAtDetails(svcName),
))
if err != nil {
panic(err)
Expand Down
19 changes: 13 additions & 6 deletions gbus/tx/sagastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"reflect"
"regexp"
"strings"
"time"

log "github.com/sirupsen/logrus"

"github.com/go-sql-driver/mysql"
"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/saga"
)
Expand All @@ -37,8 +39,9 @@ func (store *SagaStore) scanInstances(rows *sql.Rows) ([]*saga.Instance, error)
var startedBySaga sql.NullString
var startedByMsgID sql.NullString
var startedByRPCID sql.NullString
var createdAt mysql.NullTime

error := rows.Scan(&sagaID, &sagaType, &sagaData, &startedBy, &startedByMsgID, &startedByRPCID, &startedBySaga, &version)
error := rows.Scan(&sagaID, &sagaType, &sagaData, &startedBy, &startedByMsgID, &startedByRPCID, &startedBySaga, &version, &createdAt)
if error == sql.ErrNoRows {
return nil, error
} else if error != nil {
Expand Down Expand Up @@ -67,6 +70,11 @@ func (store *SagaStore) scanInstances(rows *sql.Rows) ([]*saga.Instance, error)
instance.StartedByRPCID = startedByRPCID.String
}

if createdAt.Valid {
value, _ := createdAt.Value()
instance.CreatedAt = value.(time.Time)
}

if decErr != nil {
store.Log().WithError(decErr).Error("failed to decode saga instance")
return nil, decErr
Expand All @@ -83,7 +91,7 @@ func (store *SagaStore) scanInstances(rows *sql.Rows) ([]*saga.Instance, error)
func (store *SagaStore) GetSagasByType(tx *sql.Tx, sagaType reflect.Type) (instances []*saga.Instance, err error) {

tblName := GetSagatableName(store.SvcName)
selectSQL := "SELECT saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_msg_id, started_by_rpcid, started_by_request_of_saga, version FROM " + tblName + " WHERE saga_type=" + store.ParamsMarkers[0]
selectSQL := "SELECT saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_msg_id, started_by_rpcid, started_by_request_of_saga, version, created_at FROM " + tblName + " WHERE saga_type=" + store.ParamsMarkers[0]

rows, err := tx.Query(selectSQL, sagaType.String())
defer func() {
Expand Down Expand Up @@ -153,7 +161,7 @@ func (store *SagaStore) DeleteSaga(tx *sql.Tx, instance *saga.Instance) error {
func (store *SagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance, error) {

tblName := GetSagatableName(store.SvcName)
selectSQL := `SELECT saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_msg_id, started_by_rpcid, started_by_request_of_saga, version FROM ` + tblName + ` WHERE saga_id=` + store.ParamsMarkers[0] + ``
selectSQL := `SELECT saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_msg_id, started_by_rpcid, started_by_request_of_saga, version, created_at FROM ` + tblName + ` WHERE saga_id=` + store.ParamsMarkers[0] + ``

rows, err := tx.Query(selectSQL, sagaID)
defer func() {
Expand Down Expand Up @@ -187,14 +195,14 @@ func (store *SagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance,
func (store *SagaStore) SaveNewSaga(tx *sql.Tx, sagaType reflect.Type, newInstance *saga.Instance) (err error) {
store.RegisterSagaType(newInstance.UnderlyingInstance)
tblName := GetSagatableName(store.SvcName)
insertSQL := `INSERT INTO ` + tblName + ` (saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_msg_id, started_by_rpcid, started_by_request_of_saga, version) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
insertSQL := `INSERT INTO ` + tblName + ` (saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_msg_id, started_by_rpcid, started_by_request_of_saga, version, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`

var buf []byte
if buf, err = store.serilizeSaga(newInstance); err != nil {
store.Log().WithError(err).WithField("saga_id", newInstance.ID).Error("failed to encode saga with sagaID")
return err
}
_, err = tx.Exec(insertSQL, newInstance.ID, sagaType.String(), buf, newInstance.StartedBy, newInstance.StartedByMessageID, newInstance.StartedByRPCID, newInstance.StartedBySaga, newInstance.ConcurrencyCtrl)
_, err = tx.Exec(insertSQL, newInstance.ID, sagaType.String(), buf, newInstance.StartedBy, newInstance.StartedByMessageID, newInstance.StartedByRPCID, newInstance.StartedBySaga, newInstance.ConcurrencyCtrl, newInstance.CreatedAt)
if err != nil {
store.Log().WithError(err).Error("failed saving new saga")
return err
Expand Down Expand Up @@ -251,4 +259,3 @@ func GetSagatableName(svcName string) string {

return strings.ToLower("grabbit_" + sanitized + "_sagas")
}

0 comments on commit d66df35

Please sign in to comment.