Nov 25, 2010

A National Security scenario using Hadoop, Map/Reduce and Information Theory

For those familiar with Big Data and Hadoop (otherwise see my article here), I feel Map/Reduce has a pretty high onramp to entry. Part of what is difficult and not easily explained is how to decompose your business problems into solutions that can run as a series of Map/Reduce jobs. So what better way to learn this than to explore a particular problem.

Now, for those of you that are not big fans of reading Academic papers bear with me for a minute. Pantel, Philpot and Hovy published a really great paper for the IEEE Journal in December 2005 called Data Alignment and Integration. In this paper they explained how Information Theory can be used to flag suspicious calls in the interests of Homeland/National Security. They put forth an algorithm that makes use of  PointWise Mutual Information which shows how this can be done. So let me first explain their approach and then we can take a look at how to implement it in Map/Reduce.

They start off by saying the following, "Claude Shannon’s classic 1948 article provides a way of measuring the information content of events. This theory of information provides a pointwise mutual information metric quantifying the association strength between two events by measuring the amount of information one event tells us about the other. By applying this theory to our problem, we can identify the most important observations for each entity in a population.Formally, pointwise mutual information quantifies the association strength between two events. It essentially measures the amount of information one event x gives about another event y, where P(x) denotes the probability that x occurs, and P(x, y) the probability that both events occur jointly:






"Pointwise mutual information is high when x and y occur together more often than they would by chance, which is computed by the probability of x and y occurring independently."

OK. Thats all the Math we need to get into. I swear. 

So now, if we the use source phone number as one event and the number being called as the second event, this starts to get interesting! Lets assume the following data points for us to plug into our algorithm. This data comes from the original article, which used Bogota, so if you're from Colombia, this isn't intended to offend.

Now assuming the total frequency of all calls is 1.32 x 10 to the 12 (something only the NSA would know)  we can plug these values into the algorithm and get the following:

                   


When a PMI result (7.88) for a user's calls to a particular city is higher than the average (or a particular threshold) the user gets flagged. The original article goes into way more detail explaining PMI and Information Theory. I encourage you to obtain it. But the focus of this post is to help you understand how to put this into Map/Reduce so I need to move on.

If you want to use Map/Reduce and you don't work for Google, then you need to be using Apache Hadoop. Hadoop is perfect for this type of a problem as you're dealing with Huge Datasets of Log Files (specifically Call Log Files), something that Hadoop is really, really good for.  In addition, Hadoop is designed to run on commodity hardware so this allows mission critical compute intensive solutions such as the one we're exploring to be run on affordable and widely available hardware.

So, our first task is to work backwards. When I look at this algorithm I see that I need a number of variables in order to compute it. For each caller, I need the following variables:

TOTAL_CALLS_TO_CITY_BY_ORIGINATOR (21)
* TOTAL_OTHER_CALLS (1606)
* TOTAL_CALLS_TO_CITY_BY_OTHERS (227)
* TOTAL_CALLS_BY_EVERYONE  (1.32 x 10 to the 12)

I'm going to have to run a number of jobs to first be able to determine all of these variables for each city that a user has called before I can finally run a job computing the PMI result. This actually results in 5 separate sequential Hadoop Jobs that I need to run. This is a pretty standard pattern when working with Hadoop. Each Subsequent Job builds on the job that ran before it. In some cases, the key is the originator in other cases the Originator and City are a joint key. Either way, the values are continually being augmented as we progress through the jobs continually building up the variables we need. The example output being displayed for the results of each job below is an example of one record in the output and not the entire output itself.

Hadoop Job 1 

This job assumes the input schema of your call records looks like the following (taken from Vonage) and has been copied onto a directory on the hdfs called "calls".

Date               Time        Originator        Destination    Duration UUID
Dec 20, 2009 03:30 PM 15125557496 27745552715 00:07:00 26440075852

Hadoop Jar based Jobs ascribe to the following syntax:
                   bin/hadoop jar NameOfJar.jar JobClassName InputDir OutputDir
Command: bin/hadoop jar pmi.jar Step1 calls step1

Input Data - All Call Log Files
InputFormatter: TextInputFormatter (used for all Jobs)
M/R Function: Determine TOTAL_CALLS_TO_CITY_BY_ORIGINATOR

Map: Parse out the Originator and City. Join them as a single Key, the Value as new LongWritable(1)

Reduce: Write out the Key as it stands, Sum all the Values to determine total calls to each city by User

Output:

Key (Originator+City)
Value
1512555991311432
4

Hadoop Job 2

Command: bin/hadoop jar pmi.jar Step2 step1 step2

Input Data - Output Directory of Job 1
M/R Function:  Split the Originator as the Key and the City and Total_Calls_To_City_By_Originator as the Values. No Reducer.

Output:

Originator (Key)CityTotal_Calls_To_City_By_Originator
15125559913114324

Hadoop Job 3

Command: bin/hadoop jar pmi.jar Step3 step2 step3

Input Data - Output Directory of Job 2
M/R Function: Determine TOTAL_OTHER_CALLS for Originator

Map: Pass Key/Values through as they stand

Reduce: Run through the Values collection to count all the calls to all the cities for an Originator (The Key). Subtract the calls for each city from the total and augment the value tuple with new amount for Total_Other_Calls

Output:

OriginatorCityTotal_Other_CallsTotal_Calls_To_City_By_Originator
151255591311432594

Hadoop Job 4

Command: bin/hadoop jar pmi.jar Step4 step3 step4

Input Data - Output Directory of Job 3
M/R Function: Determine TOTAL_CALLS_TO_CITY_BY_OTHERS

Map: RecordRead in the results from Job 3, Parse them out and make the City the Key and rest of the information for the record, the Values.

Reduce: Iterate over the values for each Key(City) to determine total Calls for that City. For each value record, subtract the originators calls to that city from the total to determine Total_Calls_To_City_By_Others. Write out the Originator as the Key and and augment the value tuple with the information for Total_Calls_To_City_By_Others. 

Output:

OriginatorCityTotal_Other_CallsTotal_Calls_To_City_By_OriginatorTotal_Calls_To_City_By_Others
1512555913114325941300281

Hadoop Job 5

Command: bin/hadoop jar pmi.jar Step5 step4 step5

Input Data - Output Directory of Job 4
M/R Function: Now that you have all the variables, calculate the PMI Rank for Each City an Originator has called.

Map: Pass Key/Values through as they stand

Reduce:  The Key received by the Reducer is the Originator. Each Value in the Values Array contains all the information needed calculate the PMI Rank for the originators calls to that City. Calculate the PMI Rank. Write out the Originator as the Key and City and PMI Rank as the Value.

Output:

OriginatorCityPMI Rank
15125552749615127.88


No comments: