flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] flink git commit: [FLINK-2168] Add HBaseTableSource for batch tables.
Date Wed, 15 Feb 2017 09:41:25 GMT
Repository: flink
Updated Branches:
  refs/heads/master d88c938cf -> 87d09342b


[FLINK-2168] Add HBaseTableSource for batch tables.

This closes #3149.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58d4513a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58d4513a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58d4513a

Branch: refs/heads/master
Commit: 58d4513a9913f023284aac1b93a279bf4fd8f094
Parents: d88c938
Author: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Authored: Wed Jan 18 12:27:23 2017 +0530
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Feb 15 10:24:13 2017 +0100

----------------------------------------------------------------------
 flink-connectors/flink-hbase/pom.xml            |  16 ++
 .../addons/hbase/AbstractTableInputFormat.java  | 260 +++++++++++++++++++
 .../flink/addons/hbase/HBaseTableSchema.java    | 133 ++++++++++
 .../flink/addons/hbase/HBaseTableSource.java    | 101 +++++++
 .../hbase/HBaseTableSourceInputFormat.java      | 144 ++++++++++
 .../flink/addons/hbase/TableInputFormat.java    | 204 +--------------
 .../hbase/HBaseTestingClusterAutostarter.java   |  18 +-
 .../addons/hbase/TableInputFormatITCase.java    |   6 +-
 .../addons/hbase/example/HBaseReadExample.java  |   2 +-
 .../hbase/example/HBaseTableSourceITCase.java   | 196 ++++++++++++++
 10 files changed, 870 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58d4513a/flink-connectors/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/pom.xml b/flink-connectors/flink-hbase/pom.xml
index c6c5cc6..14e2b59 100644
--- a/flink-connectors/flink-hbase/pom.xml
+++ b/flink-connectors/flink-hbase/pom.xml
@@ -82,6 +82,16 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project,
+			won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-shaded-hadoop2</artifactId>
@@ -236,6 +246,12 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<profiles>

