martes, 4 de julio de 2017

Teach yourself Spark Streaming in Five Lines of Code

 

Here you are!, you heard about it, want to know more but don't want to spend a week going through complicated tutorials or even the documentation.
This is a minimalistic concise way of learning the concepts with minimal boilerplate code or secondary concepts.

What is Spark Streaming?

Spark Structured Streaming allows you to perform ETL of your data using plain old SQL, and to obtain query results in near real-time.
Your data can come from a variety of sources like CSV, JSON, or Parquet. For this example, we will be using JSON.

Streaming Queries

The central concept in Spark Streaming is that of Streaming Queries. Streaming Queries are Spark SQL queries, that are executed periodically on a table that can dynamically expand and shrink as new elements get added to the stream and old elements get consumed. That's all.

Code Example

The complete code can be found here. But in this tutorial we are going straight to the point. First we read the data,
In line 2, we just read the first JSON file in the folder, to get the schema for creating the stream later in lines 5-7. Notice that we're passing a pattern to match all JSON files in the folder pointed by the resourcesPath string variable.
Next thing we do, is to hook our transformation,

which is a simple SELECT operation that takes the name column and transforms the age column to year of birth.
Then we put this thing to run,

Line 2 is pretty self-explanatory, we're passing from a DataFrame to a DataStreamWriter, a new abstraction that will allow the processing to happen. Then in line 4, we call trigger on the DataStreamWriter to specify the frequency at which we want the stream to process data. Line 6 specifies we want the output to go to console.
So far nothing has happened. It is not until line 7 that the stream gets finally started. Finally, the awaitTermination() call in line 10 prevents the program from finishing. It gives 2 minutes for us to play with it.
As soon as this starts running, we will get the following output in console,
+----------+---------+
|      name|birthyear|
+----------+---------+
|     Maxim|     1995|
|Aleksander|     1994|
|      Paul|     1973|
+----------+---------+
This is just the transformation applied to our first JSON file. And to generate some additional components in the stream we will add our second JSON file to that same folder,

and we will get the following,
+-------+---------+
|   name|birthyear|
+-------+---------+
|   Juan|     1995|
| Giulio|     1994|
|Rudolph|     1973|
+-------+---------+
You see that data from the first file didn't appear again. So this is just showing the 'streaming' character of the computation. Simple, right?. Enjoy!

jueves, 4 de mayo de 2017

Custom Joins in Spark SQL


Spark has become a really valuable tool for running transformations on structured and unstructured data sets. It allows for distributed parallel computations that can be composed from transformations that resemble those available in Scala collections.
In a nutshell, when data is weakly structured we may use Spark's RDDs to process it, but when it has a stronger structure, we can leverage DataFrames; Spark SQL basic abstraction for working with tabular data.
The benefits of this vs. traditional RDBMS are well known and understood by the community; access to process greater amounts of data, scalability, and flexibility to connect to many different sources and combine data in a single place.
These benefits bring also a few new problems, that are typically under-looked by documentation. In this article, I will explain Custom Spark Joins, that can be implemented using User Defined Functions(UDFs), and use that example to put in evidence an often ignored fact about Spark; the lack of built-in indexes.
The article will use Scala to show code examples, but the knowledge presented can be transferred to other languages with little effort.

UDFs and Custom Joins


Remember SQL joins?, let's refresh our memory by looking at an example,

This is pretty easy to understand, you're trying to get a list of movies in Hulu, that belong to the same genre as the ones in Netflix.
But how good is this?. Isn't there a better way to compare these items?. Suppose that instead, you want to run the comparison between a set of tags. In this setting, you'll have pairs of tag lists; with one belonging to NetflixMovies and another to HuluMovies, for every record.
To make it clear, our goal, is to achieve the following,

{'crime police murder', 'crime police murder'} -> match
{'romantic, comedy', 'crime police murder'} -> do not match 

We realize that our join operation as described above falls short to handle these cases. Also, we would like an operation that presents a "smoother" behavior, allowing us to specify a certain "degree of similarity" between the two items.

{'comedy police', 'crime police murder'} -> match.
{'crime police', 'crime police murder'} -> also a match, but stronger than the one above.

What if we had a method, that somehow would tell us that two set of tags coming from two movies are similar to each other according to some predefined distance criteria?. To make our ideas clearer, let's materialize our wishes in this Scala function below,
In this way, we can write our SQL like this,
where the 0.2 is a threshold that was chosen empirically.
Now, we're finally including the missing movies!.

A working code example