Saturday, July 15, 2017

Regression model outputting probability density distribution

For a classification problem (let say output is one of the labels R, G, B), how do we predict ?

There are two formats that we can report our prediction
  1. Output a single value which is most probable outcome.  e.g. output "B"  if P(B) > P(R) and P(B) > P(G)
  2. Output the probability estimation of each label.  (e.g. R=0.2, G=0.3, B=0.4)
But if we look at regression problem (lets say we output a numeric value v), most regression model only output a single value (that minimize the RMSE).  In this article, we will look at some use cases where outputting a probability density function is much preferred.

Predict the event occurrence time

As an illustrative example, we want to predict when would a student finish her work given she has already spent some time s.  In other words, we want to estimate E[t | t > s] where t is a random variable representing the total duration and s is the elapse time so far.

Estimating time t is generally hard if the model only output an expectation.  Notice that the model has the same set of features, expect that the elapse time has changed in a continuous manner as time passes.

Lets look at how we can train a prediction model that can output a density distribution.

Lets say our raw data schema: [feature, duration]
  • f1, 13.30
  • f2, 14.15
  • f3, 15.35
  • f4, 15.42
Take a look at the range (ie. min and max) of the output value.  We transform into the training data of the following schema:
[feature, dur<13, dur<14, dur<15, dur<16]
  • f1, 0, 1, 1, 1
  • f2, 0, 0, 1, 1
  • f3, 0, 0, 0, 1
  • f4, 0, 0, 0, 1
After that, we train 4 classification model.
  • feature, dur<13
  • feature, dur<14
  • feature, dur<15
  • feature, dur<16


Now, given a new observation with corresponding feature, we can invoke these 4 model to output the probability of binary classification (cumulative probability).  If we want the probability density, simply take the difference (ie: differentiation of cumulative probability).

At this moment, we can output a probability distribution given its input feature.



Now, we can easily estimate the remaining time from the expected time in the shade region.  As time passed, we just need to slide the red line continuously and recalculate the expected time, we don't need to execute the prediction model unless the input features has changed.

Predict cancellation before commitment 

As an illustrative example, lets say a customer of restaurant has reserved a table at 8:00pm.  Time now is 7:55pm and the customer still hasn't arrive, what is the chance of no-show ?

