共计 8524 个字符,预计需要花费 22 分钟才能阅读完成。
最近隔壁的小伙伴跑模型出现了类似以下的错误,当然最主要的错误在下面的红色标识出来
client token: N/A diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 38, hbase10): java.lang.AssertionError: assertion failed: lapack.dpotrs returned 4. at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:40) at org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:439) at org.apache.spark.ml.recommendation.ALS$anonfunorgapachespark$m |
问题的来源定位在spark源码中的位置
private def checkReturnValue(info: intW, method: String): Unit = {
info.`val` match {
case code if code < 0 =>
throw new IllegalStateException(s"LAPACK.method returnedcode; arg {-code} is illegal")
case code if code>0 =>
throw new SingularMatrixException (
s"LAPACK.method returned $code because A is not positive definite. Is A derived from " +
"a singular matrix (e.g. collinear column values)?")
case _ => // do nothing
}
}
运行时代码的环境还是spark1.6.上面代码解析都是在2.1.1中进行的,相关接口基本上差异不是太大,核心代码如下
def train[ID: ClassTag]( // scalastyle:ignore
ratings: RDD[Rating[ID]],
rank: Int = 10,
numUserBlocks: Int = 10,
numItemBlocks: Int = 10,
maxIter: Int = 10,
regParam: Double = 0.1,
implicitPrefs: Boolean = false,
alpha: Double = 1.0,
nonnegative: Boolean = false,
intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,
finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,
checkpointInterval: Int = 10,
seed: Long = 0L)(
implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = {
require(!ratings.isEmpty(), s"No ratings available from ratings")
require(intermediateRDDStorageLevel != StorageLevel.NONE,
"ALS is not designed to run without persisting intermediate RDDs.")
val sc = ratings.sparkContext
val userPart = new ALSPartitioner(numUserBlocks)
val itemPart = new ALSPartitioner(numItemBlocks)
val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions)
val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions)
val solver = if (nonnegative) new NNLSSolver else new CholeskySolver
val blockRatings = partitionRatings(ratings, userPart, itemPart)
.persist(intermediateRDDStorageLevel)
val (userInBlocks, userOutBlocks) =
makeBlocks("user", blockRatings, userPart, itemPart, intermediateRDDStorageLevel)
// materialize blockRatings and user blocks
userOutBlocks.count()
val swappedBlockRatings = blockRatings.map {
case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, localRatings)) =>
((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings))
}
val (itemInBlocks, itemOutBlocks) =
makeBlocks("item", swappedBlockRatings, itemPart, userPart, intermediateRDDStorageLevel)
// materialize item blocks
itemOutBlocks.count()
val seedGen = new XORShiftRandom(seed)
var userFactors = initialize(userInBlocks, rank, seedGen.nextLong())
var itemFactors = initialize(itemInBlocks, rank, seedGen.nextLong())
var previousCheckpointFile: Option[String] = None
val shouldCheckpoint: Int => Boolean = (iter) =>
sc.checkpointDir.isDefined && checkpointInterval != -1 && (iter % checkpointInterval == 0)
val deletePreviousCheckpointFile: () => Unit = () =>
previousCheckpointFile.foreach { file =>
try {
val checkpointFile = new Path(file)
checkpointFile.getFileSystem(sc.hadoopConfiguration).delete(checkpointFile, true)
} catch {
case e: IOException =>
logWarning(s"Cannot delete checkpoint filefile:", e)
}
}
if (implicitPrefs) {
for (iter <- 1 to maxIter) {
userFactors.setName(s"userFactors-iter").persist(intermediateRDDStorageLevel)
val previousItemFactors = itemFactors
itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, regParam,
userLocalIndexEncoder, implicitPrefs, alpha, solver)
previousItemFactors.unpersist()
itemFactors.setName(s"itemFactors-iter").persist(intermediateRDDStorageLevel)
// TODO: Generalize PeriodicGraphCheckpointer and use it here.
val deps = itemFactors.dependencies
if (shouldCheckpoint(iter)) {
itemFactors.checkpoint() // itemFactors gets materialized in computeFactors
}
val previousUserFactors = userFactors
userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, regParam,
itemLocalIndexEncoder, implicitPrefs, alpha, solver)
if (shouldCheckpoint(iter)) {
ALS.cleanShuffleDependencies(sc, deps)
deletePreviousCheckpointFile()
previousCheckpointFile = itemFactors.getCheckpointFile
}
previousUserFactors.unpersist()
}
} else {
for (iter <- 0 until maxIter) { itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, regParam, userLocalIndexEncoder, solver = solver) if (shouldCheckpoint(iter)) { val deps = itemFactors.dependencies itemFactors.checkpoint() itemFactors.count() // checkpoint item factors and cut lineage ALS.cleanShuffleDependencies(sc, deps) deletePreviousCheckpointFile() previousCheckpointFile = itemFactors.getCheckpointFile } userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, regParam, itemLocalIndexEncoder, solver = solver) } } val userIdAndFactors = userInBlocks .mapValues(_.srcIds) .join(userFactors) .mapPartitions({ items =>
items.flatMap { case (_, (ids, factors)) =>
ids.view.zip(factors)
}
// Preserve the partitioning because IDs are consistent with the partitioners in userInBlocks
// and userFactors.
}, preservesPartitioning = true)
.setName("userFactors")
.persist(finalRDDStorageLevel)
val itemIdAndFactors = itemInBlocks
.mapValues(_.srcIds)
.join(itemFactors)
.mapPartitions({ items =>
items.flatMap { case (_, (ids, factors)) =>
ids.view.zip(factors)
}
}, preservesPartitioning = true)
.setName("itemFactors")
.persist(finalRDDStorageLevel)
if (finalRDDStorageLevel != StorageLevel.NONE) {
userIdAndFactors.count()
itemFactors.unpersist()
itemIdAndFactors.count()
userInBlocks.unpersist()
userOutBlocks.unpersist()
itemInBlocks.unpersist()
itemOutBlocks.unpersist()
blockRatings.unpersist()
}
(userIdAndFactors, itemIdAndFactors)
}
关于上述代码的解析可以推荐一个博客写的非常好,对博主阅读源码找问题时提供了很大的帮助,als源码解析,在这里最主要在
val previousItemFactors = itemFactors itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, regParam, userLocalIndexEncoder, implicitPrefs, alpha, solver)
private def computeFactors[ID](
srcFactorBlocks: RDD[(Int, FactorBlock)],
srcOutBlocks: RDD[(Int, OutBlock)],
dstInBlocks: RDD[(Int, InBlock[ID])],
rank: Int,
regParam: Double,
srcEncoder: LocalIndexEncoder,
implicitPrefs: Boolean = false,
alpha: Double = 1.0,
solver: LeastSquaresNESolver): RDD[(Int, FactorBlock)] = {
val numSrcBlocks = srcFactorBlocks.partitions.length
//涉及隐式与显式,默认都是显式
val YtY = if (implicitPrefs) Some(computeYtY(srcFactorBlocks, rank)) else None
val srcOut = srcOutBlocks.join(srcFactorBlocks).flatMap {
case (srcBlockId, (srcOutBlock, srcFactors)) =>
srcOutBlock.view.zipWithIndex.map { case (activeIndices, dstBlockId) =>
(dstBlockId, (srcBlockId, activeIndices.map(idx => srcFactors(idx))))
}
}
val merged = srcOut.groupByKey(new ALSPartitioner(dstInBlocks.partitions.length))
dstInBlocks.join(merged).mapValues {
case (InBlock(dstIds, srcPtrs, srcEncodedIndices, ratings), srcFactors) =>
val sortedSrcFactors = new Array[FactorBlock](numSrcBlocks)
srcFactors.foreach { case (srcBlockId, factors) =>
sortedSrcFactors(srcBlockId) = factors
}
val dstFactors = new Array[Array[Float]](dstIds.length)
var j = 0
val ls = new NormalEquation(rank)
//以计算商品的特征矩阵为例,这个就是遍历每一个商品
while (j < dstIds.length) {
ls.reset()
if (implicitPrefs) {
ls.merge(YtY.get)
}
var i = srcPtrs(j)
var numExplicits = 0
//此处遍历每一个商品相关联的用户
while (i < srcPtrs(j + 1)) {
//加密计算索引
val encoded = srcEncodedIndices(i)
//根据索引计算得到分区ID
val blockId = srcEncoder.blockId(encoded)
val localIndex = srcEncoder.localIndex(encoded)
val srcFactor = sortedSrcFactors(blockId)(localIndex)
val rating = ratings(i)
if (implicitPrefs) {
// Extension to the original paper to handle rating < 0. confidence is a function
// of |rating| instead so that it is never negative. c1 is confidence - 1.
val c1 = alpha * math.abs(rating)
// For rating <= 0, the corresponding preference is 0. So the second argument of add // is only there for rating > 0.
if (rating > 0.0) {
numExplicits += 1
}
ls.add(srcFactor, if (rating > 0.0) 1.0 + c1 else 0.0, c1)
} else {
ls.add(srcFactor, rating)
numExplicits += 1
}
i += 1
}
// Weight lambda by the number of explicit ratings based on the ALS-WR paper.
//此处执行wls求解,使用的是cholesky分解方法
dstFactors(j) = solver.solve(ls, numExplicits * regParam)
j += 1
}
dstFactors
}
}
下面贴出cholesky代码
private[recommendation] class CholeskySolver extends LeastSquaresNESolver {
/**
* Solves a least squares problem with L2 regularization:
*
* min norm(A x - b)^2^ + lambda * norm(x)^2^
*
* @param ne a [[NormalEquation]] instance that contains AtA, Atb, and n (number of instances)
* @param lambda regularization constant
* @return the solution x
*/
override def solve(ne: NormalEquation, lambda: Double): Array[Float] = {
val k = ne.k
// Add scaled lambda to the diagonals of AtA.
var i = 0
var j = 2
while (i < ne.triK) {
ne.ata(i) += lambda
i += j
j += 1
}
//分解方法使用的是lapack打包的方法,spark这边这是负责封装api
CholeskyDecomposition.solve(ne.ata, ne.atb)
val x = new Array[Float](k)
i = 0
while (i < k) {
x(i) = ne.atb(i).toFloat
i += 1
}
ne.reset()
x
}
}
choklesky计算的原理可以参考这个文章:cholesky分解
该线性回归问题时,对左边ata矩阵进行cholesky分解出现错误,原因在于矩阵的对称矩阵中的数据太小,cholesky分解的前提是矩阵要是正定矩阵,如果不满足条件就会出现分解异常中断程序,因此在评分的构造时建议将评分数据最小值适当定义,建议从1开始定义数据
ps:从网上查到的资料显示如果存在多列数据方差为0,及时数据不是很小也会出现上面描述的情况,出现这个现象就是矩阵A并不是列满秩