Encoding Data in Python DataClasses with Avro


Last Updated on Sep 28, 2022

Earlier, you saw a simple example of how we could take a regular Python Class with Type Hints, and generate an Avro schema from it on the fly. Schema generation was a bit cumbersome because of limited type information in data fields.

Let's walk through the same example, this time with a Python Dataclass, and understand how we could dynamically generate an Avro schema.

A Simple Python Dataclass

We will redefine the previous PlainUser class as a dataclass:

from dataclasses import dataclass, asdict, fields
from typing import Optional
@dataclass
class PlainUser:
first_name: str
email: str
last_name: str = None
age: int = None

If you carefully observe the order of fields, you will notice that last_name has been pushed one level down. This is because mandatory fields cannot follow optional fields in python dataclasses.

Initializing an instance of PlainUser and retrieving the object's data is straightforward:

user = PlainUser(first_name="John", last_name="Doe", email="[email protected]")
asdict(user)
{'first_name': 'John',
'email': '[email protected]',
'last_name': 'Doe',
'age': None}

Inspecting the Dataclass

We can now extract the type information with the help of fields method:

fields(PlainUser)
(Field(name='first_name',type=<class 'str'>,default=<dataclasses._MISSING_TYPE object at 0x1094bd700>,default_factory=<dataclasses._MISSING_TYPE object at 0x1094bd700>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD),
Field(name='email',type=<class 'str'>,default=<dataclasses._MISSING_TYPE object at 0x1094bd700>,default_factory=<dataclasses._MISSING_TYPE object at 0x1094bd700>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD),
Field(name='last_name',type=<class 'str'>,default=None,default_factory=<dataclasses._MISSING_TYPE object at 0x1094bd700>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD),
Field(name='age',type=<class 'int'>,default=None,default_factory=<dataclasses._MISSING_TYPE object at 0x1094bd700>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD))

Field objects enclose valuable type information of fields declared in the DataClass. The two relevant attributes for our purposes are Field.name and Field.type - the attribute name and its type.

We will also use Field.default to detect if the field is optional and has a default value of None. If it does, we will associate the attribute with a Union type in the Avro schema.

Dynamic Generation of Avro Schema

We can now write a simple transformation method that gathers type information from the dataclass and generates an Avro JSON schema dynamically. Let's begin by initializing the schema with the common attributes - namespace, type, name, and fields array:

json_schema = {
"namespace": "ambitious.app",
"type": "record",
"name": PlainUser.__name__,
"fields": [],
}

We then use a similar mapping function as before to construct a JSON Schema.

# A naive mapping implementation
type_map = {
str: "string",
int: "int"
}
for field in fields(PlainUser):
if field.type in [str, int]:
if field.default is None:
json_schema['fields'].append({"name": field.name, 'type': ['null', type_map[field.type]]})
else:
json_schema['fields'].append({'name': field.name, 'type': type_map[field.type]})

The generated JSON schema looks similar to the one we hand-constructed in the introduction:

json_schema
{'namespace': 'ambitious.app',
'type': 'record',
'name': 'PlainUser',
'fields': [{'name': 'first_name', 'type': 'string'},
{'name': 'email', 'type': 'string'},
{'name': 'last_name', 'type': ['null', 'string']},
{'name': 'age', 'type': ['null', 'int']}]}

Encode/Decode data with Dynamic Schema

Next, we can use the schema just like before to write and read encoded data. To begin, stringify the JSON schema and parse it into Avro format:

import avro.schema
import json
json_schema_str = json.dumps(json_schema)
schema = avro.schema.parse(json_schema_str)

Then you can use DataFileWriter class to encode and write the data into a file and DataFileReader to decode and read the data later:

from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
writer_buffer = DataFileWriter(open("users-encoded-from-python-dataclass.avro", "wb"), DatumWriter(), schema)
writer_buffer.append(user.__dict__)
writer_buffer.close()
reader_buffer = DataFileReader(open("users-encoded-from-python-dataclass.avro", "rb"), DatumReader())
for user_record in reader_buffer:
print(user_record)
reader_buffer.close()
{'first_name': 'John', 'email': '[email protected]', 'last_name': 'Doe', 'age': None}

There you go!

The mapping function can be enhanced to handle other datatypes and align better with Avro specs. Still, this walkthrough gives a good idea of how to dynamically generate Avro schemas from Python dataclasses.


© 2022 Ambitious Systems. All Rights Reserved.