数据库
首页 > 数据库> > SparkSQL Catalyst中的TreeNode

SparkSQL Catalyst中的TreeNode

作者:互联网

引言

Scala Product、case类和元组

case 关键字不仅可以推断出val,同时自动增加一些方法,那么增加了那些方法呢?

你定义的case 类会混入scala.Product 特征,它提供了几个关于实例字段的通用方法。例如,对于Person 的实例:

package cn.com.tengen.test.obj
case class Person(name: String,age: Option[Int] = None)
 
object Person extends App{
  val p = Person("Lucky",Option(18))
  println(p.productArity) //元素的个数            输出:2
  println(p.productElement(0))//第1个元素         输出:Lucky
  println(p.productElement(1))//第二个元素        输出:Some(18)
  p.productIterator foreach println//便利         输出:Lucky  Some(18)
}

可以通过Product特征的方法来获取Case类的各个属性以及对应的一些操作(大小size、第n个元素值等)

简介

TreeNode累是Catalyst中和执行计划相关的所有AST,包括表达式Expression、逻辑执行计划LogicalPlan,物理执行计划SparkPlan的基类。

TreeNode继承Scala中的Product类,其目的是可通过Product类中的方法(productArity、productElement、productIterator)来操纵TreeNode实现类的参数,这些实现类一般都是case class。

TreeNode是抽象类,用于被其他抽象或实现类继承,如Expression、LogicalPlan。TreeNode类声明如下:

abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product

并且定义了this的别名:

  self: BaseType =>

这里约定TreeNode的泛型参数类型BaseType应是TreeNode[BasType]的子类型,并且继承TreeNode的实现类的类型就是传入的BaseType。比如Aggregate是LogicalPlan的实现类,LogicalPlan继承QueryPlan[LogicalPlan],又有QueryPlan[PlanType <: QueryPlan[PlanType]]继承TreeNode[PlanType],因此Aggregate的self类型就是Aggregate。

通过Product方法获取Case类属性值

如Aggregate类:

case class Aggregate(
      groupingExpressions: Seq[Expression],
      aggregateExpressions: Seq[NamedExpression],
      child: LogicalPlan)
    extends UnaryNode { ... }
abstract class UnaryNode extends LogicalPlan { ... }

在TreeNode源码中:

  protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {
    val arr = Array.ofDim[B](productArity) // Aggregate的属性size=3
    var i = 0
    while (i < arr.length) {
      arr(i) = f(productElement(i))  // 遍历Aggregate的每个属性(groupingExpressions,aggregateExpressions,child)
      i += 1
    }
    arr
  }

TreeNode源码mapChildren方法中:

Aggregate类中的child属性会被匹配到(child继承 -> LogicalPlan继承 -> TreeNode )

def mapChildren(f: BaseType => BaseType): BaseType = {
    if (children.nonEmpty) {
      var changed = false
      val newArgs = mapProductIterator {
        case arg: TreeNode[_] if containsChild(arg) =>   // Aggregate类中的child属性会被匹配到(child继承 -> LogicalPlan继承 -> TreeNode )
          val newChild = f(arg.asInstanceOf[BaseType])
          if (!(newChild fastEquals arg)) {
            changed = true
            newChild
          } else {
            arg
          }
        
        ...
        }
    }
}

 

标签:case,TreeNode,BaseType,child,SparkSQL,LogicalPlan,Aggregate,Catalyst
来源: https://www.cnblogs.com/limaosheng/p/16421295.html