Source code for pyleus.storm.component

"""Module containing the base class for all pyleus components and a wrapper
class around Storm configurations.
"""
from __future__ import absolute_import

import argparse
from collections import deque
import logging
import logging.config
import os
import sys
import traceback

try:
    import simplejson as json
    _ = json # pyflakes
except ImportError:
    import json

from pyleus.storm import DEFAULT_STREAM
from pyleus.storm import LOG_TRACE
from pyleus.storm import LOG_DEBUG
from pyleus.storm import LOG_INFO
from pyleus.storm import LOG_WARN
from pyleus.storm import LOG_ERROR
from pyleus.storm import StormTuple
from pyleus.storm.serializers.msgpack_serializer import MsgpackSerializer
from pyleus.storm.serializers.json_serializer import JSONSerializer


# Please keeep in sync with java TopologyBuilder
DESCRIBE_OPT = "--describe"
COMPONENT_OPTIONS_OPT = "--options"
PYLEUS_CONFIG_OPT = "--pyleus-config"

DEFAULT_LOGGING_CONFIG_PATH = "pyleus_logging.conf"

JSON_SERIALIZER = "json"
MSGPACK_SERIALIZER = "msgpack"
SERIALIZERS = {
    JSON_SERIALIZER: JSONSerializer,
    MSGPACK_SERIALIZER: MsgpackSerializer,
}


log = logging.getLogger(__name__)


def _is_namedtuple(obj):
    return (type(obj) is type and
            issubclass(obj, tuple) and
            hasattr(obj, "_fields"))


def _serialize(obj):
    """Given a list, a tuple or a namedtuple, return it as a list. In case of
    None, simply return None.
    """
    if obj is None:
        return None
    # obj is a namedtuple "class"
    elif _is_namedtuple(obj):
        return list(obj._fields)
    # obj is a list or a tuple
    return list(obj)


def _expand_output_fields(obj):
    """Expand all allowed notations for defining OUTPUT_FIELDS into the
    extended one.
    """
    # if single-stream notation
    if not isinstance(obj, dict):
        return {DEFAULT_STREAM: _serialize(obj)}

    # if multiple-streams notation
    for key, value in obj.items():
        obj[key] = _serialize(value)
    return obj