Now, given a person (with feature x), and current time is S - t (still hasn't bought the ticket yet), predict the probability of this person watching the movie.

Lets say our raw data schema: [feature, arrival]
  • f1, -15.42
  • f2, -15.35
  • f3, -14.15
  • f4, -13.30
  • f5, infinity
  • f6, infinity
We transform into the training data of the following schema:
[feature, arr<-16, arr<-15, arr<-14, arr<-13]
  • f1, 0, 1, 1, 1
  • f2, 0, 1, 1, 1
  • f3, 0, 0, 1, 1
  • f4, 0, 0, 0, 1
  • f5, 0, 0, 0, 0
  • f6, 0, 0, 0, 0
After that, we train 4 classification models.
  • feature, arr<-16
  • feature, arr<-15
  • feature, arr<-14
  • feature, arr<-13
Notice that P(arr<0) can be smaller than 1 because the customer can be no show.





In this post, we discuss some use cases where we need the regression model to output not just its value prediction but also the probability density distribution.  And we also illustrate how we can build such prediction model.

Sunday, July 2, 2017

How AI differs from ML

AI is not a new term, it is multiple decades old starting around early 80s when computer scientist design algorithms that can "learn" and "mimic human behavior".

On the "learning" side, the most significant algorithm is Neural Network, which is not very successful due to overfitting (the model is too powerful but not enough data).  Nevertheless, in some more specific tasks, the idea of "using data to fit a function" has gained significant success and this form the foundation of "machine learning" today.

On the "mimic" side, people have focus in "image recognition", "speech recognition", "natural language processing", experts have been spending tremendous amount of time to create features like "edge detection", "color profile", "N-grams", "Syntax tree" ... etc.  Nevertheless, the success is moderate.

Traditional Machine Learning

Machine Learning (ML) Technique has played a significant role in prediction and ML has undergone multiple generations, with a rick set of model structure, such as

  • Linear regression
  • Logistic regression
  • Decision tree
  • Support Vector Machine
  • Bayesian model
  • Regularization model
  • Ensemble model
  • Neural network

Each of these predictive model is based on certain algorithmic structure, with parameters as tunable knobs.  Training a predictive model involves the following

  1. Choose a model structure (e.g. Logistic regression, or Random forest, or ...)
  2. Feed the model with training data (with both input and output)
  3. The learning algorithm will output the optimal model (ie: model with specific parameters that minimize the training error)

Each model has its own characteristics and will perform good in some tasks and bad in others.  But generally, we can group them into the low-power (simple) model and the high-power (complex) model.  Choose between different models is a very tricky question.

Traditionally, using a low power / simple model is preferred over the use of a high power / complex model for the following reasons

  • Until we have massive processing power, training the high power model will take too long
  • Until we have massive amount of data, training the high power model will cause the overfit problem (since the high power model has rich parameters and can fit into a wide range of data shape, we may end up train a model that fits too specific to the current training data and not generalized enough to do good prediction on future data).

However, choosing a low power model suffers from the so called "under-fit" problem where the model structure is too simple and unable to fit the training data in case it is more complex.  (Imagine the underlying data has a quadratic relationship: y = 5 * x^2, there is no way you can fit a linear regression: y = a*x + b no matter what a and b we pick).

To mitigate the "under-fit problem", data scientist will typically apply their "domain knowledge" to come up with "input features", which has a more direct relationship with the output.  (e.g. Going back to the quadratic relationship: y = 5 * square(x), if you create a feature z = x^2, then you can fit a linear regression: y = a*z + b, by picking a = 5 and b = 0)

The major obstacle of "Machine Learning" is this "Feature Engineering" step which requires deep "domain experts" to identify important signals before feeding into training process.  The feature engineering step is very manual and demands a lot of scarce domain expertise and therefore become the major bottleneck of most machine learning tasks today.

In other words, if we don't have enough processing power and enough data, then we have to use the low-power / simpler model, which requires us to spend significant time and effort to create appropriate input features.  This is where most data scientists spending their time doing today.

Return of Neural Network

At early 2000, machine processing power has increased tremendously, with the advancement of cloud computing, massively parallel processing infrastructure, together with big data era where massive amount of fine grain event data being collected.  We are no longer restricted to the low-power / simple model.  For example, two most popular, mainstream machine learning model today are RandomForest and Gradient Boosting Tree.  Nevertheless, although both of them are very powerful and provide non-linear model fitting to the training data, data scientist still need to carefully create features in order to achieve good performance.

At the same time, computer scientists has revisited the use of many layers Neural Network in doing these human mimic tasks.  This give a new birth to DNN (Deep Neural Network) and provide a significant breakthrough in image classification and speech recognition tasks.  The major difference of DNN is that you can feed the raw signals (e.g. the RGB pixel value) directly into DNN without creating any domain specific input features.  Through many layers of neurons (hence it is called "deep" neural network), DNN can "automatically" generate the appropriate features through each layer and finally provide a very good prediction.  This saves significantly the "feature engineering" effort, a major bottleneck done by the data scientists.

DNN also evolves into many different network topology structure, so we have CNN (Convolutional Neural Network), RNN (Recurrent Neural Network), LSTM (Long Short Term Memory), GAN (Generative Adversarial Network), Transfer Learning, Attention Model ... etc.  The whole spectrum is called Deep Learning, which is catching the whole machine learning community’s attention today.

Reinforcement Learning

Another key component is about how to mimic a person (or animal) learn.  Imagine the very natural animal behavior of perceive/act/reward cycle.  A person or animal will first understand the environment by sensing what "state" he is in.  Based on that, he will pick an "action" which brings him to another "state".  Then he will receive a "reward".  The cycle repeats until he dies.  This way of learning (called "Reinforcement Learning") is quite different from the "curve fitting" approaches of traditional supervised machine learning approach.  In particular, learning in RL is very fast because every new feedback (such as perform an action and receive a reward) is sent immediately to influence subsequent decisions.  Reinforcement Learning has gain tremendous success in self-driving cars as well as AlphaGO (Chess Playing Robot).

Reinforcement Learning also provides a smooth integration between "Prediction" and "Optimization" because it maintains a belief of current state and possible transition probabilities when taking different actions, and then make decisions which action can lead to the best outcome.

AI = DL + RL

Compare to the classical ML Technique, DL provide a more powerful prediction model that usually produce good prediction accuracy.  Compare to the classical Optimization model using LP, RL provide a much faster learning mechanism and also more adaptive to change of the environment.

Saturday, April 29, 2017

An output of a truly random process

Recently I have a discussion with my data science team whether we can challenge the observations is following a random process or not.  Basically, data science is all about learning hidden pattern that is affecting the observations.  If the observation is following a random process, then there is nothing we can learn about.  Let me walk through an example to illustrate.

Lets say someone is making a claim that he is throwing a fair dice (with number 1 to 6) sequentially.

Lets say I claim the output of my dice throw is uniformly random, ie: with equal chances of getting a number from 1 to 6.

And then he throws the dice 12 times, and show you the output sequence.  From the output, can you make a judgement whether this is really a sequential flow of a fair dice ?  In other words, is the output really follow a random process as expected ?

Lets look at 3 situations
  • Situation 1 output is [4, 1, 3, 1, 2, 6, 3, 5, 5, 1, 2, 4]
  • Situation 2 output is [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
  • Situation 3 output is [1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6]
At first glance, the output of situation 1 looks like resulting from a random process.  Situation 2 definitely doesn't look like it.  Situation 3 is harder to judge.  If you look at the proportion of the output numbers, the frequency of each output number of situation 3 definitely follows a uniform distribution of a fair dice.  But if you look at the number ordering, situation 3 follows a well-defined ordering that doesn't seem to be random at all.  Therefore, I don't think the output of situation 3 is following a random process.

However, this seems to be a very arbitrary choice.  Why would I look at the number ordering at all ? Should I look for more properties ?  such as ...
  • Whether the number of the even position are even
  • Average gap between consecutive throws
  • Whether the number in the 3rd position always smaller than the 10th position
  • ...
As you can see, depends on my imagination, the list can go on and on.  How can I tell whether situation 3 is following a random process or not ?

Method 1: Randomization Test

This is based on the hypothesis testing methodology.  We establish null hypothesis H0 that situation 3 follows a random process.

First, I define an arbitrary list of statistics of my choices
  • statisticA = proportion of even numbers in even position
  • statisticB = average gap between consecutive output numbers
  • statisticC = ...
Second, I run a simulation to generate 12 numbers based on a random process.  Calculate the corresponding statistics defined above.

Third repeat the simulation for N times, output the mean and standard deviation of the statistics.

If the statisticA or B or C of situation 3 are too far away (based on the likelihood pValue) from the mean of statistics A/B/C by the number of standard deviation of statistics A/B/C, then we conclude that situation 3 is not following a random process.  Otherwise, we don't have enough evidence to show our null hypothesis is violated and so we accept situation 3 follows the random process.

Method 2: Predictability Test

This is based on the theory of predictive analytics.

First, I pick a particular machine learning algorithm, lets say time series forecast using ARIMA.
Notice that I can also choose to use RandomForest and create some arbitrary input features (such as previous output number, maximum number in last 3 numbers ... etc)

Second, I train my selected predictive model based on the output data of situation 3 (in this example, situation 3 has only 12 data point, but imagine we have much more than 12 data point).

Third, I evaluate my model in the test set.  And see whether the prediction is much better than a random guess.  For example I can measure the lift of my model by comparing the RMSE (root mean square error) or my prediction and the standard deviation of the testing data.  If the lift is very insignificant, then I conclude that situation 3 results from a random process, because my predictive model doesn't learn any pattern.


Tuesday, August 25, 2015

Common techniques in optimization

Optimization is a frequently encountered problem in real life.  We need to make a decision to achieve something within a set of constraints, and we need to maximize or minimize our objective based on some measurement.  For example, a restaurant may need to decide how many workers (of each position) to hire to serve the customers, with the constraint that workers cannot work overtime, and the objective is to minimize the cost.  A car manufacturer may need to decide how many units of each model to be produced, within the constraint of the storage capacity of its warehouse, and maximize the profit of its sales.

Exhaustive Search

If there isn't a lot of choices, an exhaustive search (e.g. breath-first, depth-first, best-first, A* ... etc.) can be employed to evaluate each option, see if it meets all the constraints (ie: a feasible solution), and then compute its objective value.  Then we sort all feasible solutions based on its corresponding objective value and pick the solution that has the max (or min) objective value as our decision.  Unfortunately, real world problem usually involve a large number (exponentially large due to combinatorial explosion) of choices, making the exhaustive search in many cases impractical.

When this happens, two other solution approaches are commonly used.
1) Mathematical Programming
2) Local Greedy Search.

