Up

sparktk.sparkconf module

Sets up Spark Context

# vim: set encoding=utf-8

#  Copyright (c) 2016 Intel Corporation 
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

"""Sets up Spark Context"""

import os
import shutil
import atexit
import glob2
from pyspark import SparkContext, SparkConf
from zip import zip_sparktk

LIB_DIR="dependencies"
SPARK_ASSEMBLY_SEARCH="**/spark-assembly*.jar"
CORE_TARGET="sparktk-core/target"
import logging
logger = logging.getLogger('sparktk')

def get_source_code_target_dir():
    """gets the core/target folder as if this is running from source code"""
    d = os.path.dirname
    root = os.path.join(d(d(d(os.path.abspath(__file__)))))
    target = os.path.join(root, CORE_TARGET)
    return target


# default values -- DO NOT CHANGE freely, instead change the environ variables
default_spark_home = '/opt/cloudera/parcels/CDH/lib/spark'
default_sparktk_home = get_source_code_target_dir()
default_spark_master = 'local[4]'


def set_env(name, value):
    """helper to set env w/ log"""
    logger.info("sparktk.sparkconf making $%s=%s" % (name, value))
    os.environ[name] = value


def get_jars_and_classpaths(dirs):
    """
    Helper which creates a tuple of two strings for the given dirs:

    1. jars string - a comma-separated list of all the .jar files in the given directories
    2. classpath string - a colon-separate list of all the given directories with a /* wildcard added

    :param dirs: a str or list of str specifying the directors to use for building the jar strings
    :return: (jars, classpath)
    """
    classpath = ':'.join(["%s/*" % d for d in dirs])

    # list of tuples with the directory and jar file
    dir_jar = [(d, f) for d in dirs for f in os.listdir(d) if f.endswith('.jar')]

    # Get jar file list without any duplicate jars (use the one from the first directory it's found in).  If
    # we don't remove duplicates, we get warnings about the jar already having been registered.
    distinct_jars = set()
    jar_files = []
    for dir, jar in dir_jar:
        if jar not in distinct_jars:
            jar_files.append(os.path.join(dir, jar))
            distinct_jars.add(jar)

    jars = ','.join(jar_files)
    return jars, classpath

def get_spark_dirs():
    try:
        spark_home = os.environ['SPARK_HOME']
    except KeyError:
        raise RuntimeError("Missing value for environment variable SPARK_HOME.")

    spark_assembly_search = glob2.glob(os.path.join(spark_home,SPARK_ASSEMBLY_SEARCH))
    if len(spark_assembly_search) > 0:
        spark_assembly = os.path.dirname(spark_assembly_search[0])
    else:
        raise RuntimeError("Couldn't find spark assembly jar")

    return [spark_assembly]


def get_sparktk_dirs():
    """returns the folders which contain all the jars required to run sparktk"""
    # todo: revisit when packaging is resolved, right now this assumes source code/build folder structure

    try:
        sparktk_home = os.environ['SPARKTK_HOME']
    except KeyError:
        raise RuntimeError("Missing value for SPARKTK_HOME.  Try setting $SPARKTK_HOME or the kwarg 'sparktk_home'")

    dirs = [sparktk_home,
            os.path.join(sparktk_home, LIB_DIR)]   # the /dependencies folder
    return dirs


def print_bash_cmds_for_sparktk_env():
    """prints export cmds for each env var set by set_env_for_sparktk, for use in a bash script"""
    # see ../gopyspark.sh
    for name in ['SPARK_HOME',
                 'SPARKTK_HOME',
                 'PYSPARK_PYTHON',
                 'PYSPARK_DRIVER_PYTHON',
                 'PYSPARK_SUBMIT_ARGS',
                 'SPARK_JAVA_OPTS',
                 ]:
        value = os.environ.get(name, None)
        if value:
            print "export %s='%s'" % (name, value)  # require the single-quotes because of spaces in the values


