jueves, 4 de mayo de 2017

Custom Joins in Spark SQL


Spark has become a really valuable tool for running transformations on structured and unstructured data sets. It allows for distributed parallel computations that can be composed from transformations that resemble those available in Scala collections.
In a nutshell, when data is weakly structured we may use Spark's RDDs to process it, but when it has a stronger structure, we can leverage DataFrames; Spark SQL basic abstraction for working with tabular data.
The benefits of this vs. traditional RDBMS are well known and understood by the community; access to process greater amounts of data, scalability, and flexibility to connect to many different sources and combine data in a single place.
These benefits bring also a few new problems, that are typically under-looked by documentation. In this article, I will explain Custom Spark Joins, that can be implemented using User Defined Functions(UDFs), and use that example to put in evidence an often ignored fact about Spark; the lack of built-in indexes.
The article will use Scala to show code examples, but the knowledge presented can be transferred to other languages with little effort.

UDFs and Custom Joins


Remember SQL joins?, let's refresh our memory by looking at an example,

This is pretty easy to understand, you're trying to get a list of movies in Hulu, that belong to the same genre as the ones in Netflix.
But how good is this?. Isn't there a better way to compare these items?. Suppose that instead, you want to run the comparison between a set of tags. In this setting, you'll have pairs of tag lists; with one belonging to NetflixMovies and another to HuluMovies, for every record.
To make it clear, our goal, is to achieve the following,

{'crime police murder', 'crime police murder'} -> match
{'romantic, comedy', 'crime police murder'} -> do not match 

We realize that our join operation as described above falls short to handle these cases. Also, we would like an operation that presents a "smoother" behavior, allowing us to specify a certain "degree of similarity" between the two items.

{'comedy police', 'crime police murder'} -> match.
{'crime police', 'crime police murder'} -> also a match, but stronger than the one above.

What if we had a method, that somehow would tell us that two set of tags coming from two movies are similar to each other according to some predefined distance criteria?. To make our ideas clearer, let's materialize our wishes in this Scala function below,
In this way, we can write our SQL like this,
where the 0.2 is a threshold that was chosen empirically.
Now, we're finally including the missing movies!.

A working code example 

 

We put all this together, in the following script that you can find together with build files in this git repo. Let's walk through it together,

In lines 1 and 2, we create the Spark session and the Spark Sql context. Then in lines 5, 6, 8, and 9, we load the dataframe using the loadDF() function(for more detail check the code in github), and we register a temporal view with a name that we're going to use to refer that table in SQL syntax.
The datasets are made up by me, you can take a look at them under /resources. 
In lines 12-18 we define our UDF as explained above. In line 20 we register the UDF with Spark. Notice the weird syntax? That's a partially applied function, with typed parameters.
In lines 22-24 we have the join, similar to the one we had above, but this time we project on the titles of the movies coming from each dataset (netflixmovies.title, hulumovies.title), and the score that pair has with our ranking function.
Finally, in line 27 we pass the sql string to the sqlContext, and in line 30 we execute the join and show the result. It looks something like this,
The build files and imports are included in the complete project on github.

Pitfalls


So far so good, but this computation we're doing grows with the cartesian product of the rows of the two tables. It would be awesome if we could go back to 1985 using Doc Emmet Brown's Delorean and asked our DB admin what we can do about it. If we did, the answer we would get would be plain simple: use indexes!. Well, it may be because of a problem with the Earth's gravitational pole, but in the future, we don't have indexes!
So, what were we thinking when we created the SQL engine of the future without indexes? Simple; computations will happen in memory, not in secondary storage like Martin McFly would have run his queries on.
Is this is enough to overcome the hassle of computing the cartesian product of the records in two tables?. In my experience, it is not; more than modest sized datasets will cause computation times to rise in an unacceptable way.
Fortunately, some folks have already realized about this, and have created projects which provide some improvements over plain RDDs that include regular indexes as well as spatial indexes. Stay tunned for my next post, where I will explore these options.

2 comentarios: