Map-reduce tutorial

The following tutorial shows how to develop a simple map-reduce application using the hadoop streaming API and Python programming language.
 
In the first example of the tutorial, we will start by downloading a "How to become an engineer" book by Frank Doughty (unstructured text dataset) and copying it to HDFS. Then we will go step by step to develop a map-reduce word count application in the Python programming language. Finally we will validate the application and submit it to the Hadoop cluster using YARN.  
 
The second example is more advanced. It goes beyond word counting and follows the typical steps of a text mining or natural language processing task. 
 
 

Getting data

 
To get an example dataset to run on, in the terminal window type: 
 
$ cd 
$ pwd
/home/myusername
$ wget http://www.gutenberg.org/ebooks/44604.txt.utf-8
--2015-05-12 14:06:05--  http://www.gutenberg.org/ebooks/44604.txt.utf-8
Resolving www.gutenberg.org... 152.19.134.47
Connecting to www.gutenberg.org|152.19.134.47|:80... connected.
HTTP request sent, awaiting response... 302 Found
Location: http://www.gutenberg.org/cache/epub/44604/pg44604.txt [following]
--2015-05-12 14:06:06--  http://www.gutenberg.org/cache/epub/44604/pg44604.txt
Reusing existing connection to www.gutenberg.org:80.
HTTP request sent, awaiting response... 200 OK
Length: 129202 (126K) [text/plain]
Saving to: “44604.txt.utf-8”
 
100%[===========================================================>] 129,202      787K/s   in 0.2s    
 
2015-05-12 14:06:06 (787 KB/s) - “44604.txt.utf-8” saved [129202/129202]
To copy data to HDFS from the local file system:
$ hdfs dfs -mkdir /user/myusername/input
$ hdfs dfs -put 44604.txt.utf-8 /user/myusername/input/gutenberg_input.txt
 
$ hdfs dfs -ls /user/myusername/input
-rw-r--r--   3 myusername mygroup     129202 2015-05-12 14:07 /user/myusername/input/gutenberg_input.txt
Note on the HDFS FUSE-mount
 
The CDH5 hadoop distribution includes a FUSE (Filesystem in Userspace) interface into HDFS. FUSE enables one to write a normal userland application as a bridge for a traditional filesystem interface. Thus, regular Linux POSIX commands to copy, remove and list files can be used on HDFS.
 

Hadoop streaming vs. Native Java execution

 
Hadoop provides a streaming API which supports any programming language that can read from the standard input stdin and write to the standard output stdout. The Hadoop streaming API uses standard Linux streams as the interface between Hadoop and the program.  Thus, input data is passed via the stdin to a map function, which processes it line by line and writes to the stdout.  Input to the reduce function is stdin (which is guaranteed to be sorted by key by Hadoop) and the results are output to stdout. Let's walk through a canonical MapReduce word count example, writing a mapper function, a reducer function, running it locally and then running it using the Hadoop streaming jar. 
 

Developing a Map-reduce application

 
Each MapReduce job is typically composed of a mapper, optional combiner and optional reducer. Hadoop manages the parallel execution, performs intermediate sorting and coordinates the tasks that execute mappers or reducers. Hadoop can run MapReduce programs written in various languages including Java, Python, C++ and R. 
 
Mapping step
Example mapper for the word-count task written in Python is as follows:
 
import sys
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        print '%s\t%s' % (word, 1)

Reducer step
Example reducer for the word-count task written in Python:

import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
    # remove leading and trailing whitespaces
    line = line.strip()
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word
if current_word == word:
    print '%s\t%s' % (current_word, current_count)
 
 
Optional combiner step
In the above example of word counting, a key-value pair is emitted for every word found.  Between the mapping and reduction steps, a shuffle and sort step on the key-value pairs occurs. If the number of intermediate key-value pairs is extremely large, the MapReduce job can have a very time consuming Shuffle step which can be reduced by performing local aggregation.  Namely, a dictionary of word frequencies per each document can be added so that only distinct word counts per document are emitted:
 
import sys
for line in sys.stdin:
    #for each document create dictionary of words
    word_cnts = dict()
    line = line.strip()
    words = line.split()
    for word in words:
        if word not in word_cnts.keys(): word_cnts[word] = 1
        else: word_cnts[word] += 1
    # emit key-value pairs only for distinct words per document 
    for w in word_cnts.keys():
        print '%s\t%s' % (w,word_cnts[w])

 
