Introduction to MLflow

MLflow is a tool to manage the lifecycle of Machine Learning projects. Is composed by three components:

  • Tracking: Records parameters, metrics and artifacts of each run of a model
  • Projects: Format for packaging data science projects and its dependencies
  • Models: Generic format for packaging ML models and serve them through REST API or others.

ML Tracking using XGBoost

Let’s work on a quick sample to demonstrate the benefits of MLFlow by tracking ML experiment using XGBoost and the Census Income Data Set.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
with mlflow.start_run():
# model parameters
params = {'learning_rate': 0.1, 'n_estimators': 100, 'seed': 0, 'subsample': 1, 'colsample_bytree': 1,
'objective': 'binary:logistic', 'max_depth': 3}

# log model params
for key in params:
mlflow.log_param(key, params[key])

# train XGBoost model
gbtree = XGBClassifier(**params)
gbtree.fit(train_features, train_labels)

importances = gbtree.get_booster().get_fscore()
print(importances)

# get predictions
y_pred = gbtree.predict(test_features)

accuracy = accuracy_score(test_labels, y_pred)
print("Accuracy: %.1f%%" % (accuracy * 100.0))

# log accuracy metric
mlflow.log_metric("accuracy", accuracy)

sns.set(font_scale=1.5)
xgb.plot_importance(gbtree)
plt.savefig("importance.png", dpi = 200, bbox_inches = "tight")

mlflow.log_artifact("importance.png")

# log model
mlflow.sklearn.log_model(gbtree, "model")

In this example, we’re using the MLflow Python API to track the experiment parameters, metric (accuracy), artifacts (our plot) and the XGBoost model.

When we run for the first time, we can see in the MLflow UI the following:

With our initial parameters we see that the metric accuracy is: 0.866 (86.6%)

If we select the run and we see our artifact:

Next, we will change our parameter max_depth to 6 and let’s see what happens:

And we see that our accuract has improved: 0.874 (87.4%)

All the history is tracked, as well as the model itself, so it means we will have all our experiments history tracked and the performance on the model at one moment in time.

You can check the full sample in Github https://github.com/mserrate/mlflow-sample.

Share Comments

Using an external Hive Metastore in Azure Databricks

If you already have a Hive metastore, such as the one used by Azure HDInsight, you can use Spark SQL to query the tables the same way you do it in Hive with the advantage to have a centralized metastore to manage your table schemas from both Databricks and HDInsight.

There are a couple of options to set up in the spark cluster configuration.
Apart from the database configuration options you need to specify the hive metastore version, for the current HDInsight version this is 2.1.1, and make sure to set hive.metastore.schema.verification.record.version and hive.metastore.schema.verification to true to make sure that the Spark cluster doesn’t update the metastore schema.
Also you will need to give access to the right Azure Storage account/s in order to have the right permissions to access the underlying physical data (remember that metastore is only that, metadata, not the actual data).

1
2
3
4
5
6
7
8
9
spark.hadoop.javax.jdo.option.ConnectionDriverName com.microsoft.sqlserver.jdbc.SQLServerDriver
spark.hadoop.javax.jdo.option.ConnectionURL jdbc:sqlserver://YOUDBSERVER:1433;database=metastore;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=300
spark.hadoop.javax.jdo.option.ConnectionUserName YOURDBUSERNAME
spark.hadoop.javax.jdo.option.ConnectionPassword YOURDBPASSWORD
hive.metastore.schema.verification.record.version true
spark.sql.hive.metastore.jars maven
hive.metastore.schema.verification true
spark.sql.hive.metastore.version 2.1.1
fs.azure.account.key.YOURACCOUNTNAME.blob.core.windows.net YOURACCOUNTKEY
Share Comments

Logistic Regression with TensorFlow

In the previous post we’ve seen the basics of Logistic Regression & Binary classification.

Now we’re going to see an example with python and TensorFlow.

On this example we’re going to use the dataset that shows the probability of passing an exam by taking into account 2 features: hours studied vs hours slept.

First, we’re going to import the dependencies:

