martes, 4 de julio de 2017

Teach yourself Spark Streaming in Five Lines of Code

 

Here you are!, you heard about it, want to know more but don't want to spend a week going through complicated tutorials or even the documentation.
This is a minimalistic concise way of learning the concepts with minimal boilerplate code or secondary concepts.

What is Spark Streaming?

Spark Structured Streaming allows you to perform ETL of your data using plain old SQL, and to obtain query results in near real-time.
Your data can come from a variety of sources like CSV, JSON, or Parquet. For this example, we will be using JSON.

Streaming Queries

The central concept in Spark Streaming is that of Streaming Queries. Streaming Queries are Spark SQL queries, that are executed periodically on a table that can dynamically expand and shrink as new elements get added to the stream and old elements get consumed. That's all.

Code Example

The complete code can be found here. But in this tutorial we are going straight to the point. First we read the data,
In line 2, we just read the first JSON file in the folder, to get the schema for creating the stream later in lines 5-7. Notice that we're passing a pattern to match all JSON files in the folder pointed by the resourcesPath string variable.
Next thing we do, is to hook our transformation,

which is a simple SELECT operation that takes the name column and transforms the age column to year of birth.
Then we put this thing to run,

Line 2 is pretty self-explanatory, we're passing from a DataFrame to a DataStreamWriter, a new abstraction that will allow the processing to happen. Then in line 4, we call trigger on the DataStreamWriter to specify the frequency at which we want the stream to process data. Line 6 specifies we want the output to go to console.
So far nothing has happened. It is not until line 7 that the stream gets finally started. Finally, the awaitTermination() call in line 10 prevents the program from finishing. It gives 2 minutes for us to play with it.
As soon as this starts running, we will get the following output in console,
+----------+---------+
|      name|birthyear|
+----------+---------+
|     Maxim|     1995|
|Aleksander|     1994|
|      Paul|     1973|
+----------+---------+
This is just the transformation applied to our first JSON file. And to generate some additional components in the stream we will add our second JSON file to that same folder,

and we will get the following,
+-------+---------+
|   name|birthyear|
+-------+---------+
|   Juan|     1995|
| Giulio|     1994|
|Rudolph|     1973|
+-------+---------+
You see that data from the first file didn't appear again. So this is just showing the 'streaming' character of the computation. Simple, right?. Enjoy!

jueves, 4 de mayo de 2017

Custom Joins in Spark SQL


Spark has become a really valuable tool for running transformations on structured and unstructured data sets. It allows for distributed parallel computations that can be composed from transformations that resemble those available in Scala collections.
In a nutshell, when data is weakly structured we may use Spark's RDDs to process it, but when it has a stronger structure, we can leverage DataFrames; Spark SQL basic abstraction for working with tabular data.
The benefits of this vs. traditional RDBMS are well known and understood by the community; access to process greater amounts of data, scalability, and flexibility to connect to many different sources and combine data in a single place.
These benefits bring also a few new problems, that are typically under-looked by documentation. In this article, I will explain Custom Spark Joins, that can be implemented using User Defined Functions(UDFs), and use that example to put in evidence an often ignored fact about Spark; the lack of built-in indexes.
The article will use Scala to show code examples, but the knowledge presented can be transferred to other languages with little effort.

UDFs and Custom Joins


Remember SQL joins?, let's refresh our memory by looking at an example,

This is pretty easy to understand, you're trying to get a list of movies in Hulu, that belong to the same genre as the ones in Netflix.
But how good is this?. Isn't there a better way to compare these items?. Suppose that instead, you want to run the comparison between a set of tags. In this setting, you'll have pairs of tag lists; with one belonging to NetflixMovies and another to HuluMovies, for every record.
To make it clear, our goal, is to achieve the following,

{'crime police murder', 'crime police murder'} -> match
{'romantic, comedy', 'crime police murder'} -> do not match 

We realize that our join operation as described above falls short to handle these cases. Also, we would like an operation that presents a "smoother" behavior, allowing us to specify a certain "degree of similarity" between the two items.

{'comedy police', 'crime police murder'} -> match.
{'crime police', 'crime police murder'} -> also a match, but stronger than the one above.

What if we had a method, that somehow would tell us that two set of tags coming from two movies are similar to each other according to some predefined distance criteria?. To make our ideas clearer, let's materialize our wishes in this Scala function below,
In this way, we can write our SQL like this,
where the 0.2 is a threshold that was chosen empirically.
Now, we're finally including the missing movies!.

A working code example 

 

domingo, 7 de julio de 2013

Parallel programming in Octave

Motivation

