drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [1/7] drill git commit: DRILL-2813: Update Hive statistics to use long instead of int for rowcount per split.
Date Sun, 19 Apr 2015 01:03:36 GMT
Repository: drill
Updated Branches:
  refs/heads/master 238399de5 -> 9ec257efb


DRILL-2813: Update Hive statistics to use long instead of int for rowcount per split.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/cf155464
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/cf155464
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/cf155464

Branch: refs/heads/master
Commit: cf1554641c07359e91a2836d245b0b07438d0cd5
Parents: 238399d
Author: Jacques Nadeau <jacques@apache.org>
Authored: Thu Apr 16 09:08:00 2015 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Sat Apr 18 09:11:11 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/store/hive/HiveScan.java  | 98 ++++++++++----------
 1 file changed, 49 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/cf155464/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index b96fda4..92635a8 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -94,10 +94,10 @@ public class HiveScan extends AbstractGroupScan {
   private long rowCount = 0;
 
   @JsonCreator
-  public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry,
-                  @JsonProperty("storage-plugin") String storagePluginName,
-                  @JsonProperty("columns") List<SchemaPath> columns,
-                  @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException
{
+  public HiveScan(@JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
+                  @JsonProperty("storage-plugin") final String storagePluginName,
+                  @JsonProperty("columns") final List<SchemaPath> columns,
+                  @JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException
{
     this.hiveReadEntry = hiveReadEntry;
     this.storagePluginName = storagePluginName;
     this.storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName);
@@ -106,7 +106,7 @@ public class HiveScan extends AbstractGroupScan {
     endpoints = storagePlugin.getContext().getBits();
   }
 
-  public HiveScan(HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin, List<SchemaPath>
columns) throws ExecutionSetupException {
+  public HiveScan(final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin,
final List<SchemaPath> columns) throws ExecutionSetupException {
     this.hiveReadEntry = hiveReadEntry;
     this.columns = columns;
     this.storagePlugin = storagePlugin;
@@ -115,7 +115,7 @@ public class HiveScan extends AbstractGroupScan {
     this.storagePluginName = storagePlugin.getName();
   }
 
-  private HiveScan(HiveScan that) {
+  private HiveScan(final HiveScan that) {
     this.columns = that.columns;
     this.endpoints = that.endpoints;
     this.hiveReadEntry = that.hiveReadEntry;
@@ -133,14 +133,14 @@ public class HiveScan extends AbstractGroupScan {
 
   private void getSplits() throws ExecutionSetupException {
     try {
-      List<Partition> partitions = hiveReadEntry.getPartitions();
-      Table table = hiveReadEntry.getTable();
+      final List<Partition> partitions = hiveReadEntry.getPartitions();
+      final Table table = hiveReadEntry.getTable();
       if (partitions == null || partitions.size() == 0) {
-        Properties properties = MetaStoreUtils.getTableMetadata(table);
+        final Properties properties = MetaStoreUtils.getTableMetadata(table);
         splitInput(properties, table.getSd(), null);
       } else {
-        for (Partition partition : partitions) {
-          Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+        for (final Partition partition : partitions) {
+          final Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
           splitInput(properties, partition.getSd(), partition);
         }
       }
@@ -150,27 +150,27 @@ public class HiveScan extends AbstractGroupScan {
   }
 
   /* Split the input given in StorageDescriptor */
-  private void splitInput(Properties properties, StorageDescriptor sd, Partition partition)
+  private void splitInput(final Properties properties, final StorageDescriptor sd, final
Partition partition)
       throws ReflectiveOperationException, IOException {
-    JobConf job = new JobConf();
-    for (Object obj : properties.keySet()) {
+    final JobConf job = new JobConf();
+    for (final Object obj : properties.keySet()) {
       job.set((String) obj, (String) properties.get(obj));
     }
-    for (Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet())
{
+    for (final Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet())
{
       job.set(entry.getKey(), entry.getValue());
     }
     InputFormat<?, ?> format = (InputFormat<?, ?>)
         Class.forName(sd.getInputFormat()).getConstructor().newInstance();
     job.setInputFormat(format.getClass());
-    Path path = new Path(sd.getLocation());
-    FileSystem fs = path.getFileSystem(job);
+    final Path path = new Path(sd.getLocation());
+    final FileSystem fs = path.getFileSystem(job);
 
     // Use new JobConf that has FS configuration
-    JobConf jobWithFsConf = new JobConf(fs.getConf());
+    final JobConf jobWithFsConf = new JobConf(fs.getConf());
     if (fs.exists(path)) {
       FileInputFormat.addInputPath(jobWithFsConf, path);
       format = jobWithFsConf.getInputFormat();
-      for (InputSplit split : format.getSplits(jobWithFsConf, 1)) {
+      for (final InputSplit split : format.getSplits(jobWithFsConf, 1)) {
         inputSplits.add(split);
         partitionMap.put(split, partition);
       }
@@ -178,7 +178,7 @@ public class HiveScan extends AbstractGroupScan {
     final String numRowsProp = properties.getProperty("numRows");
     logger.trace("HiveScan num rows property = {}", numRowsProp);
     if (numRowsProp != null) {
-      final int numRows = Integer.valueOf(numRowsProp);
+      final long numRows = Long.valueOf(numRowsProp);
       // starting from hive-0.13, when no statistics are available, this property is set
to -1
       // it's important to note that the value returned by hive may not be up to date
       if (numRows > 0) {
@@ -188,33 +188,33 @@ public class HiveScan extends AbstractGroupScan {
   }
 
   @Override
-  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints)
{
+  public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints)
{
     mappings = Lists.newArrayList();
     for (int i = 0; i < endpoints.size(); i++) {
       mappings.add(new ArrayList<InputSplit>());
     }
-    int count = endpoints.size();
+    final int count = endpoints.size();
     for (int i = 0; i < inputSplits.size(); i++) {
       mappings.get(i % count).add(inputSplits.get(i));
     }
   }
 
-  public static String serializeInputSplit(InputSplit split) throws IOException {
-    ByteArrayDataOutput byteArrayOutputStream =  ByteStreams.newDataOutput();
+  public static String serializeInputSplit(final InputSplit split) throws IOException {
+    final ByteArrayDataOutput byteArrayOutputStream =  ByteStreams.newDataOutput();
     split.write(byteArrayOutputStream);
-    String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
+    final String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
     logger.debug("Encoded split string for split {} : {}", split, encoded);
     return encoded;
   }
 
   @Override
-  public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+  public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException
{
     try {
-      List<InputSplit> splits = mappings.get(minorFragmentId);
+      final List<InputSplit> splits = mappings.get(minorFragmentId);
       List<HivePartition> parts = Lists.newArrayList();
-      List<String> encodedInputSplits = Lists.newArrayList();
-      List<String> splitTypes = Lists.newArrayList();
-      for (InputSplit split : splits) {
+      final List<String> encodedInputSplits = Lists.newArrayList();
+      final List<String> splitTypes = Lists.newArrayList();
+      for (final InputSplit split : splits) {
         HivePartition partition = null;
         if (partitionMap.get(split) != null) {
           partition = new HivePartition(partitionMap.get(split));
@@ -226,7 +226,7 @@ public class HiveScan extends AbstractGroupScan {
       if (parts.contains(null)) {
         parts = null;
       }
-      HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride);
+      final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride);
       return new HiveSubScan(encodedInputSplits, subEntry, splitTypes, columns);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
@@ -240,22 +240,22 @@ public class HiveScan extends AbstractGroupScan {
 
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
-    Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
-    for (DrillbitEndpoint endpoint : endpoints) {
+    final Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
+    for (final DrillbitEndpoint endpoint : endpoints) {
       endpointMap.put(endpoint.getAddress(), endpoint);
       logger.debug("endpoing address: {}", endpoint.getAddress());
     }
-    Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
+    final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
     try {
       long totalSize = 0;
-      for (InputSplit split : inputSplits) {
+      for (final InputSplit split : inputSplits) {
         totalSize += Math.max(1, split.getLength());
       }
-      for (InputSplit split : inputSplits) {
-        float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
-        for (String loc : split.getLocations()) {
+      for (final InputSplit split : inputSplits) {
+        final float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
+        for (final String loc : split.getLocations()) {
           logger.debug("split location: {}", loc);
-          DrillbitEndpoint endpoint = endpointMap.get(loc);
+          final DrillbitEndpoint endpoint = endpointMap.get(loc);
           if (endpoint != null) {
             if (affinityMap.containsKey(endpoint)) {
               affinityMap.get(endpoint).addAffinity(affinity);
@@ -265,13 +265,13 @@ public class HiveScan extends AbstractGroupScan {
           }
         }
       }
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
-    for (DrillbitEndpoint ep : affinityMap.keySet()) {
+    for (final DrillbitEndpoint ep : affinityMap.keySet()) {
       Preconditions.checkNotNull(ep);
     }
-    for (EndpointAffinity a : affinityMap.values()) {
+    for (final EndpointAffinity a : affinityMap.values()) {
       Preconditions.checkNotNull(a.getEndpoint());
     }
     return Lists.newArrayList(affinityMap.values());
@@ -281,7 +281,7 @@ public class HiveScan extends AbstractGroupScan {
   public ScanStats getScanStats() {
     try {
       long data =0;
-      for (InputSplit split : inputSplits) {
+      for (final InputSplit split : inputSplits) {
           data += split.getLength();
       }
 
@@ -292,13 +292,13 @@ public class HiveScan extends AbstractGroupScan {
       }
       logger.debug("estimated row count = {}, stats row count = {}", estRowCount, rowCount);
       return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 1, data);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws
ExecutionSetupException {
+  public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children)
throws ExecutionSetupException {
     return new HiveScan(this);
   }
 
@@ -316,20 +316,20 @@ public class HiveScan extends AbstractGroupScan {
   }
 
   @Override
-  public GroupScan clone(List<SchemaPath> columns) {
-    HiveScan newScan = new HiveScan(this);
+  public GroupScan clone(final List<SchemaPath> columns) {
+    final HiveScan newScan = new HiveScan(this);
     newScan.columns = columns;
     return newScan;
   }
 
   @Override
-  public boolean canPushdownProjects(List<SchemaPath> columns) {
+  public boolean canPushdownProjects(final List<SchemaPath> columns) {
     return true;
   }
 
   // Return true if the current table is partitioned false otherwise
   public boolean supportsPartitionFilterPushdown() {
-    List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys();
+    final List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys();
     if (partitionKeys == null || partitionKeys.size() == 0) {
       return false;
     }


Mime
View raw message