flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1466) Add InputFormat to read HCatalog tables
Date Tue, 17 Feb 2015 17:26:12 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14324499#comment-14324499
] 

ASF GitHub Bot commented on FLINK-1466:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/411#discussion_r24833839
  
    --- Diff: flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
---
    @@ -0,0 +1,413 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.hcatalog;
    +
    +import org.apache.flink.api.common.io.InputFormat;
    +import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
    +import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.api.java.typeutils.WritableTypeInfo;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.io.WritableComparable;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.JobContext;
    +import org.apache.hadoop.mapreduce.JobID;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.TaskAttemptID;
    +import org.apache.hive.hcatalog.common.HCatException;
    +import org.apache.hive.hcatalog.common.HCatUtil;
    +import org.apache.hive.hcatalog.data.DefaultHCatRecord;
    +import org.apache.hive.hcatalog.data.HCatRecord;
    +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
    +import org.apache.hive.hcatalog.data.schema.HCatSchema;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * A InputFormat to read from HCatalog tables.
    + * The InputFormat supports projection (selection and order of fields) and partition
filters.
    + *
    + * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink
{@link org.apache.flink.api.java.tuple.Tuple}.
    + * Flink Tuples are only supported for primitive type fields
    + * (no STRUCT, ARRAY, or MAP data types) and have a size limitation.
    + *
    + * @param <T>
    + */
    +public abstract class HCatInputFormatBase<T> implements InputFormat<T, HadoopInputSplit>,
ResultTypeQueryable<T> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private Configuration configuration;
    +
    +	private org.apache.hive.hcatalog.mapreduce.HCatInputFormat hCatInputFormat;
    +	private RecordReader<WritableComparable, HCatRecord> recordReader;
    +	private boolean fetched = false;
    +	private boolean hasNext;
    +
    +	protected String[] fieldNames = new String[0];
    +	protected HCatSchema outputSchema;
    +
    +	private TypeInformation<T> resultType;
    +
    +	public HCatInputFormatBase() { }
    +
    +	/**
    +	 * Creates a HCatInputFormat for the given database and table.
    +	 * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}.
    +	 * The return type of the InputFormat can be changed to Flink {@link org.apache.flink.api.java.tuple.Tuple}
by calling
    +	 * {@link HCatInputFormatBase#asFlinkTuples()}.
    +	 *
    +	 * @param database The name of the database to read from.
    +	 * @param table The name of the table to read.
    +	 * @throws java.io.IOException
    +	 */
    +	public HCatInputFormatBase(String database, String table) throws IOException {
    +		this(database, table, new Configuration());
    +	}
    +
    +	/**
    +	 * Creates a HCatInputFormat for the given database, table, and
    +	 * {@link org.apache.hadoop.conf.Configuration}.
    +	 * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}.
    +	 * The return type of the InputFormat can be changed to Flink {@link org.apache.flink.api.java.tuple.Tuple}
by calling
    +	 * {@link HCatInputFormatBase#asFlinkTuples()}.
    +	 *
    +	 * @param database The name of the database to read from.
    +	 * @param table The name of the table to read.
    +	 * @param config The Configuration for the InputFormat.
    +	 * @throws java.io.IOException
    +	 */
    +	public HCatInputFormatBase(String database, String table, Configuration config) throws
