Skip to content

Commit

Permalink
Refactor Cassandra Namespace To Only Hold One Storage Type
Browse files Browse the repository at this point in the history
Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 committed Nov 9, 2024
1 parent 7f87200 commit 7d65eb8
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 198 deletions.
1 change: 1 addition & 0 deletions cmd/jaeger/config-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ extensions:
password: "cassandra"
tls:
insecure: true
is_archive: true
receivers:
otlp:
protocols:
Expand Down
99 changes: 33 additions & 66 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const (
var ( // interface comformance checks
_ storage.Factory = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
_ storage.ArchiveFactory = (*Factory)(nil)
_ storage.SamplingStoreFactory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
_ plugin.Configurable = (*Factory)(nil)
Expand All @@ -50,22 +49,26 @@ var ( // interface comformance checks
type Factory struct {
Options *Options

primaryMetricsFactory metrics.Factory
archiveMetricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider
metricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider

primaryConfig config.SessionBuilder
primarySession cassandra.Session
archiveConfig config.SessionBuilder
archiveSession cassandra.Session
config config.SessionBuilder
session cassandra.Session
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
func NewFactory(isArchive bool) *Factory {
var options *Options
if isArchive {
options = NewOptions(archiveStorageConfig)
options.IsArchive = true
} else {
options = NewOptions(primaryStorageConfig)
}
return &Factory{
tracer: otel.GetTracerProvider(),
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
Options: options,
}
}

Expand All @@ -75,7 +78,7 @@ func NewFactoryWithConfig(
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
f := NewFactory()
f := NewFactory(opts.IsArchive)
// use this to help with testing
b := &withConfigBuilder{
f: f,
Expand Down Expand Up @@ -121,43 +124,30 @@ func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger) {
// InitFromOptions initializes factory from options.
func (f *Factory) configureFromOptions(o *Options) {
f.Options = o
// TODO this is a hack because we do not define defaults in Options
if o.others == nil {
o.others = make(map[string]*NamespaceConfig)
}
f.primaryConfig = o.GetPrimary()
if cfg := f.Options.Get(archiveStorageConfig); cfg != nil {
f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error
}
f.config = o.GetPrimary()
}

// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.primaryMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra", Tags: nil})
f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil})
metricsNamespace := "cassandra"
if f.Options.IsArchive {
metricsNamespace = "cassandra-archive"
}
f.metricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: metricsNamespace, Tags: nil})
f.logger = logger

primarySession, err := f.primaryConfig.NewSession()
primarySession, err := f.config.NewSession()
if err != nil {
return err
}
f.primarySession = primarySession

if f.archiveConfig != nil {
archiveSession, err := f.archiveConfig.NewSession()
if err != nil {
return err
}
f.archiveSession = archiveSession
} else {
logger.Info("Cassandra archive storage configuration is empty, skipping")
}
f.session = primarySession

return nil
}

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
return cSpanStore.NewSpanReader(f.session, f.metricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
}

// CreateSpanWriter implements storage.Factory
Expand All @@ -166,33 +156,13 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
if err != nil {
return nil, err
}
return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger, options...)
return cSpanStore.NewSpanWriter(f.session, f.Options.SpanStoreWriteCacheTTL, f.metricsFactory, f.logger, options...)
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
version := cDepStore.GetDependencyVersion(f.primarySession)
return cDepStore.NewDependencyStore(f.primarySession, f.primaryMetricsFactory, f.logger, version)
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
options, err := writerOptions(f.Options)
if err != nil {
return nil, err
}
return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.archiveMetricsFactory, f.logger, options...)
version := cDepStore.GetDependencyVersion(f.session)
return cDepStore.NewDependencyStore(f.session, f.metricsFactory, f.logger, version)
}

// CreateLock implements storage.SamplingStoreFactory
Expand All @@ -203,12 +173,12 @@ func (f *Factory) CreateLock() (distributedlock.Lock, error) {
}
f.logger.Info("Using unique participantName in the distributed lock", zap.String("participantName", hostId))

return cLock.NewLock(f.primarySession, hostId), nil
return cLock.NewLock(f.session, hostId), nil
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
return cSamplingStore.New(f.primarySession, f.primaryMetricsFactory, f.logger), nil
return cSamplingStore.New(f.session, f.metricsFactory, f.logger), nil
}

