如何使用scala或python在apache spark中运行多线程作业?
作者:互联网
我正面临一个与spark并发相关的问题,这阻碍了我在生产中使用它,但我知道有一条出路.我正在尝试使用订单历史记录为700万用户运行Spark ALS.首先,我将获取不同用户的列表,然后对这些用户运行循环以获取建议,这是一个非常缓慢的过程,需要几天时间才能获得所有用户的建议.我尝试用笛卡尔用户和产品一次性获取所有建议,但再次将其提供给elasticsearch我必须为每个用户过滤和排序记录,然后我才能将它提供给elasticsearch以供其他API使用.
所以请建议我一个在这种用例中具有相当可扩展性的解决方案,并在生产中使用实时建议.
这是我在scala中的代码片段,它将让您了解我目前正在接近如何解决问题:
// buy_values -> RDD with Rating(<int user_id>, <int product_id>, <double rating>)
def recommend_for_user(user: Int): Unit = {
println("Recommendations for User ID: " + user);
// Product IDs which are not bought by user
val candidates = buys_values
.filter(x => x("customer_id").toString.toInt != user)
.map(x => x("product_id").toString.toInt)
.distinct().map((user, _))
// find 30 products with top rating
val recommendations = bestModel.get
.predict(candidates)
.takeOrdered(30)(Ordering[Double].reverse.on(x => x.rating))
var i = 1
var ESMap = Map[String, String]()
recommendations.foreach { r =>
ESMap += r.product.toString -> bitem_ids.value(r.product)
}
// push to elasticsearch with user as id
client.execute {
index into "recommendation" / "items" id user fields ESMap
}.await
// remove candidate RDD from memory
candidates.unpersist()
}
// iterate on each user to get recommendations for the user [slow process]
user_ids.foreach(recommend_for_user)
解决方法:
很明显,你的计划中的瓶颈是搜索候选人.鉴于Spark架构,它严重限制了并行化的能力,并通过为每个用户启动Spark作业来增加大量开销.
假设典型情况,大多数时间有700万用户和十亿产品,您可以预测整个产品范围减去用户已经购买的少数产品.至少在我看来,重要的问题是为什么甚至打扰过滤.即使您推荐以前购买的产品,它真的有害吗?
除非你有非常严格的要求,否则我会忽略这个问题并使用MatrixFactorizationModel.recommendProductsForUsers
,它几乎完成所有工作,不包括数据导出.之后,您可以执行批量导出,您很高兴.
现在假设你有一个明确的无重复政策.在假设典型用户仅购买相对较少数量的产品的情况下工作,您可以从为每个用户获取一组产品开始:
val userProdSet = buy_values
.map{case (user, product, _) => (user, product)}
.aggregateByKey(Set.empty[Int])((s, e) => s + e, (s1, s2) => s1 ++ s2)
接下来,您只需映射userProdSet即可获得预测:
// Number of predictions for each user
val nPred = 30;
userProdSet.map{case (user, prodSet) => {
val recommended = model
// Find recommendations for user
.recommendProducts(_, nPred + prodSet.size))
// Filter to remove already purchased
.filter(rating => !prodSet.contains(rating.product))
// Sort and limit
.sortBy(_.rating)
.reverse
.take(nPred)
(user, recommended)
}}
您可以通过使用可变集合进行聚合并通过广播模型来进一步改进,但这是一个大致的想法.
如果user_ids中的用户数低于整个集合中的用户数(buy_values),则可以简单地过滤userProdSet以仅保留用户的子集.
标签:python,scala,apache-spark,apache-spark-mllib,recommendation-engine 来源: https://codeday.me/bug/20190702/1358707.html