Encode/Decode Avro Messages
Example: Read Avro Encoded Data in Confluent Cloud
Say you create such a Avro schema definition:
{
"type": "record",
"namespace": "com.mycorp.mynamespace",
"name": "sampleRecord",
"doc": "Sample schema to help you get started.",
"fields": [
{
"name": "my_field1",
"type": "int",
"doc": "The int type is a 32-bit signed integer."
},
{
"name": "my_field2",
"type": "double",
"doc": "The double type is a double precision (64-bit) IEEE 754 floating-point number."
},
{
"name": "my_field3",
"type": "string",
"doc": "The string is a unicode character sequence."
},
{
"name": "my_field4",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "use case"
}
]
}
Create a topic in Confluent Cloud and you can push data to the topic in Avro format with the following command:
confluent kafka topic produce $TOPIC --schema ~/Dev/schema.txt \
--schema-registry-endpoint https://psrc-ab123.us-east-2.aws.confluent.cloud \
--schema-registry-api-key $API_KEY \
--schema-registry-api-secret $API_SECRET \
--value-format avro
You can add messages line by line, for example
{"my_field1":1,"my_field2":3.4,"my_field3":"hello","my_field4":1707954127790}
Now let's create an external stream in Proton to read such messages:
CREATE EXTERNAL STREAM avro_stream(
my_field1 int8,
my_field2 float32,
my_field3 string,
my_field4 int64
)
SETTINGS
type = 'kafka',
brokers = 'pkc-ab123.us-east-2.aws.confluent.cloud:9092',
security_protocol='SASL_SSL',
username='$KEY',
password='$SECRET',
topic = '$TOPIC',
data_format = 'Avro',
kafka_schema_registry_url = 'https://psrc-ab123.us-east-2.aws.confluent.cloud',
kafka_schema_registry_credentials = '$API_KEY:$API_SECRET';
After running this SQL successfully, you can fetch existing data via
SELECT * FROM avro_stream WHERE _tp_time>earliest_ts()
Or only fetch the incoming new messages via
SELECT * FROM avro_stream
Example: Read Avro Encoded Data in Confluent Platform
You can follow Confluent Docs to start Confluent Platform with Schema Registry via Docker Compose.
The Avro schema definition:
{
"namespace": "io.confluent.examples.clients.basicavro",
"type": "record",
"name": "Payment",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
Follow the Schema Registry tutorial to create a new topic transactions
. Create a $HOME/.confluent/java.config
with content:
bootstrap.servers=localhost:9092
client.dns.lookup=use_all_dns_ips
session.timeout.ms=45000
acks=all
schema.registry.url=http://localhost:8081
Then use Maven to compile the sample code and produce Avro-encoded message to the local Kafka server with schema registry:
mvn clean compile package
mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.basicavro.ProducerExample \
-Dexec.args="$HOME/.confluent/java.config"
Then create an external steam in Proton:
CREATE EXTERNAL STREAM transactions(
id string,
amount double
)
SETTINGS
type = 'kafka',
brokers = 'localhost:9092',
topic = 'transactions',
data_format = 'Avro',
kafka_schema_registry_url = 'http://localhost:8081';
After running this SQL successfully, you can fetch existing data via
SELECT * FROM transactions WHERE _tp_time>earliest_ts()
Or only fetch the incoming new messages via
SELECT * FROM transactions
Example: Read Avro Encoded Data in Kafka service on Aiven
The schema registry endpoint on Aiven is signed with CA, but you need to provide ssl_ca_cert_file
for the broker.
CREATE EXTERNAL STREAM transactions(
id string,
amount double
)
SETTINGS type='kafka',
brokers='name.a.aivencloud.com:28864',
topic='transactions',
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-256',
username='avnadmin',
password='PASSWORD',
ssl_ca_cert_file='/kafka.cert',
data_format = 'Avro',
kafka_schema_registry_url = 'https://name.a.aivencloud.com:28856',
kafka_schema_registry_credentials = 'avnadmin:PASSWORD'