Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1AEF711A88 for ; Wed, 25 Jun 2014 10:26:00 +0000 (UTC) Received: (qmail 52915 invoked by uid 500); 25 Jun 2014 10:25:59 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 52897 invoked by uid 500); 25 Jun 2014 10:25:59 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 52888 invoked by uid 99); 25 Jun 2014 10:25:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Jun 2014 10:25:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 25 Jun 2014 10:25:58 +0000 Received: (qmail 50142 invoked by uid 99); 25 Jun 2014 10:25:38 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Jun 2014 10:25:38 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C54BE93A21A; Wed, 25 Jun 2014 10:25:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: uce@apache.org To: commits@flink.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: [FLINK-960] Fix CollectionDataSource bug Date: Wed, 25 Jun 2014 10:25:37 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-flink Updated Branches: refs/heads/master 3d6cc5f48 -> 515ad3c33 [FLINK-960] Fix CollectionDataSource bug This closes #33. Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/515ad3c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/515ad3c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/515ad3c3 Branch: refs/heads/master Commit: 515ad3c3362af54efc4a36fe90d9b353c1da85c8 Parents: 3d6cc5f Author: Till Rohrmann Authored: Fri Jun 20 18:17:42 2014 +0200 Committer: uce Committed: Wed Jun 25 12:24:57 2014 +0200 ---------------------------------------------------------------------- .../record/operators/CollectionDataSource.java | 13 +++--- stratosphere-scala/pom.xml | 6 +++ .../api/scala/CollectionDataSourceTest.scala | 46 ++++++++++++++++++++ 3 files changed, 59 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/515ad3c3/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CollectionDataSource.java ---------------------------------------------------------------------- diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CollectionDataSource.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CollectionDataSource.java index adadea2..f4f85d6 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CollectionDataSource.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CollectionDataSource.java @@ -136,13 +136,14 @@ public class CollectionDataSource extends GenericDataSourceBase) data[0]); f.setData((Collection) data[0]); } - - Collection tmp = new ArrayList(); - for (Object o : data) { - tmp.add(o); + else { + Collection tmp = new ArrayList(); + for (Object o : data) { + tmp.add(o); + } + checkFormat(tmp); + f.setData(tmp); } - checkFormat(tmp); - f.setData(tmp); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/515ad3c3/stratosphere-scala/pom.xml ---------------------------------------------------------------------- diff --git a/stratosphere-scala/pom.xml b/stratosphere-scala/pom.xml index 817a63c..86c1925 100644 --- a/stratosphere-scala/pom.xml +++ b/stratosphere-scala/pom.xml @@ -56,6 +56,12 @@ asm 4.0 + + + org.scalatest + scalatest_2.10 + 2.2.0 + http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/515ad3c3/stratosphere-scala/src/test/scala/eu/stratosphere/api/scala/CollectionDataSourceTest.scala ---------------------------------------------------------------------- diff --git a/stratosphere-scala/src/test/scala/eu/stratosphere/api/scala/CollectionDataSourceTest.scala b/stratosphere-scala/src/test/scala/eu/stratosphere/api/scala/CollectionDataSourceTest.scala new file mode 100644 index 0000000..3c36ed5 --- /dev/null +++ b/stratosphere-scala/src/test/scala/eu/stratosphere/api/scala/CollectionDataSourceTest.scala @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package eu.stratosphere.api.scala + +import eu.stratosphere.api.java.record.operators.{CollectionDataSource => JCollectionDataSource} +import eu.stratosphere.types.{DoubleValue, Record} +import org.scalatest.junit.AssertionsForJUnit +import org.junit.Assert._ +import org.junit.Test + +class CollectionDataSourceTest extends AssertionsForJUnit { + @Test def testScalaCollectionInput() { + val expected = List(1.0, 2.0, 3.0) + val datasource = CollectionDataSource(expected) + + val javaCDS = datasource.contract.asInstanceOf[JCollectionDataSource] + + val inputFormat = javaCDS.getFormatWrapper.getUserCodeObject() + val splits = inputFormat.createInputSplits(1) + inputFormat.open(splits(0)) + + val record = new Record() + var result = List[Double]() + + while(!inputFormat.reachedEnd()){ + inputFormat.nextRecord(record) + assertTrue(record.getNumFields == 1) + val value = record.getField[DoubleValue](0, classOf[DoubleValue]) + result = value.getValue :: result + } + + assertEquals(expected, result.reverse) + } + +}