Source code for pyleus.storm.bolt

"""Module containing the implementation of the Bolt component and a subclassed
SimpleBolt component which takes care of acking/failing tuples and exposing a
nicer API for handling tick tuples.
"""
from __future__ import absolute_import

import logging

from pyleus.storm import is_tick, is_heartbeat, StormWentAwayError
from pyleus.storm.component import Component

log = logging.getLogger(__name__)


[docs]class Bolt(Component): """Bolt component class. Inherit from :class:`~pyleus.storm.component.Component`. """ COMPONENT_TYPE = "bolt"
[docs] def process_tuple(self, tup): """Process the incoming tuple. :param tup: pyleus tuple representing the message to be processed :type tup: :class:`~pyleus.storm.StormTuple` .. note:: Implement in subclass. """ pass
def _process_tuple(self, tup): """Bolt middleware classes such as SimpleBolt should override this to inject functionality around tuple processing without changing the API for downstream bolt implementations. .. note: Implement in Bolt middleware subclass. """ if is_heartbeat(tup): self.sync() else: return self.process_tuple(tup) def run_component(self): """Bolt main loop.""" try: while True: tup = self.read_tuple() self._process_tuple(tup) except StormWentAwayError: log.warning("Disconnected from Storm. Exiting.")
[docs] def ack(self, tup): """Ack a tuple. :param tup: tuple to ack :type tup: :class:`~pyleus.storm.StormTuple` .. note:: All tuples need to be acked or failed, independently whether you are using Storm reliability features or not. If you are directly using :class:`~.Bolt` instead of :class:`~.SimpleBolt`, you must call this method or your topology will eventually run out of memory or hang. """ self.send_command('ack', { 'id': tup.id, })
[docs] def fail(self, tup): """Fail a tuple. :param tup: tuple to fail :type tup: :class:`~pyleus.storm.StormTuple` .. note:: All tuples need to be acked or failed, independently whether you are using Storm reliability features or not. If you are directly using :class:`~.Bolt` instead of :class:`~.SimpleBolt`, you must call this method or your topology will eventually run out of memory or hang. """ self.send_command('fail', { 'id': tup.id, })
[docs] def sync(self): """Respond to heartbeat. """ self.send_command('sync')
[docs] def emit( self, values, stream=None, anchors=None, direct_task=None, need_task_ids=True): """Build and send an output tuple command dict and return the ids of the tasks to which the tuple was sent by Storm. :param values: pyleus tuple values to be emitted :type values: ``tuple`` or ``list`` :param stream: output stream the message is going to belong to, default ``DEFAULT`` :type stream: ``str`` :param anchors: list of pyleus tuples the message should be anchored to, default ``None`` :type anchors: ``list`` of pyleus tuples :param direct_task: task message will be sent to, default None :type direct_task: ``int`` :param need_task_ids: whether emit should return the ids of the task the message has been sent to, default ``True`` :type need_task_ids: ``bool`` .. tip:: Setting ``need_task_ids`` to ``False`` really helps in achieving better performances. You should always do that if your application does not leverage task ids. .. danger:: ``direct_task`` is not yet supported. """ assert isinstance(values, list) or isinstance(values, tuple) if anchors is None: anchors = [] command_dict = { 'anchors': [anchor.id for anchor in anchors], # Different versions of simplejson serialize namedtuples differently. # Cast to tuple in order to have consistent # behavior between msgpack, json and simplejson. 'tuple': tuple(values), } if stream is not None: command_dict['stream'] = stream if direct_task is not None: command_dict['task'] = direct_task # By default, Storm sends back to the component the task ids of the # tasks receiving the tuple. If need_task_ids is set to False, Storm # won't send the task ids for that message if not need_task_ids: command_dict['need_task_ids'] = False self.send_command('emit', command_dict) if need_task_ids: return self.read_taskid()
[docs]class SimpleBolt(Bolt): """A Bolt that automatically acks/fails tuples. Implement process_tick() in a subclass to handle tick tuples with a nicer API. """
[docs] def process_tick(self): """Code to be executed when a tick tuple reaches the component. .. note:: Implement in subclass.""" pass
def _process_tuple(self, tup): """SimpleBolt middleware level tuple processing.""" if is_heartbeat(tup): self.sync() else: if is_tick(tup): self.process_tick() else: self.process_tuple(tup) self.ack(tup)