summaryrefslogtreecommitdiff
path: root/lib/spack/external/nose/plugins/logcapture.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/spack/external/nose/plugins/logcapture.py')
-rw-r--r--lib/spack/external/nose/plugins/logcapture.py245
1 files changed, 245 insertions, 0 deletions
diff --git a/lib/spack/external/nose/plugins/logcapture.py b/lib/spack/external/nose/plugins/logcapture.py
new file mode 100644
index 0000000000..4c9a79f6fd
--- /dev/null
+++ b/lib/spack/external/nose/plugins/logcapture.py
@@ -0,0 +1,245 @@
+"""
+This plugin captures logging statements issued during test execution. When an
+error or failure occurs, the captured log messages are attached to the running
+test in the test.capturedLogging attribute, and displayed with the error failure
+output. It is enabled by default but can be turned off with the option
+``--nologcapture``.
+
+You can filter captured logging statements with the ``--logging-filter`` option.
+If set, it specifies which logger(s) will be captured; loggers that do not match
+will be passed. Example: specifying ``--logging-filter=sqlalchemy,myapp``
+will ensure that only statements logged via sqlalchemy.engine, myapp
+or myapp.foo.bar logger will be logged.
+
+You can remove other installed logging handlers with the
+``--logging-clear-handlers`` option.
+"""
+
+import logging
+from logging import Handler
+import threading
+
+from nose.plugins.base import Plugin
+from nose.util import anyp, ln, safe_str
+
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+
+log = logging.getLogger(__name__)
+
+class FilterSet(object):
+ def __init__(self, filter_components):
+ self.inclusive, self.exclusive = self._partition(filter_components)
+
+ # @staticmethod
+ def _partition(components):
+ inclusive, exclusive = [], []
+ for component in components:
+ if component.startswith('-'):
+ exclusive.append(component[1:])
+ else:
+ inclusive.append(component)
+ return inclusive, exclusive
+ _partition = staticmethod(_partition)
+
+ def allow(self, record):
+ """returns whether this record should be printed"""
+ if not self:
+ # nothing to filter
+ return True
+ return self._allow(record) and not self._deny(record)
+
+ # @staticmethod
+ def _any_match(matchers, record):
+ """return the bool of whether `record` starts with
+ any item in `matchers`"""
+ def record_matches_key(key):
+ return record == key or record.startswith(key + '.')
+ return anyp(bool, map(record_matches_key, matchers))
+ _any_match = staticmethod(_any_match)
+
+ def _allow(self, record):
+ if not self.inclusive:
+ return True
+ return self._any_match(self.inclusive, record)
+
+ def _deny(self, record):
+ if not self.exclusive:
+ return False
+ return self._any_match(self.exclusive, record)
+
+
+class MyMemoryHandler(Handler):
+ def __init__(self, logformat, logdatefmt, filters):
+ Handler.__init__(self)
+ fmt = logging.Formatter(logformat, logdatefmt)
+ self.setFormatter(fmt)
+ self.filterset = FilterSet(filters)
+ self.buffer = []
+ def emit(self, record):
+ self.buffer.append(self.format(record))
+ def flush(self):
+ pass # do nothing
+ def truncate(self):
+ self.buffer = []
+ def filter(self, record):
+ if self.filterset.allow(record.name):
+ return Handler.filter(self, record)
+ def __getstate__(self):
+ state = self.__dict__.copy()
+ del state['lock']
+ return state
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+ self.lock = threading.RLock()
+
+
+class LogCapture(Plugin):
+ """
+ Log capture plugin. Enabled by default. Disable with --nologcapture.
+ This plugin captures logging statements issued during test execution,
+ appending any output captured to the error or failure output,
+ should the test fail or raise an error.
+ """
+ enabled = True
+ env_opt = 'NOSE_NOLOGCAPTURE'
+ name = 'logcapture'
+ score = 500
+ logformat = '%(name)s: %(levelname)s: %(message)s'
+ logdatefmt = None
+ clear = False
+ filters = ['-nose']
+
+ def options(self, parser, env):
+ """Register commandline options.
+ """
+ parser.add_option(
+ "--nologcapture", action="store_false",
+ default=not env.get(self.env_opt), dest="logcapture",
+ help="Disable logging capture plugin. "
+ "Logging configuration will be left intact."
+ " [NOSE_NOLOGCAPTURE]")
+ parser.add_option(
+ "--logging-format", action="store", dest="logcapture_format",
+ default=env.get('NOSE_LOGFORMAT') or self.logformat,
+ metavar="FORMAT",
+ help="Specify custom format to print statements. "
+ "Uses the same format as used by standard logging handlers."
+ " [NOSE_LOGFORMAT]")
+ parser.add_option(
+ "--logging-datefmt", action="store", dest="logcapture_datefmt",
+ default=env.get('NOSE_LOGDATEFMT') or self.logdatefmt,
+ metavar="FORMAT",
+ help="Specify custom date/time format to print statements. "
+ "Uses the same format as used by standard logging handlers."
+ " [NOSE_LOGDATEFMT]")
+ parser.add_option(
+ "--logging-filter", action="store", dest="logcapture_filters",
+ default=env.get('NOSE_LOGFILTER'),
+ metavar="FILTER",
+ help="Specify which statements to filter in/out. "
+ "By default, everything is captured. If the output is too"
+ " verbose,\nuse this option to filter out needless output.\n"
+ "Example: filter=foo will capture statements issued ONLY to\n"
+ " foo or foo.what.ever.sub but not foobar or other logger.\n"
+ "Specify multiple loggers with comma: filter=foo,bar,baz.\n"
+ "If any logger name is prefixed with a minus, eg filter=-foo,\n"
+ "it will be excluded rather than included. Default: "
+ "exclude logging messages from nose itself (-nose)."
+ " [NOSE_LOGFILTER]\n")
+ parser.add_option(
+ "--logging-clear-handlers", action="store_true",
+ default=False, dest="logcapture_clear",
+ help="Clear all other logging handlers")
+ parser.add_option(
+ "--logging-level", action="store",
+ default='NOTSET', dest="logcapture_level",
+ help="Set the log level to capture")
+
+ def configure(self, options, conf):
+ """Configure plugin.
+ """
+ self.conf = conf
+ # Disable if explicitly disabled, or if logging is
+ # configured via logging config file
+ if not options.logcapture or conf.loggingConfig:
+ self.enabled = False
+ self.logformat = options.logcapture_format
+ self.logdatefmt = options.logcapture_datefmt
+ self.clear = options.logcapture_clear
+ self.loglevel = options.logcapture_level
+ if options.logcapture_filters:
+ self.filters = options.logcapture_filters.split(',')
+
+ def setupLoghandler(self):
+ # setup our handler with root logger
+ root_logger = logging.getLogger()
+ if self.clear:
+ if hasattr(root_logger, "handlers"):
+ for handler in root_logger.handlers:
+ root_logger.removeHandler(handler)
+ for logger in logging.Logger.manager.loggerDict.values():
+ if hasattr(logger, "handlers"):
+ for handler in logger.handlers:
+ logger.removeHandler(handler)
+ # make sure there isn't one already
+ # you can't simply use "if self.handler not in root_logger.handlers"
+ # since at least in unit tests this doesn't work --
+ # LogCapture() is instantiated for each test case while root_logger
+ # is module global
+ # so we always add new MyMemoryHandler instance
+ for handler in root_logger.handlers[:]:
+ if isinstance(handler, MyMemoryHandler):
+ root_logger.handlers.remove(handler)
+ root_logger.addHandler(self.handler)
+ # to make sure everything gets captured
+ loglevel = getattr(self, "loglevel", "NOTSET")
+ root_logger.setLevel(getattr(logging, loglevel))
+
+ def begin(self):
+ """Set up logging handler before test run begins.
+ """
+ self.start()
+
+ def start(self):
+ self.handler = MyMemoryHandler(self.logformat, self.logdatefmt,
+ self.filters)
+ self.setupLoghandler()
+
+ def end(self):
+ pass
+
+ def beforeTest(self, test):
+ """Clear buffers and handlers before test.
+ """
+ self.setupLoghandler()
+
+ def afterTest(self, test):
+ """Clear buffers after test.
+ """
+ self.handler.truncate()
+
+ def formatFailure(self, test, err):
+ """Add captured log messages to failure output.
+ """
+ return self.formatError(test, err)
+
+ def formatError(self, test, err):
+ """Add captured log messages to error output.
+ """
+ # logic flow copied from Capture.formatError
+ test.capturedLogging = records = self.formatLogRecords()
+ if not records:
+ return err
+ ec, ev, tb = err
+ return (ec, self.addCaptureToErr(ev, records), tb)
+
+ def formatLogRecords(self):
+ return map(safe_str, self.handler.buffer)
+
+ def addCaptureToErr(self, ev, records):
+ return '\n'.join([safe_str(ev), ln('>> begin captured logging <<')] + \
+ records + \
+ [ln('>> end captured logging <<')])