"""JSON-specialized Bolt component."""
from __future__ import absolute_import
import logging
try:
import simplejson as json
_ = json # pyflakes
except ImportError:
import json
from .storm import SimpleBolt
log = logging.getLogger(__name__)
[docs]class JSONFieldsBolt(SimpleBolt):
"""JSON-specialized SimpleBolt abstracting JSON-parsing
and tuple processing from the actual application logic.
"""
[docs] def process_tuple(self, tup):
"""Extract JSON representation of incoming tuple value and emit based
on the result of :meth:`~.extract_fields`.
.. note:: Emitted tuples are automatically anchored.
"""
line, = tup.values
json_dict = json.loads(line)
fields = self.extract_fields(json_dict)
if fields is None:
return
self.emit(fields, anchors=[tup])