hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shiv...@apache.org
Subject [13/15] incubator-hawq git commit: HAWQ-45. PXF package namespace refactor
Date Tue, 03 Nov 2015 00:36:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/ProfilesConfTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/ProfilesConfTest.java b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/ProfilesConfTest.java
new file mode 100644
index 0000000..8c09ef5
--- /dev/null
+++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/ProfilesConfTest.java
@@ -0,0 +1,174 @@
+package org.apache.hawq.pxf.api.utilities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Map;
+
+import static org.apache.hawq.pxf.api.utilities.ProfileConfException.MessageFormat.NO_PROFILE_DEF;
+import static org.apache.hawq.pxf.api.utilities.ProfileConfException.MessageFormat.PROFILES_FILE_NOT_FOUND;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Base test class for all ProfilesConf tests.
+ * Each test case is encapsulated inside its own inner class to force reloading of ProfilesConf enum singleton
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ProfilesConf.class, Log.class, LogFactory.class, ClassLoader.class})
+public class ProfilesConfTest {
+    static ClassLoader classLoader;
+    static Log log;
+    String mandatoryFileName = "mandatory.xml";
+    String optionalFileName = "optional.xml";
+    File mandatoryFile;
+    File optionalFile;
+
+    @Rule
+    public TemporaryFolder testFolder = new TemporaryFolder();
+
+    @Before
+    public void setUp() throws Exception {
+        mandatoryFile = testFolder.newFile(mandatoryFileName);
+        optionalFile = testFolder.newFile(optionalFileName);
+        PowerMockito.mockStatic(LogFactory.class);
+        log = mock(Log.class);
+        when(LogFactory.getLog(ProfilesConf.class)).thenReturn(log);
+        classLoader = mock(ClassLoader.class);
+        PowerMockito.stub(PowerMockito.method(ProfilesConf.class, "getClassLoader")).toReturn(classLoader);
+    }
+
+    void writeFile(File file, String content) throws IOException {
+        Files.write(file.toPath(), content.getBytes());
+    }
+}
+
+class ProfilesConfTestDefinedProfile extends ProfilesConfTest {
+    @Test
+    public void definedProfile() throws Exception {
+        writeFile(mandatoryFile, "<profiles><profile><name>HBase</name><plugins><plugin1>X</plugin1><plugin2>XX</plugin2></plugins></profile></profiles>");
+        writeFile(optionalFile, "<profiles><profile><name>Hive</name><plugins><plugin1>Y</plugin1></plugins></profile></profiles>");
+        when(classLoader.getResource("pxf-profiles-default.xml")).thenReturn(mandatoryFile.toURI().toURL());
+        when(classLoader.getResource("pxf-profiles.xml")).thenReturn(optionalFile.toURI().toURL());
+
+        Map<String, String> hbaseProfile = ProfilesConf.getProfilePluginsMap("HBase");
+        assertEquals(2, hbaseProfile.keySet().size());
+        assertEquals(hbaseProfile.get("X-GP-PLUGIN1"), "X");
+        assertEquals(hbaseProfile.get("X-GP-PLUGIN2"), "XX");
+
+        Map<String, String> hiveProfile = ProfilesConf.getProfilePluginsMap("hIVe");// case insensitive profile name
+        assertEquals(1, hiveProfile.keySet().size());
+        assertEquals(hiveProfile.get("X-GP-PLUGIN1"), "Y");
+
+        Mockito.verify(log).info("PXF profiles loaded: [HBase, Hive]");
+    }
+}
+
+class ProfilesConfTestUndefinedProfile extends ProfilesConfTest {
+    @Test
+    public void undefinedProfile() throws Exception {
+        writeFile(mandatoryFile, "<profiles><profile><name>HBase</name><plugins><plugin1>X</plugin1></plugins></profile></profiles>");
+        writeFile(optionalFile, "<profiles><profile><name>Hive</name><plugins><plugin1>Y</plugin1></plugins></profile></profiles>");
+        when(classLoader.getResource("pxf-profiles-default.xml")).thenReturn(mandatoryFile.toURI().toURL());
+        when(classLoader.getResource("pxf-profiles.xml")).thenReturn(optionalFile.toURI().toURL());
+        try {
+            ProfilesConf.getProfilePluginsMap("UndefinedProfile");
+            fail("undefined profile should have thrown exception");
+        } catch (ProfileConfException pce) {
+            assertEquals(pce.getMessage(), String.format(NO_PROFILE_DEF.getFormat(), "UndefinedProfile", "pxf-profiles.xml"));
+        }
+    }
+}
+
+class ProfilesConfTestDuplicateProfileDefinition extends ProfilesConfTest {
+    @Test
+    public void duplicateProfileDefinition() throws Exception {
+        writeFile(mandatoryFile, "<profiles><profile><name>HBase</name><plugins><plugin1>Y</plugin1><plugin1>YY</plugin1></plugins></profile><profile><name>HBase</name><plugins><plugin1>Y</plugin1></plugins></profile></profiles>");
+        writeFile(optionalFile, "<profiles><profile><name>Hive</name><plugins><plugin1>Y</plugin1></plugins></profile></profiles>");
+        when(classLoader.getResource("pxf-profiles-default.xml")).thenReturn(mandatoryFile.toURI().toURL());
+        when(classLoader.getResource("pxf-profiles.xml")).thenReturn(optionalFile.toURI().toURL());
+        ProfilesConf.getProfilePluginsMap("HBase");
+        Mockito.verify(log).warn("Duplicate profile definition found in " + mandatoryFileName + " for: HBase");
+    }
+}
+
+class ProfilesConfTestOverrideProfile extends ProfilesConfTest {
+    @Test
+    public void overrideProfile() throws Exception {
+        writeFile(mandatoryFile, "<profiles><profile><name>HBase</name><plugins><plugin1>X</plugin1></plugins></profile></profiles>");
+        writeFile(optionalFile, "<profiles><profile><name>HBase</name><plugins><plugin1>Y</plugin1><plugin2>YY</plugin2></plugins></profile></profiles>");
+        when(classLoader.getResource("pxf-profiles-default.xml")).thenReturn(mandatoryFile.toURI().toURL());
+        when(classLoader.getResource("pxf-profiles.xml")).thenReturn(optionalFile.toURI().toURL());
+        Map profile = ProfilesConf.getProfilePluginsMap("HBase");
+        assertEquals(2, profile.keySet().size());
+        assertEquals(profile.get("X-GP-PLUGIN1"), "Y");
+        assertEquals(profile.get("X-GP-PLUGIN2"), "YY");
+    }
+}
+
+class ProfilesConfTestEmptyProfileFile extends ProfilesConfTest {
+    @Test
+    public void emptyProfileFile() throws Exception {
+        writeFile(mandatoryFile, "<profiles/>");
+        writeFile(optionalFile, "<profiles><profile><name>HBase</name><plugins><plugin1>Y</plugin1></plugins></profile></profiles>");
+        when(classLoader.getResource("pxf-profiles-default.xml")).thenReturn(mandatoryFile.toURI().toURL());
+        when(classLoader.getResource("pxf-profiles.xml")).thenReturn(optionalFile.toURI().toURL());
+        ProfilesConf.getProfilePluginsMap("HBase");
+        Mockito.verify(log).warn("Profile file: " + mandatoryFileName + " is empty");
+    }
+}
+
+class ProfilesConfTestMalformedProfileFile extends ProfilesConfTest {
+    @Test
+    public void malformedProfileFile() throws Exception {
+        writeFile(mandatoryFile, "I'm a malford x.m.l@#$#<%");
+        writeFile(optionalFile, "<profiles><profile><name>HBase</name><plugins><plugin1>Y</plugin1></plugins></profile></profiles>");
+        when(classLoader.getResource("pxf-profiles-default.xml")).thenReturn(mandatoryFile.toURI().toURL());
+        when(classLoader.getResource("pxf-profiles.xml")).thenReturn(optionalFile.toURI().toURL());
+        try {
+            ProfilesConf.getProfilePluginsMap("HBase");
+            fail("malformed profile file should have thrown exception");
+        } catch (ExceptionInInitializerError pce) {
+            assertTrue(pce.getCause().getMessage().contains(mandatoryFileName + " could not be loaded: org.xml.sax.SAXParseException"));
+        }
+    }
+}
+
+class ProfilesConfTestMissingMandatoryProfileFile extends ProfilesConfTest {
+    @Test
+    public void missingMandatoryProfileFile() throws Exception {
+        when(classLoader.getResource("pxf-profiles-default.xml")).thenReturn(null);
+        try {
+            ProfilesConf.getProfilePluginsMap("HBase");
+            fail("missing mandatory profile file should have thrown exception");
+        } catch (ExceptionInInitializerError pce) {
+            Mockito.verify(log).warn("pxf-profiles-default.xml not found in the classpath");
+            assertEquals(pce.getCause().getMessage(), String.format(PROFILES_FILE_NOT_FOUND.getFormat(), "pxf-profiles-default.xml"));
+        }
+    }
+}
+
+class ProfilesConfTestMissingOptionalProfileFile extends ProfilesConfTest {
+    @Test
+    public void missingOptionalProfileFile() throws Exception {
+        writeFile(mandatoryFile, "<profiles><profile><name>HBase</name><plugins><plugin1>Y</plugin1></plugins></profile></profiles>");
+        when(classLoader.getResource("pxf-profiles-default.xml")).thenReturn(mandatoryFile.toURI().toURL());
+        when(classLoader.getResource("pxf-profiles.xml")).thenReturn(null);
+        Map<String, String> hbaseProfile = ProfilesConf.getProfilePluginsMap("HBase");
+        assertEquals("Y", hbaseProfile.get("X-GP-PLUGIN1"));
+        Mockito.verify(log).warn("pxf-profiles.xml not found in the classpath");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseAccessor.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseAccessor.java
deleted file mode 100644
index d7d49df..0000000
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseAccessor.java
+++ /dev/null
@@ -1,250 +0,0 @@
-package com.pivotal.pxf.plugins.hbase;
-
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.ReadAccessor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.api.utilities.Plugin;
-import com.pivotal.pxf.plugins.hbase.utilities.HBaseColumnDescriptor;
-import com.pivotal.pxf.plugins.hbase.utilities.HBaseTupleDescription;
-import com.pivotal.pxf.plugins.hbase.utilities.HBaseUtilities;
-
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-
-/**
- * Accessor for HBase.
- * This class is responsible for opening the HBase table requested and
- * for iterating over its relevant fragments (regions) to return the relevant table's rows.
- * <p>
- * The table is divided into several splits. Each accessor instance is assigned a single split.
- * For each region, a Scan object is used to describe the requested rows.
- * <p>
- * The class supports filters using the {@link HBaseFilterBuilder}.
- * Regions can be filtered out according to input from {@link HBaseFilterBuilder}.
- */
-public class HBaseAccessor extends Plugin implements ReadAccessor {
-    private HBaseTupleDescription tupleDescription;
-    private Connection connection;
-    private Table table;
-    private SplitBoundary split;
-    private Scan scanDetails;
-    private ResultScanner currentScanner;
-    private byte[] scanStartKey;
-    private byte[] scanEndKey;
-
-    /**
-     * The class represents a single split of a table
-     * i.e. a start key and an end key
-     */
-    private class SplitBoundary {
-        protected byte[] startKey;
-        protected byte[] endKey;
-
-        SplitBoundary(byte[] first, byte[] second) {
-            startKey = first;
-            endKey = second;
-        }
-
-        byte[] startKey() {
-            return startKey;
-        }
-
-        byte[] endKey() {
-            return endKey;
-        }
-    }
-
-    /**
-     * Constructs {@link HBaseTupleDescription} based on HAWQ table description and
-     * initializes the scan start and end keys of the HBase table to default values.
-     *
-     * @param input query information, contains HBase table name and filter
-     */
-    public HBaseAccessor(InputData input) {
-        super(input);
-
-        tupleDescription = new HBaseTupleDescription(input);
-        split = null;
-        scanStartKey = HConstants.EMPTY_START_ROW;
-        scanEndKey = HConstants.EMPTY_END_ROW;
-    }
-
-    /**
-     * Opens the HBase table.
-     *
-     * @return true if the current fragment (split) is
-     * available for reading and includes in the filter
-     */
-    @Override
-    public boolean openForRead() throws Exception {
-        openTable();
-        createScanner();
-        addTableSplit();
-
-        return openCurrentRegion();
-    }
-
-    /**
-     * Closes the HBase table.
-     */
-    @Override
-    public void closeForRead() throws Exception {
-        table.close();
-        HBaseUtilities.closeConnection(null, connection);
-    }
-
-    /**
-     * Returns the next row in the HBase table, null if end of fragment.
-     */
-    @Override
-    public OneRow readNextObject() throws IOException {
-        Result result;
-
-        // while currentScanner can't return a new result
-        while ((result = currentScanner.next()) == null) {
-            currentScanner.close(); // close it
-            return null; // no more rows on the split
-        }
-
-        return new OneRow(null, result);
-    }
-
-    /**
-     * Load hbase table object using ConnectionFactory
-     */
-    private void openTable() throws IOException {
-        connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
-        table = connection.getTable(TableName.valueOf(inputData.getDataSource()));
-    }
-
-    /**
-     * Creates a {@link SplitBoundary} of the table split
-     * this accessor instance is assigned to scan.
-     * The table split is constructed from the fragment metadata
-     * passed in {@link InputData#getFragmentMetadata()}.
-     * <p>
-     * The function verifies the split is within user supplied range.
-     * <p>
-     * It is assumed, |startKeys| == |endKeys|
-     * This assumption is made through HBase's code as well.
-     */
-    private void addTableSplit() {
-
-        byte[] serializedMetadata = inputData.getFragmentMetadata();
-        if (serializedMetadata == null) {
-            throw new IllegalArgumentException("Missing fragment metadata information");
-        }
-        try {
-            ByteArrayInputStream bytesStream = new ByteArrayInputStream(serializedMetadata);
-            ObjectInputStream objectStream = new ObjectInputStream(bytesStream);
-
-            byte[] startKey = (byte[]) objectStream.readObject();
-            byte[] endKey = (byte[]) objectStream.readObject();
-
-            if (withinScanRange(startKey, endKey)) {
-            	split = new SplitBoundary(startKey, endKey);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException("Exception while reading expected fragment metadata", e);
-        }
-    }
-
-    /**
-     * Returns true if given start/end key pair is within the scan range.
-     */
-    private boolean withinScanRange(byte[] startKey, byte[] endKey) {
-
-    	// startKey <= scanStartKey
-        if (Bytes.compareTo(startKey, scanStartKey) <= 0) {
-        	// endKey == table's end or endKey >= scanStartKey
-            if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
-                    Bytes.compareTo(endKey, scanStartKey) >= 0) {
-                return true;
-            }
-        } else { // startKey > scanStartKey
-        	// scanEndKey == table's end or startKey <= scanEndKey
-            if (Bytes.equals(scanEndKey, HConstants.EMPTY_END_ROW) ||
-                    Bytes.compareTo(startKey, scanEndKey) <= 0) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    /**
-     * Creates the Scan object used to describe the query
-     * requested from HBase.
-     * As the row key column always gets returned, no need to ask for it.
-     */
-    private void createScanner() throws Exception {
-        scanDetails = new Scan();
-        // Return only one version (latest)
-        scanDetails.setMaxVersions(1);
-
-        addColumns();
-        addFilters();
-    }
-
-    /**
-     * Opens the region of the fragment to be scanned.
-     * Updates the Scan object to retrieve only rows from that region.
-     */
-    private boolean openCurrentRegion() throws IOException {
-        if (split == null) {
-            return false;
-        }
-
-        scanDetails.setStartRow(split.startKey());
-        scanDetails.setStopRow(split.endKey());
-
-        currentScanner = table.getScanner(scanDetails);
-        return true;
-    }
-
-    /**
-     * Adds the table tuple description to {@link #scanDetails},
-     * so only these fields will be returned.
-     */
-    private void addColumns() {
-        for (int i = 0; i < tupleDescription.columns(); ++i) {
-            HBaseColumnDescriptor column = tupleDescription.getColumn(i);
-            if (!column.isKeyColumn()) // Row keys return anyway
-            {
-                scanDetails.addColumn(column.columnFamilyBytes(), column.qualifierBytes());
-            }
-        }
-    }
-
-    /**
-     * Uses {@link HBaseFilterBuilder} to translate a filter string into a
-     * HBase {@link Filter} object. The result is added as a filter to the
-     * Scan object.
-     * <p>
-     * Uses row key ranges to limit split count.
-     */
-    private void addFilters() throws Exception {
-        if (!inputData.hasFilter()) {
-            return;
-        }
-
-        HBaseFilterBuilder eval = new HBaseFilterBuilder(tupleDescription);
-        Filter filter = eval.getFilterObject(inputData.getFilterString());
-        scanDetails.setFilter(filter);
-
-        scanStartKey = eval.startKey();
-        scanEndKey = eval.endKey();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseDataFragmenter.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseDataFragmenter.java
deleted file mode 100644
index 9ffdcc5..0000000
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseDataFragmenter.java
+++ /dev/null
@@ -1,133 +0,0 @@
-package com.pivotal.pxf.plugins.hbase;
-
-import com.pivotal.pxf.api.Fragment;
-import com.pivotal.pxf.api.Fragmenter;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hbase.utilities.HBaseLookupTable;
-import com.pivotal.pxf.plugins.hbase.utilities.HBaseUtilities;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.*;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Fragmenter class for HBase data resources.
- *
- * Extends the {@link Fragmenter} abstract class, with the purpose of transforming
- * an input data path (an HBase table name in this case) into a list of regions
- * that belong to this table.
- *
- * This class also puts HBase lookup table information for the given
- * table (if exists) in each fragment's user data field.
- */
-public class HBaseDataFragmenter extends Fragmenter {
-
-    private static final Configuration hbaseConfiguration = HBaseUtilities.initHBaseConfiguration();
-    private Admin hbaseAdmin;
-    private Connection connection;
-
-    public HBaseDataFragmenter(InputData inConf) {
-        super(inConf);
-    }
-
-    /**
-     * Returns list of fragments containing all of the
-     * HBase's table data.
-     * Lookup table information with mapping between
-     * field names in HAWQ table and HBase table will be
-     * returned as user data.
-     *
-     * @return a list of fragments
-     */
-    @Override
-    public List<Fragment> getFragments() throws Exception {
-
-        // check that Zookeeper and HBase master are available
-        HBaseAdmin.checkHBaseAvailable(hbaseConfiguration);
-        connection = ConnectionFactory.createConnection(hbaseConfiguration);
-        hbaseAdmin = connection.getAdmin();
-        if (!HBaseUtilities.isTableAvailable(hbaseAdmin, inputData.getDataSource())) {
-            HBaseUtilities.closeConnection(hbaseAdmin, connection);
-            throw new TableNotFoundException(inputData.getDataSource());
-        }
-
-        byte[] userData = prepareUserData();
-        addTableFragments(userData);
-
-        HBaseUtilities.closeConnection(hbaseAdmin, connection);
-
-        return fragments;
-    }
-
-    /**
-     * Serializes lookup table mapping into byte array.
-     *
-     * @return serialized lookup table mapping
-     * @throws IOException when connection to lookup table fails
-     * or serialization fails
-     */
-    private byte[] prepareUserData() throws Exception {
-        HBaseLookupTable lookupTable = new HBaseLookupTable(hbaseConfiguration);
-        Map<String, byte[]> mappings = lookupTable.getMappings(inputData.getDataSource());
-        lookupTable.close();
-
-        if (mappings != null) {
-            return serializeMap(mappings);
-        }
-
-        return null;
-    }
-
-    /**
-     * Serializes fragment metadata information
-     * (region start and end keys) into byte array.
-     *
-     * @param region region to be serialized
-     * @return serialized metadata information
-     * @throws IOException when serialization fails
-     */
-    private byte[] prepareFragmentMetadata(HRegionInfo region) throws IOException {
-
-        ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
-        ObjectOutputStream objectStream = new ObjectOutputStream(byteArrayStream);
-        objectStream.writeObject(region.getStartKey());
-        objectStream.writeObject(region.getEndKey());
-
-        return byteArrayStream.toByteArray();
-    }
-
-    private void addTableFragments(byte[] userData) throws IOException {
-        RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(inputData.getDataSource()));
-        List <HRegionLocation> locations = regionLocator.getAllRegionLocations();
-
-        for (HRegionLocation location : locations) {
-            addFragment(location, userData);
-        }
-
-        regionLocator.close();
-    }
-
-    private void addFragment(HRegionLocation location,
-            byte[] userData) throws IOException {
-        ServerName serverInfo = location.getServerName();
-        String[] hosts = new String[] {serverInfo.getHostname()};
-        HRegionInfo region = location.getRegionInfo();
-        byte[] fragmentMetadata = prepareFragmentMetadata(region);
-        Fragment fragment = new Fragment(inputData.getDataSource(), hosts, fragmentMetadata, userData);
-        fragments.add(fragment);
-    }
-
-    private byte[] serializeMap(Map<String, byte[]> tableMappings) throws IOException {
-        ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
-        ObjectOutputStream objectStream = new ObjectOutputStream(byteArrayStream);
-        objectStream.writeObject(tableMappings);
-
-        return byteArrayStream.toByteArray();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseFilterBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseFilterBuilder.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseFilterBuilder.java
deleted file mode 100644
index 82b920b..0000000
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseFilterBuilder.java
+++ /dev/null
@@ -1,260 +0,0 @@
-package com.pivotal.pxf.plugins.hbase;
-
-import com.pivotal.pxf.api.FilterParser;
-import com.pivotal.pxf.api.io.DataType;
-import com.pivotal.pxf.plugins.hbase.utilities.HBaseColumnDescriptor;
-import com.pivotal.pxf.plugins.hbase.utilities.HBaseIntegerComparator;
-import com.pivotal.pxf.plugins.hbase.utilities.HBaseTupleDescription;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.filter.*;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static com.pivotal.pxf.api.io.DataType.TEXT;
-
-/**
- * This is the implementation of {@code FilterParser.FilterBuilder} for HBase.
- * <p>
- * The class uses the filter parser code to build a filter object,
- * either simple (single {@link Filter} class) or a compound ({@link FilterList})
- * for {@link HBaseAccessor} to use for its scan.
- * <p>
- * This is done before the scan starts. It is not a scan time operation.
- * <p>
- * HBase row key column is a special case.
- * If the user defined row key column as TEXT and used {@code <,>,<=,>=,=} operators
- * the startkey ({@code >/>=}) and the endkey ({@code </<=}) are stored in addition to
- * the created filter.
- * This is an addition on top of regular filters and does not replace
- * any logic in HBase filter objects.
- */
-public class HBaseFilterBuilder implements FilterParser.FilterBuilder {
-    private Map<FilterParser.Operation, CompareFilter.CompareOp> operatorsMap;
-    private byte[] startKey;
-    private byte[] endKey;
-    private HBaseTupleDescription tupleDescription;
-
-    public HBaseFilterBuilder(HBaseTupleDescription tupleDescription) {
-        initOperatorsMap();
-        startKey = HConstants.EMPTY_START_ROW;
-        endKey = HConstants.EMPTY_END_ROW;
-        this.tupleDescription = tupleDescription;
-    }
-
-    /**
-     * Translates a filterString into a HBase {@link Filter} object.
-     *
-     * @param filterString filter string
-     * @return filter object
-     * @throws Exception if parsing failed
-     */
-    public Filter getFilterObject(String filterString) throws Exception {
-        FilterParser parser = new FilterParser(this);
-        Object result = parser.parse(filterString);
-
-        if (!(result instanceof Filter)) {
-            throw new Exception("String " + filterString + " resolved to no filter");
-        }
-
-        return (Filter) result;
-    }
-
-    /**
-     * Returns the startKey for scanning the HBase table.
-     * If the user specified a {@code > / >=} operation
-     * on a textual row key column, this value will be returned.
-     * Otherwise, the start of table.
-     *
-     * @return start key for scanning HBase table
-     */
-    public byte[] startKey() {
-        return startKey;
-    }
-
-    /**
-     * Returns the endKey for scanning the HBase table.
-     * If the user specified a {@code < / <=} operation
-     * on a textual row key column, this value will be returned.
-     * Otherwise, the end of table.
-     *
-     * @return end key for scanning HBase table
-     */
-    public byte[] endKey() {
-        return endKey;
-    }
-
-    /**
-     * Builds a filter from the input operands and operation.
-     * Two kinds of operations are handled:
-     * <ol>
-     * <li>Simple operation between {@code FilterParser.Constant} and {@code FilterParser.ColumnIndex}.
-     *    Supported operations are {@code <, >, <=, <=, >=, =, !=}. </li>
-     * <li>Compound operations between {@link Filter} objects.
-     *    The only supported operation is {@code AND}. </li>
-     * </ol>
-     * <p>
-     * This function is called by {@link FilterParser},
-     * each time the parser comes across an operator.
-     */
-    @Override
-    public Object build(FilterParser.Operation opId,
-                        Object leftOperand,
-                        Object rightOperand) throws Exception {
-        if (leftOperand instanceof Filter) {
-            if (opId != FilterParser.Operation.HDOP_AND ||
-                    !(rightOperand instanceof Filter)) {
-                throw new Exception("Only AND is allowed between compound expressions");
-            }
-
-            return handleCompoundOperations((Filter) leftOperand, (Filter) rightOperand);
-        }
-
-        if (!(rightOperand instanceof FilterParser.Constant)) {
-            throw new Exception("expressions of column-op-column are not supported");
-        }
-
-        // Assume column is on the left
-        return handleSimpleOperations(opId,
-                (FilterParser.ColumnIndex) leftOperand,
-                (FilterParser.Constant) rightOperand);
-    }
-
-    /**
-     * Initializes the {@link #operatorsMap} with appropriate values.
-     */
-    private void initOperatorsMap() {
-        operatorsMap = new HashMap<FilterParser.Operation, CompareFilter.CompareOp>();
-        operatorsMap.put(FilterParser.Operation.HDOP_LT, CompareFilter.CompareOp.LESS); // "<"
-        operatorsMap.put(FilterParser.Operation.HDOP_GT, CompareFilter.CompareOp.GREATER); // ">"
-        operatorsMap.put(FilterParser.Operation.HDOP_LE, CompareFilter.CompareOp.LESS_OR_EQUAL); // "<="
-        operatorsMap.put(FilterParser.Operation.HDOP_GE, CompareFilter.CompareOp.GREATER_OR_EQUAL); // ">="
-        operatorsMap.put(FilterParser.Operation.HDOP_EQ, CompareFilter.CompareOp.EQUAL); // "="
-        operatorsMap.put(FilterParser.Operation.HDOP_NE, CompareFilter.CompareOp.NOT_EQUAL); // "!="
-    }
-
-    /**
-     * Handles simple column-operator-constant expressions.
-     * Creates a special filter in the case the column is the row key column.
-     */
-    private Filter handleSimpleOperations(FilterParser.Operation opId,
-                                          FilterParser.ColumnIndex column,
-                                          FilterParser.Constant constant) throws Exception {
-        HBaseColumnDescriptor hbaseColumn = tupleDescription.getColumn(column.index());
-        ByteArrayComparable comparator = getComparator(hbaseColumn.columnTypeCode(),
-                constant.constant());
-
-        /**
-         * If row key is of type TEXT, allow filter in start/stop row key API in
-         * HBaseAccessor/Scan object.
-         */
-        if (textualRowKey(hbaseColumn)) {
-            storeStartEndKeys(opId, constant.constant());
-        }
-
-        if (hbaseColumn.isKeyColumn()) {
-            return new RowFilter(operatorsMap.get(opId), comparator);
-        }
-
-        return new SingleColumnValueFilter(hbaseColumn.columnFamilyBytes(),
-                hbaseColumn.qualifierBytes(),
-                operatorsMap.get(opId),
-                comparator);
-    }
-
-    /**
-     * Resolves the column's type to a comparator class to be used.
-     * Currently, supported types are TEXT and INTEGER types.
-     */
-    private ByteArrayComparable getComparator(int type, Object data) throws Exception {
-        ByteArrayComparable result;
-        switch (DataType.get(type)) {
-            case TEXT:
-                result = new BinaryComparator(Bytes.toBytes((String) data));
-                break;
-            case SMALLINT:
-            case INTEGER:
-            case BIGINT:
-                result = new HBaseIntegerComparator((Long) data);
-                break;
-            default:
-                throw new Exception("unsupported column type for filtering " + type);
-        }
-
-        return result;
-    }
-
-    /**
-     * Handles operation between already calculated expressions.
-     * Currently only {@code AND}, in the future {@code OR} can be added.
-     * <p>
-     * Four cases here:
-     * <ol>
-     * <li>Both are simple filters.</li>
-     * <li>Left is a FilterList and right is a filter.</li>
-     * <li>Left is a filter and right is a FilterList.</li>
-     * <li>Both are FilterLists.</li>
-     * </ol>
-     * <p>
-     * Currently, 1, 2 can occur, since no parenthesis are used.
-     */
-    private Filter handleCompoundOperations(Filter left, Filter right) {
-        FilterList result;
-
-        if (left instanceof FilterList) {
-            result = (FilterList) left;
-            result.addFilter(right);
-
-            return result;
-        }
-
-        result = new FilterList(FilterList.Operator.MUST_PASS_ALL, new Filter[] {left, right});
-
-        return result;
-    }
-
-    /**
-     * Returns true if column is of type TEXT and is a row key column.
-     */
-    private boolean textualRowKey(HBaseColumnDescriptor column) {
-        return column.isKeyColumn() &&
-                column.columnTypeCode() == TEXT.getOID();
-    }
-
-    /**
-     * Sets startKey/endKey and their inclusiveness
-     * according to the operation op.
-     * <p>
-     * TODO allow only one assignment to start/end key.
-     * Currently, multiple calls to this function might change
-     * previous assignments.
-     */
-    private void storeStartEndKeys(FilterParser.Operation op, Object data) {
-        String key = (String) data;
-
-        // Adding a zero byte to endkey, makes it inclusive
-        // Adding a zero byte to startkey, makes it exclusive
-        byte[] zeroByte = new byte[1];
-        zeroByte[0] = 0;
-
-        switch (op) {
-            case HDOP_LT:
-                endKey = Bytes.toBytes(key);
-                break;
-            case HDOP_GT:
-                startKey = Bytes.add(Bytes.toBytes(key), zeroByte);
-                break;
-            case HDOP_LE:
-                endKey = Bytes.add(Bytes.toBytes(key), zeroByte);
-                break;
-            case HDOP_GE:
-                startKey = Bytes.toBytes(key);
-                break;
-            case HDOP_EQ:
-                startKey = Bytes.toBytes(key);
-                endKey = Bytes.add(Bytes.toBytes(key), zeroByte);
-                break;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseResolver.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseResolver.java
deleted file mode 100644
index 0b4e364..0000000
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseResolver.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package com.pivotal.pxf.plugins.hbase;
-
-import com.pivotal.pxf.api.*;
-import com.pivotal.pxf.api.io.DataType;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.api.utilities.Plugin;
-import com.pivotal.pxf.plugins.hbase.utilities.HBaseColumnDescriptor;
-import com.pivotal.pxf.plugins.hbase.utilities.HBaseTupleDescription;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.sql.Timestamp;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Record resolver for HBase.
- *
- * The class is responsible to convert rows from HBase scans (returned as {@link Result} objects)
- * into a List of {@link OneField} objects.
- * That also includes the conversion process of each HBase column's value into its HAWQ assigned type.
- *
- * Currently, the class assumes all HBase values are stored as String object Bytes encoded.
- */
-public class HBaseResolver extends Plugin implements ReadResolver {
-    private HBaseTupleDescription tupleDescription;
-
-    /**
-     * Constructs a resolver and initializes the table's tuple description.
-     *
-     * @param input query information, contains HBase table name and filter
-     */
-    public HBaseResolver(InputData input) {
-        super(input);
-        tupleDescription = new HBaseTupleDescription(input);
-    }
-
-    /**
-     * Splits an HBase {@link Result} object into a list of {@link OneField},
-     * based on the table's tuple description.
-     * Each field is converted from HBase bytes into its column description type.
-     *
-     * @return list of fields
-     */
-    @Override
-    public List<OneField> getFields(OneRow onerow) throws Exception {
-        Result result = (Result) onerow.getData();
-        LinkedList<OneField> fields = new LinkedList<OneField>();
-
-        for (int i = 0; i < tupleDescription.columns(); ++i) {
-            HBaseColumnDescriptor column = tupleDescription.getColumn(i);
-            byte[] value;
-
-            if (column.isKeyColumn()) // if a row column is requested
-            {
-                value = result.getRow(); // just return the row key
-            } else // else, return column value
-            {
-                value = getColumnValue(result, column);
-            }
-
-            OneField oneField = new OneField();
-            oneField.type = column.columnTypeCode();
-            oneField.val = convertToJavaObject(oneField.type, column.columnTypeName(), value);
-            fields.add(oneField);
-        }
-        return fields;
-    }
-
-    /**
-     * Converts given byte array value to the matching java object, according to
-     * the given type code.
-     *
-     * @param typeCode ColumnDescriptor type id
-     * @param typeName type name. Used for error messages
-     * @param val value to be converted
-     * @return value converted to matching object type
-     * @throws Exception when conversion fails or type code is not supported
-     */
-    Object convertToJavaObject(int typeCode, String typeName, byte[] val) throws Exception {
-        if (val == null) {
-            return null;
-        }
-        try {
-            switch (DataType.get(typeCode)) {
-                case TEXT:
-                case VARCHAR:
-                case BPCHAR:
-                    return Bytes.toString(val);
-
-                case INTEGER:
-                    return Integer.parseInt(Bytes.toString(val));
-
-                case BIGINT:
-                    return Long.parseLong(Bytes.toString(val));
-
-                case SMALLINT:
-                    return Short.parseShort(Bytes.toString(val));
-
-                case REAL:
-                    return Float.parseFloat(Bytes.toString(val));
-
-                case FLOAT8:
-                    return Double.parseDouble(Bytes.toString(val));
-
-                case BYTEA:
-                    return val;
-
-                case BOOLEAN:
-                    return Boolean.valueOf(Bytes.toString(val));
-
-                case NUMERIC:
-                    return Bytes.toString(val);
-
-                case TIMESTAMP:
-                    return Timestamp.valueOf(Bytes.toString(val));
-
-                default:
-                    throw new UnsupportedTypeException("Unsupported data type " + typeName);
-            }
-        } catch (NumberFormatException e) {
-            throw new BadRecordException("Error converting value '" + Bytes.toString(val) + "' " +
-                    "to type " + typeName + ". " +
-                    "(original error: " + e.getMessage() + ")");
-        }
-    }
-
-    /**
-     * Returns the value of a column from a Result object.
-     *
-     * @param result HBase table row
-     * @param column HBase column to be retrieved
-     * @return HBase column value
-     */
-    byte[] getColumnValue(Result result, HBaseColumnDescriptor column) {
-        // if column does not contain a value, return null
-        if (!result.containsColumn(column.columnFamilyBytes(),
-                column.qualifierBytes())) {
-            return null;
-        }
-
-        // else, get the latest version of the requested column
-        Cell cell = result.getColumnLatestCell(column.columnFamilyBytes(), column.qualifierBytes());
-        return CellUtil.cloneValue(cell);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java
deleted file mode 100644
index cf5a897..0000000
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package com.pivotal.pxf.plugins.hbase.utilities;
-
-import com.pivotal.pxf.api.utilities.ColumnDescriptor;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.util.Arrays;
-
-/**
- * {@link ColumnDescriptor} for HBase columns.
- */
-public class HBaseColumnDescriptor extends ColumnDescriptor {
-    byte[] columnFamily;
-    byte[] qualifier;
-
-    /**
-     * Constructs a column descriptor using the given copy's column name.
-     *
-     * @param copy column descriptor to be copied
-     */
-    public HBaseColumnDescriptor(ColumnDescriptor copy) {
-        this(copy, copy.columnName().getBytes());
-    }
-
-    /**
-     * Constructs an HBase column descriptor from a generic column descriptor and an HBase column name.
-     * <p>
-     * The column name must be in either of the following forms:
-     * <ol>
-     * <li>columnfamily:qualifier - standard HBase column.</li>
-     * <li>recordkey - Row key column (case insensitive).</li>
-     * </ol>
-     * <p>
-     * For recordkey, no HBase name is created.
-     *
-     * @param copy column descriptor
-     * @param newColumnName HBase column name - can be different than the given column descriptor name.
-     */
-    public HBaseColumnDescriptor(ColumnDescriptor copy, byte[] newColumnName) {
-        super(copy);
-
-        if (isKeyColumn()) {
-            return;
-        }
-
-        int seperatorIndex = getSeparatorIndex(newColumnName);
-
-        columnFamily = Arrays.copyOfRange(newColumnName, 0, seperatorIndex);
-        qualifier = Arrays.copyOfRange(newColumnName, seperatorIndex + 1, newColumnName.length);
-    }
-
-    /**
-     * Returns the family column name.
-     * (E.g. "cf1:q2" will return "cf1")
-     *
-     * @return family column name
-     */
-    public byte[] columnFamilyBytes() {
-        return columnFamily;
-    }
-
-    /**
-     * Returns the qualifier column name.
-     * (E.g. "cf1:q2" will return "q2")
-     *
-     * @return qualifier column name
-     */
-    public byte[] qualifierBytes() {
-        return qualifier;
-    }
-
-    private int getSeparatorIndex(byte[] columnName) {
-        for (int i = 0; i < columnName.length; ++i) {
-            if (columnName[i] == ':') {
-                return i;
-            }
-        }
-
-        throw new IllegalArgumentException("Illegal HBase column name " +
-                Bytes.toString(columnName) +
-                ", missing :");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java
deleted file mode 100644
index a935534..0000000
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package com.pivotal.pxf.plugins.hbase.utilities;
-
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.filter.ByteArrayComparable;
-import org.apache.hadoop.hbase.filter.SubstringComparator;
-import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-/**
- * This is a Filter comparator for HBase It is external to PXF HBase code.
- * <p>
- * To use with HBase it must reside in the classpath of every region server.
- * <p>
- * It converts a value into {@link Long} before comparing.
- * The filter is good for any integer numeric comparison i.e. integer, bigint, smallint.
- * <p>
- * according to HBase 0.96 requirements, this must serialized using Protocol Buffers
- * ({@link #toByteArray()} and {@link #parseFrom(byte[])} methods).
- * <p>
- * A reference can be found in {@link SubstringComparator}.
- */
-public class HBaseIntegerComparator extends ByteArrayComparable {
-	private Long val;
-
-
-	public HBaseIntegerComparator(Long inVal) {
-		super(Bytes.toBytes(inVal));
-		this.val = inVal;
-	}
-
-	/**
-	 * The comparison function. Currently uses {@link Long#parseLong(String)}.
-	 *
-	 * @return 0 if equal;
-	 *         a value less than 0 if row value is less than filter value;
-	 *         and a value greater than 0 if the row value is greater than the filter value.
-	 */
-	@Override
-	public int compareTo(byte[] value, int offset, int length) {
-		/**
-		 * Fix for HD-2610: query fails when recordkey is integer.
-		 */
-		if (length == 0)
-			return 1; // empty line, can't compare.
-
-		/**
-		 * TODO optimize by parsing the bytes directly.
-		 * Maybe we can even determine if it is an int or a string encoded.
-		 */
-		String valueAsString = new String(value, offset, length);
-		Long valueAsLong = Long.parseLong(valueAsString);
-		return val.compareTo(valueAsLong);
-	}
-
-	/**
-	 * Returns the comparator serialized using Protocol Buffers.
-	 *
-	 * @return serialized comparator
-	 */
-	@Override
-	public byte[] toByteArray() {
-		ComparatorProtos.ByteArrayComparable.Builder builder = ComparatorProtos.ByteArrayComparable.newBuilder();
-		builder.setValue(ByteString.copyFrom(getValue()));
-		return builder.build().toByteArray();
-	}
-
-	/**
-	 * Hides ("overrides") a static method in {@link ByteArrayComparable}.
-	 * This method will be called in deserialization.
-	 *
-	 * @param pbBytes
-	 *            A pb serialized instance
-	 * @return An instance of {@link HBaseIntegerComparator} made from
-	 *         <code>bytes</code>
-	 * @throws DeserializationException if deserialization of bytes to Protocol Buffers failed
-	 * @see #toByteArray
-	 */
-	public static ByteArrayComparable parseFrom(final byte[] pbBytes)
-			throws DeserializationException {
-		ComparatorProtos.ByteArrayComparable proto;
-		try {
-			proto = ComparatorProtos.ByteArrayComparable.parseFrom(pbBytes);
-		} catch (InvalidProtocolBufferException e) {
-			throw new DeserializationException(e);
-		}
-		return new HBaseIntegerComparator(Bytes.toLong(proto.getValue()
-				.toByteArray()));
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseLookupTable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseLookupTable.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseLookupTable.java
deleted file mode 100644
index 459e97c..0000000
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseLookupTable.java
+++ /dev/null
@@ -1,183 +0,0 @@
-package com.pivotal.pxf.plugins.hbase.utilities;
-
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * HBaseLookupTable will load a table's lookup information
- * from HBase pxflookup table if exists.<br>
- * This table holds mappings between HAWQ column names (key) and HBase column names (value).<br>
- * E.g. for an HBase table "hbase_table", mappings between HAWQ column names and HBase column names,
- * when <code>"hawq1"</code> is mapped to <code>"cf1:hbase1"</code> and
- * <code>"hawq2"</code> is mapped to <code>"cf1:hbase2"</code>, will be:<br>
- * <pre>
- * 	ROW                     COLUMN+CELL
- *  hbase_table             column=mapping:hawq1, value=cf1:hbase1
- *  hbase_table             column=mapping:hawq2, value=cf1:hbase2
- * </pre>
- *
- * Data is returned as a map of string and byte array from {@link #getMappings(String)}.
- * <p>
- * Once created, {@link #close()} MUST be called to cleanup resources.
- */
-public class HBaseLookupTable implements Closeable {
-    private static final String LOOKUPTABLENAME = "pxflookup";
-    private static final byte[] LOOKUPCOLUMNFAMILY = Bytes.toBytes("mapping");
-
-    private static final Log LOG = LogFactory.getLog(HBaseLookupTable.class);
-
-    private Connection connection;
-    private Configuration hbaseConfiguration;
-    private Admin admin;
-    private Map<byte[], byte[]> rawTableMapping;
-    private Table lookupTable;
-
-    /**
-     * Constructs a connector to HBase lookup table.
-     * Requires calling {@link #close()} to close {@link HBaseAdmin} instance.
-     *
-     * @param conf HBase configuration
-     * @throws IOException when initializing HBaseAdmin fails
-     */
-    public HBaseLookupTable(Configuration conf) throws Exception {
-        hbaseConfiguration = conf;
-        connection = ConnectionFactory.createConnection(hbaseConfiguration);
-        admin = connection.getAdmin();
-        ClusterStatus cs = admin.getClusterStatus();
-        LOG.debug("HBase cluster has " + cs.getServersSize() + " region servers " +
-                "(" + cs.getDeadServers() + " dead)");
-    }
-
-    /**
-     * Returns mappings for given table name between its HAWQ column names and
-     * HBase column names.
-     * If lookup table doesn't exist or no mappings for the table exist, returns null.
-     * <p>
-     * All HAWQ column names are returns in low case.
-     *
-     * @param tableName HBase table name
-     * @return mappings between HAWQ column names and HBase column names
-     * @throws IOException when HBase operations fail
-     */
-    public Map<String, byte[]> getMappings(String tableName) throws IOException {
-        if (!lookupTableValid()) {
-            return null;
-        }
-
-        loadTableMappings(tableName);
-
-        if (tableHasNoMappings()) {
-            return null;
-        }
-
-        return lowerCaseMappings();
-    }
-
-    /**
-     * Closes HBase resources. Must be called after initializing this class.
-     */
-    @Override
-    public void close() throws IOException {
-        admin.close();
-    }
-
-    /**
-     * Returns true if {@link #LOOKUPTABLENAME} is available and enabled.
-     *
-     * @return whether lookup table is valid
-     */
-    private boolean lookupTableValid() throws IOException {
-        return (HBaseUtilities.isTableAvailable(admin, LOOKUPTABLENAME) &&
-                lookupHasCorrectStructure());
-    }
-
-    /**
-     * Returns true if {@link #LOOKUPTABLENAME} has {@value #LOOKUPCOLUMNFAMILY} family.
-     *
-     * @return whether lookup has expected column family name
-     */
-    private boolean lookupHasCorrectStructure() throws IOException {
-        HTableDescriptor htd = admin.getTableDescriptor(TableName.valueOf(LOOKUPTABLENAME));
-        return htd.hasFamily(LOOKUPCOLUMNFAMILY);
-    }
-
-    /**
-     * Loads table name mappings from {@link #LOOKUPTABLENAME} lookup table.
-     *
-     * @param tableName table name
-     */
-    private void loadTableMappings(String tableName) throws IOException {
-        openLookupTable();
-        loadMappingMap(tableName);
-        closeLookupTable();
-    }
-
-    /**
-     * Returns true if lookup table has no relevant mappings.
-     * Should be called after {@link #loadMappingMap(String)}.
-     */
-    private boolean tableHasNoMappings() {
-        return MapUtils.isEmpty(rawTableMapping);
-    }
-
-    /**
-     * Returns a map of mappings between HAWQ and HBase column names,
-     * with the HAWQ column values in lower case.
-     */
-    private Map<String, byte[]> lowerCaseMappings() {
-        Map<String, byte[]> lowCaseKeys = new HashMap<String, byte[]>();
-        for (Map.Entry<byte[], byte[]> entry : rawTableMapping.entrySet()) {
-            lowCaseKeys.put(lowerCase(entry.getKey()),
-                    entry.getValue());
-        }
-
-        return lowCaseKeys;
-    }
-
-    /**
-     * Load hbase table object using ConnectionFactory
-     */
-    private void openLookupTable() throws IOException {
-        lookupTable = connection.getTable(TableName.valueOf(LOOKUPTABLENAME));
-    }
-
-    /**
-     * Loads mappings for given table name from the lookup table {@link #LOOKUPTABLENAME}.
-     * The table name should be in the row key, and the family name should be {@link #LOOKUPCOLUMNFAMILY}.
-     *
-     * @param tableName HBase table name
-     * @throws IOException when HBase operations fail
-     */
-    private void loadMappingMap(String tableName) throws IOException {
-        Get lookupRow = new Get(Bytes.toBytes(tableName));
-        lookupRow.setMaxVersions(1);
-        lookupRow.addFamily(LOOKUPCOLUMNFAMILY);
-        Result row;
-
-        row = lookupTable.get(lookupRow);
-        rawTableMapping = row.getFamilyMap(LOOKUPCOLUMNFAMILY);
-        LOG.debug("lookup table mapping for " + tableName +
-                " has " + (rawTableMapping == null ? 0 : rawTableMapping.size()) + " entries");
-    }
-
-    private void closeLookupTable() throws IOException {
-        lookupTable.close();
-        HBaseUtilities.closeConnection(admin, connection);
-    }
-
-    private String lowerCase(byte[] key) {
-        return Bytes.toString(key).toLowerCase();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseTupleDescription.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseTupleDescription.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseTupleDescription.java
deleted file mode 100644
index 74f3ec3..0000000
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseTupleDescription.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package com.pivotal.pxf.plugins.hbase.utilities;
-
-import com.pivotal.pxf.api.utilities.ColumnDescriptor;
-import com.pivotal.pxf.api.utilities.InputData;
-
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The class extends the tuple description provided by {@link InputData}
- * for usage of {@link HBaseColumnDescriptor}.
- * <p>
- * This class also loads lookup table sent (optionally) by the
- * fragmenter.
- */
-public class HBaseTupleDescription {
-    private Map<String, byte[]> tableMapping;
-    private List<HBaseColumnDescriptor> tupleDescription;
-    private InputData conf;
-
-    /**
-     * Constructs tuple description of the HBase table.
-     *
-     * @param conf data containing table tuple description
-     */
-    public HBaseTupleDescription(InputData conf) {
-        this.conf = conf;
-        parseHBaseTupleDescription();
-    }
-
-    /**
-     * Returns the number of fields.
-     *
-     * @return number of fields
-     */
-    public int columns() {
-        return tupleDescription.size();
-    }
-
-    /**
-     * Returns the column description of index column.
-     *
-     * @param index column index to be returned
-     * @return column description
-     */
-    public HBaseColumnDescriptor getColumn(int index) {
-        return tupleDescription.get(index);
-    }
-
-    private void parseHBaseTupleDescription() {
-        tupleDescription = new ArrayList<HBaseColumnDescriptor>();
-        loadUserData();
-        createTupleDescription();
-    }
-
-    /**
-     * Loads user information from fragmenter.
-     * The data contains optional table mappings from the lookup table,
-     * between field names in HAWQ table and in the HBase table.
-     */
-    @SuppressWarnings("unchecked")
-    private void loadUserData() {
-        try {
-            byte[] serializedTableMappings = conf.getFragmentUserData();
-
-            // No userdata means no mappings for our table in lookup table
-            if (serializedTableMappings == null) {
-                return;
-            }
-
-            ByteArrayInputStream bytesStream = new ByteArrayInputStream(serializedTableMappings);
-            ObjectInputStream objectStream = new ObjectInputStream(bytesStream);
-            tableMapping = (Map<String, byte[]>) objectStream.readObject();
-        } catch (Exception e) {
-            throw new RuntimeException("Exception while reading expected user data from HBase's fragmenter", e);
-        }
-    }
-
-    private void createTupleDescription() {
-        for (int i = 0; i < conf.getColumns(); ++i) {
-            ColumnDescriptor column = conf.getColumn(i);
-            tupleDescription.add(getHBaseColumn(column));
-        }
-    }
-
-    /**
-     * Returns the {@link #HBaseColumnDescriptor} for given column.
-     * If the column has a lookup table mapping, the HBase column name is used.
-     *
-     * @param column HAWQ column description
-     * @return matching HBase column description
-     */
-    private HBaseColumnDescriptor getHBaseColumn(ColumnDescriptor column) {
-        if (!column.isKeyColumn() && hasMapping(column)) {
-            return new HBaseColumnDescriptor(column, getMapping(column));
-        }
-        return new HBaseColumnDescriptor(column);
-    }
-
-    /**
-     * Returns true if there is a mapping for given column name.
-     */
-    private boolean hasMapping(ColumnDescriptor column) {
-        return tableMapping != null &&
-                tableMapping.containsKey(column.columnName().toLowerCase());
-    }
-
-    /**
-     * Returns the HBase name mapping for the given column name.
-     *
-     * @param column HAWQ column description
-     * @return HBase name for the column
-     */
-    private byte[] getMapping(ColumnDescriptor column) {
-        return tableMapping.get(column.columnName().toLowerCase());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseUtilities.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseUtilities.java
deleted file mode 100644
index b338c02..0000000
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseUtilities.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.pivotal.pxf.plugins.hbase.utilities;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-
-public class HBaseUtilities {
-
-    /**
-     * Initializes HBase configuration.
-     * The following parameters are edited:
-     *
-     * hbase.client.retries.number = 1
-     *  - tries to connect to HBase only 2 times before failing.
-     *
-     * @return HBase configuration
-     */
-    public static Configuration initHBaseConfiguration() {
-        Configuration conf = HBaseConfiguration.create();
-        conf.set("hbase.client.retries.number", "3");
-        return conf;
-    }
-
-    /**
-     * Returns if given table exists and is enabled.
-     *
-     * @param hbaseAdmin HBase admin, must be initialized
-     * @param tableName table name
-     * @return true if table exists
-     * @throws IOException if a remote or network exception occurs when connecting to HBase
-     */
-    public static boolean isTableAvailable(Admin hbaseAdmin, String tableName) throws IOException {
-        TableName name = TableName.valueOf(tableName);
-        return hbaseAdmin.isTableAvailable(name) &&
-                hbaseAdmin.isTableEnabled(name);
-    }
-
-    /**
-     * Closes HBase admin and connection if they are open.
-     *
-     * @param hbaseAdmin HBase admin
-     * @param hbaseConnection HBase connection
-     * @throws IOException if an I/O error occurs when connecting to HBase
-     */
-    public static void closeConnection(Admin hbaseAdmin, Connection hbaseConnection) throws IOException {
-        if (hbaseAdmin != null) {
-            hbaseAdmin.close();
-        }
-        if (hbaseConnection != null) {
-            hbaseConnection.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseAccessor.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseAccessor.java
new file mode 100644
index 0000000..9d38cae
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseAccessor.java
@@ -0,0 +1,250 @@
+package org.apache.hawq.pxf.plugins.hbase;
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseColumnDescriptor;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseTupleDescription;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseUtilities;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Accessor for HBase.
+ * This class is responsible for opening the HBase table requested and
+ * for iterating over its relevant fragments (regions) to return the relevant table's rows.
+ * <p>
+ * The table is divided into several splits. Each accessor instance is assigned a single split.
+ * For each region, a Scan object is used to describe the requested rows.
+ * <p>
+ * The class supports filters using the {@link HBaseFilterBuilder}.
+ * Regions can be filtered out according to input from {@link HBaseFilterBuilder}.
+ */
+public class HBaseAccessor extends Plugin implements ReadAccessor {
+    private HBaseTupleDescription tupleDescription;
+    private Connection connection;
+    private Table table;
+    private SplitBoundary split;
+    private Scan scanDetails;
+    private ResultScanner currentScanner;
+    private byte[] scanStartKey;
+    private byte[] scanEndKey;
+
+    /**
+     * The class represents a single split of a table
+     * i.e. a start key and an end key
+     */
+    private class SplitBoundary {
+        protected byte[] startKey;
+        protected byte[] endKey;
+
+        SplitBoundary(byte[] first, byte[] second) {
+            startKey = first;
+            endKey = second;
+        }
+
+        byte[] startKey() {
+            return startKey;
+        }
+
+        byte[] endKey() {
+            return endKey;
+        }
+    }
+
+    /**
+     * Constructs {@link HBaseTupleDescription} based on HAWQ table description and
+     * initializes the scan start and end keys of the HBase table to default values.
+     *
+     * @param input query information, contains HBase table name and filter
+     */
+    public HBaseAccessor(InputData input) {
+        super(input);
+
+        tupleDescription = new HBaseTupleDescription(input);
+        split = null;
+        scanStartKey = HConstants.EMPTY_START_ROW;
+        scanEndKey = HConstants.EMPTY_END_ROW;
+    }
+
+    /**
+     * Opens the HBase table.
+     *
+     * @return true if the current fragment (split) is
+     * available for reading and includes in the filter
+     */
+    @Override
+    public boolean openForRead() throws Exception {
+        openTable();
+        createScanner();
+        addTableSplit();
+
+        return openCurrentRegion();
+    }
+
+    /**
+     * Closes the HBase table.
+     */
+    @Override
+    public void closeForRead() throws Exception {
+        table.close();
+        HBaseUtilities.closeConnection(null, connection);
+    }
+
+    /**
+     * Returns the next row in the HBase table, null if end of fragment.
+     */
+    @Override
+    public OneRow readNextObject() throws IOException {
+        Result result;
+
+        // while currentScanner can't return a new result
+        while ((result = currentScanner.next()) == null) {
+            currentScanner.close(); // close it
+            return null; // no more rows on the split
+        }
+
+        return new OneRow(null, result);
+    }
+
+    /**
+     * Load hbase table object using ConnectionFactory
+     */
+    private void openTable() throws IOException {
+        connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
+        table = connection.getTable(TableName.valueOf(inputData.getDataSource()));
+    }
+
+    /**
+     * Creates a {@link SplitBoundary} of the table split
+     * this accessor instance is assigned to scan.
+     * The table split is constructed from the fragment metadata
+     * passed in {@link InputData#getFragmentMetadata()}.
+     * <p>
+     * The function verifies the split is within user supplied range.
+     * <p>
+     * It is assumed, |startKeys| == |endKeys|
+     * This assumption is made through HBase's code as well.
+     */
+    private void addTableSplit() {
+
+        byte[] serializedMetadata = inputData.getFragmentMetadata();
+        if (serializedMetadata == null) {
+            throw new IllegalArgumentException("Missing fragment metadata information");
+        }
+        try {
+            ByteArrayInputStream bytesStream = new ByteArrayInputStream(serializedMetadata);
+            ObjectInputStream objectStream = new ObjectInputStream(bytesStream);
+
+            byte[] startKey = (byte[]) objectStream.readObject();
+            byte[] endKey = (byte[]) objectStream.readObject();
+
+            if (withinScanRange(startKey, endKey)) {
+            	split = new SplitBoundary(startKey, endKey);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Exception while reading expected fragment metadata", e);
+        }
+    }
+
+    /**
+     * Returns true if given start/end key pair is within the scan range.
+     */
+    private boolean withinScanRange(byte[] startKey, byte[] endKey) {
+
+    	// startKey <= scanStartKey
+        if (Bytes.compareTo(startKey, scanStartKey) <= 0) {
+        	// endKey == table's end or endKey >= scanStartKey
+            if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
+                    Bytes.compareTo(endKey, scanStartKey) >= 0) {
+                return true;
+            }
+        } else { // startKey > scanStartKey
+        	// scanEndKey == table's end or startKey <= scanEndKey
+            if (Bytes.equals(scanEndKey, HConstants.EMPTY_END_ROW) ||
+                    Bytes.compareTo(startKey, scanEndKey) <= 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Creates the Scan object used to describe the query
+     * requested from HBase.
+     * As the row key column always gets returned, no need to ask for it.
+     */
+    private void createScanner() throws Exception {
+        scanDetails = new Scan();
+        // Return only one version (latest)
+        scanDetails.setMaxVersions(1);
+
+        addColumns();
+        addFilters();
+    }
+
+    /**
+     * Opens the region of the fragment to be scanned.
+     * Updates the Scan object to retrieve only rows from that region.
+     */
+    private boolean openCurrentRegion() throws IOException {
+        if (split == null) {
+            return false;
+        }
+
+        scanDetails.setStartRow(split.startKey());
+        scanDetails.setStopRow(split.endKey());
+
+        currentScanner = table.getScanner(scanDetails);
+        return true;
+    }
+
+    /**
+     * Adds the table tuple description to {@link #scanDetails},
+     * so only these fields will be returned.
+     */
+    private void addColumns() {
+        for (int i = 0; i < tupleDescription.columns(); ++i) {
+            HBaseColumnDescriptor column = tupleDescription.getColumn(i);
+            if (!column.isKeyColumn()) // Row keys return anyway
+            {
+                scanDetails.addColumn(column.columnFamilyBytes(), column.qualifierBytes());
+            }
+        }
+    }
+
+    /**
+     * Uses {@link HBaseFilterBuilder} to translate a filter string into a
+     * HBase {@link Filter} object. The result is added as a filter to the
+     * Scan object.
+     * <p>
+     * Uses row key ranges to limit split count.
+     */
+    private void addFilters() throws Exception {
+        if (!inputData.hasFilter()) {
+            return;
+        }
+
+        HBaseFilterBuilder eval = new HBaseFilterBuilder(tupleDescription);
+        Filter filter = eval.getFilterObject(inputData.getFilterString());
+        scanDetails.setFilter(filter);
+
+        scanStartKey = eval.startKey();
+        scanEndKey = eval.endKey();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseDataFragmenter.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseDataFragmenter.java
new file mode 100644
index 0000000..36ae60c
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseDataFragmenter.java
@@ -0,0 +1,133 @@
+package org.apache.hawq.pxf.plugins.hbase;
+
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseLookupTable;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseUtilities;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Fragmenter class for HBase data resources.
+ *
+ * Extends the {@link Fragmenter} abstract class, with the purpose of transforming
+ * an input data path (an HBase table name in this case) into a list of regions
+ * that belong to this table.
+ *
+ * This class also puts HBase lookup table information for the given
+ * table (if exists) in each fragment's user data field.
+ */
+public class HBaseDataFragmenter extends Fragmenter {
+
+    private static final Configuration hbaseConfiguration = HBaseUtilities.initHBaseConfiguration();
+    private Admin hbaseAdmin;
+    private Connection connection;
+
+    public HBaseDataFragmenter(InputData inConf) {
+        super(inConf);
+    }
+
+    /**
+     * Returns list of fragments containing all of the
+     * HBase's table data.
+     * Lookup table information with mapping between
+     * field names in HAWQ table and HBase table will be
+     * returned as user data.
+     *
+     * @return a list of fragments
+     */
+    @Override
+    public List<Fragment> getFragments() throws Exception {
+
+        // check that Zookeeper and HBase master are available
+        HBaseAdmin.checkHBaseAvailable(hbaseConfiguration);
+        connection = ConnectionFactory.createConnection(hbaseConfiguration);
+        hbaseAdmin = connection.getAdmin();
+        if (!HBaseUtilities.isTableAvailable(hbaseAdmin, inputData.getDataSource())) {
+            HBaseUtilities.closeConnection(hbaseAdmin, connection);
+            throw new TableNotFoundException(inputData.getDataSource());
+        }
+
+        byte[] userData = prepareUserData();
+        addTableFragments(userData);
+
+        HBaseUtilities.closeConnection(hbaseAdmin, connection);
+
+        return fragments;
+    }
+
+    /**
+     * Serializes lookup table mapping into byte array.
+     *
+     * @return serialized lookup table mapping
+     * @throws IOException when connection to lookup table fails
+     * or serialization fails
+     */
+    private byte[] prepareUserData() throws Exception {
+        HBaseLookupTable lookupTable = new HBaseLookupTable(hbaseConfiguration);
+        Map<String, byte[]> mappings = lookupTable.getMappings(inputData.getDataSource());
+        lookupTable.close();
+
+        if (mappings != null) {
+            return serializeMap(mappings);
+        }
+
+        return null;
+    }
+
+    /**
+     * Serializes fragment metadata information
+     * (region start and end keys) into byte array.
+     *
+     * @param region region to be serialized
+     * @return serialized metadata information
+     * @throws IOException when serialization fails
+     */
+    private byte[] prepareFragmentMetadata(HRegionInfo region) throws IOException {
+
+        ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
+        ObjectOutputStream objectStream = new ObjectOutputStream(byteArrayStream);
+        objectStream.writeObject(region.getStartKey());
+        objectStream.writeObject(region.getEndKey());
+
+        return byteArrayStream.toByteArray();
+    }
+
+    private void addTableFragments(byte[] userData) throws IOException {
+        RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(inputData.getDataSource()));
+        List <HRegionLocation> locations = regionLocator.getAllRegionLocations();
+
+        for (HRegionLocation location : locations) {
+            addFragment(location, userData);
+        }
+
+        regionLocator.close();
+    }
+
+    private void addFragment(HRegionLocation location,
+            byte[] userData) throws IOException {
+        ServerName serverInfo = location.getServerName();
+        String[] hosts = new String[] {serverInfo.getHostname()};
+        HRegionInfo region = location.getRegionInfo();
+        byte[] fragmentMetadata = prepareFragmentMetadata(region);
+        Fragment fragment = new Fragment(inputData.getDataSource(), hosts, fragmentMetadata, userData);
+        fragments.add(fragment);
+    }
+
+    private byte[] serializeMap(Map<String, byte[]> tableMappings) throws IOException {
+        ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
+        ObjectOutputStream objectStream = new ObjectOutputStream(byteArrayStream);
+        objectStream.writeObject(tableMappings);
+
+        return byteArrayStream.toByteArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseFilterBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseFilterBuilder.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseFilterBuilder.java
new file mode 100644
index 0000000..6c750fa
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseFilterBuilder.java
@@ -0,0 +1,260 @@
+package org.apache.hawq.pxf.plugins.hbase;
+
+import org.apache.hawq.pxf.api.FilterParser;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseColumnDescriptor;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseIntegerComparator;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseTupleDescription;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.filter.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hawq.pxf.api.io.DataType.TEXT;
+
+/**
+ * This is the implementation of {@code FilterParser.FilterBuilder} for HBase.
+ * <p>
+ * The class uses the filter parser code to build a filter object,
+ * either simple (single {@link Filter} class) or a compound ({@link FilterList})
+ * for {@link HBaseAccessor} to use for its scan.
+ * <p>
+ * This is done before the scan starts. It is not a scan time operation.
+ * <p>
+ * HBase row key column is a special case.
+ * If the user defined row key column as TEXT and used {@code <,>,<=,>=,=} operators
+ * the startkey ({@code >/>=}) and the endkey ({@code </<=}) are stored in addition to
+ * the created filter.
+ * This is an addition on top of regular filters and does not replace
+ * any logic in HBase filter objects.
+ */
+public class HBaseFilterBuilder implements FilterParser.FilterBuilder {
+    private Map<FilterParser.Operation, CompareFilter.CompareOp> operatorsMap;
+    private byte[] startKey;
+    private byte[] endKey;
+    private HBaseTupleDescription tupleDescription;
+
+    public HBaseFilterBuilder(HBaseTupleDescription tupleDescription) {
+        initOperatorsMap();
+        startKey = HConstants.EMPTY_START_ROW;
+        endKey = HConstants.EMPTY_END_ROW;
+        this.tupleDescription = tupleDescription;
+    }
+
+    /**
+     * Translates a filterString into a HBase {@link Filter} object.
+     *
+     * @param filterString filter string
+     * @return filter object
+     * @throws Exception if parsing failed
+     */
+    public Filter getFilterObject(String filterString) throws Exception {
+        FilterParser parser = new FilterParser(this);
+        Object result = parser.parse(filterString);
+
+        if (!(result instanceof Filter)) {
+            throw new Exception("String " + filterString + " resolved to no filter");
+        }
+
+        return (Filter) result;
+    }
+
+    /**
+     * Returns the startKey for scanning the HBase table.
+     * If the user specified a {@code > / >=} operation
+     * on a textual row key column, this value will be returned.
+     * Otherwise, the start of table.
+     *
+     * @return start key for scanning HBase table
+     */
+    public byte[] startKey() {
+        return startKey;
+    }
+
+    /**
+     * Returns the endKey for scanning the HBase table.
+     * If the user specified a {@code < / <=} operation
+     * on a textual row key column, this value will be returned.
+     * Otherwise, the end of table.
+     *
+     * @return end key for scanning HBase table
+     */
+    public byte[] endKey() {
+        return endKey;
+    }
+
+    /**
+     * Builds a filter from the input operands and operation.
+     * Two kinds of operations are handled:
+     * <ol>
+     * <li>Simple operation between {@code FilterParser.Constant} and {@code FilterParser.ColumnIndex}.
+     *    Supported operations are {@code <, >, <=, <=, >=, =, !=}. </li>
+     * <li>Compound operations between {@link Filter} objects.
+     *    The only supported operation is {@code AND}. </li>
+     * </ol>
+     * <p>
+     * This function is called by {@link FilterParser},
+     * each time the parser comes across an operator.
+     */
+    @Override
+    public Object build(FilterParser.Operation opId,
+                        Object leftOperand,
+                        Object rightOperand) throws Exception {
+        if (leftOperand instanceof Filter) {
+            if (opId != FilterParser.Operation.HDOP_AND ||
+                    !(rightOperand instanceof Filter)) {
+                throw new Exception("Only AND is allowed between compound expressions");
+            }
+
+            return handleCompoundOperations((Filter) leftOperand, (Filter) rightOperand);
+        }
+
+        if (!(rightOperand instanceof FilterParser.Constant)) {
+            throw new Exception("expressions of column-op-column are not supported");
+        }
+
+        // Assume column is on the left
+        return handleSimpleOperations(opId,
+                (FilterParser.ColumnIndex) leftOperand,
+                (FilterParser.Constant) rightOperand);
+    }
+
+    /**
+     * Initializes the {@link #operatorsMap} with appropriate values.
+     */
+    private void initOperatorsMap() {
+        operatorsMap = new HashMap<FilterParser.Operation, CompareFilter.CompareOp>();
+        operatorsMap.put(FilterParser.Operation.HDOP_LT, CompareFilter.CompareOp.LESS); // "<"
+        operatorsMap.put(FilterParser.Operation.HDOP_GT, CompareFilter.CompareOp.GREATER); // ">"
+        operatorsMap.put(FilterParser.Operation.HDOP_LE, CompareFilter.CompareOp.LESS_OR_EQUAL); // "<="
+        operatorsMap.put(FilterParser.Operation.HDOP_GE, CompareFilter.CompareOp.GREATER_OR_EQUAL); // ">="
+        operatorsMap.put(FilterParser.Operation.HDOP_EQ, CompareFilter.CompareOp.EQUAL); // "="
+        operatorsMap.put(FilterParser.Operation.HDOP_NE, CompareFilter.CompareOp.NOT_EQUAL); // "!="
+    }
+
+    /**
+     * Handles simple column-operator-constant expressions.
+     * Creates a special filter in the case the column is the row key column.
+     */
+    private Filter handleSimpleOperations(FilterParser.Operation opId,
+                                          FilterParser.ColumnIndex column,
+                                          FilterParser.Constant constant) throws Exception {
+        HBaseColumnDescriptor hbaseColumn = tupleDescription.getColumn(column.index());
+        ByteArrayComparable comparator = getComparator(hbaseColumn.columnTypeCode(),
+                constant.constant());
+
+        /**
+         * If row key is of type TEXT, allow filter in start/stop row key API in
+         * HBaseAccessor/Scan object.
+         */
+        if (textualRowKey(hbaseColumn)) {
+            storeStartEndKeys(opId, constant.constant());
+        }
+
+        if (hbaseColumn.isKeyColumn()) {
+            return new RowFilter(operatorsMap.get(opId), comparator);
+        }
+
+        return new SingleColumnValueFilter(hbaseColumn.columnFamilyBytes(),
+                hbaseColumn.qualifierBytes(),
+                operatorsMap.get(opId),
+                comparator);
+    }
+
+    /**
+     * Resolves the column's type to a comparator class to be used.
+     * Currently, supported types are TEXT and INTEGER types.
+     */
+    private ByteArrayComparable getComparator(int type, Object data) throws Exception {
+        ByteArrayComparable result;
+        switch (DataType.get(type)) {
+            case TEXT:
+                result = new BinaryComparator(Bytes.toBytes((String) data));
+                break;
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+                result = new HBaseIntegerComparator((Long) data);
+                break;
+            default:
+                throw new Exception("unsupported column type for filtering " + type);
+        }
+
+        return result;
+    }
+
+    /**
+     * Handles operation between already calculated expressions.
+     * Currently only {@code AND}, in the future {@code OR} can be added.
+     * <p>
+     * Four cases here:
+     * <ol>
+     * <li>Both are simple filters.</li>
+     * <li>Left is a FilterList and right is a filter.</li>
+     * <li>Left is a filter and right is a FilterList.</li>
+     * <li>Both are FilterLists.</li>
+     * </ol>
+     * <p>
+     * Currently, 1, 2 can occur, since no parenthesis are used.
+     */
+    private Filter handleCompoundOperations(Filter left, Filter right) {
+        FilterList result;
+
+        if (left instanceof FilterList) {
+            result = (FilterList) left;
+            result.addFilter(right);
+
+            return result;
+        }
+
+        result = new FilterList(FilterList.Operator.MUST_PASS_ALL, new Filter[] {left, right});
+
+        return result;
+    }
+
+    /**
+     * Returns true if column is of type TEXT and is a row key column.
+     */
+    private boolean textualRowKey(HBaseColumnDescriptor column) {
+        return column.isKeyColumn() &&
+                column.columnTypeCode() == TEXT.getOID();
+    }
+
+    /**
+     * Sets startKey/endKey and their inclusiveness
+     * according to the operation op.
+     * <p>
+     * TODO allow only one assignment to start/end key.
+     * Currently, multiple calls to this function might change
+     * previous assignments.
+     */
+    private void storeStartEndKeys(FilterParser.Operation op, Object data) {
+        String key = (String) data;
+
+        // Adding a zero byte to endkey, makes it inclusive
+        // Adding a zero byte to startkey, makes it exclusive
+        byte[] zeroByte = new byte[1];
+        zeroByte[0] = 0;
+
+        switch (op) {
+            case HDOP_LT:
+                endKey = Bytes.toBytes(key);
+                break;
+            case HDOP_GT:
+                startKey = Bytes.add(Bytes.toBytes(key), zeroByte);
+                break;
+            case HDOP_LE:
+                endKey = Bytes.add(Bytes.toBytes(key), zeroByte);
+                break;
+            case HDOP_GE:
+                startKey = Bytes.toBytes(key);
+                break;
+            case HDOP_EQ:
+                startKey = Bytes.toBytes(key);
+                endKey = Bytes.add(Bytes.toBytes(key), zeroByte);
+                break;
+        }
+    }
+}


Mime
View raw message