博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink Table的Set Operations
阅读量:6292 次
发布时间:2019-06-22

本文共 8291 字,大约阅读时间需要 27 分钟。

本文主要研究一下flink Table的Set Operations

实例

Union

Table left = tableEnv.fromDataSet(ds1, "a, b, c");Table right = tableEnv.fromDataSet(ds2, "a, b, c");Table result = left.union(right);复制代码
  • union方法类似sql的union

UnionAll

Table left = tableEnv.fromDataSet(ds1, "a, b, c");Table right = tableEnv.fromDataSet(ds2, "a, b, c");Table result = left.unionAll(right);复制代码
  • unionAll方法类似sql的union all

Intersect

Table left = tableEnv.fromDataSet(ds1, "a, b, c");Table right = tableEnv.fromDataSet(ds2, "d, e, f");Table result = left.intersect(right);复制代码
  • intersect方法类似sql的intersect

IntersectAll

Table left = tableEnv.fromDataSet(ds1, "a, b, c");Table right = tableEnv.fromDataSet(ds2, "d, e, f");Table result = left.intersectAll(right);复制代码
  • intersectAll方法类似sql的intersect all

Minus

Table left = tableEnv.fromDataSet(ds1, "a, b, c");Table right = tableEnv.fromDataSet(ds2, "a, b, c");Table result = left.minus(right);复制代码
  • minus方法类似sql的except

MinusAll

Table left = tableEnv.fromDataSet(ds1, "a, b, c");Table right = tableEnv.fromDataSet(ds2, "a, b, c");Table result = left.minusAll(right);复制代码
  • minusAll方法类似sql的except all

In

Table left = ds1.toTable(tableEnv, "a, b, c");Table right = ds2.toTable(tableEnv, "a");// using implicit registrationTable result = left.select("a, b, c").where("a.in(" + right + ")");// using explicit registrationtableEnv.registerTable("RightTable", right);Table result = left.select("a, b, c").where("a.in(RightTable)");复制代码
  • in方法类似sql的in

