Source code for pyschema_extensions.avro

# Copyright (c) 2013 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""
Extension for generating Avro schemas from PySchema Record classes

Usage:

>>> class MyRecord(pyschema.Record):
>>>     foo = Text()
>>>     bar = Integer()
>>>
>>> [pyschema_extensions.avro.]get_schema_string(MyRecord)

'{"fields": [{"type": "string", "name": "foo"},
{"type": "long", "name": "bar"}],
"type": "record", "name": "MyRecord"}'

"""
from pyschema import core
from pyschema.types import Field, Boolean, Integer, Float
from pyschema.types import Bytes, Text, Enum, List, Map, SubRecord
try:
    import simplejson as json
except ImportError:
    import json


Boolean.avro_type_name = "boolean"
Bytes.avro_type_name = "bytes"
Text.avro_type_name = "string"
# "ENUM" is the avro 'type name' of all enums generated by pyschema
# this is pyschema convention, not avro, so it might change if
# need b
Enum.avro_type_name = "ENUM"
List.avro_type_name = "array"
Map.avro_type_name = "map"


@Float.mixin
[docs]class FloatMixin: @property
[docs] def avro_type_name(self): if self.size <= 4: return 'float' return 'double'
@Integer.mixin
[docs]class IntegerMixin: @property
[docs] def avro_type_name(self): if self.size <= 4: return 'int' return 'long'
@Field.mixin
[docs]class FieldMixin:
[docs] def avro_type_schema(self, state): """Full type specification for the field I.e. the same as would go into the "type" field. For most field, only simplified_avro_type_schema has to be implemented. """ simple_type = self.simplified_avro_type_schema(state) if self.nullable: # first value in union needs to be same as default if self.default in (None, core.NO_DEFAULT): return ["null", simple_type] else: return [simple_type, "null"] else: return simple_type
[docs] def simplified_avro_type_schema(self, state): """The basic avro type for this field Not including nullability. """ return self.avro_type_name
[docs] def avro_dump(self, o): if o is None: return None else: # relying on the reference json dump behavior # could be a bit dangerous if self.nullable: return {self.avro_type_name: self.dump(o)} else: return self.dump(o)
[docs] def avro_load(self, o): if o is None: return None else: if self.nullable: return self.load(o[self.avro_type_name]) else: return self.load(o)
[docs] def avro_default_value(self): return self.default
@List.mixin
[docs]class ListMixin:
[docs] def simplified_avro_type_schema(self, state): return { "type": "array", "items": self.field_type.avro_type_schema(state) }
[docs] def avro_dump(self, obj): if obj is None: return None else: l = [self.field_type.avro_dump(o) for o in obj] if self.nullable: return {self.avro_type_name: l} else: return l
[docs] def avro_load(self, obj): if obj is None: return None else: if self.nullable: obj = obj[self.avro_type_name] return [ self.field_type.avro_load(o) for o in obj ] ### `Enum` extensions
@Enum.mixin
[docs]class EnumMixin:
[docs] def simplified_avro_type_schema(self, state): return { "type": "enum", "name": self.avro_type_name, "symbols": list(self.values) }
@SubRecord.mixin
[docs]class SubRecordMixin:
[docs] def simplified_avro_type_schema(self, state): return get_schema_dict(self._schema, state)
@property
[docs] def avro_type_name(self): if hasattr(self._schema, '_avro_namespace_'): return '.'.join([self._schema._avro_namespace_, self._schema._schema_name]) else: return self._schema._schema_name
[docs] def avro_dump(self, obj): if obj is None: return None if self.nullable: return {self.avro_type_name: to_json_compatible(obj)} else: return to_json_compatible(obj)
[docs] def avro_load(self, obj): if obj is None: return None if self.nullable: return from_json_compatible( self._schema, obj[self.avro_type_name] ) else: return from_json_compatible( self._schema, obj )
@Map.mixin
[docs]class MapMixin:
[docs] def simplified_avro_type_schema(self, state): assert isinstance(self.key_type, Text) return { "type": "map", "values": self.value_type.avro_type_schema(state) }
[docs] def avro_dump(self, obj): if obj is None: return None else: m = dict([( # using json loader for key is kind of a hack # since this isn't an actual type in avro (always text) self.key_type.dump(k), self.value_type.avro_dump(v) ) for k, v in obj.iteritems()]) if self.nullable: return {self.avro_type_name: m} else: return m
[docs] def avro_load(self, obj): if obj is None: return None else: if self.nullable: obj = obj[self.avro_type_name] m = dict([( # using json loader for key is kind of a hack # since this isn't an actual type in avro (always text) self.key_type.load(k), self.value_type.avro_load(v) ) for k, v in obj.iteritems()]) return m # Schema generation
[docs]class SchemaGeneratorState(object): def __init__(self): self.declared_records = set()
[docs]def get_schema_dict(record, state=None): state = state or SchemaGeneratorState() if hasattr(record, '_avro_namespace_'): namespace = record._avro_namespace_ record_name = namespace + '.' + record._schema_name else: namespace = None record_name = record._schema_name if record_name in state.declared_records: return record_name state.declared_records.add(record_name) avro_record = { "type": "record", "name": record._schema_name, } if namespace: avro_record["namespace"] = namespace avro_fields = [] for field_name, field_type in record._fields.iteritems(): field_spec = { "name": field_name, "type": field_type.avro_type_schema(state) } if field_type.default is not core.NO_DEFAULT: field_spec["default"] = field_type.avro_default_value() avro_fields.append(field_spec) avro_record["fields"] = avro_fields return avro_record
[docs]def get_schema_string(record): return json.dumps(get_schema_dict(record))
[docs]def dumps(record): return json.dumps(to_json_compatible(record))
[docs]def to_json_compatible(record): dct = {} for name, fieldtype in record._fields.iteritems(): value = getattr(record, name) dct[name] = fieldtype.avro_dump(value) return dct
[docs]def from_json_compatible(schema, dct): "Load from json-encodable" kwargs = {} for key in dct: field_type = schema._fields.get(key) if field_type is None: raise core.ParseError("Unexpected field encountered in line for record %s: %s" % (schema.__name__, key)) kwargs[key] = field_type.avro_load(dct[key]) return schema(**kwargs)
[docs]def loads(s, record_store=None, schema=None): return core.loads(s, record_store, schema, from_json_compatible)