flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject git commit: [FLINK-960] Fix CollectionDataSource bug
Date Wed, 25 Jun 2014 10:25:37 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 3d6cc5f48 -> 515ad3c33


[FLINK-960] Fix CollectionDataSource bug

This closes #33.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/515ad3c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/515ad3c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/515ad3c3

Branch: refs/heads/master
Commit: 515ad3c3362af54efc4a36fe90d9b353c1da85c8
Parents: 3d6cc5f
Author: Till Rohrmann <till.rohrmann@gmail.com>
Authored: Fri Jun 20 18:17:42 2014 +0200
Committer: uce <u.celebi@fu-berlin.de>
Committed: Wed Jun 25 12:24:57 2014 +0200

----------------------------------------------------------------------
 .../record/operators/CollectionDataSource.java  | 13 +++---
 stratosphere-scala/pom.xml                      |  6 +++
 .../api/scala/CollectionDataSourceTest.scala    | 46 ++++++++++++++++++++
 3 files changed, 59 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/515ad3c3/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CollectionDataSource.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CollectionDataSource.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CollectionDataSource.java
index adadea2..f4f85d6 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CollectionDataSource.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CollectionDataSource.java
@@ -136,13 +136,14 @@ public class CollectionDataSource extends GenericDataSourceBase<Record,
GenericI
 			checkFormat((Collection<Object>) data[0]);
 			f.setData((Collection<Object>) data[0]);
 		}
-
-		Collection<Object> tmp = new ArrayList<Object>();
-		for (Object o : data) {
-			tmp.add(o);
+		else {
+			Collection<Object> tmp = new ArrayList<Object>();
+			for (Object o : data) {
+				tmp.add(o);
+			}
+			checkFormat(tmp);
+			f.setData(tmp);
 		}
-		checkFormat(tmp);
-		f.setData(tmp);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/515ad3c3/stratosphere-scala/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-scala/pom.xml b/stratosphere-scala/pom.xml
index 817a63c..86c1925 100644
--- a/stratosphere-scala/pom.xml
+++ b/stratosphere-scala/pom.xml
@@ -56,6 +56,12 @@
 			<artifactId>asm</artifactId>
 			<version>4.0</version>
 		</dependency>
+
+		<dependency>
+			<groupId>org.scalatest</groupId>
+			<artifactId>scalatest_2.10</artifactId>
+			<version>2.2.0</version>
+		</dependency>
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/515ad3c3/stratosphere-scala/src/test/scala/eu/stratosphere/api/scala/CollectionDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/stratosphere-scala/src/test/scala/eu/stratosphere/api/scala/CollectionDataSourceTest.scala
b/stratosphere-scala/src/test/scala/eu/stratosphere/api/scala/CollectionDataSourceTest.scala
new file mode 100644
index 0000000..3c36ed5
--- /dev/null
+++ b/stratosphere-scala/src/test/scala/eu/stratosphere/api/scala/CollectionDataSourceTest.scala
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package eu.stratosphere.api.scala
+
+import eu.stratosphere.api.java.record.operators.{CollectionDataSource => JCollectionDataSource}
+import eu.stratosphere.types.{DoubleValue, Record}
+import org.scalatest.junit.AssertionsForJUnit
+import org.junit.Assert._
+import org.junit.Test
+
+class CollectionDataSourceTest extends AssertionsForJUnit {
+  @Test def testScalaCollectionInput() {
+    val expected = List(1.0, 2.0, 3.0)
+    val datasource = CollectionDataSource(expected)
+
+    val javaCDS = datasource.contract.asInstanceOf[JCollectionDataSource]
+
+    val inputFormat = javaCDS.getFormatWrapper.getUserCodeObject()
+    val splits = inputFormat.createInputSplits(1)
+    inputFormat.open(splits(0))
+
+    val record = new Record()
+    var result = List[Double]()
+
+    while(!inputFormat.reachedEnd()){
+      inputFormat.nextRecord(record)
+      assertTrue(record.getNumFields == 1)
+      val value = record.getField[DoubleValue](0, classOf[DoubleValue])
+      result = value.getValue :: result
+    }
+
+    assertEquals(expected, result.reverse)
+  }
+
+}


Mime
View raw message