1
2
3
4
5
6
7
8
# Import dependencies
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import sklearn
from sklearn.model_selection import train_test_split

%matplotlib inline
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
data = np.genfromtxt('data_classification.csv', delimiter=',')

#Get the 2 features (hours slept & hours studied)
X = data[:, 0:2]
# Get the result (0 suspended - 1 approved)
Y = data[:, 2]


# Plotting
pos = np.where(Y == 1)
neg = np.where(Y == 0)
plt.scatter(X[pos, 0], X[pos, 1], marker='o', c='b')
plt.scatter(X[neg, 0], X[neg, 1], marker='x', c='r')
plt.xlabel('Hours studied')
plt.ylabel('Hours slept')
plt.legend(['Approved', 'Suspended'])
plt.show()

#Split the data in train & test
Y_reshape = data[:,2].reshape(data[:,2].shape[0], 1)
x_train, x_test, y_train, y_test = train_test_split(data[:, 0:2], Y_reshape)

print ("x_train shape: " + str(x_train.shape))
print ("y_train shape: " + str(y_train.shape))
print ("x_test shape: " + str(x_test.shape))
print ("y_test shape: " + str(y_test.shape))

num_features = x_train.shape[1]

Now we’re building the logistic regression model with TensorFlow:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
learning_rate = 0.01
training_epochs = 1000

tf.reset_default_graph()

# By aving 2 features: hours slept & hours studied
X = tf.placeholder(tf.float32, [None, num_features], name="X")
Y = tf.placeholder(tf.float32, [None, 1], name="Y")

# Initialize our weigts & bias
W = tf.get_variable("W", [num_features, 1], initializer = tf.contrib.layers.xavier_initializer())
b = tf.get_variable("b", [1], initializer = tf.zeros_initializer())

Z = tf.add(tf.matmul(X, W), b)
prediction = tf.nn.sigmoid(Z)

# Calculate the cost
cost = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits = Z, labels = Y))

# Use Adam as optimization method
optimizer = tf.train.AdamOptimizer(learning_rate).minimize(cost)

init = tf.global_variables_initializer()

cost_history = np.empty(shape=[1],dtype=float)

with tf.Session() as sess:
sess.run(init)

for epoch in range(training_epochs):
_, c = sess.run([optimizer, cost], feed_dict={X: x_train, Y: y_train})
print("Epoch:", '%04d' % (epoch+1), "cost=", "{:.9f}".format(c), \
"W=", sess.run(W), "b=", sess.run(b))
cost_history = np.append(cost_history, c)


# Calculate the correct predictions
correct_prediction = tf.to_float(tf.greater(prediction, 0.5))

# Calculate accuracy on the test set
accuracy = tf.reduce_mean(tf.to_float(tf.equal(Y, correct_prediction)))

print ("Train Accuracy:", accuracy.eval({X: x_train, Y: y_train}))
print ("Test Accuracy:", accuracy.eval({X: x_test, Y: y_test}))

Our accuracy is 86% not too bad with a dataset of only 100 elements. The optimization of the cost function is as follows:

So, our linear regression example looks like follows:

Share Comments

Logistic Regression for Deep Learning

In this post we’re going to cover some basic intuition to work on logistic regression for Deep Learning algorithms.

Logistic regression is an algorithm for binary classification, which is basically used when you want to have your model to return 0 or 1. Some examples: is this image a cat? is this email spam? etc.

The basic equation is:
$$
\begin{align}
\hat{y} = w^T x + b \label{basic}
\end{align}
$$

where:

  • $\mathbf{\hat{y}}$: is the value that our model predicts
  • $\mathbf{w \in \mathbb{R}^n}$: is a vector of $\mathbf{n}$ parameters representing the weights.
  • $\mathbf{x \in \mathbb{R}^n}$: is a vector of $\mathbf{n}$ parameters representing the features.
  • $\mathbf{b \in \mathbb{R}}$: is a scalar representing the bias or intercept term

$\mathbf{w}$ and $\mathbf{b}$ are the parameters that control the behavior of the model. We can think of $\mathbf{w}$ as the weights that determine how each feature $\mathbf{x_i}$ affects the prediction.