Mathematical Programming

Mathematical programming is a classical way to solve optimization problem.  It is a family of approaches including linear programming, integer programming, quadratic programming and even non-linear programming.  The development process usually go through the following steps ...

From a problem description, the modeler will express the problem into a mathematical structure containing 3 parts.
  • Variables:  there are two types of variables.  Data variables contains the current value of your business environment (e.g. cost of hiring a waiter, price of a car), and Decision variables hold the decision you make to optimize your objective (e.g. how many staff to hire, how many cars to make).
  • Constraints:  it is a set of rules that you cannot break.  Effectively, constraints disallow certain combinations of your decision variables and is mainly used to filter out in-feasible solutions.  In a typical settings, constraints are expressed as a system of linear inequality equations where a linear expression of decision variables are specified on the left hand side and a value is specified on the right hand side after an inequality comparison.
  • Objective function: it encapsulates the quantitative measure of how our goal has been achieved.  In a typical setting, objective function is expressed as a single linear (or quadratic) combination of decision variables
After the mathematical structure is defined, the modeler will submit it to a solver, which will output the best solution.  The process can be depicted as follows.


Expressing the problem in the mathematical structure is the key design of the solution.  There are many elements to be considered, which we described below.

The first consideration is how to represent your decision, specially whether the decision is a quantity (real number), a number of units (integer) or a binary decision (integer 0, 1).

