# Copyright (c) 2013 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
""" Schema definition toolkit using Python classes
Usage example:
>>> class Foo(Record):
... bin = Bytes()
...
... class MyRecord(Record):
... a_string = Text()
... a_float = Float()
... record = List(SubRecord(Foo))
...
... rec = MyRecord(a_string="hej")
... rec.record = [Foo(bin="bar")]
...
... s = dumps(rec)
... print loads(s)
Internals:
A valid PySchema class contains the following class variables:
`_fields`
An OrderedDict of `field_name` => `field_type`
where `field_type` is an instance of a Field subclass
`_schema_name`
The qualifying name for this schema. This is used for registering a record
in a `SchemaStore` and for auto-identification of serialized records.
Should be unique within a specific SchemaStore, so if auto registering is
used it should be unique within the execution chain of the current program.
"""
from __future__ import absolute_import
from abc import ABCMeta, abstractmethod
from itertools import izip
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
import warnings
import types
try:
import simplejson as json
except ImportError:
import json
SCHEMA_FIELD_NAME = "$schema"
[docs]def set_schema_name_field(name):
global SCHEMA_FIELD_NAME
SCHEMA_FIELD_NAME = name
[docs]class ParseError(Exception):
""" Generic exception type for Record parse errors """
pass
[docs]class SchemaStore(object):
def __init__(self):
self._schema_map = {}
def __str__(self):
return str(self._schema_map.keys())
[docs] def add_record(self, schema, _bump_stack_level=False):
""" Add record class to record store for retrieval at record load time.
Can be used as a class decorator
"""
existing = self._schema_map.get(schema.__name__, None)
if existing:
warnings.warn(
"{new_module}.{class_name} replaces record from {prev_module}"
.format(class_name=schema.__name__,
prev_module=existing.__module__,
new_module=schema.__module__),
stacklevel=3 if _bump_stack_level else 2)
self._schema_map[schema.__name__] = schema
return schema
[docs] def remove_record(self, schema):
del self._schema_map[schema.__name__]
[docs] def get(self, record_name):
return self._schema_map[record_name]
[docs] def clear(self):
self._schema_map.clear()
[docs] def clone(self):
r = SchemaStore()
r._schema_map = self._schema_map.copy()
return r
def __contains__(self, schema):
return schema in self._schema_map.values()
# NO_DEFAULT is a special value to signify that a field has no default value
# and should fail to serialize unless a value has been assigned
# it's the default default-value for all non-nullable fields
NO_DEFAULT = object()
_UNTOUCHED = object()
[docs]class Field(object):
__metaclass__ = ABCMeta
_next_index = 0
def __init__(self, description=None, nullable=True, default=_UNTOUCHED):
self.description = description
self._index = Field._next_index
self.nullable = nullable
if default is _UNTOUCHED:
# if default isn't explicitly set
# use None for Nullables, and NO_DEFAULT for others
if nullable:
default = None
else:
default = NO_DEFAULT
self.default = default
Field._next_index += 1 # used for arg order in initialization
def __repr__(self):
return self.__class__.__name__
[docs] def set_parent(self, schema):
# no-op by default but can be overridden by types
# that need parent references
pass
@abstractmethod
[docs] def dump(self, obj):
pass
@abstractmethod
[docs] def load(self, obj):
pass
@classmethod
[docs] def mixin(cls, mixin_cls):
"""Decorator for mixing in additional functionality into field type
Example:
>>> @Integer.mixin
... class IntegerPostgresExtensions:
... postgres_type = 'INT'
...
... def postgres_dump(self, obj):
... self.dump(obj) + "::integer"
Is roughly equivalent to:
>>> Integer.postgres_type = 'INT'
...
... def postgres_dump(self, obj):
... self.dump(obj) + "::integer"
...
... Integer.postgres_dump = postgres_dump
"""
for item_name in dir(mixin_cls):
if item_name.startswith("__"):
# don't copy magic properties
continue
item = getattr(mixin_cls, item_name)
if isinstance(item, types.MethodType):
# unbound method will cause problems
# so get the underlying function instead
item = item.im_func
setattr(cls, item_name, item)
return mixin_cls
[docs] def default_value(self):
return self.default
auto_store = SchemaStore()
[docs]class PySchema(ABCMeta):
"""Metaclass for Records
Builds schema on Record declaration and remembers Record types
for easy generic parsing
"""
auto_register = True
def __new__(metacls, name, bases, dct):
schema_attrs = metacls._get_schema_attributes(
name=name,
bases=bases,
dct=dct
)
dct.update(schema_attrs)
cls = ABCMeta.__new__(metacls, name, bases, dct)
# allow self-references etc.
for field_name, field in cls._fields.iteritems():
field.set_parent(cls)
if metacls.auto_register:
auto_store.add_record(cls, _bump_stack_level=True)
return cls
@classmethod
def _field_dupe_warning(metacls, name, fields):
warnings.warn(
"{schema}: Duplicate field definition for field{plural} {field}"
.format(
schema=name,
field=fields,
plural="s" if len(fields) > 1 else ""
),
stacklevel=4
)
@classmethod
def _get_schema_attributes(metacls, name, bases, dct):
fields = OrderedDict()
for b in bases:
if not isinstance(b, metacls):
continue
field_intersection = set(fields) & set(b._fields)
if field_intersection:
metacls._field_dupe_warning(name, field_intersection)
fields.update(b._fields)
new_fields = []
for field_name, field_def in dct.iteritems():
if isinstance(field_def, Field):
new_fields.append((field_name, field_def))
new_fields.sort(key=lambda fd: fd[1]._index)
for field_name, field_def in new_fields:
if field_name in fields:
metacls._field_dupe_warning(name, (field_name,))
fields[field_name] = field_def
return {
"_fields": fields,
"_schema_name": name,
}
@classmethod
[docs] def from_class(metacls, cls, auto_store=True):
"""Create proper PySchema class from cls
Any methods and attributes will be transferred to the
new object
"""
if auto_store:
def wrap(cls):
return cls
else:
wrap = no_auto_store()
return wrap(metacls.__new__(
metacls,
cls.__name__,
(Record,),
dict(cls.__dict__)
))
[docs]def disable_auto_register():
PySchema.auto_register = False
[docs]def enable_auto_register():
PySchema.auto_register = True
[docs]def no_auto_store():
""" Temporarily disable automatic registration of records in the auto_store
Decorator factory. This is _NOT_ thread safe
>>> @no_auto_store()
... class BarRecord(Record):
... pass
>>> BarRecord in auto_store
False
"""
original_auto_register_value = PySchema.auto_register
disable_auto_register()
def decorator(cls):
PySchema.auto_register = original_auto_register_value
return cls
return decorator
@no_auto_store()
[docs]class Record(object):
"""Abstract base class for structured logging records"""
__metaclass__ = PySchema
def __init__(self, *args, **kwargs):
if args:
# The idea behind only allowing keyword arguments
# is to prevent accidental misuse of a changed schema
raise TypeError('Non-keyword arguments not allowed'
' when initializing Records')
for k, field_type in self._fields.iteritems():
object.__setattr__(self, k, field_type.default_value())
for k, v in kwargs.iteritems():
setattr(self, k, v)
def __setattr__(self, name, value):
if name not in self._fields:
raise AttributeError(
"No field %r in %s"
% (name, self._schema_name)
)
super(Record, self).__setattr__(name, value)
def __unicode__(self):
return str(self).decode('ascii')
def __str__(self):
return repr(self)
def __repr__(self):
strings = ('%s=%r' % (fname, getattr(self, fname))
for fname, f in self._fields.iteritems())
return self._schema_name + '(' + ', '.join(strings) + ')'
def __cmp__(self, other):
if not isinstance(other, Record):
# return default implementation cmp value
return cmp(id(self), other)
if self._schema_name != other._schema_name:
return cmp(self._schema_name, other._schema_name)
fields = self._fields.keys()
a = (getattr(self, key) for key in fields)
b = (getattr(other, key) for key in fields)
for _a, _b in izip(a, b):
r = cmp(_a, _b)
if r:
return r
return 0
def __eq__(self, other):
return self.__cmp__(other) == 0
def __ne__(self, other):
return self.__cmp__(other) != 0
[docs]def to_json_compatible(record):
"Dump record in json-encodable object format"
d = {}
for fname, f in record._fields.iteritems():
val = getattr(record, fname)
if val is not None:
d[fname] = f.dump(val)
return d
[docs]def from_json_compatible(schema, dct):
"Load from json-encodable"
kwargs = {}
for key in dct:
field_type = schema._fields.get(key)
if field_type is None:
raise ParseError("Unexpected field encountered in line for record %s: %s" % (schema.__name__, key))
kwargs[key] = field_type.load(dct[key])
return schema(**kwargs)
[docs]def ispyschema(schema):
""" Is object PySchema instance?
Returns true for PySchema Record *classes*
i.e. NOT when schema is a *Record* instance
>>> class FooRecord(Record):
... pass
>>> ispyschema(FooRecord)
True
>>> ispyschema(FooRecord())
False
"""
return isinstance(schema, PySchema)
[docs]def load_json_dct(
dct,
record_store=None,
schema=None,
loader=from_json_compatible
):
""" Create a Record instance from a json-compatible dictionary
The dictionary values should have types that are json compatible,
as if just loaded from a json serialized record string.
:param dct:
Python dictionary with key/value pairs for the record
:param record_store:
Record store to use for schema lookups (when $schema field is present)
:param schema:
PySchema Record class for the record to load.
This will override any $schema fields specified in `dct`
"""
if schema is None:
if record_store is None:
record_store = auto_store
try:
schema_name = dct.pop(SCHEMA_FIELD_NAME)
except KeyError:
raise ParseError((
"Serialized record missing '{0}' "
"record identifier and no schema supplied")
.format(SCHEMA_FIELD_NAME)
)
try:
schema = record_store.get(schema_name)
except KeyError:
raise ParseError(
"Can't recognize record type %r"
% (schema_name,), schema_name)
# if schema is explicit, use that instead of SCHEMA_FIELD_NAME
elif SCHEMA_FIELD_NAME in dct:
dct.pop(SCHEMA_FIELD_NAME)
record = loader(schema, dct)
return record
[docs]def loads(
s,
record_store=None,
schema=None,
loader=from_json_compatible
):
""" Create a Record instance from a json serialized dictionary
:param s:
String with a json-serialized dictionary
:param record_store:
Record store to use for schema lookups (when $schema field is present)
:param schema:
PySchema Record class for the record to load.
This will override any $schema fields specified in `s`
"""
if not isinstance(s, unicode):
s = s.decode('utf8')
if s.startswith(u"{"):
json_dct = json.loads(s)
return load_json_dct(json_dct, record_store, schema, loader)
else:
raise ParseError("Not a json record")
[docs]def dumps(obj, attach_schema_name=True):
json_dct = to_json_compatible(obj)
if attach_schema_name:
json_dct[SCHEMA_FIELD_NAME] = obj._schema_name
json_string = json.dumps(json_dct)
return json_string