Nowadays, it is almost impossible to be working in a single core CPU. Chip prices have been dropping constantly and new programming models need to be in place to exploit the available processing power.
I've been using Octave for quite a while now and found myself with the need to write some parallel code to perform some time consuming tasks like network access in parallel and to use the full power of Octave at the same time.
Currently there are two options to go parallel in Octave. One of them is exploiting multiple cores in a single machine through functions like pararrayfun or parcellfun. The second option is to "scale out" the execution to multiple computers with a package like parallel
The parallel package implements a message passing programming model, the so called MPI. On the other hand, functions like pararrayfun or parcellfun have a more functional flavor. We'll focus on them in this article.

Basics

We'll begin with pararrayfun(). The basic signature of this function is as follows,

[o1o2...] = pararrayfun (nproc, fun, a1, a2, ...)

You provide this function with a function handler, fun, that will point to the function that does the actual work.  Also, you specify the input parameters to the function a1,a2, aN. These have to be arrays, all of the same size. Finally, nproc will specify the amoung of worker processes that will be used in the computation.
The way it works is really simple. Assume for simplicity that fun accepts two scalar parameters, and a1, a2 are two arrays, then pararrayfun() will traverse your arrays taking two scalars at a time and will evaluate your function on this pair. At the end, pararrayfun() will return a concatenation of whatever you return from fun.
The parallelism comes into play with nproc>1. If you have, for instance, nproc==4, you will have four processes trying to perform the evaluation of your function in parallel on separate parts of your input data.
The underlying implementation uses fork() for the multiprocessing, this comes with some overhead, so it is better that each of the chunks of computation, or evaluations of fun are heavy enough to make up the price being paid. To put it simple, suppose your array has 1000 elements, using 4 workers leaves an average of 250 elements being processed by each. If the load for the 250 elements is not heavy enough you'll be slowing things down instead of speeding up.

Example

In this section we go through an example on how to perform a simple computation in parallel with pararrayfun(). The code can be downloaded here.

function c = printArgs(lilist, a)
    lilist
    c = lilist.+a;
end

function retcode = eh(error)
    a = error
    retcode = zeros(3, 1); 
end
  
ilist = zeros(1,3);
output = pararrayfun(2, @(a)printArgs(ilist, a), [3,4,5] , "ErrorHandler" , @eh);

In line 1, we define a function printArgs() that will print its first argument, ilist,  and return a version of it which each of its elements increased by one.
In line 6, we define a eh(), which stands for error handler. This will be called when printArgs() fails and will be responsible of producing the output that printArgs() couldn't create.
Finally, in line 12 is where the magic happens. The first parameters tells pararrayfun that we will be using two processes. The second parameter is a function handler. It looks a bit tricky, but we will explain it.
The @(a) creates the handler based in printArgs(). The thing is that pararrayfun here is set for receiving a one parameter function, and printArgs receives two. What you're telling Octave here is "create a function with one parameter, a, by using printArgs(), and hard code the other parameter to ilist". In this way you get the one parameter function.
In the functional programming jargon you may say that you're closing over ilist. And the whole @(a)printArgs(ilist, a) expression is called a closure.
On execution, paraarrayfun() will call printArgs() with each of the elements in [3,4,5] distributing the elements between available processors in a first come first serve fashion. The code above will lead to the following three calls, with, at most, two of them at the same time,

printArgs(ilist, 3)
printArgs(ilist, 4)
printArgs(ilist, 5)

Remember ilist will always be the one you declare in line 11. Run it yourself and check the output!.

martes, 26 de febrero de 2013

Formant estimation in Matlab/Octave

Formants
In Speech Processing, the resonant frequencies of the vocal tract of an individual are called the formants. These frequencies characterize the individual according to his age, and gender, and can even be used to perform more complex tasks like Speaker Identification.
Typically the first two to four of these resonant frequencies are of interest, and they can be identifiedin a typical frequency plot of a speech signal as shown below,
The frequency values corresponding to the peaks F1, F2, ..., FN of the red envelope are called the formants. The little peaks below the red line are called partials.

Methods for Estimating Formants

The two most used methods for formant estimations are based on LPC coefficients and Cepstral analysis. The basic idea behind both methods is to obtain an smoothed version of the frequency response, and then compute the peaks' locations based on this representation. A good reference for LPC and formant estimation can be found in this paper. Two good resources I have used to understand and implement Cepstral methods are this one and this one.

Octave Code

In this section I provide an implementation of the method explained in the paper mentioned in previous section(LPC method). You can access the file LPCFormants.m from my dropbox.
A few words on this code. The function assumes that you have already windowed the signal, and accepts the sampling rate as a parameter. If you have no idea of what are windows, you should take a look at Short Time Analysis of Speech in numerous web resources like here or here.

lunes, 24 de mayo de 2010

Introduction

Hi again!,

I'm Alberto Andreotti, a Computer Engineer located at Córdoba, Argentina. This is my computer engineering related blog. I'm currently working as a Software Engineer at Córdoba where we enjoy of an ever growing Software Industry.
My interests are embedded systems, distributed systems and HPC, among others.

Alberto.

Creation

Hi,

this is the genesis of my blog. Hope you like the stuff published here.

Alberto.