The program above illustrates an in-mapper combiner pattern. A combiner step can be introduced as a separate step after the mapper step and before the reducer step. One requirement for the combiner is that the input key-value pair format should coincide with the output format. 
 

Validating the MapReduce application

 
To make sure the code is written well, let us run it locally first without Hadoop: 
 
$ cat 44604.txt.utf-8 | python mapper.py | sort -k1,1 | python reducer.py

This uses pipes to redirect the output of the previous step to the input of the next step. Notice that the output is sorted alphabetically according to the first key.  


MapReduce job submission on a Hadoop cluster

 
To submit the MapReduce application on a cluster using YARN scheduler execute the following command:
 
$ export STREAMING_JAR=/usr/lib/hadoop-mapreduce/hadoop-streaming.jar
$ yarn jar $STREAMING_JAR \
-mapper mapper.py -reducer reducer.py \
-input /user/myusername/input -output /user/myusername/gutenberg_output -file mapper.py -file reducer.py

The mapper task converts the inputs into lines and feeds the lines into the stdin of the process. The output from stdout of the mapper process is converted into a key/value pair.  By default, all characters up to the first tab character is the key and the rest of the line is the value. This is customizable. 


Optional streaming command parameters:

 
-combiner, -inputformat, -outputformat, -numReduceTasks, -file
 
Parameter Description
-D Configuration variables, e.g.
-Dmapred.reduce.tasks=0 specifies a "map-only" job.
-Dstream.map.output.field.separator=. specifies '.' as the field separator instead of '\t' 
-Doutput.compression.enabled=true  
-Doutput.compression.codec=
org.apache.hadoop.io.compress.BZip2Codec
 
 
Note that there are no spaces after in "-Dmapred.reduce.tasks=0" string. For full documentation, see: https://hadoop.apache.org/docs/r1.2.1/streaming.html
 

MapReduce job splitting configuration

 
Choosing the appropriate number of tasks for a MapReduce job can radically change the performance of Hadoop. Increasing the number of tasks increases the framework overhead, but increases load balancing and lowers the cost of failures. The number of map tasks for a given job is driven by the number of splits of the input data and the data block size on HDFS. For each input split a map task is spawned. So, over the lifetime of a mapreduce job the number of map tasks is equal to the number of input splits. 
 
mapred.map.tasks parameter is a suggestion to the InputFormat for the number of maps.
 
The number of reducers can be controlled by mapred.reduce.tasks parameter. The rules of thumb are, the number of reducers should be a multiple of the HDFS block size, the reduce task time should be between 5 and 15 minutes, and the output should create the fewest files possible.

See the following page for more details:
http://wiki.apache.org/hadoop/HowManyMapsAndReduces
 

Packaging additional libraries in MapReduce job

 
Let us consider a more advanced example, which is based on word counting. A typical text mining or natural language processing task involves following steps:
 
1) Tokenization
2) Text normalization
3) Stop word removal
4) Stemming
 
each of those steps can fit in the mapper task as follows:

#!/usr/bin/env python
import sys
import re
import string
import nltk.tokenize
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    #split each line on whitspace
    data = line.split()
    stemmer = nltk.stem.SnowballStemmer("english")
    for word in data:
        #remove punctuation
        word = word.strip(string.punctuation)
        #remove numbers
        if re.search("\d+",word): continue 
        #remove stopwords
        if word.lower() in stops: continue
        #stem<
        word = stemmer.stem(word) 
        if len(word) < 2: continue
        print '%s\t%s' % (word, 1)

To be able to use external libraries like NLTK we would need to package them and submit them with MapReduce job.
First, download the zip file from the NLTK site:
 
$ wget https://pypi.python.org/packages/source/n/nltk/nltk-3.0.2.zip#md5=0fa63e458d9eaa1ceb2d68d603d87ed5

Once these files are copied to your local box and unzipped, change into the download directory and run:
 
$ cd nltk-3.0.2
$ zip -r nltk.zip nltk
$ mv nltk.zip ../nltk.mod
 
Modify your python mapper script to propely load from .mod:
 
import zipimport
importer = zipimport.zipimporter('nltk.mod')
nltk = importer.load_module('nltk')
from nltk.corpus import stopwords
nltk.data.path+=["."]
stops = set(stopwords.words('english'))

 
Finally, in order for the stop word corpus available in NLTK to be available during the execution, add it in the list of files:

$ yarn jar $STREAMING_JAR -input myTextFiles -output outputFiles -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -file nltk.mod -file corpora