@@ -9,11 +9,11 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientExcept
9
9
import org .apache .avro .Schema
10
10
11
11
/** Schema Provider / SerDe implementation that uses the Confluent Schema Registry to fetch schemas for topics.
12
- * Can be configured as: topic = "kafka://topic-name/registry_host=host/[registry_port=port]/[registry_scheme=http]/[subject=subject]"
13
- * Port, scheme and subject are optional. If port is missing, we assume the host is pointing to a LB address / such that
14
- * forwards to the right host + port. Scheme defaults to http. Subject defaults to the topic name + "-value" (based on schema
15
- * registry conventions).
16
- */
12
+ * Can be configured as: topic = "kafka://topic-name/registry_host=host/[registry_port=port]/[registry_scheme=http]/[subject=subject]"
13
+ * Port, scheme and subject are optional. If port is missing, we assume the host is pointing to a LB address / such that
14
+ * forwards to the right host + port. Scheme defaults to http. Subject defaults to the topic name + "-value" (based on schema
15
+ * registry conventions).
16
+ */
17
17
class SchemaRegistrySerDe (topicInfo : TopicInfo ) extends SerDe {
18
18
import SchemaRegistrySerDe ._
19
19
@@ -63,7 +63,7 @@ class SchemaRegistrySerDe(topicInfo: TopicInfo) extends SerDe {
63
63
throw new IllegalArgumentException (" Error connecting to and requesting schema details from the registry" , e)
64
64
}
65
65
require(parsedSchema.schemaType() == AvroSchema .TYPE ,
66
- s " Unsupported schema type: ${parsedSchema.schemaType()}. Only Avro is supported. " )
66
+ s " Unsupported schema type: ${parsedSchema.schemaType()}. Only Avro is supported. " )
67
67
val avroSchema : Schema = parsedSchema.asInstanceOf [AvroSchema ].rawSchema()
68
68
val chrononSchema : StructType = AvroConversions .toChrononSchema(avroSchema).asInstanceOf [StructType ]
69
69
(avroSchema, chrononSchema)
@@ -94,4 +94,4 @@ object SchemaRegistrySerDe {
94
94
val RegistrySchemeKey = " registry_scheme"
95
95
val RegistrySubjectKey = " subject"
96
96
val SchemaRegistryWireFormat = " schema_registry_wire_format"
97
- }
97
+ }
0 commit comments