"""Module containing the implementation of the Spout component."""
from __future__ import absolute_import
import logging
from pyleus.storm import StormWentAwayError
from pyleus.storm.component import Component
log = logging.getLogger(__name__)
[docs]class Spout(Component):
"""Spout component class. Inherit from
:class:`~pyleus.storm.component.Component`.
"""
COMPONENT_TYPE = "spout"
[docs] def next_tuple(self):
"""Emit the next tuple into the topology.
.. note:: Implement in subclass.
"""
pass
[docs] def ack(self, tup_id):
"""Ack a tuple to the source.
:param tup_id: tuple identifier
:type tup_id: ``str`` or ``long``
.. note:: Implement in subclass. Default behaviour is ``pass``.
"""
pass
[docs] def fail(self, tup_id):
"""Fail a tuple to the source.
:param tup_id: tuple identifier
:type tup_id: ``str`` or ``long``
.. note:: Implement in subclass. Default behaviour is ``pass``.
"""
pass
def _handle_command(self, msg):
"""Switch on the type of command."""
command = msg['command']
if command == 'next':
self.next_tuple()
elif command == 'ack':
self.ack(msg['id'])
elif command == 'fail':
self.fail(msg['id'])
def _sync(self):
"""Send a sync message."""
self.send_command('sync')
def run_component(self):
"""Spout main loop."""
try:
while True:
msg = self.read_command()
self._handle_command(msg)
self._sync()
except StormWentAwayError:
log.warning("Disconnected from Storm. Exiting.")
[docs] def emit(
self, values,
stream=None, tup_id=None,
direct_task=None, need_task_ids=True):
"""Build and send an output tuple command dict and return the ids of
the tasks to which the tuple was sent by Storm.
:param values: pyleus tuple values to be emitted
:type values: ``tuple`` or ``list``
:param stream:
output stream the message is going to belong to, default ``DEFAULT``
:type stream: ``str``
:param tup_id:
identifier that will be used by Storm for tracking the tuple for
reliability purpose. It will be passed as argument to both
:meth:`~.ack` and :meth:`~.fail` when the tuple terminates its
lifecycle. Default ``None``
:type tup_id: ``str`` or ``long``
:param direct_task: task message will be sent to, default None
:type direct_task: ``int``
:param need_task_ids:
whether emit should return the ids of the task the message has been
sent to, default ``True``
:type need_task_ids: ``bool``
.. note:: ``tup_id`` should be JSON-serializable.
.. note::
Omitting ``tup_id`` will disable reliability tracking for that
tuple. If you provide a value for ``tup_id``, then you also need to
run at least one Storm **acker** (see :ref:`reliability`),
otherwise your topology will hang.
.. tip::
Setting ``need_task_ids`` to ``False`` really helps in achieving
better performances. You should always do that if your application
does not leverage task ids.
.. danger::
``direct_task`` is not yet supported.
"""
assert isinstance(values, list) or isinstance(values, tuple)
command_dict = {
# Different versions of simplejson serialize namedtuples differently.
# Cast to tuple in order to have consistent
# behavior between msgpack, json and simplejson.
'tuple': tuple(values),
}
if stream is not None:
command_dict['stream'] = stream
if tup_id is not None:
command_dict['id'] = tup_id
if direct_task is not None:
command_dict['task'] = direct_task
# By default, Storm sends back to the component the task ids of the
# tasks receiving the tuple. If need_task_ids is set to False, Storm
# won't send the task ids for that message
if not need_task_ids:
command_dict['need_task_ids'] = False
self.send_command('emit', command_dict)
if need_task_ids:
return self.read_taskid()