def set_env_for_sparktk(spark_home=None,
                        sparktk_home=None,
                        pyspark_submit_args=None,
                        other_libs=None,
                        debug=None):

    """Set env vars necessary to start up a Spark Context with sparktk"""

    if spark_home:
        set_env('SPARK_HOME', spark_home)
    elif 'SPARK_HOME' not in os.environ:
        set_env('SPARK_HOME', default_spark_home)

    if sparktk_home:
        set_env('SPARKTK_HOME', sparktk_home)
    elif 'SPARKTK_HOME' not in os.environ:
        set_env('SPARKTK_HOME', default_sparktk_home)

    if not os.environ.get('PYSPARK_DRIVER_PYTHON'):
        set_env('PYSPARK_DRIVER_PYTHON', 'python2.7')

    if not os.environ.get('PYSPARK_PYTHON'):
        set_env('PYSPARK_PYTHON', 'python2.7')

    # Validate other libraries to verify they have the required functions
    other_libs = _validate_other_libs(other_libs)

    # Everything else go in PYSPARK_SUBMIT_ARGS
    spark_dirs = get_spark_dirs()
    spark_dirs.extend(get_sparktk_dirs())

    # Get library directories from other_libs
    if other_libs is not None:
        for other_lib in other_libs:
            other_lib_dirs = other_lib.get_library_dirs()
            spark_dirs.extend(other_lib_dirs)

    jars, driver_class_path = get_jars_and_classpaths(spark_dirs)

    if not pyspark_submit_args:
        using_env = True
        pyspark_submit_args = os.environ.get('PYSPARK_SUBMIT_ARGS', '')
    else:
        using_env = False

    pieces = pyspark_submit_args.split()
    if ('--jars' in pieces) ^ ('--driver-class-path' in pieces):
        # Pyspark bug where --jars doesn't add to driver path  https://github.com/apache/spark/pull/11687
        # fix targeted for Spark 2.0, back-port to 1.6 unlikely
        msg = "If setting --jars or --driver-class-path in pyspark_submit_args, both must be set (due to Spark): "
        if using_env:
            msg += "$PYSPARK_SUBMIT_ARGS=%s" % os.environ['PYSPARK_SUBMIT_ARGS']
        else:
            msg += "pyspark_submit_args=%s" % pyspark_submit_args
        raise ValueError(msg)

    jars_value_index = next((i for i, x in enumerate(pieces) if x == '--jars'), -1) + 1
    if jars_value_index > 0:
        pieces[jars_value_index] = ','.join([pieces[jars_value_index], jars])
        driver_class_path_value_index = pieces.index('--driver-class-path') + 1
        pieces[driver_class_path_value_index] = ':'.join([pieces[driver_class_path_value_index], driver_class_path])
    else:
        pieces = ['--jars', jars, '--driver-class-path', driver_class_path]

    pyspark_submit_args = ' '.join(pieces)

    set_env('PYSPARK_SUBMIT_ARGS', pyspark_submit_args)

    if debug:
        print "Adding args for remote java debugger"
        try:
            address = int(debug)
        except:
            address = 5005  # default
        details = '-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=%s' % address
        set_env('SPARK_JAVA_OPTS', details)


def create_sc(master=None,
              py_files=None,
              spark_home=None,
              sparktk_home=None,
              pyspark_submit_args=None,
              app_name="sparktk",
              other_libs=None,
              extra_conf=None,
              use_local_fs=False,
              debug=None):
    """
    Creates a SparkContext with sparktk defaults

    Many parameters can be overwritten

    :param master: (str) spark master setting; for ex. 'local[4]' or 'yarn-client'
    :param py_files: (list) list of str of paths to python dependencies; Note the the current python
    package will be freshly zipped up and put in a tmp folder for shipping by spark, and then removed
    :param spark_home: (str) override $SPARK_HOME, the location of spark
    :param sparktk_home: (str) override $SPARKTK_HOME, the location of spark-tk
    :param pyspark_submit_args: (str) extra args passed to the pyspark submit
    :param app_name: (str) name of spark app that will be created
    :param other_libs: (list) other libraries (actual packages/modules) that are compatible with spark-tk,
                       which need to be added to the spark context.  These libraries must be developed for usage with
                       spark-tk and have particular methods implemented.  (See sparkconf.py _validate_other_libs)
    :param extra_conf: (dict) dict for any extra spark conf settings, for ex. {"spark.hadoop.fs.default.name": "file:///"}
    :param use_local_fs: (bool) simpler way to specify using local file system, rather than hdfs or other
    :param debug: (int or str) provide an port address to attach a debugger to the JVM that gets started
    :return: pyspark SparkContext
    """

    set_env_for_sparktk(spark_home, sparktk_home, pyspark_submit_args, other_libs, debug)

    # bug/behavior of PYSPARK_SUBMIT_ARGS requires 'pyspark-shell' on the end --check in future spark versions
    set_env('PYSPARK_SUBMIT_ARGS', ' '.join([os.environ['PYSPARK_SUBMIT_ARGS'], 'pyspark-shell']))

    if not master:
        master = default_spark_master
        logger.info("sparktk.create_sc() master not specified, setting to %s", master)

    conf = SparkConf().setMaster(master).setAppName(app_name)
    if extra_conf:
        for k, v in extra_conf.items():
            conf = conf.set(k, v)

    if use_local_fs:
        conf.set("spark.hadoop.fs.default.name", "file:///")

    if not py_files:
        py_files = []

    # zip up the relevant pieces of sparktk and put it in the py_files...
    path = zip_sparktk()
    tmp_dir = os.path.dirname(path)
    logger.info("sparkconf created tmp dir for sparktk.zip %s" % tmp_dir)
    atexit.register(shutil.rmtree, tmp_dir)  # make python delete this folder when it shuts down

    py_files.append(path)

    msg = '\n'.join(["=" * 80,
                     "Creating SparkContext with the following SparkConf",
                     "pyFiles=%s" % str(py_files),
                     conf.toDebugString(),
                     "=" * 80])
    logger.info(msg)

    sc = SparkContext(conf=conf, pyFiles=py_files)

    return sc