func writerOptions(opts *Options) ([]cSpanStore.Option, error) {
Expand Down Expand Up @@ -244,16 +214,13 @@ var _ io.Closer = (*Factory)(nil)

// Close closes the resources held by the factory
func (f *Factory) Close() error {
if f.primarySession != nil {
f.primarySession.Close()
}
if f.archiveSession != nil {
f.archiveSession.Close()
if f.session != nil {
f.session.Close()
}

return nil
}

func (f *Factory) Purge(_ context.Context) error {
return f.primarySession.Query("TRUNCATE traces").Exec()
return f.session.Query("TRUNCATE traces").Exec()
}
66 changes: 16 additions & 50 deletions plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,14 @@ func (m *mockSessionBuilder) NewSession() (cassandra.Session, error) {
}

func TestCassandraFactory(t *testing.T) {
logger, logBuf := testutils.NewLogger()
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{"--cassandra-archive.enabled=true"})
logger, _ := testutils.NewLogger()
f := NewFactory(false)
v, _ := config.Viperize(f.AddFlags)
f.InitFromViper(v, zap.NewNop())

// after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests,
// so we override it with a mock.
f.primaryConfig = newMockSessionBuilder(nil, errors.New("made-up error"))
f.config = newMockSessionBuilder(nil, errors.New("made-up error"))
require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")

var (
Expand All @@ -57,13 +56,11 @@ func TestCassandraFactory(t *testing.T) {
session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query)
session.On("Close").Return()
query.On("Exec").Return(nil)
f.primaryConfig = newMockSessionBuilder(session, nil)
f.archiveConfig = newMockSessionBuilder(nil, errors.New("made-up error"))
f.config = newMockSessionBuilder(nil, errors.New("made-up error"))
require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")

f.archiveConfig = nil
f.config = newMockSessionBuilder(session, nil)
require.NoError(t, f.Initialize(metrics.NullFactory, logger))
assert.Contains(t, logBuf.String(), "Cassandra archive storage configuration is empty, skipping")

_, err := f.CreateSpanReader()
require.NoError(t, err)
Expand All @@ -74,21 +71,6 @@ func TestCassandraFactory(t *testing.T) {
_, err = f.CreateDependencyReader()
require.NoError(t, err)

_, err = f.CreateArchiveSpanReader()
require.EqualError(t, err, "archive storage not configured")

_, err = f.CreateArchiveSpanWriter()
require.EqualError(t, err, "archive storage not configured")

f.archiveConfig = newMockSessionBuilder(session, nil)
require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

_, err = f.CreateArchiveSpanReader()
require.NoError(t, err)

_, err = f.CreateArchiveSpanWriter()
require.NoError(t, err)

_, err = f.CreateLock()
require.NoError(t, err)

Expand All @@ -99,19 +81,17 @@ func TestCassandraFactory(t *testing.T) {
}

func TestExclusiveWhitelistBlacklist(t *testing.T) {
logger, logBuf := testutils.NewLogger()
f := NewFactory()
f := NewFactory(false)
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{
"--cassandra-archive.enabled=true",
"--cassandra.index.tag-whitelist=a,b,c",
"--cassandra.index.tag-blacklist=a,b,c",
})
f.InitFromViper(v, zap.NewNop())

// after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests,
// so we override it with a mock.
f.primaryConfig = newMockSessionBuilder(nil, errors.New("made-up error"))
f.config = newMockSessionBuilder(nil, errors.New("made-up error"))
require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")

var (
Expand All @@ -120,22 +100,10 @@ func TestExclusiveWhitelistBlacklist(t *testing.T) {
)
session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query)
query.On("Exec").Return(nil)
f.primaryConfig = newMockSessionBuilder(session, nil)
f.archiveConfig = newMockSessionBuilder(nil, errors.New("made-up error"))
require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")

f.archiveConfig = nil
require.NoError(t, f.Initialize(metrics.NullFactory, logger))
assert.Contains(t, logBuf.String(), "Cassandra archive storage configuration is empty, skipping")
f.config = newMockSessionBuilder(session, nil)

_, err := f.CreateSpanWriter()
require.EqualError(t, err, "only one of TagIndexBlacklist and TagIndexWhitelist can be specified")

f.archiveConfig = &mockSessionBuilder{}
require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

_, err = f.CreateArchiveSpanWriter()
require.EqualError(t, err, "only one of TagIndexBlacklist and TagIndexWhitelist can be specified")
}

func TestWriterOptions(t *testing.T) {
Expand Down Expand Up @@ -181,13 +149,11 @@ func TestWriterOptions(t *testing.T) {
}

func TestConfigureFromOptions(t *testing.T) {
f := NewFactory()
o := NewOptions("foo", archiveStorageConfig)
o.others[archiveStorageConfig].Enabled = true
f := NewFactory(false)
o := NewOptions("foo")
f.configureFromOptions(o)
assert.Equal(t, o, f.Options)
assert.Equal(t, o.GetPrimary(), f.primaryConfig)
assert.Equal(t, o.Get(archiveStorageConfig), f.archiveConfig)
assert.Equal(t, o.GetPrimary(), f.config)
}

func TestNewFactoryWithConfig(t *testing.T) {
Expand All @@ -201,7 +167,7 @@ func TestNewFactoryWithConfig(t *testing.T) {
},
},
}
f := NewFactory()
f := NewFactory(false)
b := &withConfigBuilder{
f: f,
opts: opts,
Expand All @@ -223,7 +189,7 @@ func TestNewFactoryWithConfig(t *testing.T) {
},
},
}
f := NewFactory()
f := NewFactory(false)
b := &withConfigBuilder{
f: f,
opts: opts,
Expand All @@ -242,14 +208,14 @@ func TestNewFactoryWithConfig(t *testing.T) {
}

func TestFactory_Purge(t *testing.T) {
f := NewFactory()
f := NewFactory(false)
var (
session = &mocks.Session{}
query = &mocks.Query{}
)
session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query)
query.On("Exec").Return(nil)
f.primarySession = session
f.session = session

err := f.Purge(context.Background())
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 7d65eb8

Please sign in to comment.