This post is the second post in a series on data science. To see the first part, read here.

If you are following along with the source code, see here.

In our first post of this series, we demonstrated how to perform a naive sales forecast for Initech’s CFO by using linear regression in Excel and R. However, this framework for forecasting will not scale once the IT department computes the sales data for hundreds of locations and departments. Manually performing sales forecast in Excel for hundreds of locations and aggregating the results into a report could potentially takes weeks of effort. How can we programmatically perform sales forecasts for the CFO at scale?

Hadoop is one of the premier tools for analyzing big data. According to the Hadoop website, “The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.”

Hadoop and MapReduce may seem like complicated concepts, but at its core MapReduce is a simple concept. MapReduce references two core components that Hadoop can leverage to perform a task.  The first component is the Mapper function. The Mapper utilizes an input data set and converts it into another data set of keys and values. You can think of the Mapper as setting the stage for the Reducer function. The second component, the Reducer function, utilizes the  output from a Mapper function as input and aggregates those key/values into another set of key/values.

Hadoop seems like the perfect tool to allow us to analyze and perform sales forecasts on  the big data set IT is currently working to produce for the Initech CFO. There are a variety of methods to  install Hadoop on your machine, but one method can be found here.

Once Hadoop is installed, we need to set up our file system with our sales forecast data. Hadoop has its own distributed file system called HDFS. The distributed file system can be accessed via Hadoops hdfs command. Using the hdfs command will allow you to place your data on the distributed file system. Run the following commands (adjusted for your install) to copy our sample data set to the distributed file system:

hdfs dfs -mkdir /user
hdfs dfs -mkdir /user/hduser
hdfs dfs -mkdir /user/hduser/salesforecastsimple
hdfs dfs -put /Users/Keith/Projects/datascienceblogseries/part2/smalldataset.csv /user/hduser/salesforecastsimple/salesdata-in
hdfs dfs -ls /user/hduser/salesforecastsimple

The first three commands will setup your folder structure on HDFS. The next command will copy the smalldataset.csv to the HDFS using the -put switch of the hdfs dfs command. Finally, listing the contents of the directory to confirm that everything is configured properly.

Now that HDFS is configured, we can begin running Hadoop jobs to perform our sales forecast. Currently, we only have one file of sales data for testing purposes, but in our next post we will scale this forecast model to hundreds of files and Hadoop will begin to shine as a big data analysis tool.

Hadoop has two execution modes for data scientists to leverage. The first method of execution can be performed by creating a jar file (using any JVM based language) and running a hadoop command to execute the JAR. This is the normal mode of execution, but Hadoop also has a Streaming API that allows MapReduce jobs to run in any language or shell that can read STDIN and STDOUT.

For our sales forecasting, we will continue to use the R language to integrate with Hadoop’s Streaming API. For more details on the Streaming API see here.

The streaming API works slightly different than the JAR based execution mode, but the key to integrating with the Steaming API is to adhere to the MapReduce contracts for STDIN and STDOUT. For example, the output of your Mapper must be your Key, followed by a Tab, the Value, and ending with a newline. When looking at our mapping code for our sales forecast you can see this implemented via the cat function in R.

In order to perform our Sales Forecast for Store 1/Department 1 we need to create executable scripts by running the following commands:

chmod a+x ./
chmod a+x ./mapper.R
chmod a+x ./reducer.R

The hadoopForecast script will kick off the MapReduce job utilizing our Mapper and Reducer scripts built in R. The output of this script will be the final sales forecast results for Store 1/Department 1 of Initech.

In our example for Initech, we will need to build a Mapper function that takes all Sales data and maps it into a format that can be used by the Reducer to perform the actual sales forecast. For our sample data set the data is already somewhat organized into keys and values. The Store and Department can be considered a key, while the sales number can be considered the value. Thus, our Mapper function will be trivial to massage the data into the appropriate format for the Reducer. We simply need to create key/value pairs for our sales data entries. In this case, a natural key will be the combination of Store Number and Department Number. We will want to keep track of the Sales and the Day the Sales occurred, so that is a natural fit for the value portion of the mapper. The code for the mapper can be seen in the repository, but ultimately all it does is output the data in the format StoreNumber$DepartmentNumber [tab] DayNumber$SalesAmount [newline].

To see what the output looks like before it goes to the reducer, you can run

cat smalldataset.csv | ./mapper.R

The mapper takes our input data and maps it into key/value pairs that the Hadoop ecosystem will understand and be able to distribute across multiple nodes in parallel.

The more complex function in our sales forecast example will be our Reducer function. When utilizing the JAR based Hadoop execution model, Hadoop is responsible for managing all the keys and files, but when using the Streaming API, your Reducer function must be aware of when Key changes occur and handle it appropriately. Thus, our reducer function has some bookkeeping code to track which Key we are currently on.

For quick reference, here is our Reducer example script:

#! /usr/bin/env Rscript


ForecastNextSale <- function (salesData){
 fit <- ets(salesData)

#Initialize variables
mapOut <- read.table("stdin",col.names=c("mapkey","mapval"))
CurrKey <- as.character(mapOut[1,]$mapkey)
CurrVal <- ""
sales <- numeric()

#Keep track of the key for state tracking
#Forecast sales on key change
for(i in 1:nrow(mapOut)){
 Key <- as.character(mapOut[i,]$mapkey)
 Val <- as.character(mapOut[i,]$mapval)

 DataVal <- unlist(strsplit(Val,"\\$"))
 if (identical(Key,CurrKey)){
   CurrVal = paste(CurrVal, Val)
   sales <- c(sales, as.numeric(DataVal[2]))
 else {
   #If we switch keys, go ahead and process this record and then reset the counters
   CurrKey <- Key
   CurrVal <- Val
   sales <- numeric()
   sales <- c(sales, DataVal[2])

#run the last iteration

Hadoop is responsible for sorting the data that is fed to the reducer by Key so that we are guaranteed the correct order of data. The reducer code for our sales forecasting will simply keep track of the Sales Amount in a temporary vector until the key changes. Once the key changes we know that all data has been processed and we can perform our final forecast.

Sales forecasting has a number of valid techniques from linear regression to exponential smoothing to time series analysis. Exponential smoothing is the technique we have chosen for performing the sales forecast for Initech. The details behind exponential smoothing can be found here. However, we do not want to reinvent the wheel, so we have chosen to leverage the R forecast package to perform the heavy lifting. The R forecast package allows for forecasting to be done quickly and effectively. The ets function has a number of options, but for purposes of demonstration we have chosen the most basic use case. The default R forecast ets function will generate output that contains the forecast for the next 10 cycles and options for an 80% and 95% confidence interval. Simply pass ets a vector of numbers, and it will handle the rest for you.

In order to run all of our code through Hadoop, run the following command ./

The following demonstrates the output from our Hadoop job:

hadoop output

Next Steps

Today we have covered a simple example of utilizing data science to give Initech’s CFO a sales forecast for one specific department in one store using the exponential smoothing forecasting method. In our next post we will have gathered all the sales data from the IT department and fully leverage Hadoop to help us perform sales forecasts on a big data set at scale. This post will leverage Amazon’s EMR to help perform our sales forecast quickly and cost effectively.