def _validate_other_libs(other_libs):
    """
    Validates the other_libs parameter.  Makes it a list, if it isn't already and verifies that all the items in the
    list are python modules with the required functions.

    Raises a TypeError, if the other_libs parameter is not valid.

    :param other_libs: parameter to validate
    :return: validated other_libs parameter
    """
    if other_libs is not None:
        if not isinstance(other_libs, list):
            other_libs = [other_libs]
        import types
        # todo: formalize and document the 'other_libs' for integration with spark-tk
        required_functions = ["get_loaders","get_main_object","get_library_dirs"]
        for lib in other_libs:
            if not isinstance(lib, types.ModuleType):
                raise TypeError("Expected other_libs to contain python modules, but received %s." % type(lib) )
            for required_function in required_functions:
                if not hasattr(lib, required_function):
                    raise TypeError("other_lib '%s' is missing %s() function." % (lib.__name__,required_function))
    return other_libs

Module variables

var CORE_TARGET

var LIB_DIR

var default_spark_home

var default_spark_master

var default_sparktk_home

var logger

Functions

def create_sc(

master=None, py_files=None, spark_home=None, sparktk_home=None, pyspark_submit_args=None, app_name='sparktk', other_libs=None, extra_conf=None, use_local_fs=False, debug=None)

Creates a SparkContext with sparktk defaults

Many parameters can be overwritten

master(str):spark master setting; for ex. 'local[4]' or 'yarn-client'
py_files(list):list of str of paths to python dependencies; Note the the current python package will be freshly zipped up and put in a tmp folder for shipping by spark, and then removed
spark_home(str):override $SPARK_HOME, the location of spark
sparktk_home(str):override $SPARKTK_HOME, the location of spark-tk
pyspark_submit_args(str):extra args passed to the pyspark submit
app_name(str):name of spark app that will be created
other_libs(list):other libraries (actual packages/modules) that are compatible with spark-tk, which need to be added to the spark context. These libraries must be developed for usage with spark-tk and have particular methods implemented. (See sparkconf.py _validate_other_libs)
extra_conf(dict):dict for any extra spark conf settings, for ex. {"spark.hadoop.fs.default.name": "file:///"}
use_local_fs(bool):simpler way to specify using local file system, rather than hdfs or other
debug(int or str):provide an port address to attach a debugger to the JVM that gets started

Returns: pyspark SparkContext

