Vote count:
0
I have been reading about Hadoop and have implemented sample MR programs in Hadoop using Python.
I am confused about shuffle and sorting in hadoop
My mapper code emits key value pairs
Example Mapper Code
#!/usr/bin/env python
import sys
import csv
time =[]
#----------------------------------------#
# input comes from STDIN (standard input)#
#----------------------------------------#
reader = csv.reader(sys.stdin ,delimiter='\t')
all_data = list(reader) # converting it into list
for line in all_data:
if line[0] == "id":
continue
time = line[8]
#------------------------------------------#
# Get the authorID and the hour they posted#
#------------------------------------------#
print "%s\t%s" % ( line[3], time[11:13])
Output of the mapper would be something like
user1 01
- user2 12
- user1 15
- user4 16
- user1 07 -user1 07
Reducer code
#!/usr/bin/env python
import sys
import csv
listHour=[]
from collections import defaultdict
from operator import itemgetter
thisAuthorID = None
for line in sys.stdin:
data = line.strip().split('\t')
if len(data) == 2:
authorID, hour = data
hour = int(hour)
else:
continue
if (thisAuthorID == authorID) or (thisAuthorID == None):
listHour.append(hour)
thisAuthorID = authorID
else:
if thisAuthorID <> authorID and thisAuthorID:
counter = {}
#-------------------------------------------------#
#Loop through the list of hours for each authorID-#
#-------------------------------------------------#
for k in listHour:
#------------------------------------#
#Identify the occurrence of each hour#
#------------------------------------#
counter[k] = listHour.count(k)
#-----------------------------------------------------------------#
# Identify the hour in which author posted the maximum post #
#-----------------------------------------------------------------#
list1 = [key for key,val in counter.iteritems() if val == max(counter.values())]
for item in list1:
print "%s\t%s"% (thisAuthorID,item)
listHour = []
thisAuthorID = authorID
listHour.append(hour)
#----------------------------------------#
#print the hour in which the user posted #
#----------------------------------------#
counter = {}
for k in listHour:
counter[k] = listHour.count(k)
list1 = [key for key,val in counter.iteritems() if val == max(counter.values())]
for item in list1:
print "%s\t%s"% (thisAuthorID,item)
the output is in which hour user posted maximum number of post
user1 07.
Question is what happens in shuffle and sorting phase from mappers output goes as input to reducer. ***I have read that Reducer accepts one key-value pair for every ke***y. if this true, Input to the reducer should have been user1 [07, 15,07] but reducer input was something like sorted list of key. user 1 07 user 1 15 user 1 07
Am I missing something here?
asked 56 secs ago
Aucun commentaire:
Enregistrer un commentaire