IOException {
    +		super();
    +		this.configuration = config;
    +		HadoopUtils.mergeHadoopConf(this.configuration);
    +
    +		this.hCatInputFormat = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration,
database, table);
    +		this.outputSchema = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration);
    +
    +		// configure output schema of HCatFormat
    +		configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema));
    +		// set type information
    +		this.resultType = new WritableTypeInfo(DefaultHCatRecord.class);
    +	}
    +
    +	/**
    +	 * Specifies the fields which are returned by the InputFormat and their order.
    +	 *
    +	 * @param fields The fields and their order which are returned by the InputFormat.
    +	 * @return This InputFormat with specified return fields.
    +	 * @throws java.io.IOException
    +	 */
    +	public HCatInputFormatBase<T> getFields(String... fields) throws IOException {
    +
    +		// build output schema
    +		ArrayList<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(fields.length);
    +		for(String field : fields) {
    +			fieldSchemas.add(this.outputSchema.get(field));
    +		}
    +		this.outputSchema = new HCatSchema(fieldSchemas);
    +
    +		// update output schema configuration
    +		configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema));
    +
    +		return this;
    +	}
    +
    +	/**
    +	 * Specifies a SQL-like filter condition on the table's partition columns.
    +	 * Filter conditions on non-partition columns are invalid.
    +	 * A partition filter can significantly reduce the amount of data to be read.
    +	 *
    +	 * @param filter A SQL-like filter condition on the table's partition columns.
    +	 * @return This InputFormat with specified partition filter.
    +	 * @throws java.io.IOException
    +	 */
    +	public HCatInputFormatBase<T> withFilter(String filter) throws IOException {
    +
    +		// set filter
    +		this.hCatInputFormat.setFilter(filter);
    +
    +		return this;
    +	}
    +
    +	/**
    +	 * Specifies that the InputFormat returns Flink {@link org.apache.flink.api.java.tuple.Tuple}
    +	 * instead of {@link org.apache.hive.hcatalog.data.HCatRecord}.
    +	 * At the moment, the following restrictions apply for returning Flink tuples:
    +	 *
    +	 * <ul>
    +	 *     <li>Only primitive type fields can be returned in Flink Tuples
    +	 *          (no STRUCT, MAP, ARRAY data types).</li>
    +	 *     <li>Only a limited number of fields can be returned as Flink Tuple.</li>
    +	 * </ul>
    +	 *
    +	 * @return This InputFormat.
    +	 * @throws org.apache.hive.hcatalog.common.HCatException
    +	 */
    +	public HCatInputFormatBase<T> asFlinkTuples() throws HCatException {
    +
    +		// build type information
    +		int numFields = outputSchema.getFields().size();
    +		if(numFields > this.getMaxFlinkTupleSize()) {
    +			throw new IllegalArgumentException("Only up to "+this.getMaxFlinkTupleSize()+
    +					" fields can be returned as Flink tuples.");
    +		}
    +
    +		TypeInformation[] fieldTypes = new TypeInformation[numFields];
    +		fieldNames = new String[numFields];
    +		for (String fieldName : outputSchema.getFieldNames()) {
    +			HCatFieldSchema field = outputSchema.get(fieldName);
    +
    +			int fieldPos = outputSchema.getPosition(fieldName);
    +			TypeInformation fieldType = getFieldType(field);
    +
    +			fieldTypes[fieldPos] = fieldType;
    +			fieldNames[fieldPos] = fieldName;
    +
    +		}
    +		this.resultType = new TupleTypeInfo(fieldTypes);
    +
    +		return this;
    +	}
    +
    +	protected abstract int getMaxFlinkTupleSize();
    +
    +	private TypeInformation getFieldType(HCatFieldSchema fieldSchema) {
    +
    +		switch(fieldSchema.getType()) {
    +			case INT:
    +				return BasicTypeInfo.INT_TYPE_INFO;
    +			case TINYINT:
    +				return BasicTypeInfo.BYTE_TYPE_INFO;
    +			case SMALLINT:
    +				return BasicTypeInfo.SHORT_TYPE_INFO;
    +			case BIGINT:
    +				return BasicTypeInfo.LONG_TYPE_INFO;
    +			case BOOLEAN:
    +				return BasicTypeInfo.BOOLEAN_TYPE_INFO;
    +			case FLOAT:
    +				return BasicTypeInfo.FLOAT_TYPE_INFO;
    +			case DOUBLE:
    +				return BasicTypeInfo.DOUBLE_TYPE_INFO;
    +			case STRING:
    +				return BasicTypeInfo.STRING_TYPE_INFO;
    +			case BINARY:
    +				return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
    +			case ARRAY:
    +				throw new UnsupportedOperationException("ARRAY type is not supported in Flink tuples,
yet.");
    --- End diff --
    
    Why can't we use genericTypes here for arrays, hashmps, lists (see https://cwiki.apache.org/confluence/display/Hive/HCatalog+InputOutput)


> Add InputFormat to read HCatalog tables
> ---------------------------------------
>
>                 Key: FLINK-1466
>                 URL: https://issues.apache.org/jira/browse/FLINK-1466
>             Project: Flink
>          Issue Type: New Feature
>          Components: Java API, Scala API
>            Reporter: Fabian Hueske
>            Assignee: Fabian Hueske
>            Priority: Minor
>
> HCatalog is a metadata repository and InputFormat to make Hive tables accessible to other
frameworks such as Pig.
> Adding support for HCatalog would give access to Hive managed data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message