Up

sparktk.dtypes module

definitions for Data Types

# vim: set encoding=utf-8

#  Copyright (c) 2016 Intel Corporation 
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

"""
definitions for Data Types
"""

# TODO - consider server providing types, similar to commands

__all__ = ['dtypes', 'ignore', 'unknown', 'float32', 'float64', 'int32', 'int64', 'vector', 'unit', 'datetime', 'matrix']

import numpy as np
import json
import re
from pyspark.sql import types
import logging
logger = logging.getLogger('sparktk')

# alias numpy types
# todo: bring back the numpy types
# this is temporary until the serialization gets completely figured out...
# float32 = np.float32
# float64 = np.float64
# int32 = np.int32
# int64 = np.int64
float32 = float
float64 = float
int32 = int
int64 = long

from datetime import datetime
import dateutil.parser as datetime_parser
import pytz
# Chose python's datetime over numpy.datetime64 because of time zone support and string serialization
# Here's a long thread discussing numpy's datetime64 timezone problem:
#   http://mail.scipy.org/pipermail/numpy-discussion/2013-April/066038.html
# If need be, UDFs can create numpy objects from x using: numpy.datatime64(x.isoformat())


class _Matrix(object):
    base_type = np.ndarray
    re_pattern = "matrix"

    def __init__(self):
        self.constructor = self._get_constructor()

    def _get_constructor(self):

        def constructor(value):
            """
            Creates a numpy ndarray from a value, which can be one of many types
            """
            if value is None:
                return None
            else:
                return np.array(value, dtype=np.float64)
        return constructor

    @property
    def is_complex_type(self):
        return True

    @staticmethod
    def get_from_string(data_type_str):
        if _Matrix.re_pattern != data_type_str:
            raise "Invalid data type"
        return _Matrix()

    def __repr__(self):
        return "matrix"

matrix = _Matrix()

class _Vector(object):

    base_type = np.ndarray
    re_pattern = re.compile(r"^vector\((\d+)\)$")

    def __init__(self, length):
        self.length = int(length)
        self.constructor = self._get_constructor()

    def _get_constructor(self):
        length = self.length

        def constructor(value):
            """
            Creates a numpy array from a value, which can be one of many types
            """
            if value is None:
                return None
            try:
                # first try numpy's constructor
                array = np.array(value, dtype=np.float64)  # ensures the array is entirely made of doubles
            except:
                # also support json or comma-sep string
                if dtypes.value_is_string(value):
                    try:
                        value = json.loads(value)
                    except:
                        value = [np.float64(item.strip()) for item in value.split(',') if item]
                    array = np.array(value, dtype=np.float64)  # ensures the array is entirely made of doubles
                else:
                    raise

            array = np.atleast_1d(array)  # numpy thing, so that vectors of size 1 will still have dimension and length
            if len(array) != length:
                raise ValueError("Could not construct vector in Python Client.  Expected vector of length %s, but received length %d" % (length, len(array)))
            return array
        return constructor

    @property
    def is_complex_type(self):
        return True

    @staticmethod
    def get_from_string(data_type_str):
        return _Vector(_Vector.re_pattern.match(data_type_str).group(1))

    def __repr__(self):
        return "vector(%d)" % self.length

vector = _Vector


class _Unit(object):
    """Ignore type used for schemas during file import"""
    pass

unit = _Unit


class _Ignore(object):
    """Ignore type used for schemas during file import"""
    pass

ignore = _Ignore


class _Unknown(object):
    """Unknown type used when type is indeterminate"""
    pass

unknown = _Unknown


# map types to their string identifier
_primitive_type_to_str_table = {
    #bool: "bool", TODO
    #bytearray: "bytearray", TODO
    #dict: "dict", TODO
    float32: "float32",
    float64: "float64",
    int32: "int32",
    int64: "int64",
    #list: "list", TODO
    unicode: "unicode",
    ignore: "ignore",
    datetime: "datetime",
}

# map the pyspark sql type to the primitive data type
_pyspark_type_to_primitive_type_table = {
    #types.BooleanType : bool,
    types.LongType : int,
    types.IntegerType : int,
    types.DoubleType : float,
    types.DecimalType : float,
    types.StringType : str,
    types.TimestampType : datetime
}

# map data type to pyspark sql type
_data_type_to_pyspark_type_table = {
    int: types.IntegerType(),
    long: types.LongType(),
    float: types.DoubleType(),
    str: types.StringType(),
    unicode: types.StringType(),
    datetime: types.TimestampType()
}