The objective of the machine learning algorithm is to learn the parameters $\mathbf{w}$ and $\mathbf{b}$ so $\mathbf{\hat{y}}$ becomes a good estimate of the chance of being $\mathbf{y}$

Activation function: Sigmoid

The output of the equation ($\ref{basic}$) is a linear function. So, how we transform this linear regression result to a non-linear result? The answer is the sigmoid function that transforms our input to a binary output:

$$
\begin{align}
\hat{y} = \sigma(w^T x + b)
\end{align}
$$

where

$$
\begin{align}
\sigma(z) = \frac{1}{1 + e^{-z}}
\end{align}
$$

The sigmoid function can be represented as:

As you can see this activation function allows us to map results to 0 or 1 given:

  • For larger positive values of $\mathbf{z}$ we will have a $\mathbf{\sigma(z)}$ near 1
  • For larger negative values of $\mathbf{z}$ we will have a $\mathbf{\sigma(z)}$ near 0

Cost Function

First of all, we have the loss function which is used by one training example:
$$
\begin{align}
\mathcal{L}(\hat{y}, y) = - \bigl(y\log\hat{y} + (1 - y) \log(1 - \hat{y})\bigr)
\end{align}
$$

And the cost function measures how you are performing for the entire training set:
$$
\begin{align}
\mathcal{J}(w, b) = \frac1m \sum_{i=1}^m \mathcal{L}(\hat{y}^{(i)}, y^{(i)})
\end{align}
$$

As we want to improve as much as possible the performance, we are going to try to find the w & b values that minimizes this cost function. And that, is basically what gradient descent does for us.

Gradient descent

Gradient descent is one of the most popuar optimization methods for neural networks for its simplicity (although it can have convergence problems due local minimums). Other optimization methods are: Adam or RMSprop.
The basic idea on gradient descent is that on each iteration (determined by the slope or derivative $\partial$), the weights are updated incrementally using a learning rate $\alpha$.

A visual interpretation of gradient descent is the following:

Gradient descent <br />Source: <a href=https://sebastianraschka.com/faq/docs/closed-form-vs-gd.html>sebastianraschka.com</a>

Given our cost function $\mathcal{J}(w, b)$, weights and bias are updated with the following formula:

$$
\begin{align}
w = w - \alpha\,\frac{\partial\,\mathcal{J}(w, b)}{\partial w}
\end{align}
$$
$$
\begin{align}
b = b - \alpha\,\frac{\partial\,\mathcal{J}(w, b)}{\partial b}
\end{align}
$$

where the symbol $\partial$ in $\partial\,\mathcal{J}(w, b)$ basically means the derivative of the cost function.

In the next post, we will see how to apply this theory with an example written with python & TensorFlow.

Share Comments

Preview of Kafka Streams

The preview of Kafka Streams, which is one of the main features of the upcoming Apache Kafka 0.10, was announced by Jay Kreps this week.

Kafka joins the Stream Processing club

Kafka Streams is a library to build streaming applications using Kafka topics as input/output. Kafka Streams is in the same league as other streaming systems such as: Apache Storm, Apache Flink and, not surprisingly, Apache Samza which also uses Kafka for input or output of streams.

One of the main advantages is that if you’re already using Apache Kafka and you need real-time processing, you just need to use the library and you are good to go.

Other important features are: stateful processing, windowing and ability to be deployed using your preferred solution: a simple command line, Mesos, YARN or kubernetes and docker if you’re a container party boy.

Streams and Tables

One of the key concepts in Kafka Streams is the support of KStream and KTable.

That isn’t a new concept if you come from the Event Sourcing world: the KStream is the append-only event store where its state is given by replaying the events from the beginning of time until the last event whereas KTable is the snapshot or projection of the current state of the stream given a point in time.

Example: Twitter Hashtags Job

Kafka Streams: KStream and KTable

Show me the code!

You can find the complete example here: https://github.com/mserrate/kafka-streams-app