def create_sc(master=None,
              py_files=None,
              spark_home=None,
              sparktk_home=None,
              pyspark_submit_args=None,
              app_name="sparktk",
              other_libs=None,
              extra_conf=None,
              use_local_fs=False,
              debug=None):
    """
    Creates a SparkContext with sparktk defaults

    Many parameters can be overwritten

    :param master: (str) spark master setting; for ex. 'local[4]' or 'yarn-client'
    :param py_files: (list) list of str of paths to python dependencies; Note the the current python
    package will be freshly zipped up and put in a tmp folder for shipping by spark, and then removed
    :param spark_home: (str) override $SPARK_HOME, the location of spark
    :param sparktk_home: (str) override $SPARKTK_HOME, the location of spark-tk
    :param pyspark_submit_args: (str) extra args passed to the pyspark submit
    :param app_name: (str) name of spark app that will be created
    :param other_libs: (list) other libraries (actual packages/modules) that are compatible with spark-tk,
                       which need to be added to the spark context.  These libraries must be developed for usage with
                       spark-tk and have particular methods implemented.  (See sparkconf.py _validate_other_libs)
    :param extra_conf: (dict) dict for any extra spark conf settings, for ex. {"spark.hadoop.fs.default.name": "file:///"}
    :param use_local_fs: (bool) simpler way to specify using local file system, rather than hdfs or other
    :param debug: (int or str) provide an port address to attach a debugger to the JVM that gets started
    :return: pyspark SparkContext
    """

    set_env_for_sparktk(spark_home, sparktk_home, pyspark_submit_args, other_libs, debug)

    # bug/behavior of PYSPARK_SUBMIT_ARGS requires 'pyspark-shell' on the end --check in future spark versions
    set_env('PYSPARK_SUBMIT_ARGS', ' '.join([os.environ['PYSPARK_SUBMIT_ARGS'], 'pyspark-shell']))

    if not master:
        master = default_spark_master
        logger.info("sparktk.create_sc() master not specified, setting to %s", master)

    conf = SparkConf().setMaster(master).setAppName(app_name)
    if extra_conf:
        for k, v in extra_conf.items():
            conf = conf.set(k, v)

    if use_local_fs:
        conf.set("spark.hadoop.fs.default.name", "file:///")

    if not py_files:
        py_files = []

    # zip up the relevant pieces of sparktk and put it in the py_files...
    path = zip_sparktk()
    tmp_dir = os.path.dirname(path)
    logger.info("sparkconf created tmp dir for sparktk.zip %s" % tmp_dir)
    atexit.register(shutil.rmtree, tmp_dir)  # make python delete this folder when it shuts down

    py_files.append(path)

    msg = '\n'.join(["=" * 80,
                     "Creating SparkContext with the following SparkConf",
                     "pyFiles=%s" % str(py_files),
                     conf.toDebugString(),
                     "=" * 80])
    logger.info(msg)

    sc = SparkContext(conf=conf, pyFiles=py_files)

    return sc

def get_jars_and_classpaths(

dirs)