# build reverse map string -> type
_primitive_str_to_type_table = dict([(s, t) for t, s in _primitive_type_to_str_table.iteritems()])

_primitive_alias_type_to_type_table = {
    float: float64,
    int: int32,
    long: int64,
    str: unicode,
    list: vector,
    np.ndarray: matrix,
}

_primitive_alias_str_to_type_table = dict([(alias.__name__, t) for alias, t in _primitive_alias_type_to_type_table.iteritems()])
_primitive_alias_str_to_type_table["string"] = unicode


def datetime_to_ms(date_time):
    """
    Returns the number of milliseconds since epoch (1970-01-01).
    :param date_time: (datetime) Date/time to convert to timestamp
    :return: Timestamp (number of ms since epoch)
    """
    if isinstance(date_time, datetime):
        ms = long(date_time.strftime("%s")) * 1000.0
        ms += date_time.microsecond // 1000
        return long(ms)
    else:
        raise TypeError("Unable to calculate the number of milliseconds since epoch for type: %s" % type(date_time))

def ms_to_datetime_str(ms):
    """
    Returns the date/time string for the specified timestamp (milliseconds since epoch).
    :param ms: Milliseconds since epoch (int or long)
    :return: Date/time string
    """
    if isinstance(ms, long) or isinstance(ms, int):
        return datetime.fromtimestamp(ms/1000.0, tz=pytz.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
    else:
        raise TypeError("Unable to convert timestamp milliseconds to a date/time string, because the value provided " +
                        "is not a long/int.  Unsupported type: %" % type(ms))

def datetime_constructor(value):
    """Creates special constructor for datetime parsing.  Returns the number of ms since epoch."""
    if dtypes.value_is_string(value):
        return datetime_to_ms(datetime_parser.parse(value))
    elif isinstance(value, long) or isinstance(value, int):
        return value
    else:
        try:
            return datetime_to_ms(datetime(*value))
        except:
            raise TypeError("cannot convert type to the datetime")


def numpy_to_bson_friendly(obj):
    """take an object and convert it to a type that can be serialized to bson if neccessary."""
    if isinstance(obj, float32) or isinstance(obj, float64):
        return float(obj)
    if isinstance(obj, int32):
        return int(obj)
    if isinstance(obj, vector.base_type):
        return obj.tolist()
    if isinstance(obj, datetime):
        return obj.isoformat()
    if isinstance(obj, dict):
        return dict([(numpy_to_bson_friendly(key), numpy_to_bson_friendly(value)) for key, value in obj.items()])
    # Let the base class default method raise the TypeError
    return obj


class _DataTypes(object):
    """
    Provides functions with define and operate on supported data types.
    """

    def __contains__(self, item):
        try:
            self.validate(item)
            return True
        except ValueError:
            return False

    def __repr__(self):
        aliases = "\n(and aliases: %s)" % (", ".join(sorted(["%s->%s" % (alias.__name__, self.to_string(data_type)) for alias, data_type in _primitive_alias_type_to_type_table.iteritems()])))
        return ", ".join(sorted(_primitive_str_to_type_table.keys() + ["vector(n)"]+["matrix"])) + aliases

    @staticmethod
    def value_is_string(value):
        """get bool indication that value is a string, whether str or unicode"""
        return isinstance(value, basestring)

    @staticmethod
    def value_is_missing_value(value):
        return value is None

    @staticmethod
    def get_primitive_data_types():
        return _primitive_type_to_str_table.keys()

    @staticmethod
    def to_string(data_type):
        """
        Returns the string representation of the given type

        Parameters
        ----------
        data_type : type
            valid data type; if invalid, a ValueError is raised

        Returns
        -------
        result : str
            string representation

        Examples
        --------
        >>> dtypes.to_string(float32)
        'float32'
        """
        valid_data_type = _DataTypes.get_from_type(data_type)
        try:
            return _primitive_type_to_str_table[valid_data_type]
        except KeyError:
            # complex data types should use their repr
            return repr(valid_data_type)

    @staticmethod
    def get_from_string(data_type_str):
        """
        Returns the data type for the given type string representation

        Parameters
        ----------
        data_type_str : str
            valid data type str; if invalid, a ValueError is raised

        Returns
        -------
        result : type
            type represented by the string

        Examples
        --------
        >>> dtypes.get_from_string('unicode')
        unicode
        """
        try:
            return _primitive_str_to_type_table[data_type_str]
        except KeyError:
            try:
                return _primitive_alias_str_to_type_table[data_type_str]
            except KeyError:
                try:
                    if data_type_str == 'matrix':
                        return matrix.get_from_string(data_type_str)
                    else:
                        return vector.get_from_string(data_type_str)
                except:
                    raise ValueError("Unsupported type string '%s' " % data_type_str)

    @staticmethod
    def is_primitive_type(data_type):
        return data_type in _primitive_type_to_str_table or data_type in _primitive_alias_type_to_type_table

    @staticmethod
    def is_complex_type(data_type):
        try:
            return data_type.is_complex_type
        except AttributeError:
            return False

    @staticmethod
    def is_primitive_alias_type(data_type):
        return data_type in _primitive_alias_type_to_type_table

    @staticmethod
    def is_int(data_type):
        return data_type in [int, int32, int64]

    @staticmethod
    def is_float(data_type):
        return data_type in [float, float32, float64]

    @staticmethod
    def get_from_type(data_type):
        """
        Returns the data type for the given type (often it will return the same type)

        Parameters
        ----------
        data_type : type
            valid data type or type that may be aliased for a valid data type;
            if invalid, a ValueError is raised

        Returns
        -------
        result : type
            valid data type for given type

        Examples
        --------
        >>> dtypes.get_from_type(int)
        numpy.int32
        """
        if _DataTypes.is_primitive_alias_type(data_type):
            return _primitive_alias_type_to_type_table[data_type]
        if _DataTypes.is_primitive_type(data_type) or _DataTypes.is_complex_type(data_type):
            return data_type
        raise ValueError("Unsupported type %s" % data_type)

    @staticmethod
    def validate(data_type):
        """Raises a ValueError if data_type is not a valid data_type"""
        _DataTypes.get_from_type(data_type)

    @staticmethod
    def get_constructor(to_type):
        """gets the constructor for the to_type"""
        try:
            return to_type.constructor
        except AttributeError:
            if to_type == datetime:
                return datetime_constructor

            def constructor(value):
                if value is None:
                    return None
                try:
                    return to_type(value)
                except Exception as e:
                    print "ERROR for value %s.  %s" % (value, e)
                    raise
            return constructor

    @staticmethod
    def cast(value, to_type):
        """
        Returns the given value cast to the given type.  None is always returned as None

        Parameters
        ----------
        value : object
            value to convert by casting

        to_type : type
            valid data type to use for the cast

        Returns
        -------
        results : object
            the value cast to the to_type

        Examples
        --------
        >>> dtypes.cast(3, float64)
        3.0
        >>> dtypes.cast(4.5, str)
        '4.5'
        >>> dtypes.cast(None, str)
        None
        >>> dtypes.cast(np.inf, float32)
        None
        """
        if _DataTypes.value_is_missing_value(value):  # Special handling for missing values
            return None
        elif _DataTypes.is_primitive_type(to_type) and type(value) is to_type:  # Optimization
            return value
        try:
            constructor = _DataTypes.get_constructor(to_type)
            result = constructor(value)
            return None if _DataTypes.value_is_missing_value(result) else result
        except Exception as e:
            raise ValueError(("Unable to cast to type %s\n" % to_type) + str(e))

    @staticmethod
    def datetime_from_iso(iso_string):
        """create datetime object from ISO 8601 string"""
        return datetime_parser.parse(iso_string)

    @staticmethod
    def get_primitive_type_from_pyspark_type(pyspark_type):
        """
        Get the primitive type for the specified pyspark sql data type.
        
        :param pyspark_type: pyspark.sql.types data type
        :return: Primitive data type
        """
        if _pyspark_type_to_primitive_type_table.has_key(pyspark_type):
            return _pyspark_type_to_primitive_type_table[pyspark_type]
        else:
            raise ValueError("Unable to cast pyspark type %s to primitive type." % str(pyspark_type))

    @staticmethod
    def merge_types (type_a, type_b):
        """
        Returns the lowest common denominator data type for the specified types.
        :param type_a: Data type a to compare
        :param type_b: Data type b t compare
        :return: Merged data type
        """
        merged = unicode
        numeric_types = [float, long, int, bool]       # numeric types in rank order
        if type_a == type_b:
            merged = type_a
        elif type_a == unicode or type_b == unicode:
            merged = unicode
        elif type_a == str or type_b == str:
            merged = str
        elif type_a in numeric_types and type_b in numeric_types:
            if numeric_types.index(type_a) > numeric_types.index(type_b):
                merged = type_b
            else:
                merged = type_a
        elif isinstance(type_a, vector) and isinstance(type_b, vector):
            if type_a.length != type_b.length:
                raise ValueError("Vectors must all be the same length (found vectors with length %s and %s)." % (type_a.length, type_b.length))
            merged = type_a
        return merged


dtypes = _DataTypes()
#valid_data_types = dtypes  # consider for backwards comp

Module variables

var dtypes

var matrix

Classes

class datetime

datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]])