1
2
3
4
5
6
7
8
9
KStream<String, JsonNode> source = builder.stream(stringDeserializer, jsonDeserializer, "streams-hashtag-input");

KTable<String, Long> counts = source
.filter(new HashtagFilter())
.flatMapValues(new HashtagSplitter())
.map(new HashtagMapper())
.countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts");

counts.to("streams-hashtag-count-output", stringSerializer, longSerializer);

For this example I’ve been using a simple TweetProducer who connects to the Twitter Streaming API and sends JSON events to a Kafka topic.

This topic is read as a KStream and then we begin the process:

  1. Filter out the tweets without hashtags
  2. Apply a flatMapValues (we are just interested in the values, not the keys) to split the different hashtags in a tweet
  3. Apply a map to return a key (hashtag) value (hashtag) as we want to aggregate by hashtag
  4. Aggregate the streams per key (the hashtag) and count them

 

Finally we send the KTable to the output queue.

Share Comments

Sentiment analysis of tweets

In the previous post I have presented an overview of the topology used to analyse twitter streams with Kafka and Storm. Now it’s time to cover the technical details of the twitter topology.

Twitter Topology

Twitter Streaming

The declaration of the storm topology using KafkaSpout to read the tweets from a kafka queue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class TwitterProcessorTopology extends BaseTopology {

public TwitterProcessorTopology(String configFileLocation) throws Exception {
super(configFileLocation);
}

private void configureKafkaSpout(TopologyBuilder topology) {
BrokerHosts hosts = new ZkHosts(topologyConfig.getProperty("zookeeper.host"));

SpoutConfig spoutConfig = new SpoutConfig(
hosts,
topologyConfig.getProperty("kafka.twitter.raw.topic"),
topologyConfig.getProperty("kafka.zkRoot"),
topologyConfig.getProperty("kafka.consumer.group"));
spoutConfig.scheme= new SchemeAsMultiScheme(new StringScheme());

KafkaSpout kafkaSpout= new KafkaSpout(spoutConfig);
topology.setSpout("twitterSpout", kafkaSpout);
}

private void configureBolts(TopologyBuilder topology) {
// filtering
topology.setBolt("twitterFilter", new TwitterFilterBolt(), 4)
.shuffleGrouping("twitterSpout");

// sanitization
topology.setBolt("textSanitization", new TextSanitizationBolt(), 4)
.shuffleGrouping("twitterFilter");

// sentiment analysis
topology.setBolt("sentimentAnalysis", new SentimentAnalysisBolt(), 4)
.shuffleGrouping("textSanitization");

// persist tweets with analysis to Cassandra
topology.setBolt("sentimentAnalysisToCassandra", new SentimentAnalysisToCassandraBolt(topologyConfig), 4)
.shuffleGrouping("sentimentAnalysis");

// divide sentiment by hashtag
topology.setBolt("hashtagSplitter", new HashtagSplitterBolt(), 4)
.shuffleGrouping("textSanitization");

// persist hashtags to Cassandra
topology.setBolt("hashtagCounter", new HashtagCounterBolt(), 4)
.fieldsGrouping("hashtagSplitter", new Fields("tweet_hashtag"));

topology.setBolt("topHashtag", new TopHashtagBolt())
.globalGrouping("hashtagCounter");

topology.setBolt("topHashtagToCassandra", new TopHashtagToCassandraBolt(topologyConfig), 4)
.shuffleGrouping("topHashtag");
}

private void buildAndSubmit() throws Exception {
TopologyBuilder builder = new TopologyBuilder();
configureKafkaSpout(builder);
configureBolts(builder);

Config config = new Config();

//set producer properties
Properties props = new Properties();
props.put("metadata.broker.list", topologyConfig.getProperty("kafka.broker.list"));
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);

StormSubmitter.submitTopology("twitter-processor", config, builder.createTopology());
}

public static void main(String[] args) throws Exception {
String configFileLocation = args[0];

TwitterProcessorTopology topology = new TwitterProcessorTopology(configFileLocation);
topology.buildAndSubmit();
}
}

Read More

Share Comments

Analysis of twitter streams with Kafka and Storm

