slope one python版与spark sql版本代码分享

4,923次阅读
没有评论

共计 3825 个字符,预计需要花费 10 分钟才能阅读完成。

slopeone原理

Slope One  是一种很简单的类比类似的算法, 其实大体意思 就是A B 不同的用户 对不同的 item1 item2 打分

那么 一个新用户C 对item的打分,就是  该用户的打分 减去 其他用户打分的平均 就是C用户对未知tem的打分

他有个很好的有点,数据少的时候效果也很好。

user item1 item2
A 7 2
B 8 3
C 9

那么 C的item2 打分 为 9-((7-2)+(8-3))/2=4

其实分成2步

1. 计算物品之间评分差的平均值,记为物品间的评分偏差;

2.根据物品间的评分偏差和用户的历史评分,给用户生成预测评分高的推荐物品列表。

python版本

# Copyright 2006 Bryan O'Sullivan <bos@serpentine.com>.
#
# This software may be used and distributed according to the terms
# of the GNU General Public License, version 2 or later, which is
# incorporated herein by reference.

class SlopeOne(object):
    def __init__(self):
        self.diffs = {}
        self.freqs = {}

    def predict(self, userprefs):
        preds, freqs = {}, {}
        for item, rating in userprefs.iteritems():
            for diffitem, diffratings in self.diffs.iteritems():
                try:
                    freq = self.freqs[diffitem][item]
                except KeyError:
                    continue
                preds.setdefault(diffitem, 0.0)
                freqs.setdefault(diffitem, 0)
                preds[diffitem] += freq * (diffratings[item] + rating)
                freqs[diffitem] += freq
        return dict([(item, value / freqs[item])
                     for item, value in preds.iteritems()
                     if item not in userprefs and freqs[item] > 0])

    def update(self, userdata):
        for ratings in userdata.itervalues():
            for item1, rating1 in ratings.iteritems():
                self.freqs.setdefault(item1, {})
                self.diffs.setdefault(item1, {})
                for item2, rating2 in ratings.iteritems():
                    self.freqs[item1].setdefault(item2, 0)
                    self.diffs[item1].setdefault(item2, 0.0)
                    self.freqs[item1][item2] += 1
                    self.diffs[item1][item2] += rating1 - rating2
        for item1, ratings in self.diffs.iteritems():
            for item2 in ratings:
                ratings[item2] /= self.freqs[item1][item2]

if __name__ == '__main__':
    userdata = dict(
        alice=dict(squid=1.0,
                   cuttlefish=0.5,
                   octopus=0.2),
        bob=dict(squid=1.0,
                 octopus=0.5,
                 nautilus=0.2),
        carole=dict(squid=0.2,
                    octopus=1.0,
                    cuttlefish=0.4,
                    nautilus=0.4),
        dave=dict(cuttlefish=0.9,
                  octopus=0.4,
                  nautilus=0.5),
        )
    s = SlopeOne()
    s.update(userdata)
    print s.predict(dict(squid=0.4))

spark版本

spark主要使用spark sql模块操作

import sys
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import Rating
from pyspark.sql import SQLContext
import operator
import math


conf = SparkConf().setAppName("Slope One")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

training_inputs='/home/cuijian/slope_one_train.txt'
testing_inputs='/home/cuijian/slope_one_test.txt'
def get_tuple(line):
    elems = line.split(',')
    return int(elems[0]), int(elems[1]), float(elems[2])


def main():
    training_in = sc.textFile(training_inputs)
    testing_in = sc.textFile(testing_inputs)

    training_data = training_in.map(get_tuple)
    testing_data = testing_in.map(get_tuple).cache()

    training_df = sqlContext.createDataFrame(training_data, ['uid', 'mid', 'rating'])
    testing_df = sqlContext.createDataFrame(testing_data, ['uid', 'mid', 'rating'])

    training_df.registerTempTable("TrainingTable")
    testing_df.registerTempTable("TestingTable")

    joined_user_df = sqlContext.sql("""
    SELECT t1.uid, t1.mid as mid1, t2.mid as mid2, (t1.rating-t2.rating) as rating_diff FROM
    TrainingTable t1
    JOIN
    TrainingTable t2
    ON (t1.uid = t2.uid)
    """)

    joined_user_df.registerTempTable("JoinedUserTable")
    mpair_dev_c_df = sqlContext.sql("""
    SELECT mid1, mid2, sum(rating_diff)/count(rating_diff) as dev, count(rating_diff) as c FROM
    JoinedUserTable
    Group By mid1, mid2
    """)


    mpair_dev_c_df.registerTempTable('mpair_dev_c_dfTable')

    result=sqlContext.sql('SELECT a.mid,b.mid2,a.rating-b.dev  FROM  TestingTable a JOIN mpair_dev_c_dfTable b ON a.mid=b.mid1 ')
    

    result.show()
    


    # testing_training_df = sqlContext.sql("""
    # SELECT t1.uid, t1.mid as midj, t2.mid as midi, t1.rating as rating_j, t2.rating as rating_i FROM
    # TestingTable t1
    # JOIN
    # TrainingTable t2
    # ON (t1.uid = t2.uid)
    # """)

    # cond = [testing_training_df.midj == mpair_dev_c_df.mid1, testing_training_df.midi == mpair_dev_c_df.mid2]
    # df = testing_training_df.join(mpair_dev_c_df, cond)

    # df.registerTempTable("AllTable")
    # ps = sqlContext.sql("""
    # SELECT uid, midj, sum((dev+rating_i)*c)/sum(c) as p, rating_j as true_rating FROM
    # AllTable
    # Group By uid, midj, rating_j
    # """)

    # ps.registerTempTable("PTable")
    # rmse = sqlContext.sql("""
    # SELECT sqrt(sum(power(true_rating-p, 2))/count(true_rating)) as RMSE FROM
    # PTable
    # """)
    # rmse.show()

if __name__ == '__main__':
    main()

参考资料
http://www.codexiu.cn/spark/blog/13452/

正文完
请博主喝杯咖啡吧!
post-qrcode
 
admin
版权声明:本站原创文章,由 admin 2018-02-24发表,共计3825字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)
验证码