sparktk.loggers module
Logging - simple helpers for now
# vim: set encoding=utf-8
# Copyright (c) 2016 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Logging - simple helpers for now
"""
import logging
import sys
# Constants
LINE_FORMAT = '%(asctime)s|%(levelname)-5s|%(name)s|%(message)s'
# add a null handler to root logger to avoid handler warning messages
class NullHandler(logging.Handler):
name = "NullHandler"
def emit(self, record):
pass
# this line avoids the 'no handler' warning msg when no logging is set at all
_null_handler = NullHandler()
_null_handler.name = '' # add name explicitly for python 2.6
logging.getLogger('').addHandler(_null_handler)
class Loggers(object):
"""
Collection of loggers to stderr, wrapped for simplicity
"""
# map first character of level to actual level setting, for convenience
_level_map = {'c': logging.CRITICAL,
'f': logging.FATAL,
'e': logging.ERROR,
'w': logging.WARN,
'i': logging.INFO,
'd': logging.DEBUG,
'n': logging.NOTSET}
def __init__(self):
self._user_logger_names = []
def __repr__(self):
header = ["{0:<8} {1:<50} {2:<14}".format("Level", "Logger", "# of Handlers"),
"{0:<8} {1:<50} {2:<14}".format("-"*8, "-"*50, "-"*14)]
entries = []
for name in self._user_logger_names:
entries.append(self._get_repr_line(name, None))
return "\n".join(header + entries)
@staticmethod
def _get_repr_line(name, alias):
logger = logging.getLogger(name)
if alias:
name += " (%s)" % alias
return "{0:<8} {1:<50} {2:<14}".format(logging.getLevelName(logger.level),
name,
len(logger.handlers))
@staticmethod
def get(logger_name):
"""returns the logger of the given name"""
return logging.getLogger(logger_name)
def set(self, level=logging.DEBUG, logger_name='', output=None, line_format=None):
"""
Sets the level and adds handlers to the given logger
Parameters
----------
level : int, str or logging.*, optional
The level to which the logger will be set. May be 0,10,20,30,40,50
or "DEBUG", "INFO", etc. (only first letter is requirecd)
Setting to None disables the logging to stderr
See `https://docs.python.org/2/library/logging.html`
If not specified, DEBUG is used
To turn OFF the logger, set level to 0 or None
logger_name: str, optional
The name of the logger. If empty string, then the sparktk root logger is set
output: file or str, or list of such, optional
The file object or name of the file to log to. If empty, then stderr is used
Examples
--------
# to enable INFO level logging to file 'log.txt' and no printing to stderr:
>>> loggers.set('INFO', 'sparktk.frame','log.txt', False)
"""
logger_name = logger_name if logger_name != 'root' else ''
if not level:
return self._turn_logger_off(logger_name)
line_format = line_format if line_format is not None else LINE_FORMAT
logger = logging.getLogger(logger_name)
if not output:
output = sys.stderr
if isinstance(output, basestring):
handler = logging.FileHandler(output)
elif isinstance(output, list) or isinstance(output, tuple):
logger = None
for o in output:
logger = self.set(level, logger_name, o, line_format)
return logger
else:
try:
handler = logging.StreamHandler(output)
except:
raise ValueError("Bad output argument %s. Expected stream or file name." % output)
try:
handler_name = output.name
except:
handler_name = str(output)
if isinstance(level, basestring):
c = str(level)[0].lower() # only require first letter
level = self._level_map[c]
logger.setLevel(level)
self._add_handler_to_logger(logger, handler, handler_name, line_format)
# store logger name
if logger_name not in self._user_logger_names:
self._user_logger_names.append(logger_name)
return logger
@staticmethod
def _logger_has_handler(logger, handler_name):
return logger.handlers and any([h.name for h in logger.handlers if h.name == handler_name])
@staticmethod
def _add_handler_to_logger(logger, handler, handler_name, line_format):
handler.setLevel(logging.DEBUG)
handler.name = handler_name
formatter = logging.Formatter(line_format, '%y-%m-%d %H:%M:%S')
handler.setFormatter(formatter)
logger.addHandler(handler)
def _turn_logger_off(self, logger_name):
logger = logging.getLogger(logger_name)
logger.level = logging.CRITICAL
victim_handlers = [x for x in logger.handlers]
for h in victim_handlers:
logger.removeHandler(h)
try:
self._user_logger_names.remove(logger_name)
except ValueError:
pass
return logger
@staticmethod
def set_spark(sc, level):
"""
Controls the logging level for the messages generated by Spark
:param sc: (SparkContext) valid SparkContext (python)
:param level: (str) log level, like "info", "debug", "warn", etc.
"""
from pyspark import SparkContext
if not isinstance(sc, SparkContext):
raise TypeError("set_spark requires a valid SparkContext object for first arg, received type=%s" % type(sc))
logger = sc._jvm.org.apache.log4j
new_level = Loggers._get_scala_logger_level(sc, level)
logger.LogManager.getLogger("org").setLevel(new_level)
logger.LogManager.getLogger("akka").setLevel(new_level)
@staticmethod
def set_sparktk_scala(tc, level):
"""
Controls the logging level for the messages generated by the Scala part of sparktk
:param tc: (TkContext) valid TkContext (python)
:param level: (str) log level, like "info", "debug", "warn", etc.
"""
tc._jtc.setLoggerLevel(str(Loggers._get_scala_logger_level(tc.sc, level)))
@staticmethod
def _get_scala_logger_level(sc, level_string):
logger = sc._jvm.org.apache.log4j
if not level_string:
level = ""
valid_levels = {
"d": logger.Level.DEBUG,
"e": logger.Level.ERROR,
"w": logger.Level.WARN,
"i": logger.Level.INFO,
"o": logger.Level.OFF,
}
try:
return valid_levels[level_string.lower()[0]]
except KeyError:
raise ValueError("Bad logging level '%s'. Valid levels include: %s" % (level_string, sorted(map(lambda lev: str(lev.toString()).lower(), valid_levels.values()))))
def log_load(module, logger_name='sparktk'):
"""
intended to log when a module is imported
Usage is to put a line like this at the top of the .py file:
``from sparktk.loggers import log_load; log_load(__name__); del log_load``
:param module: name of the module, usually __name__
:param logger_name: override the logger name
"""
logger = logging.getLogger(logger_name)
logger.info("load module %s" % module)
loggers = Loggers()
# Logging backdoor
#
# If env variable is set, we will call loggers.set immediately, so the loggers
# can run during the rest of the sparktk package import
#
# The value of this env var is a JSON list containing map, each of which
# represents a call to loggers.set. The map holds the **kwargs for the
# call to loggers.set
#
# Example: This sets the module logger to debug for core/frame.py
#
# $ export SPARK_TK_LOGGERS='[{"logger_name": "sparktk", "level": "debug"}]'
#
import os
loggers_env_name = "SPARK_TK_LOGGERS"
loggers_env = os.getenv(loggers_env_name)
if loggers_env:
print "$SPARK_TK_LOGGERS=%s" % loggers_env
try:
import json
for entry in json.loads(loggers_env):
loggers.set(**entry)
except Exception as e:
import sys
sys.stderr.write("!! Error trying to ingest logging env variable $%s\n" % loggers_env_name)
raise
Module variables
var LINE_FORMAT
var loggers
var loggers_env
var loggers_env_name
Functions
def log_load(
module, logger_name='sparktk')
intended to log when a module is imported
Usage is to put a line like this at the top of the .py file:
from sparktk.loggers import log_load; log_load(__name__); del log_load
module: | name of the module, usually __name__ |
:param logger_name: override the logger name
def log_load(module, logger_name='sparktk'):
"""
intended to log when a module is imported
Usage is to put a line like this at the top of the .py file:
``from sparktk.loggers import log_load; log_load(__name__); del log_load``
:param module: name of the module, usually __name__
:param logger_name: override the logger name
"""
logger = logging.getLogger(logger_name)
logger.info("load module %s" % module)
Classes
class Loggers
Collection of loggers to stderr, wrapped for simplicity
class Loggers(object):
"""
Collection of loggers to stderr, wrapped for simplicity
"""
# map first character of level to actual level setting, for convenience
_level_map = {'c': logging.CRITICAL,
'f': logging.FATAL,
'e': logging.ERROR,
'w': logging.WARN,
'i': logging.INFO,
'd': logging.DEBUG,
'n': logging.NOTSET}
def __init__(self):
self._user_logger_names = []
def __repr__(self):
header = ["{0:<8} {1:<50} {2:<14}".format("Level", "Logger", "# of Handlers"),
"{0:<8} {1:<50} {2:<14}".format("-"*8, "-"*50, "-"*14)]
entries = []
for name in self._user_logger_names:
entries.append(self._get_repr_line(name, None))
return "\n".join(header + entries)
@staticmethod
def _get_repr_line(name, alias):
logger = logging.getLogger(name)
if alias:
name += " (%s)" % alias
return "{0:<8} {1:<50} {2:<14}".format(logging.getLevelName(logger.level),
name,
len(logger.handlers))
@staticmethod
def get(logger_name):
"""returns the logger of the given name"""
return logging.getLogger(logger_name)
def set(self, level=logging.DEBUG, logger_name='', output=None, line_format=None):
"""
Sets the level and adds handlers to the given logger
Parameters
----------
level : int, str or logging.*, optional
The level to which the logger will be set. May be 0,10,20,30,40,50
or "DEBUG", "INFO", etc. (only first letter is requirecd)
Setting to None disables the logging to stderr
See `https://docs.python.org/2/library/logging.html`
If not specified, DEBUG is used
To turn OFF the logger, set level to 0 or None
logger_name: str, optional
The name of the logger. If empty string, then the sparktk root logger is set
output: file or str, or list of such, optional
The file object or name of the file to log to. If empty, then stderr is used
Examples
--------
# to enable INFO level logging to file 'log.txt' and no printing to stderr:
>>> loggers.set('INFO', 'sparktk.frame','log.txt', False)
"""
logger_name = logger_name if logger_name != 'root' else ''
if not level:
return self._turn_logger_off(logger_name)
line_format = line_format if line_format is not None else LINE_FORMAT
logger = logging.getLogger(logger_name)
if not output:
output = sys.stderr
if isinstance(output, basestring):
handler = logging.FileHandler(output)
elif isinstance(output, list) or isinstance(output, tuple):
logger = None
for o in output:
logger = self.set(level, logger_name, o, line_format)
return logger
else:
try:
handler = logging.StreamHandler(output)
except:
raise ValueError("Bad output argument %s. Expected stream or file name." % output)
try:
handler_name = output.name
except:
handler_name = str(output)
if isinstance(level, basestring):
c = str(level)[0].lower() # only require first letter
level = self._level_map[c]
logger.setLevel(level)
self._add_handler_to_logger(logger, handler, handler_name, line_format)
# store logger name
if logger_name not in self._user_logger_names:
self._user_logger_names.append(logger_name)
return logger
@staticmethod
def _logger_has_handler(logger, handler_name):
return logger.handlers and any([h.name for h in logger.handlers if h.name == handler_name])
@staticmethod
def _add_handler_to_logger(logger, handler, handler_name, line_format):
handler.setLevel(logging.DEBUG)
handler.name = handler_name
formatter = logging.Formatter(line_format, '%y-%m-%d %H:%M:%S')
handler.setFormatter(formatter)
logger.addHandler(handler)
def _turn_logger_off(self, logger_name):
logger = logging.getLogger(logger_name)
logger.level = logging.CRITICAL
victim_handlers = [x for x in logger.handlers]
for h in victim_handlers:
logger.removeHandler(h)
try:
self._user_logger_names.remove(logger_name)
except ValueError:
pass
return logger
@staticmethod
def set_spark(sc, level):
"""
Controls the logging level for the messages generated by Spark
:param sc: (SparkContext) valid SparkContext (python)
:param level: (str) log level, like "info", "debug", "warn", etc.
"""
from pyspark import SparkContext
if not isinstance(sc, SparkContext):
raise TypeError("set_spark requires a valid SparkContext object for first arg, received type=%s" % type(sc))
logger = sc._jvm.org.apache.log4j
new_level = Loggers._get_scala_logger_level(sc, level)
logger.LogManager.getLogger("org").setLevel(new_level)
logger.LogManager.getLogger("akka").setLevel(new_level)
@staticmethod
def set_sparktk_scala(tc, level):
"""
Controls the logging level for the messages generated by the Scala part of sparktk
:param tc: (TkContext) valid TkContext (python)
:param level: (str) log level, like "info", "debug", "warn", etc.
"""
tc._jtc.setLoggerLevel(str(Loggers._get_scala_logger_level(tc.sc, level)))
@staticmethod
def _get_scala_logger_level(sc, level_string):
logger = sc._jvm.org.apache.log4j
if not level_string:
level = ""
valid_levels = {
"d": logger.Level.DEBUG,
"e": logger.Level.ERROR,
"w": logger.Level.WARN,
"i": logger.Level.INFO,
"o": logger.Level.OFF,
}
try:
return valid_levels[level_string.lower()[0]]
except KeyError:
raise ValueError("Bad logging level '%s'. Valid levels include: %s" % (level_string, sorted(map(lambda lev: str(lev.toString()).lower(), valid_levels.values()))))
Ancestors (in MRO)
- Loggers
- __builtin__.object
Static methods
def get(
logger_name)
returns the logger of the given name
@staticmethod
def get(logger_name):
"""returns the logger of the given name"""
return logging.getLogger(logger_name)
def set_spark(
sc, level)
Controls the logging level for the messages generated by Spark
sc | (SparkContext): | valid SparkContext (python) |
level | (str): | log level, like "info", "debug", "warn", etc. |
@staticmethod
def set_spark(sc, level):
"""
Controls the logging level for the messages generated by Spark
:param sc: (SparkContext) valid SparkContext (python)
:param level: (str) log level, like "info", "debug", "warn", etc.
"""
from pyspark import SparkContext
if not isinstance(sc, SparkContext):
raise TypeError("set_spark requires a valid SparkContext object for first arg, received type=%s" % type(sc))
logger = sc._jvm.org.apache.log4j
new_level = Loggers._get_scala_logger_level(sc, level)
logger.LogManager.getLogger("org").setLevel(new_level)
logger.LogManager.getLogger("akka").setLevel(new_level)
def set_sparktk_scala(
tc, level)
Controls the logging level for the messages generated by the Scala part of sparktk
tc | (TkContext): | valid TkContext (python) |
level | (str): | log level, like "info", "debug", "warn", etc. |
@staticmethod
def set_sparktk_scala(tc, level):
"""
Controls the logging level for the messages generated by the Scala part of sparktk
:param tc: (TkContext) valid TkContext (python)
:param level: (str) log level, like "info", "debug", "warn", etc.
"""
tc._jtc.setLoggerLevel(str(Loggers._get_scala_logger_level(tc.sc, level)))
Methods
def __init__(
self)
def __init__(self):
self._user_logger_names = []
def set(
self, level=10, logger_name='', output=None, line_format=None)
Sets the level and adds handlers to the given logger
level : int, str or logging.*, optional
The level to which the logger will be set. May be 0,10,20,30,40,50
or "DEBUG", "INFO", etc. (only first letter is requirecd)
Setting to None disables the logging to stderr
See https://docs.python.org/2/library/logging.html
If not specified, DEBUG is used
To turn OFF the logger, set level to 0 or None
logger_name: str, optional
The name of the logger. If empty string, then the sparktk root logger is set
output: file or str, or list of such, optional
The file object or name of the file to log to. If empty, then stderr is used
to enable INFO level logging to file 'log.txt' and no printing to stderr:
loggers.set('INFO', 'sparktk.frame','log.txt', False)
def set(self, level=logging.DEBUG, logger_name='', output=None, line_format=None):
"""
Sets the level and adds handlers to the given logger
Parameters
----------
level : int, str or logging.*, optional
The level to which the logger will be set. May be 0,10,20,30,40,50
or "DEBUG", "INFO", etc. (only first letter is requirecd)
Setting to None disables the logging to stderr
See `https://docs.python.org/2/library/logging.html`
If not specified, DEBUG is used
To turn OFF the logger, set level to 0 or None
logger_name: str, optional
The name of the logger. If empty string, then the sparktk root logger is set
output: file or str, or list of such, optional
The file object or name of the file to log to. If empty, then stderr is used
Examples
--------
# to enable INFO level logging to file 'log.txt' and no printing to stderr:
>>> loggers.set('INFO', 'sparktk.frame','log.txt', False)
"""
logger_name = logger_name if logger_name != 'root' else ''
if not level:
return self._turn_logger_off(logger_name)
line_format = line_format if line_format is not None else LINE_FORMAT
logger = logging.getLogger(logger_name)
if not output:
output = sys.stderr
if isinstance(output, basestring):
handler = logging.FileHandler(output)
elif isinstance(output, list) or isinstance(output, tuple):
logger = None
for o in output:
logger = self.set(level, logger_name, o, line_format)
return logger
else:
try:
handler = logging.StreamHandler(output)
except:
raise ValueError("Bad output argument %s. Expected stream or file name." % output)
try:
handler_name = output.name
except:
handler_name = str(output)
if isinstance(level, basestring):
c = str(level)[0].lower() # only require first letter
level = self._level_map[c]
logger.setLevel(level)
self._add_handler_to_logger(logger, handler, handler_name, line_format)
# store logger name
if logger_name not in self._user_logger_names:
self._user_logger_names.append(logger_name)
return logger
class NullHandler
class NullHandler(logging.Handler):
name = "NullHandler"
def emit(self, record):
pass
Ancestors (in MRO)
- NullHandler
- logging.Handler
- logging.Filterer
- __builtin__.object
Class variables
var name
Methods
def __init__(
self, level=0)
Initializes the instance - basically setting the formatter to None and the filter list to empty.
def __init__(self, level=NOTSET):
"""
Initializes the instance - basically setting the formatter to None
and the filter list to empty.
"""
Filterer.__init__(self)
self._name = None
self.level = _checkLevel(level)
self.formatter = None
# Add the handler to the global _handlerList (for cleanup on shutdown)
_addHandlerRef(self)
self.createLock()
def acquire(
self)
Acquire the I/O thread lock.
def acquire(self):
"""
Acquire the I/O thread lock.
"""
if self.lock:
self.lock.acquire()
def addFilter(
self, filter)
Add the specified filter to this handler.
def addFilter(self, filter):
"""
Add the specified filter to this handler.
"""
if not (filter in self.filters):
self.filters.append(filter)
def close(
self)
Tidy up any resources used by the handler.
This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.
def close(self):
"""
Tidy up any resources used by the handler.
This version removes the handler from an internal map of handlers,
_handlers, which is used for handler lookup by name. Subclasses
should ensure that this gets called from overridden close()
methods.
"""
#get the module data lock, as we're updating a shared structure.
_acquireLock()
try: #unlikely to raise an exception, but you never know...
if self._name and self._name in _handlers:
del _handlers[self._name]
finally:
_releaseLock()
def createLock(
self)
Acquire a thread lock for serializing access to the underlying I/O.
def createLock(self):
"""
Acquire a thread lock for serializing access to the underlying I/O.
"""
if thread:
self.lock = threading.RLock()
else:
self.lock = None
def emit(
self, record)
def emit(self, record):
pass
def filter(
self, record)
Determine if a record is loggable by consulting all the filters.
The default is to allow the record to be logged; any filter can veto this and the record is then dropped. Returns a zero value if a record is to be dropped, else non-zero.
def filter(self, record):
"""
Determine if a record is loggable by consulting all the filters.
The default is to allow the record to be logged; any filter can veto
this and the record is then dropped. Returns a zero value if a record
is to be dropped, else non-zero.
"""
rv = 1
for f in self.filters:
if not f.filter(record):
rv = 0
break
return rv
def flush(
self)
Ensure all logging output has been flushed.
This version does nothing and is intended to be implemented by subclasses.
def flush(self):
"""
Ensure all logging output has been flushed.
This version does nothing and is intended to be implemented by
subclasses.
"""
pass
def format(
self, record)
Format the specified record.
If a formatter is set, use it. Otherwise, use the default formatter for the module.
def format(self, record):
"""
Format the specified record.
If a formatter is set, use it. Otherwise, use the default formatter
for the module.
"""
if self.formatter:
fmt = self.formatter
else:
fmt = _defaultFormatter
return fmt.format(record)
def get_name(
self)
def get_name(self):
return self._name
def handle(
self, record)
Conditionally emit the specified logging record.
Emission depends on filters which may have been added to the handler. Wrap the actual emission of the record with acquisition/release of the I/O thread lock. Returns whether the filter passed the record for emission.
def handle(self, record):
"""
Conditionally emit the specified logging record.
Emission depends on filters which may have been added to the handler.
Wrap the actual emission of the record with acquisition/release of
the I/O thread lock. Returns whether the filter passed the record for
emission.
"""
rv = self.filter(record)
if rv:
self.acquire()
try:
self.emit(record)
finally:
self.release()
return rv
def handleError(
self, record)
Handle errors which occur during an emit() call.
This method should be called from handlers when an exception is encountered during an emit() call. If raiseExceptions is false, exceptions get silently ignored. This is what is mostly wanted for a logging system - most users will not care about errors in the logging system, they are more interested in application errors. You could, however, replace this with a custom handler if you wish. The record which was being processed is passed in to this method.
def handleError(self, record):
"""
Handle errors which occur during an emit() call.
This method should be called from handlers when an exception is
encountered during an emit() call. If raiseExceptions is false,
exceptions get silently ignored. This is what is mostly wanted
for a logging system - most users will not care about errors in
the logging system, they are more interested in application errors.
You could, however, replace this with a custom handler if you wish.
The record which was being processed is passed in to this method.
"""
if raiseExceptions and sys.stderr: # see issue 13807
ei = sys.exc_info()
try:
traceback.print_exception(ei[0], ei[1], ei[2],
None, sys.stderr)
sys.stderr.write('Logged from file %s, line %s\n' % (
record.filename, record.lineno))
except IOError:
pass # see issue 5971
finally:
del ei
def release(
self)
Release the I/O thread lock.
def release(self):
"""
Release the I/O thread lock.
"""
if self.lock:
self.lock.release()
def removeFilter(
self, filter)
Remove the specified filter from this handler.
def removeFilter(self, filter):
"""
Remove the specified filter from this handler.
"""
if filter in self.filters:
self.filters.remove(filter)
def setFormatter(
self, fmt)
Set the formatter for this handler.
def setFormatter(self, fmt):
"""
Set the formatter for this handler.
"""
self.formatter = fmt
def setLevel(
self, level)
Set the logging level of this handler.
def setLevel(self, level):
"""
Set the logging level of this handler.
"""
self.level = _checkLevel(level)
def set_name(
self, name)
def set_name(self, name):
_acquireLock()
try:
if self._name in _handlers:
del _handlers[self._name]
self._name = name
if name:
_handlers[name] = self
finally:
_releaseLock()