The next step is to represent the constraints in terms of inequality equations of linear combination of decision variables.  You need to think whether the constraints is a hard of soft constraints.  Hard constraints should be expressed in the constraint part of the problem.  Notice the solver will not consider any solution once it violates any constraints.  Therefore, if the solver cannot find a feasible solution that fulfill all constraints, it will simply abort.  In other words, it won't return you a solution that violates the smallest number of constraints.  If you want the solve to tell you that because you have rooms to relax them, you should model these soft constraints in the objective function rather than in the constraint section.  Typically, you define an objective function that quantifies the degree of violation.  The solver will then give you the optimal solution (violating least number of constraints) rather than just telling you no solution is found.

Finally you define the objective function.  In most real-life problems, you have multiple goals in mind (e.g. you want to minimize your customer's wait time in the queue, but you also want to minimize your cost of hiring staffs).  First you express each goal as a linear expression of decision variables and then take the weighted average among different goals (which is also a linear combination of all decision variables) to form the final objective function.  How to choose the weights among different goals is a business question, based on how much you are willing to trade off between conflicting goals.

There are some objectives that cannot be expressed by a linear expression of decision variables.  One frequently encountered example is about minimizing the absolute deviation from a target value (ie. no matter the deviation is a positive and negative value).  A common way is to minimize the sum of square of the difference.  But after we square it, it is no longer a linear expression.  To address this requirement, there is a more powerful class of solver call "quadratic programming" which relax the objective function to allow a degree 2 polynomial expression.

After you expressed the problem in the mathematical structure.  You can pass it to a solver (there are many open source and commercial solver available) which you can treat it as a magical black box.  The solver will output an optimal solution (with a value assigned to each decision variable) that fulfill all constraints and maximize (or minimize) the objective function.

Once you received the solution from the solver, you need to evaluate how "reliable" is this optimal solution.  There may be fluctuations in the data variables due to collection error, or the data may have a volatile value that changes rapidly, or the data is an estimation of another unknown quantity.




Ideally, the optimal solution doesn't change much when we fluctuate the data values within its error bound.  In this case, our optimal solution is stable against error in our data variables and therefore is a good one.  However, if the optimal solution changes drastically when the data variable fluctuates, we say the optimal solution is unreliable and cannot use it.  In the case, we usually modify each data variables one at a time to figure out which data variable is causing a big swing of optimal solution and try to reduce our estimation error of that data variable.

The sensitivity analysis is an important step to evaluate the stability and hence the quality of our optimal solution.  It also provides guidance on which area we need to invest effort to make the estimation more accurate.

Mathematical Programming allows you to specify your optimization problem in a very declarative manner and also output an optimal solution if it exist.  It should be the first-to-go solution.  The downside of Mathematical programming is that it requires linear constraints and linear (or quadratic) objectives.  And it also has limits in terms of number of decision variables and constraints that it can store (and this limitation varies among different implementations).  Although there are non-linear solvers, the number of variables it can take is even smaller.

Usually the first thing to do is to test out if your problem is small enough to fit in the mathematically programming solver.  If not, you may need to use another approach.

Greedy Local Search

The key words here are "local" and "greedy".  Greedy local search starts at a particular selection of decision variables, and then it look around surrounding neighborhood (hence the term "local") and within that find the best solution (hence the term "greedy").  Then it moves the current point to this best neighbor and then the process repeats until the new solution stays at the same place (ie. there is no better neighbor found).

If the initial point is selected to be a non-feasible solution, the greedy search will first try to find a feasible solution first by looking for neighbors that has less number of constraint violation.  After the feasible solution is found, the greedy search will only look for neighbors that fulfill all constraints and within which to find a neighbor with the best objective value.  Another good initialization strategy is to choose the initial point to be a feasible (of course not optimal) solution and then start the local search from there.

Because local search limits its search within a neighborhood, it can control the degree of complexity by simply limits the scope of neighbor.  Greedy local search can evaluate a large number of variables by walking across a chain of neighborhoods.  However, because local search only navigate towards the best neighbor within a limited scope, it loses the overall picture and rely on the path to be a convex shape.  If there are valleys along the path, the local search will stop there and never reach the global optimum.  This is called a local optimal trap because the search is trapped within a local maximum and not able to escape.  There are some techniques to escape from local optimum that I describe in my previous blog post.

When performing greedy local search, it is important to pick the right greedy function (also called heuristic function) as well as the right neighborhood.  Although it is common to choose the greedy function to be the same objective function itself, they don't have to be the same.  In many good practices, the greedy function is chosen to be the one that has high correlation with the objective function, but can be computed much faster.  On the other hand, evaluating objective function of every point in the neighborhood can also be a very expensive operation, especially when the data has a high dimension and the number of neighbors can be exponentially large.  A good neighbor function can be combined with a good greedy function such that we don't need to evaluate each neighbor to figure out which one has the best objective value.

Combining the two approaches

In my experience, combining the two approaches can be a very powerful solution itself.  At the outer layer, we are using the greedy local search to navigate the direction towards better solution.  However, when we search for the best neighbor within the neighborhood, we use the mathematical programming by taking the greedy function as the objective function, and we use the constraints directly.  This way, we can use a very effective approach to search for best solution within the neighborhood and can use the flexible neighborhood scoping of local search to control the complexity of the mathematical programming solver.

Saturday, June 27, 2015

When machine replace human

Recently, a good friend sent me an article from Harvard Business Review called "Beyond Automation", written by Thomas H. Davenport and Julia Kirby.  The article talked about how automation affects our job forces and displacing values from human workers.  It proposed 5 strategies in how we can get prepared to retain competitiveness in the automation era.  This is a very good article and triggers me a lot of thoughts.

I want to explore a fundamental question:  "Can machine replace a human in future ?"

Lets start looking at what machines are doing and not doing today.  Machines are operating under a human's program, and therefore it can only solve those problems that we, human can express or codified in a structural form.  Don't underestimate its power underneath.  With good abstract thinking, smartest human in the world has partitioned large number of problems (by its problem nature) into different problem categories.  Each category is expressed in form od a "generic problem" and subsequently a "general solution" is developed.  Notice that computer scientist has been doing this for many decades, and come up with the powerful algorithm such as "Sorting", "Finding shortest path", "Heuristic search" ... etc.

By grouping concrete problems by their nature into a "generic, abstract problem", we can significantly reduce the volume of cases/scenarios while still covers a large area of ground.  The "generic solution" we developed can also be specialized for each concrete problem scenario.  After that we can develop a software program which can be executed in a large cluster of machines equipped with fast CPU and a lot of memory.  Compare this automated solution with what a human can do in a manual fashion.  In these areas, once problems are well-defined and solutions are automated by software program, computers with much powerful CPU and memory will always beat human in many many orders of magnitude.  There is no question that the human job in these areas will be eliminated.

In terms of capturing our experience using a abstract data structure and algorithm, computer scientist are very far from done.  There are still a very large body of problems that even the smartest human haven't completely figured out how to put them in a structural form yet.  Things that involve "perception", "intuition", "decision making", "estimation", "creativity" are primarily done today by human.  I believe these type of jobs will continue to be done by human workers in our next decade.  On the other hand, with our latest technology research, we continuously push our boundary of automation into some of these areas.  "Face recognition", "Voice recognition" that involves high degree of perception can now be done very accurately by software program.  With "machine learning" technology, we can do "prediction" and make judgement in a more objective way than a human.  Together with "planning" and "optimization" algorithm, large percentage of decision making can be automated, and the result is usually better because of a less biased and data-driven manner.

However, in these forefront areas where latest software technology is unable to automate every steps, the human is need in the path to make a final decision, or interven in those exceptional situation that the software is not programmed to handled.  There are jobs that a human and machine can working together to make better outcome.  This is what is called "augmentation" in the article.  Some job examples are artists are using advanced software to touchup their photos, using computer graphics to create movies, using machine learning to do genome sequence processing, using robots to perform surgery, driver-less vehicles ... etc.

Whether computer programming can replace human completely remains to be seen, but I don't think this will happen in the next 2 decades.  We humans are unique and good at perceiving things with multiple level of abstractions from different angles.  We are good at connecting the dots between unrelated areas.  We can invent new things.  These are things that machine will be very hard to do, or at least will take a long time if at all possible.

"When can we program a machine that can write program ?"

The HBR article suggests a person can consider five strategies (step up, step aside, step in, step narrowly and step forward) to retain value in the automation era.  I favor the "step forward" strategy because the person is driving the trend rather than passively reacting to the trend.  Date back to our history, human's value system has been shifted across industry revolution, internet revolution etc.   At the end of the day, it is more-sophisticated human who take away jobs (and value) from other less-sophisticated human.  And it is always the people who drives the movement to be the winner of this value shift.  It happens in the past and will continue into future.


Sunday, February 22, 2015

Big Data Processing in Spark

In the traditional 3-tier architecture, data processing is performed by the application server where the data itself is stored in the database server.  Application server and database server are typically two different machine.  Therefore, the processing cycle proceeds as follows
  1. Application server send a query to the database server to retrieve the necessary data
  2. Application server perform processing on the received data
  3. Application server will save the changed data to the database server
In the traditional data processing paradigm, we move data to the code.
It can be depicted as follows ...




Then big data phenomenon arrives.  Because the data volume is huge, it cannot be hold by a single database server.  Big data is typically partitioned and stored across many physical DB server machines.  On the other hand, application servers need to be added to increase the processing power of big data.


However, as we increase the number of App servers and DB servers for storing and processing the big data, more data need to be transfer back and forth across the network during the processing cycle, up to a point where the network becomes a major bottleneck.

Moving code to data

To overcome the network bottleneck, we need a new computing paradigm.  Instead of moving data to the code, we move the code to the data and perform the processing at where the data is stored.



Notice the change of the program structure
  • The program execution starts at a driver, which orchestrate the execution happening remotely across many worker servers within a cluster.
  • Data is no longer transferred to the driver program, the driver program holds a data reference in its variable rather than the data itself.  The data reference is basically an id to locate the corresponding data residing in the database server
  • Code is shipped from the program to the database server, where the execution is happening, and data is modified at the database server without leaving the server machine.
  • Finally the program request a save of the modified data.  Since the modified data resides in the database server, no data transfer happens over the network.
By moving the code to the data, the volume of data transfer over network is significantly reduced.  This is an important paradigm shift for big data processing.

In the following session, I will use Apache Spark to illustrate how this big data processing paradigm is implemented.

RDD

Resilient Distributed Dataset (RDD) is how Spark implements the data reference concept.  RDD is a logical reference of a dataset which is partitioned across many server machines in the cluster.


To make a clear distinction between data reference and data itself, a Spark program is organized as a sequence of execution steps, which can either be a "transformation" or an "action".

Programming Model

A typical program is organized as follows
  1. From an environment variable "context", create some initial data reference RDD objects
  2. Transform initial RDD objects to create more RDD objects.  Transformation is expressed in terms of functional programming where a code block is shipped from the driver program to multiple remote worker server, which hold a partition of the RDD.  Variable appears inside the code block can either be an item of the RDD or a local variable inside the driver program which get serialized over to the worker machine.  After the code (and the copy of the serialized variables) is received by the remote worker server, it will be executed there by feeding the items of RDD residing in that partition.  Notice that the result of a transformation is a brand new RDD (the original RDD is not mutated)
  3. Finally, the RDD object (the data reference) will need to be materialized.  This is achieved through an "action", which will dump the RDD into a storage, or return its value data to the driver program.
Here is a word count example

# Get initial RDD from the context
file = spark.textFile("hdfs://...")

# Three consecutive transformation of the RDD
counts = file.flatMap(lambda line: line.split(" "))
             .map(lambda word: (word, 1))
             .reduceByKey(lambda a, b: a + b)


# Materialize the RDD using an action
counts.saveAsTextFile("hdfs://...") 

When the driver program starts its execution, it builds up a graph where nodes are RDD and edges are transformation steps.  However, no execution is happening at the cluster until an action is encountered.  At that point, the driver program will ship the execution graph as well as the code block to the cluster, where every worker server will get a copy.

The execution graph is a DAG.
  • Each DAG is a atomic unit of execution. 
  • Each source node (no incoming edge) is an external data source or driver memory
  • Each intermediate node is a RDD
  • Each sink node (no outgoing edge) is an external data source or driver memory
  • Green edge connecting to RDD represents a transformation.  Red edge connecting to a sink node represents an action



Data Shuffling

Although we ship the code to worker server where the data processing happens, data movement cannot be completely eliminated.  For example, if the processing requires data residing in different partitions to be grouped first, then we need to shuffle data among worker server.

Spark carefully distinguish "transformation" operation in two types.
  • "Narrow transformation" refers to the processing where the processing logic depends only on data that is already residing in the partition and data shuffling is unnecessary.  Examples of narrow transformation includes filter(), sample(), map(), flatMap() .... etc.
  • "Wide transformation" refers to the processing where the processing logic depends on data residing in multiple partitions and therefore data shuffling is needed to bring them together in one place.  Example of wide transformation includes groupByKey(), reduceByKey() ... etc.



Joining two RDD can also affect the amount of data being shuffled.  Spark provides two ways to join data.  In a shuffle join implementation, data of two RDD with the same key will be redistributed to the same partition.  In other words, each of the items in each RDD will be shuffled across worker servers.


Beside shuffle join, Spark provides another alternative call broadcast join.  In this case, one of the RDD will be broadcasted and copied over to every partition.  Imagine the situation when one of the RDD is significantly smaller relative to the other, then broadcast join will reduce the network traffic because only the small RDD need to be copied to all worker servers while the large RDD doesn't need to be shuffled at all.



In some cases, transformation can be re-ordered to reduce the amount of data shuffling.  Below is an example of a JOIN between two huge RDDs followed by a filtering.



Plan1 is a naive implementation which follows the given order.  It first join the two huge RDD and then apply the filter on the join result.  This ends up causing a big data shuffling because the two RDD is huge, even though the result after filtering is small.

Plan2 offers a smarter way by using the "push-down-predicate" technique where we first apply the filtering in both RDDs before joining them.  Since the filtering will reduce the number of items of each RDD significantly, the join processing will be much cheaper.

Execution planning

As explain above, data shuffling incur the most significant cost in the overall data processing flow.  Spark provides a mechanism that generate an execute plan from the DAG that minimize the amount of data shuffling.
  1. Analyze the DAG to determine the order of transformation.  Notice that we starts from the action (terminal node) and trace back to all dependent RDDs.
  2. To minimize data shuffling, we group the narrow transformation together in a "stage" where all transformation tasks can be performed within the partition and no data shuffling is needed.  The transformations becomes tasks that are chained together within a stage
  3. Wide transformation sits at the boundary of two stages, which requires data to be shuffled to a different worker server.  When a stage finishes its execution, it persist the data into different files (one per partition) of the local disks.  Worker nodes of the subsequent stage will come to pickup these files and this is where data shuffling happens
Below is an example how the execution planning turns the DAG into an execution plan involving stages and tasks.
 

Reliability and Fault Resiliency

Since the DAG defines a deterministic transformation steps between different partitions of data within each RDD RDD, fault recovery is very straightforward.  Whenever a worker server crashes during the execution of a stage, another worker server can simply re-execute the stage from the beginning by pulling the input data from its parent stage that has the output data stored in local files.  In case the result of the parent stage is not accessible (e.g. the worker server lost the file), the parent stage need to be re-executed as well.  Imagine this is a lineage of transformation steps, and any failure of a step will trigger a restart of execution from its last step.

Since the DAG itself is an atomic unit of execution, all the RDD values will be forgotten after the DAG finishes its execution.  Therefore, after the driver program finishes an action (which execute a DAG to its completion), all the RDD value will be forgotten and if the program access the RDD again in subsequent statement, the RDD needs to be recomputed again from its dependents.  To reduce this repetitive processing, Spark provide a caching mechanism to remember RDDs in worker server memory (or local disk).  Once the execution planner finds the RDD is already cache in memory, it will use the RDD right away without tracing back to its parent RDDs.  This way, we prune the DAG once we reach an RDD that is in the cache.


Overall speaking, Apache Spark provides a powerful framework for big data processing.  By the caching mechanism that holds previous computation result in memory, Spark out-performs Hadoop significantly because it doesn't need to persist all the data into disk for each round of parallel processing.  Although it is still very new, I think Spark will take off as the main stream approach to process big data.

Friday, November 28, 2014

Spark Streaming

In this post, we'll discuss another important topic of big data processing: real-time stream processing area.  This is an area where Hadoop falls short because of its high latency, and another open source framework Storm is developed to cover the need in real-time processing.  Unfortunately, Hadoop and Storm provides quite different programming model, resulting in high development and maintenance cost.

Continue from my previous post on Spark, which provides a highly efficient parallel processing framework.  Spark streaming is a natural extension of its core programming paradigm to provide large-scale, real-time data processing.  The biggest benefits of using Spark Streaming is that it is based on a similar programming paradigm of its core and there is no need to develop and maintain a completely different programming paradigm for batch and realtime processing.


Spark Core Programming Paradigm Recap

The core Spark programming paradigm consists of the following steps ...
  1. Taking input data from an external data source and create an RDD (a distributed data set across many servers)
  2. Transform the RDD to another RDD (these transformation defines a direct acyclic graph of dependencies between RDD)
  3. Output the final RDD to an external data source


Notice that the RDD is immutable, therefore the sequence of transformations is deterministic and therefore recovery from intermediate processing failure is simply by tracing back to the parent of the failure node (in the DAG) and redo the processing from there.


Spark Streaming

Spark Streaming introduce a data structure call DStream which is basically a sequence of RDD where each RDD contains data associated with a time interval.  DStream is created with a frequency parameters which defines the frequency RDD creation into the sequence.

Transformation of a DStream boils down to transformation of each RDD (within the sequence of RDD that the DStream contains).  Within the transformation, the RDD inside the DStream can "join" with another RDD (outside the DStream), hence provide a mix processing paradigm between DStream and other RDDs.  Also, since each transformation produces an output RDD, the result of transforming a DStream results in another sequence of RDDs that defines an output DStream.

Here is the basic transformation where each RDD in the output DStream has a one to one correspondence with each RDD in the input DStream. 


Instead of performing a 1 to 1 transformation of each RDD in the DStream.  Spark streaming enable a sliding window operation by defining a WINDOW which groups consecutive RDDs along the time dimension.  There are 2 parameters that the window is defined ...
  1. Window length: defines how many consecutive RDDs will be combined for performing the transformation.
  2. Slide interval: defines how many RDD will be skipped before the next transformation executes.


By providing a similar set of transformation operation for both RDD and DStream, Spark enable a unified programming paradigm across both batch and real-time processing, and hence reduce the corresponding development and maintenance cost.