Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 87C7018741 for ; Mon, 15 Feb 2016 13:18:20 +0000 (UTC) Received: (qmail 17938 invoked by uid 500); 15 Feb 2016 13:18:20 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 17898 invoked by uid 500); 15 Feb 2016 13:18:20 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 17883 invoked by uid 99); 15 Feb 2016 13:18:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Feb 2016 13:18:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 57057E0A3C; Mon, 15 Feb 2016 13:18:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vasia@apache.org To: commits@flink.apache.org Date: Mon, 15 Feb 2016 13:18:21 -0000 Message-Id: In-Reply-To: <5bdd5176a5c74bc087243a1c7646d059@git.apache.org> References: <5bdd5176a5c74bc087243a1c7646d059@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] flink git commit: [FLINK-3226] Translate logical joins to physical [FLINK-3226] Translate logical joins to physical This closes #1632 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abbedc32 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abbedc32 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abbedc32 Branch: refs/heads/tableOnCalcite Commit: abbedc324e67b6701546239b68203f0d89a7e526 Parents: e742826 Author: vasia Authored: Thu Feb 11 18:04:45 2016 +0100 Committer: vasia Committed: Mon Feb 15 12:44:10 2016 +0100 ---------------------------------------------------------------------- .../flink/api/table/codegen/CodeGenerator.scala | 17 +++- .../table/plan/nodes/dataset/DataSetJoin.scala | 26 +++++- .../plan/rules/dataset/DataSetJoinRule.scala | 98 +++++++++++++++++--- .../api/table/runtime/FlatJoinRunner.scala | 51 ++++++++++ .../flink/api/java/table/test/JoinITCase.java | 35 +++++-- .../flink/api/scala/table/test/JoinITCase.scala | 30 ++++-- 6 files changed, 221 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/abbedc32/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala index a4ae4b1..d6a8aaa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala @@ -32,9 +32,9 @@ import org.apache.flink.api.table.codegen.Indenter.toISC import org.apache.flink.api.table.codegen.OperatorCodeGen._ import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo import org.apache.flink.api.table.typeinfo.RowTypeInfo - import scala.collection.JavaConversions._ import scala.collection.mutable +import org.apache.flink.api.common.functions.FlatJoinFunction /** * A code generator for generating Flink [[org.apache.flink.api.common.functions.Function]]s. @@ -148,16 +148,25 @@ class CodeGenerator( if (clazz == classOf[FlatMapFunction[_,_]]) { val inputTypeTerm = boxedTypeTermForTypeInfo(input1) (s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)", - s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;") + List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) } // MapFunction else if (clazz == classOf[MapFunction[_,_]]) { val inputTypeTerm = boxedTypeTermForTypeInfo(input1) ("Object map(Object _in1)", - s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;") + List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) } + // FlatJoinFunction + else if (clazz == classOf[FlatJoinFunction[_,_,_]]) { + val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1) + val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse( + throw new CodeGenException("Input 2 for FlatJoinFunction should not be null"))) + (s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)", + List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;", + s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;")) + } else { // TODO more functions throw new CodeGenException("Unsupported Function.") @@ -175,7 +184,7 @@ class CodeGenerator( @Override public ${samHeader._1} { - ${samHeader._2} + ${samHeader._2.mkString("\n")} ${reuseInputUnboxingCode()} $bodyCode } http://git-wip-us.apache.org/repos/asf/flink/blob/abbedc32/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala index de436be..c32853d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala @@ -27,6 +27,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.api.table.{TableConfig, Row} +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.table.plan.TypeConverter._ +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.table.typeinfo.RowTypeInfo +import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConversions._ +import org.apache.flink.api.table.plan.TypeConverter /** * Flink RelNode which matches along with JoinOperator and its related operations. @@ -42,7 +50,8 @@ class DataSetJoin( joinKeysRight: Array[Int], joinType: JoinType, joinHint: JoinHint, - func: JoinFunction[Row, Row, Row]) + func: (TableConfig, TypeInformation[Any], TypeInformation[Any], TypeInformation[Any]) => + FlatJoinFunction[Any, Any, Any]) extends BiRel(cluster, traitSet, left, right) with DataSetRel { @@ -71,6 +80,19 @@ class DataSetJoin( override def translateToPlan( config: TableConfig, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - ??? + + val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(config) + val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(config) + + val returnType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + + val joinFun = func.apply(config, leftDataSet.getType, rightDataSet.getType, returnType) + leftDataSet.join(rightDataSet).where(joinKeysLeft: _*).equalTo(joinKeysRight: _*) + .`with`(joinFun).asInstanceOf[DataSet[Any]] } + } http://git-wip-us.apache.org/repos/asf/flink/blob/abbedc32/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala index 3d2117d..4bb80ca 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala @@ -24,6 +24,16 @@ import org.apache.calcite.rel.convert.ConverterRule import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetJoin} import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, FlinkConvention} +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer +import org.apache.flink.api.table.plan.TypeConverter._ +import org.apache.flink.api.table.runtime.FlatJoinRunner +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.calcite.rel.core.JoinInfo +import org.apache.flink.api.table.TableException class DataSetJoinRule extends ConverterRule( @@ -39,18 +49,82 @@ class DataSetJoinRule val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE) - new DataSetJoin( - rel.getCluster, - traitSet, - convLeft, - convRight, - rel.getRowType, - join.toString, - Array[Int](), - Array[Int](), - JoinType.INNER, - null, - null) + // get the equality keys + val joinInfo = join.analyzeCondition + val keyPairs = joinInfo.pairs + + if (keyPairs.isEmpty) { // if no equality keys => not supported + throw new TableException("Joins should have at least one equality condition") + } + else { // at least one equality expression => generate a join function + val conditionType = join.getCondition.getType + val func = getJoinFunction(join, joinInfo) + val leftKeys = ArrayBuffer.empty[Int] + val rightKeys = ArrayBuffer.empty[Int] + + keyPairs.foreach(pair => { + leftKeys.add(pair.source) + rightKeys.add(pair.target)} + ) + + new DataSetJoin( + rel.getCluster, + traitSet, + convLeft, + convRight, + rel.getRowType, + join.toString, + leftKeys.toArray, + rightKeys.toArray, + JoinType.INNER, + null, + func) + } + } + + def getJoinFunction(join: FlinkJoin, joinInfo: JoinInfo): + ((TableConfig, TypeInformation[Any], TypeInformation[Any], TypeInformation[Any]) => + FlatJoinFunction[Any, Any, Any]) = { + + val func = ( + config: TableConfig, + leftInputType: TypeInformation[Any], + rightInputType: TypeInformation[Any], + returnType: TypeInformation[Any]) => { + + val generator = new CodeGenerator(config, leftInputType, Some(rightInputType)) + val conversion = generator.generateConverterResultExpression(returnType) + var body = "" + + if (joinInfo.isEqui) { + // only equality condition + body = s""" + |${conversion.code} + |${generator.collectorTerm}.collect(${conversion.resultTerm}); + |""".stripMargin + } + else { + val condition = generator.generateExpression(join.getCondition) + body = s""" + |${condition.code} + |if (${condition.resultTerm}) { + | ${conversion.code} + | ${generator.collectorTerm}.collect(${conversion.resultTerm}); + |} + |""".stripMargin + } + val genFunction = generator.generateFunction( + description, + classOf[FlatJoinFunction[Any, Any, Any]], + body, + returnType) + + new FlatJoinRunner[Any, Any, Any]( + genFunction.name, + genFunction.code, + genFunction.returnType) + } + func } } http://git-wip-us.apache.org/repos/asf/flink/blob/abbedc32/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala new file mode 100644 index 0000000..6e7d099 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class FlatJoinRunner[IN1, IN2, OUT]( + name: String, + code: String, + @transient returnType: TypeInformation[OUT]) + extends RichFlatJoinFunction[IN1, IN2, OUT] + with ResultTypeQueryable[OUT] + with FunctionCompiler[FlatJoinFunction[IN1, IN2, OUT]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: FlatJoinFunction[IN1, IN2, OUT] = null + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling FlatJoinFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating FlatJoinFunction.") + function = clazz.newInstance() + } + + override def join(first: IN1, second: IN2, out: Collector[OUT]): Unit = + function.join(first, second, out) + + override def getProducedType: TypeInformation[OUT] = returnType +} http://git-wip-us.apache.org/repos/asf/flink/blob/abbedc32/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java index 95213ee..7b8b6ec 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.table.test; import org.apache.flink.api.table.Row; import org.apache.flink.api.table.Table; +import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.TableEnvironment; @@ -27,11 +28,9 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import scala.NotImplementedError; import java.util.List; @@ -43,7 +42,7 @@ public class JoinITCase extends MultipleProgramsTestBase { super(mode); } - @Test(expected = NotImplementedError.class) + @Test public void testJoin() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -62,7 +61,7 @@ public class JoinITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test public void testJoinWithFilter() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -81,7 +80,27 @@ public class JoinITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test + public void testJoinWithJoinFilter() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet> ds1 = CollectionDataSets.get3TupleDataSet(env); + DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); + + Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); + Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); + + Table result = in1.join(in2).where("b === e && a < 6 && h < b").select("c, g"); + + DataSet ds = tableEnv.toDataSet(result, Row.class); + List results = ds.collect(); + String expected = "Hello world, how are you?,Hallo Welt wie\n" + + "I am fine.,Hallo Welt wie\n"; + compareResultAsText(results, expected); + } + + @Test public void testJoinWithMultipleKeys() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -120,9 +139,7 @@ public class JoinITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - // Calcite does not eagerly check the compatibility of compared types - @Ignore - @Test(expected = IllegalArgumentException.class) + @Test(expected = InvalidProgramException.class) public void testJoinWithNonMatchingKeyTypes() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -162,7 +179,7 @@ public class JoinITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test public void testJoinWithAggregation() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/abbedc32/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala index 628613e..5302f30 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala @@ -27,13 +27,14 @@ import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized - import scala.collection.JavaConverters._ +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.table.TableException @RunWith(classOf[Parameterized]) class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - @Test(expected = classOf[NotImplementedError]) + @Test def testJoin(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) @@ -46,7 +47,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test def testJoinWithFilter(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) @@ -59,7 +60,21 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test + def testJoinWithJoinFilter(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) + + val joinT = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g) + + val expected = "Hello world, how are you?,Hallo Welt wie\n" + + "I am fine.,Hallo Welt wie\n" + val results = joinT.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test def testJoinWithMultipleKeys(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) @@ -86,9 +101,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) TestBaseUtils.compareResultAsText(results.asJava, expected) } - // Calcite does not eagerly check the compatibility of compared types - @Ignore - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[InvalidProgramException]) def testJoinWithNonMatchingKeyTypes(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) @@ -114,7 +127,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test def testJoinWithAggregation(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) @@ -127,5 +140,4 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) TestBaseUtils.compareResultAsText(results.asJava, expected) } - }