In the previous post I have presented an overview of the topology used to analyse twitter streams with Kafka and Storm. Now it’s time to cover the technical details of the twitter topology.
The declaration of the storm topology using KafkaSpout to read the tweets from a kafka queue:
First of all, we are going to filter the tweets that we are interested in. As we are going to perform the sentiment analysis just to tweets in english, we are filtering on this property:
Then, we will sanitise our tweets by converting accented characters into unaccented characters and by removing single letters or numbers:
On this bolt we are scoring the tweet by each of its words using SentiWordNet. That’s not the best way to do it as it can have false positives or negatives given that it does the classification word by word independently: it does not cover the tweet context or sarcasm, etc. but that’s ok for a sample 🙂
Finally, we will store to Cassandra the the tweet with the score and its hashtags if any:
That branch of the topology graph is responsible for splitting the different hashtags and emitting a tuple per hashtag to the next bolt. That’s why we inherit from BaseRichBolt in order to manually ACK the tuple after all hashtags have been emitted.
In this particular case, we are using Fields Grouping because we want to partition the stream by hashtag. It means that the same hashtag will always go to the same task. Thus, we can use a hashmap to count the number of occurrences of a hashtag:
For that bolt, we are using Global Grouping to get the top 20 hashtags. Global grouping means that all hashtags will go to the same bolt’s task. For that use case, we need a sliding windows in order to get the top 20 hashtags every 10 seconds. We are relying on the Storm Tick Tuple feature. For normal tuples we just do the ranking of hashtags and, when a tick tuple is received (configured to get it every 10sec) we emit the ranking calculated over this window of time.
Finally, we are storing the top N hashtags per day in Cassandra. For that we’re using the row partitioning pattern to store a row per day and the top hashtags for each time bucket (20 seconds)
Although the hashtag counter may work, I will not say that is entirely correct and there are better ways to do it:
- You can take a look to this excellent resource: http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/
- Using Storm Trident: In the next post I will show how to use the high level abstraction from Storm that allows to process a stream as a sequence of small batches of data (aka micro-batching) and fits better for the top hashtags example.