What is PubNub?
PubNub is a real-time messaging platform which uses publisher / subscriber pattern. You can find out more about PubNub by visiting www.pubnub.com
What is Storm?
Storm is a free and open source distributed realtime computation system. Storm can be used in scenarios like: realtime analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. To learn more about Storm project, visit Storm Project website
PubNub Spout for Storm
Once you get familiar with Storm, you would know that Storm uses a concept called Spouts. A Spout is a source of streams in a Storm topology. Basically Spout is the one who will read tuples from the external source and emit them in to the Storm topology. Please refer to Storm Wiki to read more about the concept of Spout.
In this post it will show how to use PubNub as the real-time external data source for a Storm topology, by making a PubNub Spout.
The Design
Basically what we do is create a Storm Spout, which is subscribed to a PubNub channel. Once, some data is been published to that channel, Spout will receive those data and it will emit them in to the Storm topology.
The Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
package com.kohls.trending.spout; import backtype.storm.Config; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import com.kohls.trending.TrendingProperties; import com.pubnub.api.Callback; import com.pubnub.api.Pubnub; import com.pubnub.api.PubnubError; import com.pubnub.api.PubnubException; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; /** * This is the PUBNUB Spout. This works as the input data stream for this Topology */ @SuppressWarnings({"rawtypes", "serial"}) public class PubnubSpout extends BaseRichSpout { Pubnub _pubnub; private SpoutOutputCollector collector; private LinkedBlockingQueue<String> queue; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _pubnub = new Pubnub(TrendingProperties.getInstance().PUBNUB_PUB_KEY, TrendingProperties.getInstance().PUBNUB_SUB_KEY, false); queue = new LinkedBlockingQueue<String>(1000); this.collector = collector; try { _pubnub.subscribe(new String[]{TrendingProperties.getInstance().PUBNUB_SUB_CHANNEL}, new Callback() { @Override public void successCallback(String channel, Object message) { queue.offer(message.toString()); } @Override public void errorCallback(String channel, PubnubError error) { } }); } catch (PubnubException e) { e.printStackTrace(); } } @Override public void nextTuple() { String ret = queue.poll(); if (ret == null) { Utils.sleep(50); } else { collector.emit(new Values(ret)); } } @Override public void close() { _pubnub.unsubscribe(TrendingProperties.getInstance().PUBNUB_SUB_CHANNEL); } @Override public Map<String, Object> getComponentConfiguration() { Config ret = new Config(); ret.setMaxTaskParallelism(1); return ret; } @Override public void ack(Object id) { } @Override public void fail(Object id) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("json")); } } |
If you find this useful, Please leave a comment below.