Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E472811D52 for ; Thu, 14 Aug 2014 14:23:39 +0000 (UTC) Received: (qmail 95619 invoked by uid 500); 14 Aug 2014 14:23:38 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 95598 invoked by uid 500); 14 Aug 2014 14:23:38 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 95588 invoked by uid 99); 14 Aug 2014 14:23:38 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Aug 2014 14:23:38 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 14 Aug 2014 14:23:14 +0000 Received: (qmail 92336 invoked by uid 99); 14 Aug 2014 14:23:11 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Aug 2014 14:23:11 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id BD5959BEFA5; Thu, 14 Aug 2014 14:23:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.incubator.apache.org Message-Id: <2fc1f8fdc6b04893a014d997fedc8e58@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: Add Sanity Checks for Join/CoGroup with SolutionSet in Record API Date: Thu, 14 Aug 2014 14:23:10 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-flink Updated Branches: refs/heads/master c693f04f4 -> e696ff77d Add Sanity Checks for Join/CoGroup with SolutionSet in Record API We now check whether the key fields specified for the delta iterations match those specified when joining/coGrouping with the solution set. This code is more or less copied from the new Java API. The Scala API uses the Record API underneath so it is also covered. Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e696ff77 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e696ff77 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e696ff77 Branch: refs/heads/master Commit: e696ff77de190ad8e5d69ceaa513951bf810a939 Parents: c693f04 Author: Aljoscha Krettek Authored: Thu Aug 14 14:20:23 2014 +0200 Committer: Aljoscha Krettek Committed: Thu Aug 14 15:57:03 2014 +0200 ---------------------------------------------------------------------- .../operators/base/DeltaIterationBase.java | 2 +- .../java/record/operators/CoGroupOperator.java | 10 + .../java/record/operators/DeltaIteration.java | 10 + .../api/java/record/operators/JoinOperator.java | 10 + .../scala/DeltaIterationSanityCheckTest.scala | 192 +++++++++++++++++++ 5 files changed, 223 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e696ff77/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java index 8e955b1..a7dab4f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java @@ -282,7 +282,7 @@ public class DeltaIterationBase extends DualInputOperator extends Operator { - private final DeltaIterationBase containingIteration; + protected final DeltaIterationBase containingIteration; public SolutionSetPlaceHolder(DeltaIterationBase container, OperatorInformation operatorInfo) { super(operatorInfo, "Solution Set Place Holder"); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e696ff77/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java index c958b84..0dc4acc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java @@ -98,6 +98,16 @@ public class CoGroupOperator extends CoGroupOperatorBasesolution set, can be obtained @@ -78,5 +81,12 @@ public class DeltaIteration extends DeltaIterationBase { public SolutionSetPlaceHolder(DeltaIterationBase container) { super(container, new OperatorInformation(new RecordTypeInfo())); } + + public void checkJoinKeyFields(int[] keyFields) { + int[] ssKeys = containingIteration.getSolutionSetKeyFields(); + if (!Arrays.equals(ssKeys, keyFields)) { + throw new InvalidProgramException("The solution can only be joined/co-grouped with the same keys as the elements are identified with (here: " + Arrays.toString(ssKeys) + ")."); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e696ff77/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java index 0ddb44a..f70de88 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java @@ -88,6 +88,16 @@ public class JoinOperator extends JoinOperatorBase l } + (result, ws) + } + val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10) + + val output = iteration.write("/dummy", CsvOutputFormat()) + + val plan = new ScalaPlan(Seq(output)) + } + + @Test + def testCorrectJoinWithSolution2 { + val solutionInput = CollectionDataSource(Array((1, "1"))) + val worksetInput = CollectionDataSource(Array((2, "2"))) + + def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = { + val result = ws join s where {_._1} isEqualTo {_._1} map { (l, r) => l } + (result, ws) + } + val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10) + + val output = iteration.write("/dummy", CsvOutputFormat()) + + val plan = new ScalaPlan(Seq(output)) + } + + @Test(expected = classOf[InvalidProgramException]) + def testIncorrectJoinWithSolution1 { + val solutionInput = CollectionDataSource(Array((1, "1"))) + val worksetInput = CollectionDataSource(Array((2, "2"))) + + def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = { + val result = s join ws where {_._2} isEqualTo {_._2} map { (l, r) => l } + (result, ws) + } + val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10) + + val output = iteration.write("/dummy", CsvOutputFormat()) + + val plan = new ScalaPlan(Seq(output)) + } + + @Test(expected = classOf[InvalidProgramException]) + def testIncorrectJoinWithSolution2 { + val solutionInput = CollectionDataSource(Array((1, "1"))) + val worksetInput = CollectionDataSource(Array((2, "2"))) + + def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = { + val result = ws join s where {_._2} isEqualTo {_._2} map { (l, r) => l } + (result, ws) + } + val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10) + + val output = iteration.write("/dummy", CsvOutputFormat()) + + val plan = new ScalaPlan(Seq(output)) + } + + @Test(expected = classOf[InvalidProgramException]) + def testIncorrectJoinWithSolution3 { + val solutionInput = CollectionDataSource(Array((1, "1"))) + val worksetInput = CollectionDataSource(Array((2, "2"))) + + def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = { + val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l } + (result, ws) + } + val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10) + + val output = iteration.write("/dummy", CsvOutputFormat()) + + val plan = new ScalaPlan(Seq(output)) + } + + @Test + def testCorrectCoGroupWithSolution1 { + val solutionInput = CollectionDataSource(Array((1, "1"))) + val worksetInput = CollectionDataSource(Array((2, "2"))) + + def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = { + val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() } + (result, ws) + } + val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10) + + val output = iteration.write("/dummy", CsvOutputFormat()) + + val plan = new ScalaPlan(Seq(output)) + } + + @Test + def testCorrectCoGroupWithSolution2 { + val solutionInput = CollectionDataSource(Array((1, "1"))) + val worksetInput = CollectionDataSource(Array((2, "2"))) + + def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = { + val result = ws cogroup s where {_._1} isEqualTo {_._1} map { (l, r) => l.next() } + (result, ws) + } + val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10) + + val output = iteration.write("/dummy", CsvOutputFormat()) + + val plan = new ScalaPlan(Seq(output)) + } + + @Test(expected = classOf[InvalidProgramException]) + def testIncorrectCoGroupWithSolution1 { + val solutionInput = CollectionDataSource(Array((1, "1"))) + val worksetInput = CollectionDataSource(Array((2, "2"))) + + def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = { + val result = s cogroup ws where {_._2} isEqualTo {_._2} map { (l, r) => l.next() } + (result, ws) + } + val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10) + + val output = iteration.write("/dummy", CsvOutputFormat()) + + val plan = new ScalaPlan(Seq(output)) + } + + @Test(expected = classOf[InvalidProgramException]) + def testIncorrectCoGroupWithSolution2 { + val solutionInput = CollectionDataSource(Array((1, "1"))) + val worksetInput = CollectionDataSource(Array((2, "2"))) + + def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = { + val result = ws cogroup s where {_._2} isEqualTo {_._2} map { (l, r) => l.next() } + (result, ws) + } + val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10) + + val output = iteration.write("/dummy", CsvOutputFormat()) + + val plan = new ScalaPlan(Seq(output)) + } + + @Test(expected = classOf[InvalidProgramException]) + def testIncorrectCoGroupWithSolution3 { + val solutionInput = CollectionDataSource(Array((1, "1"))) + val worksetInput = CollectionDataSource(Array((2, "2"))) + + def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = { + val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() } + (result, ws) + } + val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10) + + val output = iteration.write("/dummy", CsvOutputFormat()) + + val plan = new ScalaPlan(Seq(output)) + } +}