mercredi 25 juin 2014

Shuffle and Sorting in Hadoop


Vote count:

0




I have been reading about Hadoop and have implemented sample MR programs in Hadoop using Python.


I am confused about shuffle and sorting in hadoop


My mapper code emits key value pairs


Example Mapper Code



#!/usr/bin/env python

import sys
import csv

time =[]
#----------------------------------------#
# input comes from STDIN (standard input)#
#----------------------------------------#

reader = csv.reader(sys.stdin ,delimiter='\t')
all_data = list(reader) # converting it into list

for line in all_data:
if line[0] == "id":
continue
time = line[8]
#------------------------------------------#
# Get the authorID and the hour they posted#
#------------------------------------------#

print "%s\t%s" % ( line[3], time[11:13])


Output of the mapper would be something like


user1 01



  • user2 12

    • user1 15

    • user4 16

    • user1 07 -user1 07




Reducer code



#!/usr/bin/env python
import sys

import csv

listHour=[]
from collections import defaultdict
from operator import itemgetter
thisAuthorID = None

for line in sys.stdin:
data = line.strip().split('\t')
if len(data) == 2:
authorID, hour = data
hour = int(hour)

else:
continue

if (thisAuthorID == authorID) or (thisAuthorID == None):
listHour.append(hour)
thisAuthorID = authorID

else:
if thisAuthorID <> authorID and thisAuthorID:

counter = {}


#-------------------------------------------------#
#Loop through the list of hours for each authorID-#
#-------------------------------------------------#

for k in listHour:
#------------------------------------#
#Identify the occurrence of each hour#
#------------------------------------#
counter[k] = listHour.count(k)

#-----------------------------------------------------------------#
# Identify the hour in which author posted the maximum post #
#-----------------------------------------------------------------#

list1 = [key for key,val in counter.iteritems() if val == max(counter.values())]

for item in list1:
print "%s\t%s"% (thisAuthorID,item)
listHour = []
thisAuthorID = authorID
listHour.append(hour)

#----------------------------------------#
#print the hour in which the user posted #
#----------------------------------------#

counter = {}
for k in listHour:
counter[k] = listHour.count(k)
list1 = [key for key,val in counter.iteritems() if val == max(counter.values())]

for item in list1:
print "%s\t%s"% (thisAuthorID,item)


the output is in which hour user posted maximum number of post


user1 07.


Question is what happens in shuffle and sorting phase from mappers output goes as input to reducer. ***I have read that Reducer accepts one key-value pair for every ke***y. if this true, Input to the reducer should have been user1 [07, 15,07] but reducer input was something like sorted list of key. user 1 07 user 1 15 user 1 07


Am I missing something here?



asked 56 secs ago






Aucun commentaire:

Enregistrer un commentaire