Skip to content

Commit

Permalink
introduces a new Schema Registry Client wrapper around the lib
Browse files Browse the repository at this point in the history
  • Loading branch information
Dominik Liebler committed Jan 3, 2021
1 parent 7517e35 commit 0531050
Show file tree
Hide file tree
Showing 17 changed files with 331 additions and 166 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ In order to get help for a specific command, try `schema-registry-gitops <comman
The desired state is managed using this YAML definition:

```yaml
# sets default compatibility level (optional)
# sets global compatibility level (optional)
compatibility: FULL_TRANSITIVE
subjects:
# a subject that references a file for the schema definition
Expand Down
4 changes: 2 additions & 2 deletions examples/schema-registry.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
compatibility: FULL
compatibility: FULL_TRANSITIVE

subjects:
- name: foo-value
file: foo.avsc
compatibility: FULL_TRANSITIVE
compatibility: BACKWARD

- name: without-compatiblity
file: without_compatibility.avsc
Expand Down
13 changes: 7 additions & 6 deletions src/main/kotlin/dev/domnikl/schema_registry_gitops/Factory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,27 @@ import dev.domnikl.schema_registry_gitops.state.Dumper
import dev.domnikl.schema_registry_gitops.state.Persistence
import dev.domnikl.schema_registry_gitops.state.Validator
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.rest.RestService
import org.slf4j.LoggerFactory