Following my last post, I will present a real-time processing sample with Kafka and Storm using the Twitter Streaming API.

Overview

Twitter Streaming

The solution consists of the following:

  • twitter-kafka-producer: A very basic producer that reads tweets from the Twitter Streaming API and stores them in Kafka.
  • twitter-storm-topology: A Storm topology that reads tweets from Kafka and, after applying filtering and sanitization, process the messages in parallel for:
    • Sentiment Analysis: Using a sentiment analysis algorithm to classify the tweet into a positive or negative feeling.
    • Top Hashtags: Calculates the top 20 hashtags using a sliding window.

Storm Topology

Twitter Topology

The Storm topology consist of the following elements:

  • Kafka Spout: The spout implementation to read messages from Kafka.

  • Filtering: Filtering out all non-english language tweets.

  • Sanitization: Text normalization in order to be processed properly by the sentiment analysis algorithm.

  • Sentiment Analysis: The algorithm that analyses word by word the text of the tweet, giving a value between -1 to 1.

  • Sentiment Analysis to Cassandra: Stores the tweets and its sentiment value in Cassandra.

  • Hashtag Splitter: Splits the different hashtags appearing in a tweet.

  • Hashtag Counter: Counts hashtag occurrences.

  • Top Hashtag: Does a ranking of the top 20 hashtags given a sliding windows (using the Tick Tuple feature from Storm).

  • Top Hashtag to Cassandra: Stores the top 20 hashtags in Cassandra.

Summary

In this post we have seen the benefits of using Apache Kafka & Apache Storm to ingest and process streams of data, on next posts will look at the implementation details and will provide some analytical insight from the data stored in Cassandra.

The sample can be found on Github: https://github.com/mserrate/twitter-streaming-app

Share Comments

Big Data: streams and lambdas

I’ve been working for some years now in distributed systems and event-driven architectures, from the misunderstood SOA (or its refurbished version known as Microservices) to Event Sourcing.

Some of the concepts presented in these systems related to events like immutability, perpetuity and versioning are valid as well for stream processing. Stream processing along with batch processing is sometimes referred as Big Data.

Big Data

When we think about Big Data what it first comes to our mind is Hadoop for batch processing. Although Hadoop has a big capacity to process indecent amounts of data, it also comes with a high latency response.

Although this latency won’t be a problem for a lot of use cases, it may be a problem when we need to get real (or near-real) time feedback.

That’s where the Lambda Architecture (by Nathan Marz) comes in by describing how to design a system where most of our data is processed by the batch layer but, while this process is running, we are able to process the streams coming into our system:

Lambda architecture

Where we can say that:

Current View = Query(Batch View) + Query(Stream View)

Batch Layer

The batch processing layer computes arbitrary sets of data using the entire historical data. The obvious example of batch processing is Hadoop, or to be more precise, the distributed file system HDFS and a processing tool like MapReduce, Pig

The result of this process will be stored in a database that should support batch writes (ElephantDB, HBase) but no random writes. That makes the database architecture extremely simple by removing features like online compactation or concurrency.

Stream Layer

The stream processing layer computes data one by one giving immediate feedback. Depending on the number of events or the throughput needed we may use different technologies: Spark Streaming (although it’s micro-batch the latency may be sufficient for many use cases), Storm, Samza, Flink.

The result of this process will be stored in a database that should support random writes, one option may be Cassandra.

In following posts I will present concrete examples with docker images using some technologies that I’ve used like: Kafka, Storm, Cassandra and Druid.

Share Comments

Speaking at DotNetSpain conference

Too long without posting… Anyway, a short post to remember that I will be speaking at DotNetSpain 2015 about Complex Event Processing, Immutability and Projections with EventStore.

So, if you are interested come and say hi!

Share Comments

Webcast about SOA, DDD & CQRS with NServiceBus

I will be giving a Webcast tomorrow about SOA, DDD, CQRS with NServiceBus in Spanish. In this talk I will cover the DDD strategic design, bounded contexts and how to model domain logic through NServiceBus Sagas.

You can see the details in the following link:

Share Comments