MapReduce Examples Implementation

MapReduce一些经典实现案例编写(使用mrjob+Python)

  1. MapReduce?
  1. 实现movielen数据中的用户平均评分
userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
#!/usr/bin/python
# -*- coding: utf-8 -*-
from mrjob.job import MRJob


class MRratingAvg(MRJob):
    '''
    计算每个用户的平均评分
    '''
    def mapper(self, _, line):
        user_id, item_id, rating, timestamp = line.strip().split(',')
        if not user_id.isdigit():
            return
        yield user_id, float(rating)

    def reducer(self, user_id, values):
        #shuff and sort 之后
        '''
        (user_id,[rating1,rating2,rating3])
        '''
        l = list(values)
        yield (user_id, sum(l)/len(l))


if __name__ == '__main__':
    MRratingAvg.run()
  1. 使用mrjob实现去重任务
#~/usr/bin/python
from mrjob.job import MRJob

class MREliminate(MRJob):
    def mapper(self, _, lines):
        line = lines.strip().split(" ")
        yield line[0], (line[1], line[2])

    def reducer(self, key, values):
        exist_ls = []
        for v in values:
            if v not in exist_ls:
                exist_ls.append(v)
                yield key, (v[0], v[1])

if __name__ == "__main__":
    MREliminate.run()
  1. 使用MapReduce实现排序任务
from mrjob.job import MRJob


class MRrank(MRJob):
    '''
    计算每个用户的平均评分
    '''
    def mapper(self, _, line):
        user_id, score = line.split(',')
        yield 'rank', (float(score), user_id)

    def reducer(self, user_id, values):
        # shuff and sort 之后
        l = list(values)
        l.sort()
        for key in l:
            print key[1], key[0]


if __name__ == '__main__':
    MRrank.run()
  1. 使用MapReduce实现倒排索引 Data set shown following:
d1,i love shanghai
d2,i am from shanghai now i study in tongji university
d3,i am from lanzhou now i study in lanzhou university of science  and  technolgy
from mrjob.job import MRJob


class MRratingAvg(MRJob):
    '''
    计算每个用户的平均评分
    '''
    def mapper(self, _, line):
        line = line.strip().split(',')
        docum = line[0]
        words = line[1].split()
        for word in words:
            yield word, docum

    def reducer(self, word, docum):
        #shuff and sort 之后
        temp = []
        for d in docum:
            temp.append(d)
        yield word, temp


if __name__ == '__main__':
    MRratingAvg.run()
"am"	["d2", "d3"]
"and"	["d3"]
"from"	["d2", "d3"]
"i"	["d1", "d2", "d2", "d3", "d3"]
"in"	["d2", "d3"]
"lanzhou"	["d3", "d3"]
"love"	["d1"]
"now"	["d2", "d3"]
"of"	["d3"]
"science"	["d3"]
"shanghai"	["d1", "d2"]
"study"	["d2", "d3"]
"technolgy"	["d3"]
"tongji"	["d2"]
"university"	["d2", "d3"]
  1. MapReduce 实现PageRank特定算法任务 Data set:
A,0.25,B,C,D
B,0.25,A,D
C,0.25,C
D,0.25,B,C

数据集代表着从某节点到其他节点的边的分布

from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from mrjob.step import MRStep


class MRPageRank(MRJob):

    def mapper(self, _, line):
        data = line.strip().split(',')
        p = float(data[1])
        target = data[2:]
        n = len(target)
        for i in target:
            yield i, p/n

    def reducer(self, id, p):
        ''' reducer of pagerank algorithm'''
        alpha = 0.8
        N = 4  # Size of the web pages
        value = 0.0
        for v in p:
            value += float(v)
        values = alpha * value + (1 - alpha) / N
        yield id, values


if __name__ == '__main__':
    MRPageRank.run()