class Factory {
fun createStateValidator(baseUrl: String): Validator {
fun createValidator(baseUrl: String): Validator {
return Validator(createClient(baseUrl))
}

fun createStateApplier(baseUrl: String): Applier {
fun createApplier(baseUrl: String): Applier {
return Applier(
createClient(baseUrl),
LoggerFactory.getLogger(Applier::class.java)
)
}

fun createStateDumper(baseUrl: String): Dumper {
fun createDumper(baseUrl: String): Dumper {
return Dumper(createClient(baseUrl))
}

fun createStatePersistence() = Persistence()
fun createPersistence() = Persistence()

private fun createClient(baseUrl: String) = CachedSchemaRegistryClient(RestService(baseUrl), 100)
private fun createClient(baseUrl: String) = SchemaRegistryClient(
CachedSchemaRegistryClient(baseUrl, 100)
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package dev.domnikl.schema_registry_gitops

import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException

class SchemaRegistryClient(private val schemaRegistryClient: SchemaRegistryClient) {
fun subjects(): List<String> {
return schemaRegistryClient.allSubjects.toList()
}

fun globalCompatibility(): Compatibility {
return Compatibility.valueOf(schemaRegistryClient.getCompatibility(""))
}

fun updateGlobalCompatibility(compatibility: Compatibility): Compatibility {
return Compatibility.valueOf(schemaRegistryClient.updateCompatibility("", compatibility.toString()))
}

fun compatibility(subject: String): Compatibility {
return handleNotExisting {
Compatibility.valueOf(schemaRegistryClient.getCompatibility(subject))
} ?: Compatibility.NONE
}

fun updateCompatibility(subject: Subject): Compatibility {
return Compatibility.valueOf(schemaRegistryClient.updateCompatibility(subject.name, subject.compatibility.toString()))
}

fun testCompatibility(subject: Subject): Boolean {
return schemaRegistryClient.testCompatibility(subject.name, subject.schema)
}

fun getLatestSchema(subject: String): AvroSchema {
return AvroSchema(schemaRegistryClient.getLatestSchemaMetadata(subject).schema)
}

fun create(subject: Subject): Int {
return schemaRegistryClient.register(subject.name, subject.schema)
}

fun evolve(subject: Subject): Int {
return schemaRegistryClient.register(subject.name, subject.schema)
}

fun version(subject: Subject): Int? {
return handleNotExisting {
schemaRegistryClient.getVersion(subject.name, subject.schema)
}
}

private fun <V> handleNotExisting(f: () -> V?): V? {
return try {
f()
} catch (e: RestClientException) {
when (e.errorCode) {
ERROR_CODE_SUBJECT_NOT_FOUND -> null
ERROR_CODE_VERSION_NOT_FOUND -> null
ERROR_CODE_SCHEMA_NOT_FOUND -> null
else -> throw e
}
}
}

companion object {
private const val ERROR_CODE_SUBJECT_NOT_FOUND = 40401
private const val ERROR_CODE_VERSION_NOT_FOUND = 40402
private const val ERROR_CODE_SCHEMA_NOT_FOUND = 40403
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ class Apply(private val factory: Factory) : Callable<Int> {

override fun call(): Int {
val file = File(inputFile)
val state = factory.createStatePersistence().load(file.parentFile, file)
val stateApplier = factory.createStateApplier(cli.baseUrl)
val state = factory.createPersistence().load(file.parentFile, file)
val stateApplier = factory.createApplier(cli.baseUrl)

stateApplier.apply(state)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ class Dump(private val factory: Factory) : Callable<Int> {
}

override fun call(): Int {
val state = factory.createStateDumper(cli.baseUrl).dump()
val state = factory.createDumper(cli.baseUrl).dump()

factory.createStatePersistence().save(state, outputStream)
factory.createPersistence().save(state, outputStream)

return 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Validate(factory: Factory) : Callable<Int> {
@CommandLine.Parameters(description = ["path to input YAML file"])
private lateinit var inputFile: String

private val validator by lazy { factory.createStateValidator(cli.baseUrl) }
private val validator by lazy { factory.createValidator(cli.baseUrl) }

override fun call(): Int {
val file = File(inputFile)
Expand Down
31 changes: 11 additions & 20 deletions src/main/kotlin/dev/domnikl/schema_registry_gitops/state/Applier.kt
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package dev.domnikl.schema_registry_gitops.state

import dev.domnikl.schema_registry_gitops.Compatibility
import dev.domnikl.schema_registry_gitops.SchemaRegistryClient
import dev.domnikl.schema_registry_gitops.State
import dev.domnikl.schema_registry_gitops.Subject
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
import org.slf4j.Logger

class Applier(
private val client: SchemaRegistryClient,
private val logger: Logger
) {
fun apply(state: State) {
updateDefaultCompatibility(state)
updateGlobalCompatibility(state)

val registeredSubjects = client.allSubjects
val registeredSubjects = client.subjects()

state.subjects.forEach { subject ->
when (registeredSubjects.contains(subject.name)) {
Expand All @@ -25,7 +23,7 @@ class Applier(
}

private fun register(subject: Subject) {
val versionId = client.register(subject.name, subject.schema)
val versionId = client.create(subject)

logger.info("Created subject '${subject.name}' and registered new schema with version $versionId")

Expand All @@ -35,17 +33,10 @@ class Applier(
private fun evolve(subject: Subject) {
updateCompatibility(subject)

val versionBefore = try {
client.getVersion(subject.name, subject.schema)
} catch (e: RestClientException) {
when (e.errorCode) {
40403 -> null
else -> throw e
}
}
val versionBefore = client.version(subject)

if (versionBefore == null) {
val versionId = client.register(subject.name, subject.schema)
val versionId = client.evolve(subject)

logger.info("Evolved existing schema for subject '${subject.name}' to version $versionId")
} else {
Expand All @@ -55,24 +46,24 @@ class Applier(

private fun updateCompatibility(subject: Subject) {
if (subject.compatibility != null) {
val compatibility = client.updateCompatibility(subject.name, subject.compatibility.toString())
val compatibility = client.updateCompatibility(subject)

logger.info("Changed '${subject.name}' compatibility to $compatibility")
}
}

private fun updateDefaultCompatibility(state: State) {
private fun updateGlobalCompatibility(state: State) {
if (state.compatibility == null) return

val compatibilityBefore = Compatibility.valueOf(client.getCompatibility(""))
val compatibilityBefore = client.globalCompatibility()

if (compatibilityBefore == state.compatibility) {
logger.debug("Did not change compatibility level as it matched desired level ${state.compatibility}")
return
}

val compatibilityAfter = client.updateCompatibility("", state.compatibility.toString())
val compatibilityAfter = client.updateGlobalCompatibility(state.compatibility)

logger.info("Changed default compatibility level from $compatibilityBefore to $compatibilityAfter")
logger.info("Changed global compatibility level from $compatibilityBefore to $compatibilityAfter")
}
}
19 changes: 5 additions & 14 deletions src/main/kotlin/dev/domnikl/schema_registry_gitops/state/Dumper.kt
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
package dev.domnikl.schema_registry_gitops.state

import dev.domnikl.schema_registry_gitops.Compatibility
import dev.domnikl.schema_registry_gitops.SchemaRegistryClient
import dev.domnikl.schema_registry_gitops.State
import dev.domnikl.schema_registry_gitops.Subject
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException

class Dumper(private val client: SchemaRegistryClient) {
fun dump() = State(
Compatibility.valueOf(client.getCompatibility("")),
client.allSubjects.map { subject ->
val compatibility = try {
Compatibility.valueOf(client.getCompatibility(subject))
} catch (e: RestClientException) {
Compatibility.NONE
}

client.globalCompatibility(),
client.subjects().map { subject ->
Subject(
subject,
compatibility,
AvroSchema(client.getLatestSchemaMetadata(subject).schema)
client.compatibility(subject),
client.getLatestSchema(subject)
)
}
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package dev.domnikl.schema_registry_gitops.state

import dev.domnikl.schema_registry_gitops.SchemaRegistryClient
import dev.domnikl.schema_registry_gitops.State
import dev.domnikl.schema_registry_gitops.Subject
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient

class Validator(private val client: SchemaRegistryClient) {
fun validate(state: State) = state.subjects.filterNot { isCompatible(it) }.map { it.name }

private fun isCompatible(subject: Subject): Boolean {
// if subject does not yet exist, it's always valid (as long as the schema itself is valid)
if (!client.allSubjects.contains(subject.name)) {
if (!client.subjects().contains(subject.name)) {
return true
}

return client.testCompatibility(subject.name, subject.schema)
return client.testCompatibility(subject)
}
}
Loading

0 comments on commit 0531050

Please sign in to comment.