flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] incubator-flink git commit: [FLINK-933] Add primitive input format to read a sequence of primitives
Date Wed, 19 Nov 2014 20:19:42 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master ae505adb8 -> 3ac4df81e


[FLINK-933] Add primitive input format to read a sequence of primitives

This closes #47


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d640b6c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d640b6c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d640b6c5

Branch: refs/heads/master
Commit: d640b6c5a49b3f3cae7c7a62a2a5e87bc4c83fe8
Parents: ae505ad
Author: mingliang <qmlmoon@gmail.com>
Authored: Wed Jun 25 18:54:36 2014 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Nov 19 18:01:06 2014 +0100

----------------------------------------------------------------------
 .../flink/api/java/ExecutionEnvironment.java    |  36 +++-
 .../flink/api/java/io/PrimitiveInputFormat.java |  79 +++++++++
 .../api/java/io/PrimitiveInputFormatTest.java   | 167 +++++++++++++++++++
 .../flink/api/scala/ExecutionEnvironment.scala  |  22 +++
 4 files changed, 303 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d640b6c5/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 2f9f661..5c85408 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -41,6 +41,7 @@ import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.api.java.io.CsvReader;
 import org.apache.flink.api.java.io.IteratorInputFormat;
 import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
+import org.apache.flink.api.java.io.PrimitiveInputFormat;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.io.TextValueInputFormat;
 import org.apache.flink.api.java.operators.DataSink;
@@ -267,7 +268,40 @@ public abstract class ExecutionEnvironment {
 		format.setSkipInvalidLines(skipInvalidLines);
 		return new DataSource<StringValue>(this, format, new ValueTypeInfo<StringValue>(StringValue.class),
Utils.getCallLocationName());
 	}
