sparktk.frame.ops.to_pandas module
# vim: set encoding=utf-8
# Copyright (c) 2016 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
def to_pandas(self, n=None, offset=0, columns=None):
"""
Brings data into a local pandas dataframe.
Similar to the 'take' function, but puts the data into a pandas dataframe.
Parameters
----------
:param n: (Optional(int)) The number of rows to get from the frame (warning: do not overwhelm the python session
by taking too much)
:param offset: (Optional(int)) The number of rows to skip before copying. Defaults to 0.
:param columns: (Optional(List[str])) Column filter. The list of names to be included. Default is all columns.
:return: (pandas.DataFrame) A new pandas dataframe object containing the taken frame data.
Examples
--------
Consider the following spark-tk frame, where we have columns for name and phone number:
>>> frame.inspect()
[#] name phone
=======================
[0] Fred 555-1234
[1] Susan 555-0202
[2] Thurston 555-4510
[3] Judy 555-2183
>>> frame.schema
[('name', ), ('phone', )]
The frame to_pandas() method is used to get a pandas DataFrame that contains the data from the spark-tk frame. Note
that since no parameters are provided when to_pandas() is called, the default values are used for the number of
rows, the row offset, and the columns.
>>> pandas_frame = frame.to_pandas()
>>> pandas_frame
name phone
0 Fred 555-1234
1 Susan 555-0202
2 Thurston 555-4510
3 Judy 555-2183
"""
try:
import pandas
except:
raise RuntimeError("pandas module not found, unable to download. Install pandas or try the take command.")
from sparktk.frame.ops.take import take_rich
result = take_rich(self, n, offset, columns)
headers, data_types = zip(*result.schema)
frame_data = result.data
from sparktk import dtypes
import datetime
date_time_columns = [i for i, x in enumerate(self.schema) if x[1] in (dtypes.datetime, datetime.datetime)]
has_date_time = len(date_time_columns) > 0
# translate our datetime long to datetime, so that it gets into the pandas df as a datetime column
def long_to_date_time(row):
for i in date_time_columns:
if isinstance(row[i], long):
row[i] = datetime.datetime.fromtimestamp(row[i]//1000).replace(microsecond=row[i]%1000*1000)
return row
if (has_date_time):
frame_data = map(long_to_date_time, frame_data)
# create pandas df
pandas_df = pandas.DataFrame(frame_data, columns=headers)
for i, dtype in enumerate(data_types):
dtype_str = _sparktk_dtype_to_pandas_str(dtype)
try:
pandas_df[[headers[i]]] = pandas_df[[headers[i]]].astype(dtype_str)
except (TypeError, ValueError):
if dtype_str.startswith("int"):
# DataFrame does not handle missing values in int columns. If we get this error, use the 'object' datatype instead.
print "WARNING - Encountered problem casting column %s to %s, possibly due to missing values (i.e. presence of None). Continued by casting column %s as 'object'" % (headers[i], dtype_str, headers[i])
pandas_df[[headers[i]]] = pandas_df[[headers[i]]].astype("object")
else:
raise
return pandas_df
def _sparktk_dtype_to_pandas_str(dtype):
"""maps spark-tk schema types to types understood by pandas, returns string"""
from sparktk import dtypes
if dtype ==dtypes.datetime:
return "datetime64[ns]"
elif dtypes.dtypes.is_primitive_type(dtype):
return dtypes.dtypes.to_string(dtype)
return "object"
Functions
def to_pandas(
self, n=None, offset=0, columns=None)
Brings data into a local pandas dataframe.
Similar to the 'take' function, but puts the data into a pandas dataframe.
Parameters:
n | (Optional(int)): | The number of rows to get from the frame (warning: do not overwhelm the python session by taking too much) |
offset | (Optional(int)): | The number of rows to skip before copying. Defaults to 0. |
columns | (Optional(List[str])): | Column filter. The list of names to be included. Default is all columns. |
Returns | (pandas.DataFrame): | A new pandas dataframe object containing the taken frame data. |
Examples:
Consider the following spark-tk frame, where we have columns for name and phone number:
>>> frame.inspect()
[#] name phone
=======================
[0] Fred 555-1234
[1] Susan 555-0202
[2] Thurston 555-4510
[3] Judy 555-2183
>>> frame.schema
[('name', <type 'str'>), ('phone', <type 'str'>)]
The frame to_pandas() method is used to get a pandas DataFrame that contains the data from the spark-tk frame. Note that since no parameters are provided when to_pandas() is called, the default values are used for the number of rows, the row offset, and the columns.
>>> pandas_frame = frame.to_pandas()
>>> pandas_frame
name phone
0 Fred 555-1234
1 Susan 555-0202
2 Thurston 555-4510
3 Judy 555-2183
def to_pandas(self, n=None, offset=0, columns=None):
"""
Brings data into a local pandas dataframe.
Similar to the 'take' function, but puts the data into a pandas dataframe.
Parameters
----------
:param n: (Optional(int)) The number of rows to get from the frame (warning: do not overwhelm the python session
by taking too much)
:param offset: (Optional(int)) The number of rows to skip before copying. Defaults to 0.
:param columns: (Optional(List[str])) Column filter. The list of names to be included. Default is all columns.
:return: (pandas.DataFrame) A new pandas dataframe object containing the taken frame data.
Examples
--------
Consider the following spark-tk frame, where we have columns for name and phone number:
>>> frame.inspect()
[#] name phone
=======================
[0] Fred 555-1234
[1] Susan 555-0202
[2] Thurston 555-4510
[3] Judy 555-2183
>>> frame.schema
[('name', ), ('phone', )]
The frame to_pandas() method is used to get a pandas DataFrame that contains the data from the spark-tk frame. Note
that since no parameters are provided when to_pandas() is called, the default values are used for the number of
rows, the row offset, and the columns.
>>> pandas_frame = frame.to_pandas()
>>> pandas_frame
name phone
0 Fred 555-1234
1 Susan 555-0202
2 Thurston 555-4510
3 Judy 555-2183
"""
try:
import pandas
except:
raise RuntimeError("pandas module not found, unable to download. Install pandas or try the take command.")
from sparktk.frame.ops.take import take_rich
result = take_rich(self, n, offset, columns)
headers, data_types = zip(*result.schema)
frame_data = result.data
from sparktk import dtypes
import datetime
date_time_columns = [i for i, x in enumerate(self.schema) if x[1] in (dtypes.datetime, datetime.datetime)]
has_date_time = len(date_time_columns) > 0
# translate our datetime long to datetime, so that it gets into the pandas df as a datetime column
def long_to_date_time(row):
for i in date_time_columns:
if isinstance(row[i], long):
row[i] = datetime.datetime.fromtimestamp(row[i]//1000).replace(microsecond=row[i]%1000*1000)
return row
if (has_date_time):
frame_data = map(long_to_date_time, frame_data)
# create pandas df
pandas_df = pandas.DataFrame(frame_data, columns=headers)
for i, dtype in enumerate(data_types):
dtype_str = _sparktk_dtype_to_pandas_str(dtype)
try:
pandas_df[[headers[i]]] = pandas_df[[headers[i]]].astype(dtype_str)
except (TypeError, ValueError):
if dtype_str.startswith("int"):
# DataFrame does not handle missing values in int columns. If we get this error, use the 'object' datatype instead.
print "WARNING - Encountered problem casting column %s to %s, possibly due to missing values (i.e. presence of None). Continued by casting column %s as 'object'" % (headers[i], dtype_str, headers[i])
pandas_df[[headers[i]]] = pandas_df[[headers[i]]].astype("object")
else:
raise
return pandas_df