Table

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class Table(    private[flink] val tableEnv: TableEnvironment,    private[flink] val logicalPlan: LogicalNode) {  //......  def union(right: Table): Table = {    // check that right table belongs to the same TableEnvironment    if (right.tableEnv != this.tableEnv) {      throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")    }    new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))  }  def unionAll(right: Table): Table = {    // check that right table belongs to the same TableEnvironment    if (right.tableEnv != this.tableEnv) {      throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")    }    new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))  }  def intersect(right: Table): Table = {    // check that right table belongs to the same TableEnvironment    if (right.tableEnv != this.tableEnv) {      throw new ValidationException(        "Only tables from the same TableEnvironment can be intersected.")    }    new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))  }  def intersectAll(right: Table): Table = {    // check that right table belongs to the same TableEnvironment    if (right.tableEnv != this.tableEnv) {      throw new ValidationException(        "Only tables from the same TableEnvironment can be intersected.")    }    new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))  }  def minus(right: Table): Table = {    // check that right table belongs to the same TableEnvironment    if (right.tableEnv != this.tableEnv) {      throw new ValidationException("Only tables from the same TableEnvironment can be " +        "subtracted.")    }    new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = false)      .validate(tableEnv))  }  def minusAll(right: Table): Table = {    // check that right table belongs to the same TableEnvironment    if (right.tableEnv != this.tableEnv) {      throw new ValidationException("Only tables from the same TableEnvironment can be " +        "subtracted.")    }    new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = true)      .validate(tableEnv))  }  //......}复制代码
  • union及unionAll使用的是Union,intersect及intersectAll使用的是Intersect,minus及minusAll使用的是Minus

Union

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {  override def output: Seq[Attribute] = left.output  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {    left.construct(relBuilder)    right.construct(relBuilder)    relBuilder.union(all)  }  override def validate(tableEnv: TableEnvironment): LogicalNode = {    if (tableEnv.isInstanceOf[StreamTableEnvironment] && !all) {      failValidation(s"Union on stream tables is currently not supported.")    }    val resolvedUnion = super.validate(tableEnv).asInstanceOf[Union]    if (left.output.length != right.output.length) {      failValidation(s"Union two tables of different column sizes:" +        s" ${left.output.size} and ${right.output.size}")    }    val sameSchema = left.output.zip(right.output).forall { case (l, r) =>      l.resultType == r.resultType    }    if (!sameSchema) {      failValidation(s"Union two tables of different schema:" +        s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +        s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")    }    resolvedUnion  }}复制代码
  • Union继承了BinaryNode,其construct方法通过relBuilder.union来构建union操作

Intersect

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Intersect(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {  override def output: Seq[Attribute] = left.output  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {    left.construct(relBuilder)    right.construct(relBuilder)    relBuilder.intersect(all)  }  override def validate(tableEnv: TableEnvironment): LogicalNode = {    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {      failValidation(s"Intersect on stream tables is currently not supported.")    }    val resolvedIntersect = super.validate(tableEnv).asInstanceOf[Intersect]    if (left.output.length != right.output.length) {      failValidation(s"Intersect two tables of different column sizes:" +        s" ${left.output.size} and ${right.output.size}")    }    // allow different column names between tables    val sameSchema = left.output.zip(right.output).forall { case (l, r) =>      l.resultType == r.resultType    }    if (!sameSchema) {      failValidation(s"Intersect two tables of different schema:" +        s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +        s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")    }    resolvedIntersect  }}复制代码
  • Intersect继承了BinaryNode,其construct方法通过relBuilder.intersect来构建intersect操作

Minus

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Minus(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {  override def output: Seq[Attribute] = left.output  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {    left.construct(relBuilder)    right.construct(relBuilder)    relBuilder.minus(all)  }  override def validate(tableEnv: TableEnvironment): LogicalNode = {    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {      failValidation(s"Minus on stream tables is currently not supported.")    }    val resolvedMinus = super.validate(tableEnv).asInstanceOf[Minus]    if (left.output.length != right.output.length) {      failValidation(s"Minus two table of different column sizes:" +        s" ${left.output.size} and ${right.output.size}")    }    val sameSchema = left.output.zip(right.output).forall { case (l, r) =>      l.resultType == r.resultType    }    if (!sameSchema) {      failValidation(s"Minus two table of different schema:" +        s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +        s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")    }    resolvedMinus  }}复制代码
  • Minus继承了BinaryNode,其construct方法通过relBuilder.minus来构建minus操作

小结

  • Table对Set提供了union、unionAll、intersect、intersectAll、minus、minusAll、in(in在where子句中)操作
  • union及unionAll使用的是Union,intersect及intersectAll使用的是Intersect,minus及minusAll使用的是Minus
  • Union继承了BinaryNode,其construct方法通过relBuilder.union来构建union操作;Intersect继承了BinaryNode,其construct方法通过relBuilder.intersect来构建intersect操作;Minus继承了BinaryNode,其construct方法通过relBuilder.minus来构建minus操作

doc

转载地址:http://hmkta.baihongyu.com/

你可能感兴趣的文章
常用链接
查看>>
pitfall override private method
查看>>
!important 和 * ----hack
查看>>
聊天界面图文混排
查看>>
控件的拖动
查看>>
svn eclipse unable to load default svn client的解决办法
查看>>
Android.mk 文件语法详解
查看>>
QT liunx 工具下载
查看>>
内核源码树
查看>>
Java 5 特性 Instrumentation 实践
查看>>
AppScan使用
查看>>
Java NIO框架Netty教程(三) 字符串消息收发(转)
查看>>
Ucenter 会员同步登录通讯原理
查看>>
php--------获取当前时间、时间戳
查看>>
Spring MVC中文文档翻译发布
查看>>
docker centos环境部署tomcat
查看>>
JavaScript 基础(九): 条件 语句
查看>>
Linux系统固定IP配置
查看>>
配置Quartz
查看>>
Linux 线程实现机制分析
查看>>