-	
+
+	// ----------------------------------- Primitive Input Format ---------------------------------------
+
+	/**
+	 * Creates a DataSet that represents the primitive type produced by reading the given file
line wise.
+	 * This method is similar to {@link #readCsvFile(String)} with single field, but it produces
a DataSet not through
+	 * {@link org.apache.flink.api.java.tuple.Tuple1}.
+	 *
+	 * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+	 * @param typeClass The primitive type class to be read.
+	 * @return A DataSet that represents the data read from the given file as primitive type.
+	 */
+	public <X> DataSource<X> readFileOfPrimitives(String filePath, Class<X>
typeClass) {
+		Validate.notNull(filePath, "The file path may not be null.");
+
+		return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath),
typeClass), TypeExtractor.getForClass(typeClass));
+	}
+
+	/**
+	 * Creates a DataSet that represents the primitive type produced by reading the given file
in delimited way.
+	 * This method is similar to {@link #readCsvFile(String)} with single field, but it produces
a DataSet not through
+	 * {@link org.apache.flink.api.java.tuple.Tuple1}.
+	 *
+	 * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+	 * @param delimiter The delimiter of the given file.
+	 * @param typeClass The primitive type class to be read.
+	 * @return A DataSet that represents the data read from the given file as primitive type.
+	 */
+	public <X> DataSource<X> readFileOfPrimitives(String filePath, String delimiter,
Class<X> typeClass) {
+		Validate.notNull(filePath, "The file path may not be null.");
+
+		return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath),
delimiter, typeClass), TypeExtractor.getForClass(typeClass));
+	}
+
 	// ----------------------------------- CSV Input Format ---------------------------------------
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d640b6c5/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
new file mode 100644
index 0000000..b3648e9
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * An input format that reads single field primitive data from a given file. The difference
between this and
+ * {@link org.apache.flink.api.java.io.CsvInputFormat} is that it won't go through {@link
org.apache.flink.api.java.tuple.Tuple1}.
+ */
+public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
+
+	private static final long serialVersionUID = 1L;
+
+	private Class<OT> primitiveClass;
+
+	private static final byte CARRIAGE_RETURN = (byte) '\r';
+
+	private static final byte NEW_LINE = (byte) '\n';
+
+	private transient FieldParser<OT> parser;
+
+
+	public PrimitiveInputFormat(Path filePath, Class<OT> primitiveClass) {
+		super(filePath);
+		this.primitiveClass = primitiveClass;
+	}
+
+	public PrimitiveInputFormat(Path filePath, String delimiter, Class<OT> primitiveClass)
{
+		super(filePath);
+		this.primitiveClass = primitiveClass;
+		this.setDelimiter(delimiter);
+	}
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+		Class<? extends FieldParser<OT>> parserType = FieldParser.getParserForType(primitiveClass);
+		if (parserType == null) {
+			throw new IllegalArgumentException("The type '" + primitiveClass.getName() + "' is not
supported for the primitive input format.");
+		}
+		parser = InstantiationUtil.instantiate(parserType, FieldParser.class);
+	}
+
+	@Override
+	public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) {
+		//Check if \n is used as delimiter and the end of this line is a \r, then remove \r from
the line
+		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == NEW_LINE
+			&& offset+numBytes >= 1 && bytes[offset+numBytes-1] == CARRIAGE_RETURN){
+			numBytes -= 1;
+		}
+
+		//Null character as delimiter is used because there's only 1 field to be parsed
+		parser.parseField(bytes, offset, numBytes + offset, '\0', reuse);
+		return parser.getLastResult();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d640b6c5/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
new file mode 100644
index 0000000..c7b8627
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+
+public class PrimitiveInputFormatTest {
+
+	private static final Path PATH = new Path("an/ignored/file/");
+
+
+	@Test
+	public void testStringInput() {
+		try {
+			final String fileContent = "abc||def||||";
+			final FileInputSplit split = createInputSplit(fileContent);
+
+			final PrimitiveInputFormat<String> format = new PrimitiveInputFormat<String>(PATH,
"||", String.class);
+
+			final Configuration parameters = new Configuration();
+			format.configure(parameters);
+			format.open(split);
+
+			String result = null;
+
+			result = format.nextRecord(result);
+			assertEquals("abc", result);
+
+			result = format.nextRecord(result);
+			assertEquals("def", result);
+
+			result = format.nextRecord(result);
+			assertEquals("", result);
+
+			result = format.nextRecord(result);
+			assertNull(result);
+			assertTrue(format.reachedEnd());
+		}
+		catch (Exception ex) {
+			ex.printStackTrace();
+			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+		}
+	}
+
+
+
+	@Test
+	public void testIntegerInput() throws IOException {
+		try {
+			final String fileContent = "111|222|";
+			final FileInputSplit split = createInputSplit(fileContent);
+
+			final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH,"|",
Integer.class);
+
+			format.configure(new Configuration());
+			format.open(split);
+
+			Integer result = null;
+			result = format.nextRecord(result);
+			assertEquals(Integer.valueOf(111), result);
+
+			result = format.nextRecord(result);
+			assertEquals(Integer.valueOf(222), result);
+
+			result = format.nextRecord(result);
+			assertNull(result);
+			assertTrue(format.reachedEnd());
+		}
+		catch (Exception ex) {
+			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+		}
+	}
+
+	@Test
+	public void testDoubleInputLinewise() throws IOException {
+		try {
+			final String fileContent = "1.21\n2.23\n";
+			final FileInputSplit split = createInputSplit(fileContent);
+
+			final PrimitiveInputFormat<Double> format = new PrimitiveInputFormat<Double>(PATH,
Double.class);
+
+			format.configure(new Configuration());
+			format.open(split);
+
+			Double result = null;
+			result = format.nextRecord(result);
+			assertEquals(Double.valueOf(1.21), result);
+
+			result = format.nextRecord(result);
+			assertEquals(Double.valueOf(2.23), result);
+
+			result = format.nextRecord(result);
+			assertNull(result);
+			assertTrue(format.reachedEnd());
+		}
+		catch (Exception ex) {
+			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+		}
+	}
+
+	@Test
+	public void testRemovingTrailingCR() {
+		try {
+			String first = "First line";
+			String second = "Second line";
+			String fileContent = first + "\r\n" + second + "\r\n";
+			final FileInputSplit split = createInputSplit(fileContent);
+
+			final PrimitiveInputFormat<String> format = new PrimitiveInputFormat<String>(PATH
,String.class);
+
+			format.configure(new Configuration());
+			format.open(split);
+
+			String result = null;
+
+			result = format.nextRecord(result);
+			assertEquals(first, result);
+
+			result = format.nextRecord(result);
+			assertEquals(second, result);
+		}
+		catch (Exception ex) {
+			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+		}
+	}
+
+	private FileInputSplit createInputSplit(String content) throws IOException {
+		File tempFile = File.createTempFile("test_contents", "tmp");
+		tempFile.deleteOnExit();
+
+		FileWriter wrt = new FileWriter(tempFile);
+		wrt.write(content);
+		wrt.close();
+
+		return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(),
new String[] {"localhost"});
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d640b6c5/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index ee92fd6..cf556c6 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -191,6 +191,28 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
+   * Creates a DataSet that represents the primitive type produced by reading the
+   * given file in delimited way.This method is similar to [[readCsvFile]] with
+   * single field, but it produces a DataSet not through Tuple.
+   * The type parameter must be used to specify the primitive type.
+   *
+   * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
+   *                 "hdfs://host:port/file/path").
+   * @param delimiter The string that separates primitives , defaults to newline.
+   */
+  def readFileOfPrimitives[T : ClassTag : TypeInformation](
+      filePath : String,
+      delimiter : String = "\n") : DataSet[T] = {
+    Validate.notNull(filePath, "File path must not be null.")
+    val typeInfo = implicitly[TypeInformation[T]]
+    val datasource = new DataSource[T](
+      javaEnv,
+      new PrimitiveInputFormat(new Path(filePath), delimiter, typeInfo.getTypeClass),
+      typeInfo)
+    wrap(datasource)
+  }
+
+  /**
    * Creates a new DataSource by reading the specified file using the custom
    * [[org.apache.flink.api.common.io.FileInputFormat]].
    */


Mime
View raw message