[docs]class StormConfig(dict): """Add some convenience properites to a configuration ``dict`` from Storm. You can access Storm configuration dictionary within a component through ``self.conf``. """ def __init__(self, conf): super(StormConfig, self).__init__() self.update(conf) @property
[docs] def tick_tuple_freq(self): """Helper property to access the value of tick tuple frequency stored in Storm configuration. :return: tick tuple frequency for the component :rtype: ``float`` or ``None`` .. note:: Bolts not specifying tick tuple frequency default to ``None``, while spouts are not supposed to use tick tuples at all. """ return self.get("topology.tick.tuple.freq.secs")
[docs]class Component(object): """Base class for all pyleus components.""" COMPONENT_TYPE = None # One of "bolt", "spout" #: ``list`` or ``dict`` of output fields for the component. #: #: .. note:: Specify in subclass. #: #: .. seealso:: :ref:`groupings` OUTPUT_FIELDS = None #: ``list`` of user-defined options for the component. #: #: .. note:: Specify in subclass. OPTIONS = None # Populated in Component.run() #: ``dict`` containing options passed to component in the yaml definition #: file. options = None #: :class:`~.StormConfig` containing the Storm configuration for the #: component. conf = None #: ``dict`` containing the Storm context for the component. context = None pyleus_config = None def __init__(self, input_stream=None, output_stream=None): """The Storm component will parse the command line in order to figure out if it has been queried for a description or for actually running.""" super(Component, self).__init__() if input_stream is None: input_stream = sys.stdin if output_stream is None: output_stream = sys.stdout self._input_stream = input_stream self._output_stream = output_stream self._pending_commands = deque() self._pending_taskids = deque() self._serializer = None def describe(self): """Print to stdout a JSON description of the component. The java TopologyBuilder will use the JSON descrption for topology cofiguration and validation. """ print(json.dumps({ "component_type": self.COMPONENT_TYPE, "output_fields": _expand_output_fields(self.OUTPUT_FIELDS), "options": _serialize(self.OPTIONS)})) def initialize_logging(self): """Load logging configuration file from command line configuration (if provided) and initialize logging for the component. """ logging_config_path = self.pyleus_config.get('logging_config_path') if logging_config_path: logging.config.fileConfig(logging_config_path) elif os.path.isfile(DEFAULT_LOGGING_CONFIG_PATH): logging.config.fileConfig(DEFAULT_LOGGING_CONFIG_PATH) def initialize_serializer(self): """Load serializer type from command line configuration and instantiate the associated :class:`~pyleus.storm.serializers.serializer.Serializer`. """ serializer = self.pyleus_config.get('serializer') if serializer in SERIALIZERS: self._serializer = SERIALIZERS[serializer]( self._input_stream, self._output_stream) else: raise ValueError("Unknown serializer: {0}", serializer) def setup_component(self): """Storm component setup before execution. It will also call the initialization method implemented in the subclass. """ self.conf, self.context = self._init_component() self.initialize()
[docs] def initialize(self): """Called after component has been launched, but before processing any tuples. You can use this method to setup your component. .. note:: Implement in subclass. """ pass
[docs] def run(self): """Entry point for the component running logic. Forgetting to call it as following will prevent the topology from running. :Example: .. code-block:: python if __name__ == '__main__': MyComponent().run() """ parser = argparse.ArgumentParser(add_help=False) parser.add_argument(DESCRIBE_OPT, action="store_true", default=False) parser.add_argument(COMPONENT_OPTIONS_OPT, default=None) parser.add_argument(PYLEUS_CONFIG_OPT, default=None) args = parser.parse_args() if args.describe: self.describe() return self.options = json.loads(args.options) if args.options else {} self.pyleus_config = json.loads(args.pyleus_config) \ if args.pyleus_config else {} try: self.initialize_logging() self.initialize_serializer() self.setup_component() self.run_component() except: log.exception("Exception in {0}.run".format(self.COMPONENT_TYPE)) self.error(traceback.format_exc())
def run_component(self): """Run the main loop of the component. Implemented in Bolt and Spout subclasses. """ raise NotImplementedError def _msg_is_command(self, msg): """Storm differentiates between commands and taskids by whether the message is a ``dict`` or ``list``. """ return isinstance(msg, dict) def _msg_is_taskid(self, msg): """..seealso:: :meth:`~._msg_is_command`""" return isinstance(msg, list) def read_command(self): """Return the next command from the input stream, whether from the _pending_commands queue or the stream directly if the queue is empty. In that case, queue any taskids which are received until the next command comes in. """ if self._pending_commands: return self._pending_commands.popleft() msg = self._serializer.read_msg() while self._msg_is_taskid(msg): self._pending_taskids.append(msg) msg = self._serializer.read_msg() return msg def read_taskid(self): """Like :meth:`~.read_command`, but returns the next taskid and queues any commands received while reading the input stream. """ if self._pending_taskids: return self._pending_taskids.popleft() msg = self._serializer.read_msg() while self._msg_is_command(msg): self._pending_commands.append(msg) msg = self._serializer.read_msg() return msg def read_tuple(self): """Read and parse a command into a StormTuple object.""" cmd = self.read_command() return StormTuple( cmd['id'], cmd['comp'], cmd['stream'], cmd['task'], cmd['tuple']) def _create_pidfile(self, pid_dir, pid): """Create a file based on pid used by Storm to watch over the Python process. """ open(os.path.join(pid_dir, str(pid)), 'a').close() def _init_component(self): """Receive the setup_info dict from the Storm task and report back with our pid; also touch a pidfile in the pidDir specified in setup_info. """ setup_info = self._serializer.read_msg() pid = os.getpid() self._serializer.send_msg({'pid': pid}) self._create_pidfile(setup_info['pidDir'], pid) return StormConfig(setup_info['conf']), setup_info['context'] def send_command(self, command, opts_dict=None): """Merge command with options and send the message through :class:`~pyleus.storm.serializers.serializer.Serializer` """ if opts_dict is not None: command_dict = dict(opts_dict) command_dict['command'] = command else: command_dict = dict(command=command) self._serializer.send_msg(command_dict)
[docs] def log(self, msg, level=LOG_INFO): """Send a log message. :param msg: log message :type msg: ``str`` :param level: log levels defined as constants in :mod:`pyleus.storm`. Allowed: ``LOG_TRACE``, ``LOG_DEBUG``, ``LOG_INFO``, ``LOG_WARN``, ``LOG_ERROR``. Default: ``LOG_INFO`` :type stream: ``int`` """ self.send_command('log', { 'msg': msg, 'level': level, })
[docs] def log_trace(self, msg): """Send a log message with level LOG_TRACE. :param msg: log message :type msg: ``str`` """ self.log(msg, level=LOG_TRACE)
[docs] def log_debug(self, msg): """Send a log message with level LOG_DEBUG. :param msg: log message :type msg: ``str`` """ self.log(msg, level=LOG_DEBUG)
[docs] def log_info(self, msg): """Send a log message with level LOG_INFO. :param msg: log message :type msg: ``str`` """ self.log(msg, level=LOG_INFO)
[docs] def log_warn(self, msg): """Send a log message with level LOG_WARN. :param msg: log message :type msg: ``str`` """ self.log(msg, level=LOG_WARN)
[docs] def log_error(self, msg): """Send a log message with level LOG_ERROR. :param msg: log message :type msg: ``str`` """ self.log(msg, level=LOG_ERROR)
[docs] def error(self, msg): """Send an error message. :param msg: error message :type msg: ``str`` """ self.send_command('error', { 'msg': msg, })