Source code for pyschema_extensions.avro_to_pyschema

# Copyright (c) 2014 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the 'License'); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

"""
Helper functions for converting an Avro schema definition (json) to
a PySchema python source definition.

TODO: Another idea is to read avro schema and create Python classes
dynamically without generating python source code.
"""

field_map = {
    'string': 'pyschema.Text',
    'float': 'pyschema.Float',
    'double': 'pyschema.Float',
    'int': 'pyschema.Integer',
    'boolean': 'pyschema.Boolean',
    'long': 'pyschema.Integer',
}

extra_args_map = {
    'float': 'size=4',
    'int': 'size=4',
}

complex_field_map = {
    'array': 'pyschema.List',
    'map': 'pyschema.Map',
    'record': 'pyschema.SubRecord',
}


[docs]def get_first_type(field_type): if isinstance(field_type, list): field_type = field_type[0] return field_type
[docs]def get_name(field): if isinstance(field['type'], basestring): return field['name'] if isinstance(field['type'], list): return field['type'][0]['name'] if isinstance(field['type'], dict): return field['type']['name']
[docs]def is_nullable(field_type): if isinstance(field_type, list): return True return False
[docs]def get_field_type_name(field_type): field_type = get_first_type(field_type) if isinstance(field_type, dict): field_type = field_type['type'] return field_type
[docs]def nullable_str(field_type): if not is_nullable(field_type): return 'nullable=False' return ''
[docs]def get_sub_fields_name(sub_type): sub_map = {'record': 'fields', 'array': 'items', 'map': 'values'} return sub_map[sub_type]
[docs]def get_sub_field(field): field_type = get_field_type_name(field['type']) if field_type == 'record': return field['fields'] sub_field = field['type'][get_sub_fields_name(field_type)] if isinstance(sub_field, list): return sub_field[0] return sub_field
[docs]def get_field_definition(field, sub_records): if isinstance(field, basestring): if field in field_map.keys(): return field_map[field] + '()' return field nullable = 'nullable=False' if is_nullable(field['type']): nullable = '' field_type = get_field_type_name(field['type']) # simple types if field_type in field_map.keys(): args = [ arg for arg in [nullable, extra_args_map.get(field_type, '')] if arg ] return "%s(%s)" % (field_map[field_type], ', '.join(args)) # complex types elif field_type == 'record': name = get_name(field) sub_rec = get_pyschema_record(field, sub_records) sub_records.append(sub_rec) return "%s(%s, %s)" % (complex_field_map[field_type], name, nullable) elif field_type in complex_field_map.keys(): sub_field = get_sub_field(field) sub_definition = get_field_definition(sub_field, sub_records) return "%s(%s, %s)" % ( complex_field_map[field_type], sub_definition, nullable)
[docs]def get_pyschema_record(schema, sub_records): name = get_name(schema) record_def = "class %s(pyschema.Record):\n" % name if is_nullable(schema['type']): fields = schema['type'][0]['fields'] else: fields = schema['fields'] for field in fields: name = field['name'] record_def += " %s = %s\n" % ( name, get_field_definition(field, sub_records)) return record_def