http://git-wip-us.apache.org/repos/asf/flink/blob/58d4513a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
new file mode 100644
index 0000000..d7f6ce9
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables.
+ */
+public abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInputSplit> {
+
+	protected static Logger LOG = LoggerFactory.getLogger(AbstractTableInputFormat.class);
+	/** helper variable to decide whether the input is exhausted or not */
+	protected boolean endReached = false;
+
+	protected transient HTable table = null;
+	protected transient Scan scan = null;
+
+	/** HBase iterator wrapper */
+	protected ResultScanner resultScanner = null;
+
+	protected byte[] lastRow;
+	protected int scannedRows;
+	/**
+	 * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
+	 * @return The appropriate instance of Scan for this usecase.
+	 */
+	protected abstract Scan getScanner();
+
+	/**
+	 * What table is to be read.
+	 * Per instance of a TableInputFormat derivative only a single tablename is possible.
+	 * @return The name of the table
+	 */
+	protected abstract String getTableName();
+
+	/**
+	 * The output from HBase is always an instance of {@link Result}.
+	 * This method is to copy the data in the Result instance into the required {@link T}
+	 * @param r The Result instance from HBase that needs to be converted
+	 * @return The approriate instance of {@link T} that contains the needed information.
+	 */
+	protected abstract T mapResultToType(Result r);
+
+	/**
+	 * Creates a {@link Scan} object and opens the {@link HTable} connection.
+	 * These are opened here because they are needed in the createInputSplits
+	 * which is called before the openInputFormat method.
+	 * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}.
+	 *
+	 * @param parameters The configuration that is to be used
+	 * @see Configuration
+	 */
+	public abstract void configure(Configuration parameters);
+
+	@Override
+	public void open(TableInputSplit split) throws IOException {
+		if (table == null) {
+			throw new IOException("The HBase table has not been opened!");
+		}
+		if (scan == null) {
+			throw new IOException("getScanner returned null");
+		}
+		if (split == null) {
+			throw new IOException("Input split is null!");
+		}
+
+		logSplitInfo("opening", split);
+		scan.setStartRow(split.getStartRow());
+		lastRow = split.getEndRow();
+		scan.setStopRow(lastRow);
+
+		resultScanner = table.getScanner(scan);
+		endReached = false;
+		scannedRows = 0;
+	}
+
+	public T nextRecord(T reuse) throws IOException {
+		if (resultScanner == null) {
+			throw new IOException("No table result scanner provided!");
+		}
+		try {
+			Result res = resultScanner.next();
+			if (res != null) {
+				scannedRows++;
+				lastRow = res.getRow();
+				return mapResultToType(res);
+			}
+		} catch (Exception e) {
+			resultScanner.close();
+			//workaround for timeout on scan
+			LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
+			scan.setStartRow(lastRow);
+			resultScanner = table.getScanner(scan);
+			Result res = resultScanner.next();
+			if (res != null) {
+				scannedRows++;
+				lastRow = res.getRow();
+				return mapResultToType(res);
+			}
+		}
+
+		endReached = true;
+		return null;
+	}
+
+	private void logSplitInfo(String action, TableInputSplit split) {
+		int splitId = split.getSplitNumber();
+		String splitStart = Bytes.toString(split.getStartRow());
+		String splitEnd = Bytes.toString(split.getEndRow());
+		String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
+		String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
+		String[] hostnames = split.getHostnames();
+		LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey);
+	}
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return endReached;
+	}
+
+	@Override
+	public void close() throws IOException {
+		LOG.info("Closing split (scanned {} rows)", scannedRows);
+		lastRow = null;
+		try {
+			if (resultScanner != null) {
+				resultScanner.close();
+			}
+		} finally {
+			resultScanner = null;
+		}
+	}
+
+	@Override
+	public void closeInputFormat() throws IOException {
+		try {
+			if (table != null) {
+				table.close();
+			}
+		} finally {
+			table = null;
+		}
+	}
+
+	@Override
+	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
+		if (table == null) {
+			throw new IOException("The HBase table has not been opened!");
+		}
+		if (scan == null) {
+			throw new IOException("getScanner returned null");
+		}
+
+		//Gets the starting and ending row keys for every region in the currently open table
+		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
+		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
+			throw new IOException("Expecting at least one region.");
+		}
+		final byte[] startRow = scan.getStartRow();
+		final byte[] stopRow = scan.getStopRow();
+		final boolean scanWithNoLowerBound = startRow.length == 0;
+		final boolean scanWithNoUpperBound = stopRow.length == 0;
+
+		final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(minNumSplits);
+		for (int i = 0; i < keys.getFirst().length; i++) {
+			final byte[] startKey = keys.getFirst()[i];
+			final byte[] endKey = keys.getSecond()[i];
+			final String regionLocation = table.getRegionLocation(startKey, false).getHostnamePort();
+			//Test if the given region is to be included in the InputSplit while splitting the regions of a table
+			if (!includeRegionInSplit(startKey, endKey)) {
+				continue;
+			}
+			//Finds the region on which the given row is being served
+			final String[] hosts = new String[]{regionLocation};
+
+			// determine if regions contains keys used by the scan
+			boolean isLastRegion = endKey.length == 0;
+			if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
+				(scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
+
+				final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
+				final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
+					&& !isLastRegion ? endKey : stopRow;
+				int id = splits.size();
+				final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop);
+				splits.add(split);
+			}
+		}
+		LOG.info("Created " + splits.size() + " splits");
+		for (TableInputSplit split : splits) {
+			logSplitInfo("created", split);
+		}
+		return splits.toArray(new TableInputSplit[0]);
+	}
+
+	/**
+	 * Test if the given region is to be included in the InputSplit while splitting the regions of a table.
+	 * <p>
+	 * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
+	 * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
+	 * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R
+	 * processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due
+	 * to the ordering of the keys. <br>
+	 * <br>
+	 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. <br>
+	 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded(
+	 * i.e. all regions are included).
+	 *
+	 * @param startKey Start key of the region
+	 * @param endKey   End key of the region
+	 * @return true, if this region needs to be included as part of the input (default).
+	 */
+	protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
+		return true;
+	}
+
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits) {
+		return new LocatableInputSplitAssigner(inputSplits);
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58d4513a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
new file mode 100644
index 0000000..d3c9fb8
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableList;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Time;
+import java.util.Map;
+import java.sql.Date;
+import java.util.TreeMap;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+	// A Map with key as column family.
+	// Guarantees natural ordering
+	private final Map<String, Map<String, TypeInformation<?>>> familyMap =
+		new TreeMap<>();
+
+	// Allowed types. This may change.
+	private static ImmutableCollection<Class<?>> CLASS_TYPES = ImmutableList.<Class<?>>of(
+		Integer.class, Short.class, Float.class, Long.class, String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, BigDecimal.class, Date.class, Time.class, byte[].class
+	);
+
+	/**
+	 * Allows specifying the family and qualifier name along with the data type of the qualifier for an HBase table
+	 *
+	 * @param family    the family name
+	 * @param qualifier the qualifier name
+	 * @param clazz     the data type of the qualifier
+	 */
+	void addColumn(String family, String qualifier, Class<?> clazz) {
+		Preconditions.checkNotNull(family, "family name");
+		Preconditions.checkNotNull(qualifier, "qualifier name");
+		Preconditions.checkNotNull(clazz, "class type");
+		Map<String, TypeInformation<?>> map = this.familyMap.get(family);
+		if (map == null) {
+			map = new TreeMap<>();
+		}
+		if (!CLASS_TYPES.contains(clazz)) {
+			// throw exception
+			throw new IllegalArgumentException("Unsupported class type found " + clazz+". Better to use byte[].class and deserialize using user defined scalar functions");
+		}
+		map.put(qualifier, TypeExtractor.getForClass(clazz));
+		familyMap.put(family, map);
+	}
+
+	String[] getFamilyNames() {
+		return this.familyMap.keySet().toArray(new String[this.familyMap.size()]);
+	}
+
+	String[] getQualifierNames(String family) {
+		Map<String, TypeInformation<?>> colDetails = familyMap.get(family);
+		String[] qualifierNames = new String[colDetails.size()];
+		int i = 0;
+		for (String qualifier: colDetails.keySet()) {
+			qualifierNames[i] = qualifier;
+			i++;
+		}
+		return qualifierNames;
+	}
+
+	TypeInformation<?>[] getQualifierTypes(String family) {
+		Map<String, TypeInformation<?>> colDetails = familyMap.get(family);
+		TypeInformation<?>[] typeInformations = new TypeInformation[colDetails.size()];
+		int i = 0;
+		for (TypeInformation<?> typeInfo : colDetails.values()) {
+			typeInformations[i] = typeInfo;
+			i++;
+		}
+		return typeInformations;
+	}
+
+	Map<String, TypeInformation<?>> getFamilyInfo(String family) {
+		return familyMap.get(family);
+	}
+
+	Object deserialize(byte[] value, TypeInformation<?> typeInfo) {
+		if (typeInfo.isBasicType()) {
+			if (typeInfo.getTypeClass() == Integer.class) {
+				return Bytes.toInt(value);
+			} else if (typeInfo.getTypeClass() == Short.class) {
+				return Bytes.toShort(value);
+			} else if (typeInfo.getTypeClass() == Float.class) {
+				return Bytes.toFloat(value);
+			} else if (typeInfo.getTypeClass() == Long.class) {
+				return Bytes.toLong(value);
+			} else if (typeInfo.getTypeClass() == String.class) {
+				return Bytes.toString(value);
+			} else if (typeInfo.getTypeClass() == Byte.class) {
+				return value[0];
+			} else if (typeInfo.getTypeClass() == Boolean.class) {
+				return Bytes.toBoolean(value);
+			} else if (typeInfo.getTypeClass() == Double.class) {
+				return Bytes.toDouble(value);
+			} else if (typeInfo.getTypeClass() == BigInteger.class) {
+				return new BigInteger(value);
+			} else if (typeInfo.getTypeClass() == BigDecimal.class) {
+				return Bytes.toBigDecimal(value);
+			} else if (typeInfo.getTypeClass() == Date.class) {
+				return new Date(Bytes.toLong(value));
+			} else if (typeInfo.getTypeClass() == Time.class) {
+				return new Time(Bytes.toLong(value));
+			}
+		}
+		return value;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58d4513a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
new file mode 100644
index 0000000..c484bd7
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+/**
+ * Creates a table source that helps to scan data from an hbase table
+ * The table name is passed during {@link HBaseTableSource} construction along with the required hbase configurations.
+ * Use {@link #addColumn(String, String, Class)} to specify the family name, qualifier and the type of the qualifier that needs to be scanned
+ * This supports nested schema, for eg: if we have a column family 'Person' and two qualifiers 'Name' (type String) and 'Unique_Id' (Type Integer),
+ * then we represent this schema as Row<Person : Row<Name: String, Unique_Id : Integer>>
+ */
+public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTableSource<Row> {
+
+	private Configuration conf;
+	private String tableName;
+	private HBaseTableSchema schema;
+
+	/**
+	 * The hbase configuration and the table name that acts as the hbase table source
+	 *
+	 * @param conf      hbase configuration
+	 * @param tableName the tableName
+	 */
+	public HBaseTableSource(Configuration conf, String tableName) {
+		this.conf = conf;
+		this.tableName = Preconditions.checkNotNull(tableName, "Table  name");
+		this.schema = new HBaseTableSchema();
+	}
+
+	/**
+	 * Allows specifying the family and qualifier name along with the data type of the qualifier for an HBase table
+	 *
+	 * @param family    the family name
+	 * @param qualifier the qualifier name
+	 * @param clazz     the data type of the qualifier
+	 */
+	public void addColumn(String family, String qualifier, Class<?> clazz) {
+		this.schema.addColumn(family, qualifier, clazz);
+	}
+
+	@Override
+	public TypeInformation<Row> getReturnType() {
+		String[] famNames = schema.getFamilyNames();
+		TypeInformation<?>[] typeInfos = new TypeInformation[famNames.length];
+		int i = 0;
+		for (String family : famNames) {
+			typeInfos[i] = new RowTypeInfo(schema.getQualifierTypes(family), schema.getQualifierNames(family));
+			i++;
+		}
+		RowTypeInfo rowInfo = new RowTypeInfo(typeInfos, famNames);
+		return rowInfo;
+	}
+
+	@Override
+	public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
+		return execEnv.createInput(new HBaseTableSourceInputFormat(conf, tableName, schema), getReturnType());
+	}
+
+	@Override
+	public ProjectableTableSource<Row> projectFields(int[] fields) {
+		String[] famNames = schema.getFamilyNames();
+		HBaseTableSource newTableSource = new HBaseTableSource(this.conf, tableName);
+		// Extract the family from the given fields
+		for(int field : fields) {
+			String family = famNames[field];
+			Map<String, TypeInformation<?>> familyInfo = schema.getFamilyInfo(family);
+			for(String qualifier : familyInfo.keySet()) {
+				// create the newSchema
+				newTableSource.addColumn(family, qualifier, familyInfo.get(qualifier).getTypeClass());
+			}
+		}
+		return newTableSource;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58d4513a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
new file mode 100644
index 0000000..6c4d7da
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends AbstractTableInputFormat<Row> implements ResultTypeQueryable<Row> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+	private String tableName;
+	private transient Connection conn;
+	private transient org.apache.hadoop.conf.Configuration conf;
+	private HBaseTableSchema schema;
+
+	public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) {
+		this.tableName = tableName;
+		this.conf = conf;
+		this.schema = schema;
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+		LOG.info("Initializing HBaseConfiguration");
+		connectToTable();
+		if(table != null) {
+			scan = getScanner();
+		}
+	}
+
+	@Override
+	protected Scan getScanner() {
+		Scan scan = new Scan();
+		for(String family : schema.getFamilyNames()) {
+			for(String qualifierName : schema.getQualifierNames(family)) {
+				scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifierName));
+			}
+		}
+		return scan;
+	}
+
+	@Override
+	public String getTableName() {
+		return tableName;
+	}
+
+	@Override
+	protected Row mapResultToType(Result res) {
+		List<Object> values = new ArrayList<Object>();
+		int i = 0;
+		String[] familyNames = schema.getFamilyNames();
+		Object[] rows = new Object[familyNames.length];
+		for(String family : familyNames) {
+			Map<String, TypeInformation<?>> infos = schema.getFamilyInfo(family);
+			for(String qualifier : infos.keySet()) {
+				byte[] value = res.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
+				if(value != null) {
+					values.add(schema.deserialize(value, infos.get(qualifier)));
+				} else {
+					values.add(null);
+				}
+			}
+			rows[i] = Row.of(values.toArray(new Object[values.size()]));
+			values.clear();
+			i++;
+		}
+		return Row.of(rows);
+	}
+
+	private void connectToTable() {
+		//use files found in the classpath
+		if(this.conf == null) {
+			this.conf = HBaseConfiguration.create();
+		}
+		try {
+			conn = ConnectionFactory.createConnection(conf);
+		} catch(IOException ioe) {
+			LOG.error("Exception while creating connection to hbase cluster", ioe);
+			return;
+		}
+		try {
+			table = (HTable)conn.getTable(TableName.valueOf(tableName));
+		} catch(TableNotFoundException tnfe) {
+			LOG.error("The table " + tableName + " not found ", tnfe);
+		} catch(IOException ioe) {
+			LOG.error("Exception while connecting to the table "+tableName+ " ", ioe);
+		}
+	}
+
+	@Override
+	public TypeInformation<Row> getProducedType() {
+		// split the fieldNames
+		String[] famNames = schema.getFamilyNames();
+		TypeInformation<?>[] typeInfos = new TypeInformation[famNames.length];
+		int i = 0;
+		for (String family : famNames) {
+			typeInfos[i] = new RowTypeInfo(schema.getQualifierTypes(family), schema.getQualifierNames(family));
+			i++;
+		}
+		RowTypeInfo rowInfo = new RowTypeInfo(typeInfos, famNames);
+		return rowInfo;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58d4513a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
index 35b0a7c..467652a 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -18,47 +18,21 @@
 package org.apache.flink.addons.hbase;
 
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * {@link InputFormat} subclass that wraps the access for HTables.
  */
-public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<T, TableInputSplit> {
+public abstract class TableInputFormat<T extends Tuple> extends AbstractTableInputFormat<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class);
-
-	/** helper variable to decide whether the input is exhausted or not */
-	private boolean endReached = false;
-
-	protected transient HTable table = null;
-	protected transient Scan scan = null;
-
-	/** HBase iterator wrapper */
-	private ResultScanner resultScanner = null;
-
-	private byte[] lastRow;
-	private int scannedRows;
-
 	/**
 	 * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
 	 * @return The appropriate instance of Scan for this usecase.
@@ -78,7 +52,7 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
 	 * @param r The Result instance from HBase that needs to be converted
 	 * @return The approriate instance of {@link Tuple} that contains the needed information.
 	 */
-	protected abstract T mapResultToTuple(Result r);
+	protected abstract T mapResultToType(Result r);
 
 	/**
 	 * Creates a {@link Scan} object and opens the {@link HTable} connection.
@@ -112,178 +86,4 @@ public abstract class TableInputFormat<T extends Tuple> extends RichInputFormat<
 		}
 		return null;
 	}
-
-	@Override
-	public void open(TableInputSplit split) throws IOException {
-		if (table == null) {
-			throw new IOException("The HBase table has not been opened!");
-		}
-		if (scan == null) {
-			throw new IOException("getScanner returned null");
-		}
-		if (split == null) {
-			throw new IOException("Input split is null!");
-		}
-
-		logSplitInfo("opening", split);
-		scan.setStartRow(split.getStartRow());
-		lastRow = split.getEndRow();
-		scan.setStopRow(lastRow);
-
-		resultScanner = table.getScanner(scan);
-		endReached = false;
-		scannedRows = 0;
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return endReached;
-	}
-
-	@Override
-	public T nextRecord(T reuse) throws IOException {
-		if (resultScanner == null) {
-			throw new IOException("No table result scanner provided!");
-		}
-		try {
-			Result res = resultScanner.next();
-			if (res != null) {
-				scannedRows++;
-				lastRow = res.getRow();
-				return mapResultToTuple(res);
-			}
-		} catch (Exception e) {
-			resultScanner.close();
-			//workaround for timeout on scan
-			LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
-			scan.setStartRow(lastRow);
-			resultScanner = table.getScanner(scan);
-			Result res = resultScanner.next();
-			if (res != null) {
-				scannedRows++;
-				lastRow = res.getRow();
-				return mapResultToTuple(res);
-			}
-		}
-
-		endReached = true;
-		return null;
-	}
-
-	@Override
-	public void close() throws IOException {
-		LOG.info("Closing split (scanned {} rows)", scannedRows);
-		lastRow = null;
-		try {
-			if (resultScanner != null) {
-				resultScanner.close();
-			}
-		} finally {
-			resultScanner = null;
-		}
-	}
-
-	@Override
-	public void closeInputFormat() throws IOException {
-		try {
-			if (table != null) {
-				table.close();
-			}
-		} finally {
-			table = null;
-		}
-	}
-
-	@Override
-	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
-		if (table == null) {
-			throw new IOException("The HBase table has not been opened!");
-		}
-		if (scan == null) {
-			throw new IOException("getScanner returned null");
-		}
-
-		//Gets the starting and ending row keys for every region in the currently open table
-		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
-		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
-			throw new IOException("Expecting at least one region.");
-		}
-		final byte[] startRow = scan.getStartRow();
-		final byte[] stopRow = scan.getStopRow();
-		final boolean scanWithNoLowerBound = startRow.length == 0;
-		final boolean scanWithNoUpperBound = stopRow.length == 0;
-
-		final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(minNumSplits);
-		for (int i = 0; i < keys.getFirst().length; i++) {
-			final byte[] startKey = keys.getFirst()[i];
-			final byte[] endKey = keys.getSecond()[i];
-			final String regionLocation = table.getRegionLocation(startKey, false).getHostnamePort();
-			//Test if the given region is to be included in the InputSplit while splitting the regions of a table
-			if (!includeRegionInSplit(startKey, endKey)) {
-				continue;
-			}
-			//Finds the region on which the given row is being served
-			final String[] hosts = new String[]{regionLocation};
-
-			// determine if regions contains keys used by the scan
-			boolean isLastRegion = endKey.length == 0;
-			if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
-				(scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
-
-				final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
-				final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
-					&& !isLastRegion ? endKey : stopRow;
-				int id = splits.size();
-				final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop);
-				splits.add(split);
-			}
-		}
-		LOG.info("Created " + splits.size() + " splits");
-		for (TableInputSplit split : splits) {
-			logSplitInfo("created", split);
-		}
-		return splits.toArray(new TableInputSplit[0]);
-	}
-
-	private void logSplitInfo(String action, TableInputSplit split) {
-		int splitId = split.getSplitNumber();
-		String splitStart = Bytes.toString(split.getStartRow());
-		String splitEnd = Bytes.toString(split.getEndRow());
-		String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
-		String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
-		String[] hostnames = split.getHostnames();
-		LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey);
-	}
-
-	/**
-	 * Test if the given region is to be included in the InputSplit while splitting the regions of a table.
-	 * <p>
-	 * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
-	 * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
-	 * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R
-	 * processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due
-	 * to the ordering of the keys. <br>
-	 * <br>
-	 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. <br>
-	 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded(
-	 * i.e. all regions are included).
-	 *
-	 * @param startKey Start key of the region
-	 * @param endKey   End key of the region
-	 * @return true, if this region needs to be included as part of the input (default).
-	 */
-	protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
-		return true;
-	}
-
-	@Override
-	public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits) {
-		return new LocatableInputSplitAssigner(inputSplits);
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-		return null;
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/58d4513a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
index 3d9f672..e9ab303 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
@@ -87,13 +87,17 @@ public class HBaseTestingClusterAutostarter implements Serializable {
 
 	private static boolean alreadyRegisteredTestCluster = false;
 
-	protected static void createTable(TableName tableName, byte[] columnFamilyName, byte[][] splitKeys) {
+	private static Configuration conf;
+
+	protected static void createTable(TableName tableName, byte[][] columnFamilyName, byte[][] splitKeys) {
 		LOG.info("HBase minicluster: Creating table " + tableName.getNameAsString());
 
 		assertNotNull("HBaseAdmin is not initialized successfully.", admin);
 		HTableDescriptor desc = new HTableDescriptor(tableName);
-		HColumnDescriptor colDef = new HColumnDescriptor(columnFamilyName);
-		desc.addFamily(colDef);
+		for(byte[] fam : columnFamilyName) {
+			HColumnDescriptor colDef = new HColumnDescriptor(fam);
+			desc.addFamily(colDef);
+		}
 
 		try {
 			admin.createTable(desc, splitKeys);
@@ -125,7 +129,7 @@ public class HBaseTestingClusterAutostarter implements Serializable {
 		}
 	}
 
-	private static void initialize(Configuration conf) {
+	private static Configuration initialize(Configuration conf) {
 		conf = HBaseConfiguration.create(conf);
 		conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
 		try {
@@ -137,6 +141,7 @@ public class HBaseTestingClusterAutostarter implements Serializable {
 		} catch (IOException e) {
 			assertNull("IOException", e);
 		}
+		return conf;
 	}
 
 	@BeforeClass
@@ -154,7 +159,7 @@ public class HBaseTestingClusterAutostarter implements Serializable {
 		// Make sure the zookeeper quorum value contains the right port number (varies per run).
 		TEST_UTIL.getConfiguration().set("hbase.zookeeper.quorum", "localhost:" + TEST_UTIL.getZkCluster().getClientPort());
 
-		initialize(TEST_UTIL.getConfiguration());
+		conf = initialize(TEST_UTIL.getConfiguration());
 		LOG.info("HBase minicluster: Running");
 	}
 
@@ -186,6 +191,9 @@ public class HBaseTestingClusterAutostarter implements Serializable {
 		alreadyRegisteredTestCluster = true;
 	}
 
+	public static Configuration getConf() {
+		return conf;
+	}
 	private static void createHBaseSiteXml(File hbaseSiteXmlDirectory, String zookeeperQuorum) {
 		hbaseSiteXmlFile = new File(hbaseSiteXmlDirectory, "hbase-site.xml");
 		// Create the hbase-site.xml file for this run.

http://git-wip-us.apache.org/repos/asf/flink/blob/58d4513a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
index 3dddd88..57ccafd 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
@@ -57,7 +57,9 @@ public class TableInputFormatITCase extends HBaseTestingClusterAutostarter {
 	public void createTestTable() throws IOException {
 		TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
 		byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), "6".getBytes(), "9".getBytes()};
-		createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
+		byte[][] famNames = new byte[1][];
+		famNames[0] = TEST_TABLE_FAMILY_NAME;
+		createTable(tableName, famNames, splitKeys);
 		HTable table = openTable(tableName);
 
 		for (String rowId : ROW_IDS) {
@@ -83,7 +85,7 @@ public class TableInputFormatITCase extends HBaseTestingClusterAutostarter {
 		}
 
 		@Override
-		protected Tuple1<String> mapResultToTuple(Result r) {
+		protected Tuple1<String> mapResultToType(Result r) {
 			return new Tuple1<>(new String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/58d4513a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
index dccf876..5377791 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -63,7 +63,7 @@ public class HBaseReadExample {
 				private Tuple2<String, String> reuse = new Tuple2<String, String>();
 				
 				@Override
-				protected Tuple2<String, String> mapResultToTuple(Result r) {
+				protected Tuple2<String, String> mapResultToType(Result r) {
 					String key = Bytes.toString(r.getRow());
 					String val = Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME));
 					reuse.setField(key, 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/58d4513a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
new file mode 100644
index 0000000..8882d5c
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseTableSourceITCase.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.addons.hbase.HBaseTableSource;
+import org.apache.flink.addons.hbase.HBaseTestingClusterAutostarter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class HBaseTableSourceITCase extends HBaseTestingClusterAutostarter {
+
+	private static final byte[] ROW_1 = Bytes.toBytes("row1");
+	private static final byte[] ROW_2 = Bytes.toBytes("row2");
+	private static final byte[] ROW_3 = Bytes.toBytes("row3");
+	private static final byte[] F_1 = Bytes.toBytes("f1");
+	private static final byte[] F_2 = Bytes.toBytes("f2");
+	private static final byte[] Q_1 = Bytes.toBytes("q1");
+	private static final byte[] Q_2 = Bytes.toBytes("q2");
+	private static final byte[] Q_3 = Bytes.toBytes("q3");
+
+	@BeforeClass
+	public static void activateHBaseCluster(){
+		registerHBaseMiniClusterInClasspath();
+	}
+
+	@Test
+	public void testHBaseTableSourceWithSingleColumnFamily() throws Exception {
+		// create a table with single region
+		TableName tableName = TableName.valueOf("test");
+		// no split keys
+		byte[][] famNames = new byte[1][];
+		famNames[0] = F_1;
+		createTable(tableName, famNames, null);
+		// get the htable instance
+		HTable table = openTable(tableName);
+		List<Put> puts = new ArrayList<Put>();
+		// add some data
+		Put put = new Put(ROW_1);
+		// add 3 qualifiers per row
+		//1st qual is integer
+		put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+		//2nd qual is String
+		put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+		// 3rd qual is long
+		put.addColumn(F_1, Q_3, Bytes.toBytes(19991L));
+		puts.add(put);
+
+		put = new Put(ROW_2);
+		// add 3 qualifiers per row
+		//1st qual is integer
+		put.addColumn(F_1, Q_1, Bytes.toBytes(101));
+		//2nd qual is String
+		put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue1"));
+		// 3rd qual is long
+		put.addColumn(F_1, Q_3, Bytes.toBytes(19992L));
+		puts.add(put);
+
+		put = new Put(ROW_3);
+		// add 3 qualifiers per row
+		//1st qual is integer
+		put.addColumn(F_1, Q_1, Bytes.toBytes(102));
+		//2nd qual is String
+		put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue2"));
+		// 3rd qual is long
+		put.addColumn(F_1, Q_3, Bytes.toBytes(19993L));
+		puts.add(put);
+		// add the mutations to the table
+		table.put(puts);
+		table.close();
+		// preparetion is done
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
+		// fetch row2 from the table till the end
+		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), tableName.getNameAsString());
+		hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_1), Integer.class);
+		hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_2), String.class);
+		hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_3), Long.class);
+		tableEnv.registerTableSource("test", hbaseTable);
+		Table result = tableEnv
+			.sql("SELECT test.f1.q1, test.f1.q2, test.f1.q3 FROM test");
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+
+		String expected = "100,strvalue,19991\n" +
+			"101,strvalue1,19992\n" +
+			"102,strvalue2,19993\n";
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testHBaseTableSourceWithTwoColumnFamily() throws Exception {
+		// create a table with single region
+		TableName tableName = TableName.valueOf("test1");
+		// no split keys
+		byte[][] famNames = new byte[2][];
+		famNames[0] = F_1;
+		famNames[1] = F_2;
+		createTable(tableName, famNames, null);
+		// get the htable instance
+		HTable table = openTable(tableName);
+		List<Put> puts = new ArrayList<Put>();
+		// add some data
+		Put put = new Put(ROW_1);
+		// add 3 qualifiers per row
+		//1st qual is integer
+		put.addColumn(F_1, Q_1, Bytes.toBytes(100));
+		//2nd qual is String
+		put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue"));
+		// 3rd qual is long
+		put.addColumn(F_1, Q_3, Bytes.toBytes(19991L));
+		puts.add(put);
+
+		put = new Put(ROW_2);
+		// add 3 qualifiers per row
+		//1st qual is integer
+		put.addColumn(F_2, Q_1, Bytes.toBytes(201));
+		//2nd qual is String
+		put.addColumn(F_2, Q_2, Bytes.toBytes("newvalue1"));
+		// 3rd qual is long
+		put.addColumn(F_2, Q_3, Bytes.toBytes(29992L));
+		puts.add(put);
+
+		put = new Put(ROW_3);
+		// add 3 qualifiers per row
+		//1st qual is integer
+		put.addColumn(F_1, Q_1, Bytes.toBytes(102));
+		//2nd qual is String
+		put.addColumn(F_1, Q_2, Bytes.toBytes("strvalue2"));
+		// 3rd qual is long
+		put.addColumn(F_1, Q_3, Bytes.toBytes(19993L));
+		puts.add(put);
+		// add the mutations to the table
+		table.put(puts);
+		table.close();
+		// preparation is done
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
+		// fetch row2 from the table till the end
+		HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), tableName.getNameAsString());
+		hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_1), Integer.class);
+		hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_2), String.class);
+		hbaseTable.addColumn(Bytes.toString(F_1), Bytes.toString(Q_3), Long.class);
+		hbaseTable.addColumn(Bytes.toString(F_2), Bytes.toString(Q_1), Integer.class);
+		hbaseTable.addColumn(Bytes.toString(F_2), Bytes.toString(Q_2), String.class);
+		hbaseTable.addColumn(Bytes.toString(F_2), Bytes.toString(Q_3), Long.class);
+		tableEnv.registerTableSource("test1", hbaseTable);
+		Table result = tableEnv
+			.sql("SELECT test1.f1.q1, test1.f1.q2, test1.f1.q3, test1.f2.q1, test1.f2.q2, test1.f2.q3 FROM test1");
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+
+		String expected = "100,strvalue,19991,null,null,null\n" +
+			"null,null,null,201,newvalue1,29992\n" +
+			"102,strvalue2,19993,null,null,null\n";
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+}


Mime
View raw message