flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [3/4] flink git commit: [FLINK-1466] Adds HCatInputFormats to read from HCatalog tables.
Date Fri, 20 Feb 2015 15:11:49 GMT
[FLINK-1466] Adds HCatInputFormats to read from HCatalog tables.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6acd2e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6acd2e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6acd2e4

Branch: refs/heads/master
Commit: a6acd2e47c5a8b31e580f66253cad2966e11d3cf
Parents: 3d84970
Author: Fabian Hueske <fhueske@apache.org>
Authored: Thu Jan 29 10:34:29 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri Feb 20 16:10:23 2015 +0100

----------------------------------------------------------------------
 flink-staging/flink-hcatalog/pom.xml            | 186 +++++++++
 .../flink/hcatalog/HCatInputFormatBase.java     | 413 +++++++++++++++++++
 .../flink/hcatalog/java/HCatInputFormat.java    | 140 +++++++
 .../flink/hcatalog/scala/HCatInputFormat.scala  | 210 ++++++++++
 flink-staging/pom.xml                           |   1 +
 5 files changed, 950 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hcatalog/pom.xml b/flink-staging/flink-hcatalog/pom.xml
new file mode 100644
index 0000000..8762b9c
--- /dev/null
+++ b/flink-staging/flink-hcatalog/pom.xml
@@ -0,0 +1,186 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	
+	<modelVersion>4.0.0</modelVersion>
+	
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-staging</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-hcatalog</artifactId>
+	<name>flink-hcatalog</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hive.hcatalog</groupId>
+			<artifactId>hcatalog-core</artifactId>
+			<version>0.12.0</version>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies
on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+					<compilerPlugins>
+						<compilerPlugin>
+							<groupId>org.scalamacros</groupId>
+							<artifactId>paradise_${scala.version}</artifactId>
+							<version>${scala.macros.version}</version>
+						</compilerPlugin>
+					</compilerPlugins>
+				</configuration>
+			</plugin>
+
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
new file mode 100644
index 0000000..59a6719
--- /dev/null
+++ b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and partition filters.
+ *
+ * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink {@link
org.apache.flink.api.java.tuple.Tuple}.
+ * Flink Tuples are only supported for primitive type fields
+ * (no STRUCT, ARRAY, or MAP data types) and have a size limitation.
+ *
+ * @param <T>
+ */
+public abstract class HCatInputFormatBase<T> implements InputFormat<T, HadoopInputSplit>,
ResultTypeQueryable<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private Configuration configuration;
+
+	private org.apache.hive.hcatalog.mapreduce.HCatInputFormat hCatInputFormat;
+	private RecordReader<WritableComparable, HCatRecord> recordReader;
+	private boolean fetched = false;
+	private boolean hasNext;
+
+	protected String[] fieldNames = new String[0];
+	protected HCatSchema outputSchema;
+
+	private TypeInformation<T> resultType;
+
+	public HCatInputFormatBase() { }
+
+	/**
+	 * Creates a HCatInputFormat for the given database and table.
+	 * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}.
+	 * The return type of the InputFormat can be changed to Flink {@link org.apache.flink.api.java.tuple.Tuple}
by calling
+	 * {@link HCatInputFormatBase#asFlinkTuples()}.
+	 *
+	 * @param database The name of the database to read from.
+	 * @param table The name of the table to read.
+	 * @throws java.io.IOException
+	 */
+	public HCatInputFormatBase(String database, String table) throws IOException {
+		this(database, table, new Configuration());
+	}
+
+	/**
+	 * Creates a HCatInputFormat for the given database, table, and
+	 * {@link org.apache.hadoop.conf.Configuration}.
+	 * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}.
+	 * The return type of the InputFormat can be changed to Flink {@link org.apache.flink.api.java.tuple.Tuple}
by calling
+	 * {@link HCatInputFormatBase#asFlinkTuples()}.
+	 *
+	 * @param database The name of the database to read from.
+	 * @param table The name of the table to read.
+	 * @param config The Configuration for the InputFormat.
+	 * @throws java.io.IOException
+	 */
+	public HCatInputFormatBase(String database, String table, Configuration config) throws IOException
{
+		super();
+		this.configuration = config;
+		HadoopUtils.mergeHadoopConf(this.configuration);
+
+		this.hCatInputFormat = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration,
database, table);
+		this.outputSchema = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration);
+
+		// configure output schema of HCatFormat
+		configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema));
+		// set type information
+		this.resultType = new WritableTypeInfo(DefaultHCatRecord.class);
+	}
+
+	/**
+	 * Specifies the fields which are returned by the InputFormat and their order.
+	 *
+	 * @param fields The fields and their order which are returned by the InputFormat.
+	 * @return This InputFormat with specified return fields.
+	 * @throws java.io.IOException
+	 */
+	public HCatInputFormatBase<T> getFields(String... fields) throws IOException {
+
+		// build output schema
+		ArrayList<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(fields.length);
+		for(String field : fields) {
+			fieldSchemas.add(this.outputSchema.get(field));
+		}
+		this.outputSchema = new HCatSchema(fieldSchemas);
+
+		// update output schema configuration
+		configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema));
+
+		return this;
+	}
+
+	/**
+	 * Specifies a SQL-like filter condition on the table's partition columns.
+	 * Filter conditions on non-partition columns are invalid.
+	 * A partition filter can significantly reduce the amount of data to be read.
+	 *
+	 * @param filter A SQL-like filter condition on the table's partition columns.
+	 * @return This InputFormat with specified partition filter.
+	 * @throws java.io.IOException
+	 */
+	public HCatInputFormatBase<T> withFilter(String filter) throws IOException {
+
+		// set filter
+		this.hCatInputFormat.setFilter(filter);
+
+		return this;
+	}
+
+	/**
+	 * Specifies that the InputFormat returns Flink {@link org.apache.flink.api.java.tuple.Tuple}
+	 * instead of {@link org.apache.hive.hcatalog.data.HCatRecord}.
+	 * At the moment, the following restrictions apply for returning Flink tuples:
+	 *
+	 * <ul>
+	 *     <li>Only primitive type fields can be returned in Flink Tuples
+	 *          (no STRUCT, MAP, ARRAY data types).</li>
+	 *     <li>Only a limited number of fields can be returned as Flink Tuple.</li>
+	 * </ul>
+	 *
+	 * @return This InputFormat.
+	 * @throws org.apache.hive.hcatalog.common.HCatException
+	 */
+	public HCatInputFormatBase<T> asFlinkTuples() throws HCatException {
+
+		// build type information
+		int numFields = outputSchema.getFields().size();
+		if(numFields > this.getMaxFlinkTupleSize()) {
+			throw new IllegalArgumentException("Only up to "+this.getMaxFlinkTupleSize()+
+					" fields can be returned as Flink tuples.");
+		}
+
+		TypeInformation[] fieldTypes = new TypeInformation[numFields];
+		fieldNames = new String[numFields];
+		for (String fieldName : outputSchema.getFieldNames()) {
+			HCatFieldSchema field = outputSchema.get(fieldName);
+
+			int fieldPos = outputSchema.getPosition(fieldName);
+			TypeInformation fieldType = getFieldType(field);
+
+			fieldTypes[fieldPos] = fieldType;
+			fieldNames[fieldPos] = fieldName;
+
+		}
+		this.resultType = new TupleTypeInfo(fieldTypes);
+
+		return this;
+	}
+
+	protected abstract int getMaxFlinkTupleSize();
+
+	private TypeInformation getFieldType(HCatFieldSchema fieldSchema) {
+
+		switch(fieldSchema.getType()) {
+			case INT:
+				return BasicTypeInfo.INT_TYPE_INFO;
+			case TINYINT:
+				return BasicTypeInfo.BYTE_TYPE_INFO;
+			case SMALLINT:
+				return BasicTypeInfo.SHORT_TYPE_INFO;
+			case BIGINT:
+				return BasicTypeInfo.LONG_TYPE_INFO;
+			case BOOLEAN:
+				return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+			case FLOAT:
+				return BasicTypeInfo.FLOAT_TYPE_INFO;
+			case DOUBLE:
+				return BasicTypeInfo.DOUBLE_TYPE_INFO;
+			case STRING:
+				return BasicTypeInfo.STRING_TYPE_INFO;
+			case BINARY:
+				return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+			case ARRAY:
+				throw new UnsupportedOperationException("ARRAY type is not supported in Flink tuples,
yet.");
+			case MAP:
+				throw new UnsupportedOperationException("MAP type is not supported in Flink tuples, yet.");
+			case STRUCT:
+				throw new UnsupportedOperationException("STRUCT type not supported in Flink tuples, yet.");
+			default:
+				throw new IllegalArgumentException("Unknown data type \""+fieldSchema.getType()+"\" encountered.");
+		}
+	}
+
+	/**
+	 * Returns the {@link org.apache.hadoop.conf.Configuration} of the HCatInputFormat.
+	 *
+	 * @return The Configuration of the HCatInputFormat.
+	 */
+	public Configuration getConfiguration() {
+		return this.configuration;
+	}
+
+	/**
+	 * Returns the {@link org.apache.hive.hcatalog.data.schema.HCatSchema} of the {@link org.apache.hive.hcatalog.data.HCatRecord}
+	 * returned by this InputFormat.
+	 *
+	 * @return The HCatSchema of the HCatRecords returned by this InputFormat.
+	 */
+	public HCatSchema getOutputSchema() {
+		return this.outputSchema;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  InputFormat
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void configure(org.apache.flink.configuration.Configuration parameters) {
+		// nothing to do
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
+		// no statistics provided at the moment
+		return null;
+	}
+
+	@Override
+	public HadoopInputSplit[] createInputSplits(int minNumSplits)
+			throws IOException {
+		configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
+
+		JobContext jobContext = null;
+		try {
+			jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		List<InputSplit> splits;
+		try {
+			splits = this.hCatInputFormat.getSplits(jobContext);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not get Splits.", e);
+		}
+		HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
+
+		for(int i = 0; i < hadoopInputSplits.length; i++){
+			hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext);
+		}
+		return hadoopInputSplits;
+	}
+
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
+		return new LocatableInputSplitAssigner(inputSplits);
+	}
+
+	@Override
+	public void open(HadoopInputSplit split) throws IOException {
+		TaskAttemptContext context = null;
+		try {
+			context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
+		} catch(Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		try {
+			this.recordReader = this.hCatInputFormat
+					.createRecordReader(split.getHadoopInputSplit(), context);
+			this.recordReader.initialize(split.getHadoopInputSplit(), context);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not create RecordReader.", e);
+		} finally {
+			this.fetched = false;
+		}
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		if(!this.fetched) {
+			fetchNext();
+		}
+		return !this.hasNext;
+	}
+
+	private void fetchNext() throws IOException {
+		try {
+			this.hasNext = this.recordReader.nextKeyValue();
+		} catch (InterruptedException e) {
+			throw new IOException("Could not fetch next KeyValue pair.", e);
+		} finally {
+			this.fetched = true;
+		}
+	}
+
+	@Override
+	public T nextRecord(T record) throws IOException {
+		if(!this.fetched) {
+			// first record
+			fetchNext();
+		}
+		if(!this.hasNext) {
+			return null;
+		}
+		try {
+
+			// get next HCatRecord
+			HCatRecord v = this.recordReader.getCurrentValue();
+			this.fetched = false;
+
+			if(this.fieldNames.length > 0) {
+				// return as Flink tuple
+				return this.buildFlinkTuple(record, v);
+
+			} else {
+				// return as HCatRecord
+				return (T)v;
+			}
+
+		} catch (InterruptedException e) {
+			throw new IOException("Could not get next record.", e);
+		}
+	}
+
+	protected abstract T buildFlinkTuple(T t, HCatRecord record) throws HCatException;
+
+	@Override
+	public void close() throws IOException {
+		this.recordReader.close();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Custom de/serialization methods
+	// --------------------------------------------------------------------------------------------
+
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeInt(this.fieldNames.length);
+		for(String fieldName : this.fieldNames) {
+			out.writeUTF(fieldName);
+		}
+		this.configuration.write(out);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
{
+		this.fieldNames = new String[in.readInt()];
+		for(int i=0; i<this.fieldNames.length; i++) {
+			this.fieldNames[i] = in.readUTF();
+		}
+
+		Configuration configuration = new Configuration();
+		configuration.readFields(in);
+
+		if(this.configuration == null) {
+			this.configuration = configuration;
+		}
+
+		this.hCatInputFormat = new org.apache.hive.hcatalog.mapreduce.HCatInputFormat();
+		this.outputSchema = (HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema"));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Result type business
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return this.resultType;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
new file mode 100644
index 0000000..c3c3a1c
--- /dev/null
+++ b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog.java;
+
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.hcatalog.HCatInputFormatBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.HCatRecord;
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and partition filters.
+ *
+ * Data can be returned as {@link HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}.
+ * Flink Tuples are only supported for up to 25 fields of primitive types
+ * (no STRUCT, ARRAY, or MAP data types).
+ *
+ * @param <T>
+ */
+public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
+	private static final long serialVersionUID = 1L;
+
+	public HCatInputFormat() {}
+
+	public HCatInputFormat(String database, String table) throws Exception {
+		super(database, table);
+	}
+
+	public HCatInputFormat(String database, String table, Configuration config) throws Exception
{
+		super(database, table, config);
+	}
+
+
+	@Override
+	protected int getMaxFlinkTupleSize() {
+		return 25;
+	}
+
+	@Override
+	protected T buildFlinkTuple(T t, HCatRecord record) throws HCatException {
+
+		Tuple tuple = (Tuple)t;
+
+		// Extract all fields from HCatRecord
+		for(int i=0; i < this.fieldNames.length; i++) {
+
+			// get field value
+			Object o = record.get(this.fieldNames[i], this.outputSchema);
+
+			// Set field value in Flink tuple.
+			// Partition columns are returned as String and
+			//   need to be converted to original type.
+			switch(this.outputSchema.get(i).getType()) {
+				case INT:
+					if(o instanceof String) {
+						tuple.setField(Integer.parseInt((String) o), i);
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case TINYINT:
+					if(o instanceof String) {
+						tuple.setField(Byte.parseByte((String) o), i);
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case SMALLINT:
+					if(o instanceof String) {
+						tuple.setField(Short.parseShort((String) o), i);
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case BIGINT:
+					if(o instanceof String) {
+						tuple.setField(Long.parseLong((String) o), i);
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case BOOLEAN:
+					if(o instanceof String) {
+						tuple.setField(Boolean.parseBoolean((String) o), i);
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case FLOAT:
+					if(o instanceof String) {
+						tuple.setField(Float.parseFloat((String) o), i);
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case DOUBLE:
+					if(o instanceof String) {
+						tuple.setField(Double.parseDouble((String) o), i);
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				case STRING:
+					tuple.setField(o, i);
+					break;
+				case BINARY:
+					if(o instanceof String) {
+						throw new RuntimeException("Cannot handle partition keys of type BINARY.");
+					} else {
+						tuple.setField(o, i);
+					}
+					break;
+				default:
+					throw new RuntimeException("Invalid Type");
+			}
+		}
+
+		return (T)tuple;
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
b/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
new file mode 100644
index 0000000..7cc18f0
--- /dev/null
+++ b/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog.scala
+
+import org.apache.flink.configuration
+import org.apache.flink.hcatalog.HCatInputFormatBase
+import org.apache.hadoop.conf.Configuration
+import org.apache.hive.hcatalog.data.HCatRecord
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and partition filters.
+ *
+ * Data can be returned as {@link HCatRecord} or
+ * Flink {@link org.apache.flink.api.java.tuple.Tuple}.
+ * Flink Tuples are only supported for up to 22 fields of primitive types
+ * (no STRUCT, ARRAY, or MAP data types).
+ *
+ */
+class HCatInputFormat[T](
+                        database: String,
+                        table: String,
+                        config: Configuration
+                          ) extends HCatInputFormatBase[T](database, table, config) {
+
+  def this(database: String, table: String) {
+    this(database, table, new Configuration)
+  }
+
+  var vals: Array[Any] = Array[Any]()
+
+  override def configure(parameters: configuration.Configuration): Unit = {
+    super.configure(parameters)
+    vals = new Array[Any](fieldNames.length)
+  }
+
+  override protected def getMaxFlinkTupleSize: Int = 22
+
+  override protected def buildFlinkTuple(t: T, record: HCatRecord): T = {
+
+    // Extract all fields from HCatRecord
+    var i: Int = 0
+    while (i < this.fieldNames.length) {
+
+        val o: AnyRef = record.get(this.fieldNames(i), this.outputSchema)
+
+        // partition columns are returned as String
+        //   Check and convert to actual type.
+        this.outputSchema.get(i).getType match {
+          case HCatFieldSchema.Type.INT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toInt
+            }
+            else {
+              vals(i) = o.asInstanceOf[Int]
+            }
+          case HCatFieldSchema.Type.TINYINT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toInt.toByte
+            }
+            else {
+              vals(i) = o.asInstanceOf[Byte]
+            }
+          case HCatFieldSchema.Type.SMALLINT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toInt.toShort
+            }
+            else {
+              vals(i) = o.asInstanceOf[Short]
+            }
+          case HCatFieldSchema.Type.BIGINT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toLong
+            }
+            else {
+              vals(i) = o.asInstanceOf[Long]
+            }
+          case HCatFieldSchema.Type.BOOLEAN =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toBoolean
+            }
+            else {
+              vals(i) = o.asInstanceOf[Boolean]
+            }
+          case HCatFieldSchema.Type.FLOAT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toFloat
+            }
+            else {
+              vals(i) = o.asInstanceOf[Float]
+            }
+          case HCatFieldSchema.Type.DOUBLE =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toDouble
+            }
+            else {
+              vals(i) = o.asInstanceOf[Double]
+            }
+          case HCatFieldSchema.Type.STRING =>
+            vals(i) = o
+          case HCatFieldSchema.Type.BINARY =>
+            if (o.isInstanceOf[String]) {
+              throw new RuntimeException("Cannot handle partition keys of type BINARY.")
+            }
+            else {
+              vals(i) = o.asInstanceOf[Array[Byte]]
+            }
+          case _ =>
+            throw new RuntimeException("Invalid type " + this.outputSchema.get(i).getType
+
+              " encountered.")
+        }
+
+        i += 1
+      }
+    createScalaTuple(vals)
+  }
+
+  private def createScalaTuple(vals: Array[Any]): T = {
+
+    this.fieldNames.length match {
+      case 1 =>
+        new Tuple1(vals(0)).asInstanceOf[T]
+      case 2 =>
+        new Tuple2(vals(0), vals(1)).asInstanceOf[T]
+      case 3 =>
+        new Tuple3(vals(0), vals(1), vals(2)).asInstanceOf[T]
+      case 4 =>
+        new Tuple4(vals(0), vals(1), vals(2), vals(3)).asInstanceOf[T]
+      case 5 =>
+        new Tuple5(vals(0), vals(1), vals(2), vals(3), vals(4)).asInstanceOf[T]
+      case 6 =>
+        new Tuple6(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5)).asInstanceOf[T]
+      case 7 =>
+        new Tuple7(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6)).asInstanceOf[T]
+      case 8 =>
+        new Tuple8(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7))
+          .asInstanceOf[T]
+      case 9 =>
+        new Tuple9(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8)).asInstanceOf[T]
+      case 10 =>
+        new Tuple10(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9)).asInstanceOf[T]
+      case 11 =>
+        new Tuple11(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10)).asInstanceOf[T]
+      case 12 =>
+        new Tuple12(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11)).asInstanceOf[T]
+      case 13 =>
+        new Tuple13(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12)).asInstanceOf[T]
+      case 14 =>
+        new Tuple14(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13)).asInstanceOf[T]
+      case 15 =>
+        new Tuple15(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14)).asInstanceOf[T]
+      case 16 =>
+        new Tuple16(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15))
+          .asInstanceOf[T]
+      case 17 =>
+        new Tuple17(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+          vals(16)).asInstanceOf[T]
+      case 18 =>
+        new Tuple18(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+          vals(16), vals(17)).asInstanceOf[T]
+      case 19 =>
+        new Tuple19(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+          vals(16), vals(17), vals(18)).asInstanceOf[T]
+      case 20 =>
+        new Tuple20(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+          vals(16), vals(17), vals(18), vals(19)).asInstanceOf[T]
+      case 21 =>
+        new Tuple21(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+          vals(16), vals(17), vals(18), vals(19), vals(20)).asInstanceOf[T]
+      case 22 =>
+        new Tuple22(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+          vals(16), vals(17), vals(18), vals(19), vals(20), vals(21)).asInstanceOf[T]
+      case _ =>
+        throw new RuntimeException("Only up to 22 fields supported for Scala Tuples.")
+
+  }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index ea59e70..1b98cbe 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -41,6 +41,7 @@ under the License.
 		<module>flink-streaming</module>
 		<module>flink-hbase</module>
 		<module>flink-gelly</module>
+		<module>flink-hcatalog</module>
 	</modules>
 	
 	<!-- See main pom.xml for explanation of profiles -->


Mime
View raw message