[FLINK-1466] Adds HCatInputFormats to read from HCatalog tables.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6acd2e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6acd2e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6acd2e4
Branch: refs/heads/master
Commit: a6acd2e47c5a8b31e580f66253cad2966e11d3cf
Parents: 3d84970
Author: Fabian Hueske <fhueske@apache.org>
Authored: Thu Jan 29 10:34:29 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri Feb 20 16:10:23 2015 +0100
----------------------------------------------------------------------
flink-staging/flink-hcatalog/pom.xml | 186 +++++++++
.../flink/hcatalog/HCatInputFormatBase.java | 413 +++++++++++++++++++
.../flink/hcatalog/java/HCatInputFormat.java | 140 +++++++
.../flink/hcatalog/scala/HCatInputFormat.scala | 210 ++++++++++
flink-staging/pom.xml | 1 +
5 files changed, 950 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hcatalog/pom.xml b/flink-staging/flink-hcatalog/pom.xml
new file mode 100644
index 0000000..8762b9c
--- /dev/null
+++ b/flink-staging/flink-hcatalog/pom.xml
@@ -0,0 +1,186 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-staging</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-hcatalog</artifactId>
+ <name>flink-hcatalog</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hcatalog-core</artifactId>
+ <version>0.12.0</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Scala Compiler -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.1.4</version>
+ <executions>
+ <!-- Run scala compiler in the process-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) compile phase -->
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+
+ <!-- Run scala compiler in the process-test-resources phase, so that dependencies
on
+ scala classes can be resolved later in the (Java) test-compile phase -->
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <jvmArgs>
+ <jvmArg>-Xms128m</jvmArg>
+ <jvmArg>-Xmx512m</jvmArg>
+ </jvmArgs>
+ <compilerPlugins>
+ <compilerPlugin>
+ <groupId>org.scalamacros</groupId>
+ <artifactId>paradise_${scala.version}</artifactId>
+ <version>${scala.macros.version}</version>
+ </compilerPlugin>
+ </compilerPlugins>
+ </configuration>
+ </plugin>
+
+ <!-- Eclipse Integration -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ <projectnatures>
+ <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+ <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+ </projectnatures>
+ <buildcommands>
+ <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+ </buildcommands>
+ <classpathContainers>
+ <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+ <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+ </classpathContainers>
+ <excludes>
+ <exclude>org.scala-lang:scala-library</exclude>
+ <exclude>org.scala-lang:scala-compiler</exclude>
+ </excludes>
+ <sourceIncludes>
+ <sourceInclude>**/*.scala</sourceInclude>
+ <sourceInclude>**/*.java</sourceInclude>
+ </sourceIncludes>
+ </configuration>
+ </plugin>
+
+ <!-- Adding scala source directories to build path -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <!-- Add src/main/scala to eclipse build path -->
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ <!-- Add src/test/scala to eclipse build path -->
+ <execution>
+ <id>add-test-source</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/test/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <version>0.5.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <verbose>false</verbose>
+ <failOnViolation>true</failOnViolation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <failOnWarning>false</failOnWarning>
+ <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+ <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+ <outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+ <outputEncoding>UTF-8</outputEncoding>
+ </configuration>
+ </plugin>
+
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
new file mode 100644
index 0000000..59a6719
--- /dev/null
+++ b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and partition filters.
+ *
+ * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink {@link
org.apache.flink.api.java.tuple.Tuple}.
+ * Flink Tuples are only supported for primitive type fields
+ * (no STRUCT, ARRAY, or MAP data types) and have a size limitation.
+ *
+ * @param <T>
+ */
+public abstract class HCatInputFormatBase<T> implements InputFormat<T, HadoopInputSplit>,
ResultTypeQueryable<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private Configuration configuration;
+
+ private org.apache.hive.hcatalog.mapreduce.HCatInputFormat hCatInputFormat;
+ private RecordReader<WritableComparable, HCatRecord> recordReader;
+ private boolean fetched = false;
+ private boolean hasNext;
+
+ protected String[] fieldNames = new String[0];
+ protected HCatSchema outputSchema;
+
+ private TypeInformation<T> resultType;
+
+ public HCatInputFormatBase() { }
+
+ /**
+ * Creates a HCatInputFormat for the given database and table.
+ * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}.
+ * The return type of the InputFormat can be changed to Flink {@link org.apache.flink.api.java.tuple.Tuple}
by calling
+ * {@link HCatInputFormatBase#asFlinkTuples()}.
+ *
+ * @param database The name of the database to read from.
+ * @param table The name of the table to read.
+ * @throws java.io.IOException
+ */
+ public HCatInputFormatBase(String database, String table) throws IOException {
+ this(database, table, new Configuration());
+ }
+
+ /**
+ * Creates a HCatInputFormat for the given database, table, and
+ * {@link org.apache.hadoop.conf.Configuration}.
+ * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}.
+ * The return type of the InputFormat can be changed to Flink {@link org.apache.flink.api.java.tuple.Tuple}
by calling
+ * {@link HCatInputFormatBase#asFlinkTuples()}.
+ *
+ * @param database The name of the database to read from.
+ * @param table The name of the table to read.
+ * @param config The Configuration for the InputFormat.
+ * @throws java.io.IOException
+ */
+ public HCatInputFormatBase(String database, String table, Configuration config) throws IOException
{
+ super();
+ this.configuration = config;
+ HadoopUtils.mergeHadoopConf(this.configuration);
+
+ this.hCatInputFormat = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration,
database, table);
+ this.outputSchema = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration);
+
+ // configure output schema of HCatFormat
+ configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema));
+ // set type information
+ this.resultType = new WritableTypeInfo(DefaultHCatRecord.class);
+ }
+
+ /**
+ * Specifies the fields which are returned by the InputFormat and their order.
+ *
+ * @param fields The fields and their order which are returned by the InputFormat.
+ * @return This InputFormat with specified return fields.
+ * @throws java.io.IOException
+ */
+ public HCatInputFormatBase<T> getFields(String... fields) throws IOException {
+
+ // build output schema
+ ArrayList<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(fields.length);
+ for(String field : fields) {
+ fieldSchemas.add(this.outputSchema.get(field));
+ }
+ this.outputSchema = new HCatSchema(fieldSchemas);
+
+ // update output schema configuration
+ configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema));
+
+ return this;
+ }
+
+ /**
+ * Specifies a SQL-like filter condition on the table's partition columns.
+ * Filter conditions on non-partition columns are invalid.
+ * A partition filter can significantly reduce the amount of data to be read.
+ *
+ * @param filter A SQL-like filter condition on the table's partition columns.
+ * @return This InputFormat with specified partition filter.
+ * @throws java.io.IOException
+ */
+ public HCatInputFormatBase<T> withFilter(String filter) throws IOException {
+
+ // set filter
+ this.hCatInputFormat.setFilter(filter);
+
+ return this;
+ }
+
+ /**
+ * Specifies that the InputFormat returns Flink {@link org.apache.flink.api.java.tuple.Tuple}
+ * instead of {@link org.apache.hive.hcatalog.data.HCatRecord}.
+ * At the moment, the following restrictions apply for returning Flink tuples:
+ *
+ * <ul>
+ * <li>Only primitive type fields can be returned in Flink Tuples
+ * (no STRUCT, MAP, ARRAY data types).</li>
+ * <li>Only a limited number of fields can be returned as Flink Tuple.</li>
+ * </ul>
+ *
+ * @return This InputFormat.
+ * @throws org.apache.hive.hcatalog.common.HCatException
+ */
+ public HCatInputFormatBase<T> asFlinkTuples() throws HCatException {
+
+ // build type information
+ int numFields = outputSchema.getFields().size();
+ if(numFields > this.getMaxFlinkTupleSize()) {
+ throw new IllegalArgumentException("Only up to "+this.getMaxFlinkTupleSize()+
+ " fields can be returned as Flink tuples.");
+ }
+
+ TypeInformation[] fieldTypes = new TypeInformation[numFields];
+ fieldNames = new String[numFields];
+ for (String fieldName : outputSchema.getFieldNames()) {
+ HCatFieldSchema field = outputSchema.get(fieldName);
+
+ int fieldPos = outputSchema.getPosition(fieldName);
+ TypeInformation fieldType = getFieldType(field);
+
+ fieldTypes[fieldPos] = fieldType;
+ fieldNames[fieldPos] = fieldName;
+
+ }
+ this.resultType = new TupleTypeInfo(fieldTypes);
+
+ return this;
+ }
+
+ protected abstract int getMaxFlinkTupleSize();
+
+ private TypeInformation getFieldType(HCatFieldSchema fieldSchema) {
+
+ switch(fieldSchema.getType()) {
+ case INT:
+ return BasicTypeInfo.INT_TYPE_INFO;
+ case TINYINT:
+ return BasicTypeInfo.BYTE_TYPE_INFO;
+ case SMALLINT:
+ return BasicTypeInfo.SHORT_TYPE_INFO;
+ case BIGINT:
+ return BasicTypeInfo.LONG_TYPE_INFO;
+ case BOOLEAN:
+ return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+ case FLOAT:
+ return BasicTypeInfo.FLOAT_TYPE_INFO;
+ case DOUBLE:
+ return BasicTypeInfo.DOUBLE_TYPE_INFO;
+ case STRING:
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ case BINARY:
+ return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+ case ARRAY:
+ throw new UnsupportedOperationException("ARRAY type is not supported in Flink tuples,
yet.");
+ case MAP:
+ throw new UnsupportedOperationException("MAP type is not supported in Flink tuples, yet.");
+ case STRUCT:
+ throw new UnsupportedOperationException("STRUCT type not supported in Flink tuples, yet.");
+ default:
+ throw new IllegalArgumentException("Unknown data type \""+fieldSchema.getType()+"\" encountered.");
+ }
+ }
+
+ /**
+ * Returns the {@link org.apache.hadoop.conf.Configuration} of the HCatInputFormat.
+ *
+ * @return The Configuration of the HCatInputFormat.
+ */
+ public Configuration getConfiguration() {
+ return this.configuration;
+ }
+
+ /**
+ * Returns the {@link org.apache.hive.hcatalog.data.schema.HCatSchema} of the {@link org.apache.hive.hcatalog.data.HCatRecord}
+ * returned by this InputFormat.
+ *
+ * @return The HCatSchema of the HCatRecords returned by this InputFormat.
+ */
+ public HCatSchema getOutputSchema() {
+ return this.outputSchema;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // InputFormat
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void configure(org.apache.flink.configuration.Configuration parameters) {
+ // nothing to do
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
+ // no statistics provided at the moment
+ return null;
+ }
+
+ @Override
+ public HadoopInputSplit[] createInputSplits(int minNumSplits)
+ throws IOException {
+ configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
+
+ JobContext jobContext = null;
+ try {
+ jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ List<InputSplit> splits;
+ try {
+ splits = this.hCatInputFormat.getSplits(jobContext);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not get Splits.", e);
+ }
+ HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
+
+ for(int i = 0; i < hadoopInputSplits.length; i++){
+ hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext);
+ }
+ return hadoopInputSplits;
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
+ return new LocatableInputSplitAssigner(inputSplits);
+ }
+
+ @Override
+ public void open(HadoopInputSplit split) throws IOException {
+ TaskAttemptContext context = null;
+ try {
+ context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ this.recordReader = this.hCatInputFormat
+ .createRecordReader(split.getHadoopInputSplit(), context);
+ this.recordReader.initialize(split.getHadoopInputSplit(), context);
+ } catch (InterruptedException e) {
+ throw new IOException("Could not create RecordReader.", e);
+ } finally {
+ this.fetched = false;
+ }
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ if(!this.fetched) {
+ fetchNext();
+ }
+ return !this.hasNext;
+ }
+
+ private void fetchNext() throws IOException {
+ try {
+ this.hasNext = this.recordReader.nextKeyValue();
+ } catch (InterruptedException e) {
+ throw new IOException("Could not fetch next KeyValue pair.", e);
+ } finally {
+ this.fetched = true;
+ }
+ }
+
+ @Override
+ public T nextRecord(T record) throws IOException {
+ if(!this.fetched) {
+ // first record
+ fetchNext();
+ }
+ if(!this.hasNext) {
+ return null;
+ }
+ try {
+
+ // get next HCatRecord
+ HCatRecord v = this.recordReader.getCurrentValue();
+ this.fetched = false;
+
+ if(this.fieldNames.length > 0) {
+ // return as Flink tuple
+ return this.buildFlinkTuple(record, v);
+
+ } else {
+ // return as HCatRecord
+ return (T)v;
+ }
+
+ } catch (InterruptedException e) {
+ throw new IOException("Could not get next record.", e);
+ }
+ }
+
+ protected abstract T buildFlinkTuple(T t, HCatRecord record) throws HCatException;
+
+ @Override
+ public void close() throws IOException {
+ this.recordReader.close();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom de/serialization methods
+ // --------------------------------------------------------------------------------------------
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeInt(this.fieldNames.length);
+ for(String fieldName : this.fieldNames) {
+ out.writeUTF(fieldName);
+ }
+ this.configuration.write(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
{
+ this.fieldNames = new String[in.readInt()];
+ for(int i=0; i<this.fieldNames.length; i++) {
+ this.fieldNames[i] = in.readUTF();
+ }
+
+ Configuration configuration = new Configuration();
+ configuration.readFields(in);
+
+ if(this.configuration == null) {
+ this.configuration = configuration;
+ }
+
+ this.hCatInputFormat = new org.apache.hive.hcatalog.mapreduce.HCatInputFormat();
+ this.outputSchema = (HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema"));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Result type business
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return this.resultType;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
new file mode 100644
index 0000000..c3c3a1c
--- /dev/null
+++ b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog.java;
+
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.hcatalog.HCatInputFormatBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.HCatRecord;
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and partition filters.
+ *
+ * Data can be returned as {@link HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}.
+ * Flink Tuples are only supported for up to 25 fields of primitive types
+ * (no STRUCT, ARRAY, or MAP data types).
+ *
+ * @param <T>
+ */
+public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
+ private static final long serialVersionUID = 1L;
+
+ public HCatInputFormat() {}
+
+ public HCatInputFormat(String database, String table) throws Exception {
+ super(database, table);
+ }
+
+ public HCatInputFormat(String database, String table, Configuration config) throws Exception
{
+ super(database, table, config);
+ }
+
+
+ @Override
+ protected int getMaxFlinkTupleSize() {
+ return 25;
+ }
+
+ @Override
+ protected T buildFlinkTuple(T t, HCatRecord record) throws HCatException {
+
+ Tuple tuple = (Tuple)t;
+
+ // Extract all fields from HCatRecord
+ for(int i=0; i < this.fieldNames.length; i++) {
+
+ // get field value
+ Object o = record.get(this.fieldNames[i], this.outputSchema);
+
+ // Set field value in Flink tuple.
+ // Partition columns are returned as String and
+ // need to be converted to original type.
+ switch(this.outputSchema.get(i).getType()) {
+ case INT:
+ if(o instanceof String) {
+ tuple.setField(Integer.parseInt((String) o), i);
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case TINYINT:
+ if(o instanceof String) {
+ tuple.setField(Byte.parseByte((String) o), i);
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case SMALLINT:
+ if(o instanceof String) {
+ tuple.setField(Short.parseShort((String) o), i);
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case BIGINT:
+ if(o instanceof String) {
+ tuple.setField(Long.parseLong((String) o), i);
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case BOOLEAN:
+ if(o instanceof String) {
+ tuple.setField(Boolean.parseBoolean((String) o), i);
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case FLOAT:
+ if(o instanceof String) {
+ tuple.setField(Float.parseFloat((String) o), i);
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case DOUBLE:
+ if(o instanceof String) {
+ tuple.setField(Double.parseDouble((String) o), i);
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ case STRING:
+ tuple.setField(o, i);
+ break;
+ case BINARY:
+ if(o instanceof String) {
+ throw new RuntimeException("Cannot handle partition keys of type BINARY.");
+ } else {
+ tuple.setField(o, i);
+ }
+ break;
+ default:
+ throw new RuntimeException("Invalid Type");
+ }
+ }
+
+ return (T)tuple;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
b/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
new file mode 100644
index 0000000..7cc18f0
--- /dev/null
+++ b/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog.scala
+
+import org.apache.flink.configuration
+import org.apache.flink.hcatalog.HCatInputFormatBase
+import org.apache.hadoop.conf.Configuration
+import org.apache.hive.hcatalog.data.HCatRecord
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and partition filters.
+ *
+ * Data can be returned as {@link HCatRecord} or
+ * Flink {@link org.apache.flink.api.java.tuple.Tuple}.
+ * Flink Tuples are only supported for up to 22 fields of primitive types
+ * (no STRUCT, ARRAY, or MAP data types).
+ *
+ */
+class HCatInputFormat[T](
+ database: String,
+ table: String,
+ config: Configuration
+ ) extends HCatInputFormatBase[T](database, table, config) {
+
+ def this(database: String, table: String) {
+ this(database, table, new Configuration)
+ }
+
+ var vals: Array[Any] = Array[Any]()
+
+ override def configure(parameters: configuration.Configuration): Unit = {
+ super.configure(parameters)
+ vals = new Array[Any](fieldNames.length)
+ }
+
+ override protected def getMaxFlinkTupleSize: Int = 22
+
+ override protected def buildFlinkTuple(t: T, record: HCatRecord): T = {
+
+ // Extract all fields from HCatRecord
+ var i: Int = 0
+ while (i < this.fieldNames.length) {
+
+ val o: AnyRef = record.get(this.fieldNames(i), this.outputSchema)
+
+ // partition columns are returned as String
+ // Check and convert to actual type.
+ this.outputSchema.get(i).getType match {
+ case HCatFieldSchema.Type.INT =>
+ if (o.isInstanceOf[String]) {
+ vals(i) = o.asInstanceOf[String].toInt
+ }
+ else {
+ vals(i) = o.asInstanceOf[Int]
+ }
+ case HCatFieldSchema.Type.TINYINT =>
+ if (o.isInstanceOf[String]) {
+ vals(i) = o.asInstanceOf[String].toInt.toByte
+ }
+ else {
+ vals(i) = o.asInstanceOf[Byte]
+ }
+ case HCatFieldSchema.Type.SMALLINT =>
+ if (o.isInstanceOf[String]) {
+ vals(i) = o.asInstanceOf[String].toInt.toShort
+ }
+ else {
+ vals(i) = o.asInstanceOf[Short]
+ }
+ case HCatFieldSchema.Type.BIGINT =>
+ if (o.isInstanceOf[String]) {
+ vals(i) = o.asInstanceOf[String].toLong
+ }
+ else {
+ vals(i) = o.asInstanceOf[Long]
+ }
+ case HCatFieldSchema.Type.BOOLEAN =>
+ if (o.isInstanceOf[String]) {
+ vals(i) = o.asInstanceOf[String].toBoolean
+ }
+ else {
+ vals(i) = o.asInstanceOf[Boolean]
+ }
+ case HCatFieldSchema.Type.FLOAT =>
+ if (o.isInstanceOf[String]) {
+ vals(i) = o.asInstanceOf[String].toFloat
+ }
+ else {
+ vals(i) = o.asInstanceOf[Float]
+ }
+ case HCatFieldSchema.Type.DOUBLE =>
+ if (o.isInstanceOf[String]) {
+ vals(i) = o.asInstanceOf[String].toDouble
+ }
+ else {
+ vals(i) = o.asInstanceOf[Double]
+ }
+ case HCatFieldSchema.Type.STRING =>
+ vals(i) = o
+ case HCatFieldSchema.Type.BINARY =>
+ if (o.isInstanceOf[String]) {
+ throw new RuntimeException("Cannot handle partition keys of type BINARY.")
+ }
+ else {
+ vals(i) = o.asInstanceOf[Array[Byte]]
+ }
+ case _ =>
+ throw new RuntimeException("Invalid type " + this.outputSchema.get(i).getType
+
+ " encountered.")
+ }
+
+ i += 1
+ }
+ createScalaTuple(vals)
+ }
+
+ private def createScalaTuple(vals: Array[Any]): T = {
+
+ this.fieldNames.length match {
+ case 1 =>
+ new Tuple1(vals(0)).asInstanceOf[T]
+ case 2 =>
+ new Tuple2(vals(0), vals(1)).asInstanceOf[T]
+ case 3 =>
+ new Tuple3(vals(0), vals(1), vals(2)).asInstanceOf[T]
+ case 4 =>
+ new Tuple4(vals(0), vals(1), vals(2), vals(3)).asInstanceOf[T]
+ case 5 =>
+ new Tuple5(vals(0), vals(1), vals(2), vals(3), vals(4)).asInstanceOf[T]
+ case 6 =>
+ new Tuple6(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5)).asInstanceOf[T]
+ case 7 =>
+ new Tuple7(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6)).asInstanceOf[T]
+ case 8 =>
+ new Tuple8(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7))
+ .asInstanceOf[T]
+ case 9 =>
+ new Tuple9(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8)).asInstanceOf[T]
+ case 10 =>
+ new Tuple10(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9)).asInstanceOf[T]
+ case 11 =>
+ new Tuple11(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10)).asInstanceOf[T]
+ case 12 =>
+ new Tuple12(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11)).asInstanceOf[T]
+ case 13 =>
+ new Tuple13(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12)).asInstanceOf[T]
+ case 14 =>
+ new Tuple14(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13)).asInstanceOf[T]
+ case 15 =>
+ new Tuple15(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14)).asInstanceOf[T]
+ case 16 =>
+ new Tuple16(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15))
+ .asInstanceOf[T]
+ case 17 =>
+ new Tuple17(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+ vals(16)).asInstanceOf[T]
+ case 18 =>
+ new Tuple18(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+ vals(16), vals(17)).asInstanceOf[T]
+ case 19 =>
+ new Tuple19(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+ vals(16), vals(17), vals(18)).asInstanceOf[T]
+ case 20 =>
+ new Tuple20(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+ vals(16), vals(17), vals(18), vals(19)).asInstanceOf[T]
+ case 21 =>
+ new Tuple21(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+ vals(16), vals(17), vals(18), vals(19), vals(20)).asInstanceOf[T]
+ case 22 =>
+ new Tuple22(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7),
+ vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15),
+ vals(16), vals(17), vals(18), vals(19), vals(20), vals(21)).asInstanceOf[T]
+ case _ =>
+ throw new RuntimeException("Only up to 22 fields supported for Scala Tuples.")
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a6acd2e4/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index ea59e70..1b98cbe 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -41,6 +41,7 @@ under the License.
<module>flink-streaming</module>
<module>flink-hbase</module>
<module>flink-gelly</module>
+ <module>flink-hcatalog</module>
</modules>
<!-- See main pom.xml for explanation of profiles -->
|