Helper which creates a tuple of two strings for the given dirs:

  1. jars string - a comma-separated list of all the .jar files in the given directories
  2. classpath string - a colon-separate list of all the given directories with a /* wildcard added
dirs: a str or list of str specifying the directors to use for building the jar strings

Returns: (jars, classpath)

def get_jars_and_classpaths(dirs):
    """
    Helper which creates a tuple of two strings for the given dirs:

    1. jars string - a comma-separated list of all the .jar files in the given directories
    2. classpath string - a colon-separate list of all the given directories with a /* wildcard added

    :param dirs: a str or list of str specifying the directors to use for building the jar strings
    :return: (jars, classpath)
    """
    classpath = ':'.join(["%s/*" % d for d in dirs])

    # list of tuples with the directory and jar file
    dir_jar = [(d, f) for d in dirs for f in os.listdir(d) if f.endswith('.jar')]

    # Get jar file list without any duplicate jars (use the one from the first directory it's found in).  If
    # we don't remove duplicates, we get warnings about the jar already having been registered.
    distinct_jars = set()
    jar_files = []
    for dir, jar in dir_jar:
        if jar not in distinct_jars:
            jar_files.append(os.path.join(dir, jar))
            distinct_jars.add(jar)

    jars = ','.join(jar_files)
    return jars, classpath

def get_source_code_target_dir(

)

gets the core/target folder as if this is running from source code

def get_source_code_target_dir():
    """gets the core/target folder as if this is running from source code"""
    d = os.path.dirname
    root = os.path.join(d(d(d(os.path.abspath(__file__)))))
    target = os.path.join(root, CORE_TARGET)
    return target

def get_spark_dirs(

)

def get_spark_dirs():
    try:
        spark_home = os.environ['SPARK_HOME']
    except KeyError:
        raise RuntimeError("Missing value for environment variable SPARK_HOME.")

    spark_assembly_search = glob2.glob(os.path.join(spark_home,SPARK_ASSEMBLY_SEARCH))
    if len(spark_assembly_search) > 0:
        spark_assembly = os.path.dirname(spark_assembly_search[0])
    else:
        raise RuntimeError("Couldn't find spark assembly jar")

    return [spark_assembly]

def get_sparktk_dirs(

)

returns the folders which contain all the jars required to run sparktk

def get_sparktk_dirs():
    """returns the folders which contain all the jars required to run sparktk"""
    # todo: revisit when packaging is resolved, right now this assumes source code/build folder structure

    try:
        sparktk_home = os.environ['SPARKTK_HOME']
    except KeyError:
        raise RuntimeError("Missing value for SPARKTK_HOME.  Try setting $SPARKTK_HOME or the kwarg 'sparktk_home'")

    dirs = [sparktk_home,
            os.path.join(sparktk_home, LIB_DIR)]   # the /dependencies folder
    return dirs

def print_bash_cmds_for_sparktk_env(

)

prints export cmds for each env var set by set_env_for_sparktk, for use in a bash script

def print_bash_cmds_for_sparktk_env():
    """prints export cmds for each env var set by set_env_for_sparktk, for use in a bash script"""
    # see ../gopyspark.sh
    for name in ['SPARK_HOME',
                 'SPARKTK_HOME',
                 'PYSPARK_PYTHON',
                 'PYSPARK_DRIVER_PYTHON',
                 'PYSPARK_SUBMIT_ARGS',
                 'SPARK_JAVA_OPTS',
                 ]:
        value = os.environ.get(name, None)
        if value:
            print "export %s='%s'" % (name, value)  # require the single-quotes because of spaces in the values

def set_env(

name, value)

helper to set env w/ log

def set_env(name, value):
    """helper to set env w/ log"""
    logger.info("sparktk.sparkconf making $%s=%s" % (name, value))
    os.environ[name] = value

def set_env_for_sparktk(

spark_home=None, sparktk_home=None, pyspark_submit_args=None, other_libs=None, debug=None)

Set env vars necessary to start up a Spark Context with sparktk

def set_env_for_sparktk(spark_home=None,
                        sparktk_home=None,
                        pyspark_submit_args=None,
                        other_libs=None,
                        debug=None):

    """Set env vars necessary to start up a Spark Context with sparktk"""

    if spark_home:
        set_env('SPARK_HOME', spark_home)
    elif 'SPARK_HOME' not in os.environ:
        set_env('SPARK_HOME', default_spark_home)

    if sparktk_home:
        set_env('SPARKTK_HOME', sparktk_home)
    elif 'SPARKTK_HOME' not in os.environ:
        set_env('SPARKTK_HOME', default_sparktk_home)

    if not os.environ.get('PYSPARK_DRIVER_PYTHON'):
        set_env('PYSPARK_DRIVER_PYTHON', 'python2.7')

    if not os.environ.get('PYSPARK_PYTHON'):
        set_env('PYSPARK_PYTHON', 'python2.7')

    # Validate other libraries to verify they have the required functions
    other_libs = _validate_other_libs(other_libs)

    # Everything else go in PYSPARK_SUBMIT_ARGS
    spark_dirs = get_spark_dirs()
    spark_dirs.extend(get_sparktk_dirs())

    # Get library directories from other_libs
    if other_libs is not None:
        for other_lib in other_libs:
            other_lib_dirs = other_lib.get_library_dirs()
            spark_dirs.extend(other_lib_dirs)

    jars, driver_class_path = get_jars_and_classpaths(spark_dirs)

    if not pyspark_submit_args:
        using_env = True
        pyspark_submit_args = os.environ.get('PYSPARK_SUBMIT_ARGS', '')
    else:
        using_env = False

    pieces = pyspark_submit_args.split()
    if ('--jars' in pieces) ^ ('--driver-class-path' in pieces):
        # Pyspark bug where --jars doesn't add to driver path  https://github.com/apache/spark/pull/11687
        # fix targeted for Spark 2.0, back-port to 1.6 unlikely
        msg = "If setting --jars or --driver-class-path in pyspark_submit_args, both must be set (due to Spark): "
        if using_env:
            msg += "$PYSPARK_SUBMIT_ARGS=%s" % os.environ['PYSPARK_SUBMIT_ARGS']
        else:
            msg += "pyspark_submit_args=%s" % pyspark_submit_args
        raise ValueError(msg)

    jars_value_index = next((i for i, x in enumerate(pieces) if x == '--jars'), -1) + 1
    if jars_value_index > 0:
        pieces[jars_value_index] = ','.join([pieces[jars_value_index], jars])
        driver_class_path_value_index = pieces.index('--driver-class-path') + 1
        pieces[driver_class_path_value_index] = ':'.join([pieces[driver_class_path_value_index], driver_class_path])
    else:
        pieces = ['--jars', jars, '--driver-class-path', driver_class_path]

    pyspark_submit_args = ' '.join(pieces)

    set_env('PYSPARK_SUBMIT_ARGS', pyspark_submit_args)

    if debug:
        print "Adding args for remote java debugger"
        try:
            address = int(debug)
        except:
            address = 5005  # default
        details = '-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=%s' % address
        set_env('SPARK_JAVA_OPTS', details)