hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From odiache...@apache.org
Subject [2/2] incubator-hawq git commit: [#140722851] Applied code-review feedback.
Date Tue, 28 Mar 2017 00:12:11 GMT
[#140722851] Applied code-review feedback.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/478f0a89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/478f0a89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/478f0a89

Branch: refs/heads/140722851
Commit: 478f0a89163554a835bf3963b4a7e2df3341fe34
Parents: 73a2048
Author: Oleksandr Diachenko <odiachenko@pivotal.io>
Authored: Mon Mar 27 17:11:47 2017 -0700
Committer: Oleksandr Diachenko <odiachenko@pivotal.io>
Committed: Mon Mar 27 17:11:47 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hawq/pxf/api/StatsAccessor.java  | 40 ++++++++++++++++++
 .../pxf/api/utilities/EnumAggregationType.java  |  2 +-
 .../pxf/api/utilities/FragmentMetadata.java     | 17 +++++++-
 .../hawq/pxf/api/utilities/Utilities.java       | 44 ++++++++++++++++++--
 .../hawq/pxf/plugins/hive/HiveORCAccessor.java  | 42 ++++++++++---------
 .../org/apache/hawq/pxf/service/AggBridge.java  |  5 +++
 .../pxf/service/utilities/ProtocolData.java     |  1 +
 7 files changed, 127 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
new file mode 100644
index 0000000..724448b
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.api;
+
+import org.apache.hawq.pxf.api.OneRow;
+
+/**
+ * Interface of accessor which can leverage statistic information for aggregate queries
+ *
+ */
+public interface StatsAccessor {
+
+    /**
+     * Method which reads needed statistics for current split
+     */
+    public void retrieveStats() throws Exception;
+
+    /**
+     * Returns next tuple based on statistics information without actual reading of data
+     */
+    public OneRow emitAggObject();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
index ee38f18..69716c6 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java
@@ -27,7 +27,7 @@ public enum EnumAggregationType {
     private String aggOperationCode;
     private boolean optimizationSupported;
 
-    EnumAggregationType(String aggOperationCode, boolean optimizationSupported) {
+    private EnumAggregationType(String aggOperationCode, boolean optimizationSupported) {
         this.aggOperationCode = aggOperationCode;
         this.optimizationSupported = optimizationSupported;
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
index 7a82065..7f266ec 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java
@@ -19,9 +19,12 @@
 
 package org.apache.hawq.pxf.api.utilities;
 
+/**
+ * Class which holds metadata of a file split and locality information.
+ *
+ */
 public class FragmentMetadata {
 
-
     private long start;
     private long end;
     private String[] hosts;
@@ -32,6 +35,10 @@ public class FragmentMetadata {
         this.hosts = hosts;
     }
 
+    /**
+     * 
+     * @return position in bytes where given data fragment starts
+     */
     public long getStart() {
         return start;
     }
@@ -40,6 +47,10 @@ public class FragmentMetadata {
         this.start = start;
     }
 
+    /**
+     * 
+     * @return position in bytes where given data fragment ends
+     */
     public long getEnd() {
         return end;
     }
@@ -48,6 +59,10 @@ public class FragmentMetadata {
         this.end = end;
     }
 
+    /**
+     * 
+     * @return all hosts which has given data fragment
+     */
     public String[] getHosts() {
         return hosts;
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
index f59a07a..f948888 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
@@ -23,6 +23,8 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.StatsAccessor;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -156,6 +158,14 @@ public class Utilities {
         return input.replaceAll("[^a-zA-Z0-9_:/-]", ".");
     }
 
+    /**
+     * Parses input data and returns fragment metadata.
+     * 
+     * @param inputData input data which has protocol information
+     * @return fragment metadata
+     * @throws IllegalArgumentException if fragment metadata information wasn't found in
input data
+     * @throws Exception
+     */
     public static FragmentMetadata parseFragmentMetadata(InputData inputData) throws Exception
{
         byte[] serializedLocation = inputData.getFragmentMetadata();
         if (serializedLocation == null) {
@@ -166,9 +176,18 @@ public class Utilities {
             long start = objectStream.readLong();
             long end = objectStream.readLong();
             String[] hosts = (String[]) objectStream.readObject();
-            LOG.debug("parsed file split: path " + inputData.getDataSource()
-                    + ", start " + start + ", end " + end + ", hosts "
-                    + ArrayUtils.toString(hosts));
+            if (LOG.isDebugEnabled()) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("parsed file split: path ");
+                sb.append(inputData.getDataSource());
+                sb.append(", start ");
+                sb.append(start);
+                sb.append(", end ");
+                sb.append(end);
+                sb.append(", hosts ");
+                sb.append(ArrayUtils.toString(hosts));
+                LOG.debug(sb.toString());
+            }
             FragmentMetadata fragmentMetadata = new FragmentMetadata(start, end, hosts);
             return fragmentMetadata;
         } catch (Exception e) {
@@ -176,4 +195,23 @@ public class Utilities {
             throw e;
         }
     }
+
+
+    /**
+     * Determines whether accessor should use statistics to optimize reading results
+     * 
+     * @param accessor accessor instance
+     * @param inputData input data which has protocol information
+     * @return true if this accessor should use statistic information
+     */
+    public static boolean useStats(ReadAccessor accessor, InputData inputData) {
+        if (accessor instanceof StatsAccessor) {
+            if (inputData != null && !inputData.hasFilter()
+                    && inputData.getAggType() != null
+                    && inputData.getAggType().isOptimizationSupported()) {
+                return true;
+            }
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
index 0e2fc2a..24b355c 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -22,7 +22,6 @@ package org.apache.hawq.pxf.plugins.hive;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -31,16 +30,14 @@ import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hawq.pxf.api.BasicFilter;
 import org.apache.hawq.pxf.api.LogicalFilter;
 import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.StatsAccessor;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.EnumAggregationType;
 import org.apache.hawq.pxf.api.utilities.FragmentMetadata;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.Utilities;
-import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
 import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics;
-import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
 import org.apache.hadoop.mapred.*;
 
 import java.io.IOException;
@@ -56,7 +53,7 @@ import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_
  * This class replaces the generic HiveAccessor for a case where a table is stored entirely
as ORC files.
  * Use together with {@link HiveInputFormatFragmenter}/{@link HiveColumnarSerdeResolver}
  */
-public class HiveORCAccessor extends HiveAccessor {
+public class HiveORCAccessor extends HiveAccessor implements StatsAccessor {
 
     private static final Log LOG = LogFactory.getLog(HiveORCAccessor.class);
 
@@ -81,18 +78,7 @@ public class HiveORCAccessor extends HiveAccessor {
         HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE);
         initPartitionFields(hiveUserData.getPartitionKeys());
         filterInFragmenter = hiveUserData.isFilterInFragmenter();
-
-        if (inputData != null && !inputData.hasFilter() && inputData.getAggType()
!= null && inputData.getAggType().isOptimizationSupported()) {
-            useStats = true;
-        }
-    }
-
-    private void retrieveStats() throws Exception {
-        FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData);
-        /* We are using file-level stats therefore if file has multiple splits,
-         * it's enough to return count for a first split in file*/
-        if (fragmentMetadata.getStart() == 0)
-            this.count = this.orcReader.getNumberOfRows();
+        useStats = Utilities.useStats(this, inputData);
     }
 
     @Override
@@ -104,7 +90,7 @@ public class HiveORCAccessor extends HiveAccessor {
             }
             retrieveStats();
             objectsEmitted = 0;
-            return true;
+            return super.openForRead();
         } else {
             addColumns();
             addFilters();
@@ -259,7 +245,25 @@ public class HiveORCAccessor extends HiveAccessor {
             return super.readNextObject();
     }
 
-    private OneRow emitAggObject() {
+    /**
+     * Fetches file-level statistics from an ORC file.
+     */
+    @Override
+    public void retrieveStats() throws Exception {
+        FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData);
+        /*
+         * We are using file-level stats therefore if file has multiple splits,
+         * it's enough to return count for a first split in file
+         */
+        if (fragmentMetadata.getStart() == 0)
+            this.count = this.orcReader.getNumberOfRows();
+    }
+
+    /**
+     * Emits tuple without reading from disk, currently supports COUNT
+     */
+    @Override
+    public OneRow emitAggObject() {
         OneRow row = null;
         switch (inputData.getAggType()) {
             case COUNT:

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
index d274864..12f44e2 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java
@@ -32,9 +32,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.collections.map.LRUMap;
 
+/**
+ * Bridge class optimized for aggregate queries.
+ *
+ */
 public class AggBridge extends ReadBridge implements Bridge {
 
     private static final Log LOG = LogFactory.getLog(AggBridge.class);
+    /* Avoid resolving rows with the same key twice */
     private LRUMap resolvedFieldsCache;
 
     public AggBridge(ProtocolData protData) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
index 6f21068..0de356b 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
@@ -117,6 +117,7 @@ public class ProtocolData extends InputData {
         // Store alignment for global use as a system property
         System.setProperty("greenplum.alignment", getProperty("ALIGNMENT"));
 
+        //Get aggregation operation
         String aggTypeOperationName = getOptionalProperty("AGG-TYPE");
 
         this.setAggType(EnumAggregationType.getAggregationType(aggTypeOperationName));


Mime
View raw message