The year, month and day arguments are required. tzinfo may be None, or an instance of a tzinfo subclass. The remaining arguments may be ints or longs.

Ancestors (in MRO)

  • datetime
  • datetime.date
  • __builtin__.object

Class variables

var day

var hour

var max

var microsecond

var min

var minute

var month

var resolution

var second

var tzinfo

var year

class float32

float(x) -> floating point number

Convert a string or number to a floating point number, if possible.

Ancestors (in MRO)

Class variables

var imag

var real

class float64

float(x) -> floating point number

Convert a string or number to a floating point number, if possible.

Ancestors (in MRO)

Class variables

var imag

Inheritance: float32.imag

var real

Inheritance: float32.real

class ignore

Ignore type used for schemas during file import

class _Ignore(object):
    """Ignore type used for schemas during file import"""
    pass

Ancestors (in MRO)

class int32

int(x=0) -> int or long int(x, base=10) -> int or long

Convert a number or string to an integer, or return 0 if no arguments are given. If x is floating point, the conversion truncates towards zero. If x is outside the integer range, the function returns a long instead.

If x is not a number or if base is given, then x must be a string or Unicode object representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

int('0b100', base=0) 4

Ancestors (in MRO)

  • int32
  • __builtin__.object

Class variables

var denominator

var imag

