Encoding Data in Python DataClasses with Avro
Earlier, you saw a simple example of how we could take a regular Python Class with Type Hints, and generate an Avro schema from it on the fly. Schema generation was a bit cumbersome because of limited type information in data fields.
Let's walk through the same example, this time with a Python Dataclass, and understand how we could dynamically generate an Avro schema.
A Simple Python Dataclass
We will redefine the previous PlainUser
class as a dataclass:
from dataclasses import dataclass, asdict, fieldsfrom typing import Optional@dataclassclass PlainUser:first_name: stremail: strlast_name: str = Noneage: int = None
If you carefully observe the order of fields, you will notice that last_name
has been pushed one level down. This is because mandatory fields cannot follow optional fields in python dataclasses.
Initializing an instance of PlainUser
and retrieving the object's data is straightforward:
asdict(user)
Inspecting the Dataclass
We can now extract the type information with the help of fields
method:
fields(PlainUser)
Field
objects enclose valuable type information of fields declared in the DataClass. The two relevant attributes for our purposes are Field.name
and Field.type
- the attribute name and its type.
We will also use Field.default
to detect if the field is optional and has a default value of None
. If it does, we will associate the attribute with a Union
type in the Avro schema.
Dynamic Generation of Avro Schema
We can now write a simple transformation method that gathers type information from the dataclass and generates an Avro JSON schema dynamically. Let's begin by initializing the schema with the common attributes - namespace
, type
, name
, and fields
array:
json_schema = {"namespace": "ambitious.app","type": "record","name": PlainUser.__name__,"fields": [],}
We then use a similar mapping function as before to construct a JSON Schema.
# A naive mapping implementationtype_map = {str: "string",int: "int"}for field in fields(PlainUser):if field.type in [str, int]:if field.default is None:json_schema['fields'].append({"name": field.name, 'type': ['null', type_map[field.type]]})else:json_schema['fields'].append({'name': field.name, 'type': type_map[field.type]})
The generated JSON schema looks similar to the one we hand-constructed in the introduction:
json_schema
Encode/Decode data with Dynamic Schema
Next, we can use the schema just like before to write and read encoded data. To begin, stringify the JSON schema and parse it into Avro format:
import avro.schemaimport jsonjson_schema_str = json.dumps(json_schema)schema = avro.schema.parse(json_schema_str)
Then you can use DataFileWriter
class to encode and write the data into a file and DataFileReader
to decode and read the data later:
from avro.datafile import DataFileReader, DataFileWriterfrom avro.io import DatumReader, DatumWriterwriter_buffer = DataFileWriter(open("users-encoded-from-python-dataclass.avro", "wb"), DatumWriter(), schema)writer_buffer.append(user.__dict__)writer_buffer.close()
reader_buffer = DataFileReader(open("users-encoded-from-python-dataclass.avro", "rb"), DatumReader())for user_record in reader_buffer:print(user_record)reader_buffer.close()
There you go!
The mapping function can be enhanced to handle other datatypes and align better with Avro specs. Still, this walkthrough gives a good idea of how to dynamically generate Avro schemas from Python dataclasses.