payspark als
作者:互联网
from pyspark.sql import SparkSession
import math
from os.path import abspath
def cosSim(v1, v2):
member = reduce(lambda x, y: x+y, map(lambda d: d[0]*d[1], zip(v1,v2)))
t1 = math.sqrt(reduce(lambda m, n: m+n, map(lambda x: math.pow(x, 2), v1)))
t2 = math.sqrt(reduce(lambda m, n: m+n, map(lambda x: math.pow(x, 2), v2)))
return 1.0*member/(t1*t2)
warehouse_location = abspath('spark-warehouse')
spark = SparkSession.builder.appName("pyspark als").config("spark.sql.warehouse.dir", warehouse_location).enableHiveSupport().getOrCreate()
data = spark.sql("select userid,itemid,count(*) as score from table group by userid,itemid")
from pyspark.mllib.recommendation import Rating
ratings = data.rdd.map(lambda x:Rating(int(x[0]), int(x[1]), float(x[2])))
from pyspark.mllib.recommendation import ALS
model = ALS.trainImplicit(ratings, rank=10)
jf = model.productFeatures()
jf = spark.createDataFrame(jf)
jf = jf.withColumnRenamed("_1", "id").withColumnRenamed("_2", "features")
vvv = spark.sql("select itemid,code,city_code from table")
vjf = jf.join(vvv, jf.id == vvv.itemid)
cvjf = vjf.select("itemid", "code", "city_code", "features")
jcvjf = cvjf.withColumnRenamed("itemid", "xjobid").withColumnRenamed("code", "xcode").withColumnRenamed("city_code", "xcity_code").withColumnRenamed("features", "xfeatures")
jjj = cvjf.join(jcvjf, (cvjf.code == jcvjf.xcode) & (cvjf.city_code == jcvjf.xcity_code))
jj = jjj.filter("itemid!=xitemid").select("itemid", "xitemid", "features", "xfeatures")
sjj = jj.rdd.map(lambda x: (x[0], x[1], cosSim(x[2], x[3])))
gsj = sjj.groupBy(lambda x: x[0])
sj3 = gsj.map(lambda x: str(x[0])+','+','.join(map(lambda m: str(m[1]), sorted(list(x[1]), key=lambda n: n[2], reverse=True)[:10])))
#print sj3.collect()
sj3.saveAsTextFile("/hdfs/py_als_test")
spark.stop()
标签:itemid,map,code,jf,payspark,spark,als,lambda 来源: https://www.cnblogs.com/kayy/p/11272329.html