In the previous post I have presented an overview of the topology used to analyse twitter streams with Kafka and Storm. Now it’s time to cover the technical details of the twitter topology.
Twitter Topology
The declaration of the storm topology using KafkaSpout to read the tweets from a kafka queue:
privatevoidconfigureKafkaSpout(TopologyBuilder topology){ BrokerHosts hosts = new ZkHosts(topologyConfig.getProperty("zookeeper.host"));
SpoutConfig spoutConfig = new SpoutConfig( hosts, topologyConfig.getProperty("kafka.twitter.raw.topic"), topologyConfig.getProperty("kafka.zkRoot"), topologyConfig.getProperty("kafka.consumer.group")); spoutConfig.scheme= new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout= new KafkaSpout(spoutConfig); topology.setSpout("twitterSpout", kafkaSpout); }
privatevoidconfigureBolts(TopologyBuilder topology){ // filtering topology.setBolt("twitterFilter", new TwitterFilterBolt(), 4) .shuffleGrouping("twitterSpout");
// sanitization topology.setBolt("textSanitization", new TextSanitizationBolt(), 4) .shuffleGrouping("twitterFilter");
// sentiment analysis topology.setBolt("sentimentAnalysis", new SentimentAnalysisBolt(), 4) .shuffleGrouping("textSanitization");
// persist tweets with analysis to Cassandra topology.setBolt("sentimentAnalysisToCassandra", new SentimentAnalysisToCassandraBolt(topologyConfig), 4) .shuffleGrouping("sentimentAnalysis");
// divide sentiment by hashtag topology.setBolt("hashtagSplitter", new HashtagSplitterBolt(), 4) .shuffleGrouping("textSanitization");
// persist hashtags to Cassandra topology.setBolt("hashtagCounter", new HashtagCounterBolt(), 4) .fieldsGrouping("hashtagSplitter", new Fields("tweet_hashtag"));
topology.setBolt("topHashtag", new TopHashtagBolt()) .globalGrouping("hashtagCounter");
topology.setBolt("topHashtagToCassandra", new TopHashtagToCassandraBolt(topologyConfig), 4) .shuffleGrouping("topHashtag"); }
privatevoidbuildAndSubmit()throws Exception { TopologyBuilder builder = new TopologyBuilder(); configureKafkaSpout(builder); configureBolts(builder);
TwitterProcessorTopology topology = new TwitterProcessorTopology(configFileLocation); topology.buildAndSubmit(); } }
1. Filter Bolt
First of all, we are going to filter the tweets that we are interested in. As we are going to perform the sentiment analysis just to tweets in english, we are filtering on this property:
On this bolt we are scoring the tweet by each of its words using SentiWordNet. That’s not the best way to do it as it can have false positives or negatives given that it does the classification word by word independently: it does not cover the tweet context or sarcasm, etc. but that’s ok for a sample 🙂
That branch of the topology graph is responsible for splitting the different hashtags and emitting a tuple per hashtag to the next bolt. That’s why we inherit from BaseRichBolt in order to manually ACK the tuple after all hashtags have been emitted.
In this particular case, we are using Fields Grouping because we want to partition the stream by hashtag. It means that the same hashtag will always go to the same task. Thus, we can use a hashmap to count the number of occurrences of a hashtag:
For that bolt, we are using Global Grouping to get the top 20 hashtags. Global grouping means that all hashtags will go to the same bolt’s task. For that use case, we need a sliding windows in order to get the top 20 hashtags every 10 seconds. We are relying on the Storm Tick Tuple feature. For normal tuples we just do the ranking of hashtags and, when a tick tuple is received (configured to get it every 10sec) we emit the ranking calculated over this window of time.
Finally, we are storing the top N hashtags per day in Cassandra. For that we’re using the row partitioning pattern to store a row per day and the top hashtags for each time bucket (20 seconds)
Using Storm Trident: In the next post I will show how to use the high level abstraction from Storm that allows to process a stream as a sequence of small batches of data (aka micro-batching) and fits better for the top hashtags example.