jueves, 4 de mayo de 2017

Custom Joins in Spark SQL


Spark has become a really valuable tool for running transformations on structured and unstructured data sets. It allows for distributed parallel computations that can be composed from transformations that resemble those available in Scala collections.
In a nutshell, when data is weakly structured we may use Spark's RDDs to process it, but when it has a stronger structure, we can leverage DataFrames; Spark SQL basic abstraction for working with tabular data.
The benefits of this vs. traditional RDBMS are well known and understood by the community; access to process greater amounts of data, scalability, and flexibility to connect to many different sources and combine data in a single place.
These benefits bring also a few new problems, that are typically under-looked by documentation. In this article, I will explain Custom Spark Joins, that can be implemented using User Defined Functions(UDFs), and use that example to put in evidence an often ignored fact about Spark; the lack of built-in indexes.
The article will use Scala to show code examples, but the knowledge presented can be transferred to other languages with little effort.

UDFs and Custom Joins


Remember SQL joins?, let's refresh our memory by looking at an example,

This is pretty easy to understand, you're trying to get a list of movies in Hulu, that belong to the same genre as the ones in Netflix.
But how good is this?. Isn't there a better way to compare these items?. Suppose that instead, you want to run the comparison between a set of tags. In this setting, you'll have pairs of tag lists; with one belonging to NetflixMovies and another to HuluMovies, for every record.
To make it clear, our goal, is to achieve the following,

{'crime police murder', 'crime police murder'} -> match
{'romantic, comedy', 'crime police murder'} -> do not match 

We realize that our join operation as described above falls short to handle these cases. Also, we would like an operation that presents a "smoother" behavior, allowing us to specify a certain "degree of similarity" between the two items.

{'comedy police', 'crime police murder'} -> match.
{'crime police', 'crime police murder'} -> also a match, but stronger than the one above.

What if we had a method, that somehow would tell us that two set of tags coming from two movies are similar to each other according to some predefined distance criteria?. To make our ideas clearer, let's materialize our wishes in this Scala function below,
In this way, we can write our SQL like this,
where the 0.2 is a threshold that was chosen empirically.
Now, we're finally including the missing movies!.

A working code example