var numerator

var real

class int64

long(x=0) -> long long(x, base=10) -> long

Convert a number or string to a long integer, or return 0L if no arguments are given. If x is floating point, the conversion truncates towards zero.

If x is not a number or if base is given, then x must be a string or Unicode object representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

int('0b100', base=0) 4L

Ancestors (in MRO)

  • int64
  • __builtin__.object

Class variables

var denominator

var imag

var numerator

var real

class unit

Ignore type used for schemas during file import

class _Unit(object):
    """Ignore type used for schemas during file import"""
    pass

Ancestors (in MRO)

  • unit
  • __builtin__.object

class unknown

Unknown type used when type is indeterminate

class _Unknown(object):
    """Unknown type used when type is indeterminate"""
    pass

Ancestors (in MRO)

class vector

class _Vector(object):

    base_type = np.ndarray
    re_pattern = re.compile(r"^vector\((\d+)\)$")

    def __init__(self, length):
        self.length = int(length)
        self.constructor = self._get_constructor()

    def _get_constructor(self):
        length = self.length

        def constructor(value):
            """
            Creates a numpy array from a value, which can be one of many types
            """
            if value is None:
                return None
            try:
                # first try numpy's constructor
                array = np.array(value, dtype=np.float64)  # ensures the array is entirely made of doubles
            except:
                # also support json or comma-sep string
                if dtypes.value_is_string(value):
                    try:
                        value = json.loads(value)
                    except:
                        value = [np.float64(item.strip()) for item in value.split(',') if item]
                    array = np.array(value, dtype=np.float64)  # ensures the array is entirely made of doubles
                else:
                    raise

            array = np.atleast_1d(array)  # numpy thing, so that vectors of size 1 will still have dimension and length
            if len(array) != length:
                raise ValueError("Could not construct vector in Python Client.  Expected vector of length %s, but received length %d" % (length, len(array)))
            return array
        return constructor

    @property
    def is_complex_type(self):
        return True

    @staticmethod
    def get_from_string(data_type_str):
        return _Vector(_Vector.re_pattern.match(data_type_str).group(1))

    def __repr__(self):
        return "vector(%d)" % self.length

Ancestors (in MRO)

Class variables

var base_type

var re_pattern

Static methods

def get_from_string(

data_type_str)

@staticmethod
def get_from_string(data_type_str):
    return _Vector(_Vector.re_pattern.match(data_type_str).group(1))

Instance variables

var constructor

var is_complex_type

var length

Methods

def __init__(

self, length)

def __init__(self, length):
    self.length = int(length)
    self.constructor = self._get_constructor()