hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ekoif...@apache.org
Subject hive git commit: HIVE-14993 make WriteEntity distinguish writeType (Eugene Koifman, reviewed by Wei Zheng)
Date Sat, 22 Oct 2016 18:49:57 GMT
Repository: hive
Updated Branches:
  refs/heads/master 6cca9911b -> 394fc47da


HIVE-14993 make WriteEntity distinguish writeType (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/394fc47d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/394fc47d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/394fc47d

Branch: refs/heads/master
Commit: 394fc47daf1c9e6c2b5406b8f0a57163a2678315
Parents: 6cca991
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Sat Oct 22 11:49:41 2016 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Sat Oct 22 11:49:41 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 66 +++++++++++++++-----
 .../apache/hadoop/hive/ql/exec/MoveTask.java    | 34 +++++++---
 .../org/apache/hadoop/hive/ql/hooks/Entity.java | 13 ++--
 .../apache/hadoop/hive/ql/hooks/ReadEntity.java |  2 +-
 .../hadoop/hive/ql/hooks/WriteEntity.java       |  6 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  | 15 +++--
 6 files changed, 93 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/394fc47d/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index acf570f..cfece77 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -928,7 +928,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
         );
     if (HiveUtils.getIndexHandler(conf, crtIndex.getIndexTypeHandlerClass()).usesIndexTable())
{
           Table indexTable = db.getTable(indexTableName);
-          work.getOutputs().add(new WriteEntity(indexTable, WriteEntity.WriteType.DDL_NO_LOCK));
+          addIfAbsentByName(new WriteEntity(indexTable, WriteEntity.WriteType.DDL_NO_LOCK));
     }
     return 0;
   }
@@ -1024,7 +1024,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
   private int addPartitions(Hive db, AddPartitionDesc addPartitionDesc) throws HiveException
{
     List<Partition> parts = db.createPartitions(addPartitionDesc);
     for (Partition part : parts) {
-      work.getOutputs().add(new WriteEntity(part, WriteEntity.WriteType.INSERT));
+      addIfAbsentByName(new WriteEntity(part, WriteEntity.WriteType.INSERT));
     }
     return 0;
   }
@@ -1058,7 +1058,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
         .getPartition(tbl, renamePartitionDesc.getNewPartSpec(), false);
     work.getInputs().add(new ReadEntity(oldPart));
     // We've already obtained a lock on the table, don't lock the partition too
-    work.getOutputs().add(new WriteEntity(newPart, WriteEntity.WriteType.DDL_NO_LOCK));
+    addIfAbsentByName(new WriteEntity(newPart, WriteEntity.WriteType.DDL_NO_LOCK));
     return 0;
   }
 
@@ -1150,7 +1150,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
 
     work.getInputs().add(new ReadEntity(tbl));
     // We've already locked the table as the input, don't relock it as the output.
-    work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
+    addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
 
     return 0;
   }
@@ -1176,7 +1176,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
         throw new HiveException("Uable to update table");
       }
       work.getInputs().add(new ReadEntity(tbl));
-      work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
+      addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
     } else {
       Partition part = db.getPartition(tbl, touchDesc.getPartSpec(), false);
       if (part == null) {
@@ -1188,7 +1188,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
         throw new HiveException(e);
       }
       work.getInputs().add(new ReadEntity(part));
-      work.getOutputs().add(new WriteEntity(part, WriteEntity.WriteType.DDL_NO_LOCK));
+      addIfAbsentByName(new WriteEntity(part, WriteEntity.WriteType.DDL_NO_LOCK));
     }
     return 0;
   }
@@ -3388,14 +3388,46 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
     if (allPartitions != null ) {
       for (Partition tmpPart: allPartitions) {
         work.getInputs().add(new ReadEntity(tmpPart));
-        work.getOutputs().add(new WriteEntity(tmpPart, WriteEntity.WriteType.DDL_NO_LOCK));
+        addIfAbsentByName(new WriteEntity(tmpPart, WriteEntity.WriteType.DDL_NO_LOCK));
       }
     } else {
       work.getInputs().add(new ReadEntity(oldTbl));
-      work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
+      addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
     }
     return 0;
   }
+  /**
+   * There are many places where "duplicate" Read/WriteEnity objects are added.  The way
this was
+   * initially implemented, the duplicate just replaced the previous object.
+   * (work.getOutputs() is a Set and WriteEntity#equals() relies on name)
+   * This may be benign for ReadEntity and perhaps was benign for WriteEntity before WriteType
was
+   * added. Now that WriteEntity has a WriteType it replaces it with one with possibly different
+   * {@link org.apache.hadoop.hive.ql.hooks.WriteEntity.WriteType}.  It's hard to imagine
+   * how this is desirable.
+   *
+   * As of HIVE-14993, WriteEntity with different WriteType must be considered different.
+   * So WriteEntity create in DDLTask cause extra output in golden files, but only because
+   * DDLTask sets a different WriteType for the same Entity.
+   *
+   * In the spirit of bug-for-bug compatibility, this method ensures we only add new
+   * WriteEntity if it's really new.
+   *
+   * @return {@code true} if item was added
+   */
+  static boolean addIfAbsentByName(WriteEntity newWriteEntity, Set<WriteEntity> outputs)
{
+    for(WriteEntity writeEntity : outputs) {
+      if(writeEntity.getName().equalsIgnoreCase(newWriteEntity.getName())) {
+        LOG.debug("Ignoring request to add " + newWriteEntity.toStringDetail() + " because
" +
+          writeEntity.toStringDetail() + " is present");
+        return false;
+      }
+    }
+    outputs.add(newWriteEntity);
+    return true;
+  }
+  private boolean addIfAbsentByName(WriteEntity newWriteEntity) {
+    return addIfAbsentByName(newWriteEntity, work.getOutputs());
+  }
 
   private boolean isSchemaEvolutionEnabled(Table tbl) {
     boolean isAcid = AcidUtils.isTablePropertyTransactional(tbl.getMetadata());
@@ -3807,7 +3839,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
     for (Partition partition : droppedParts) {
       console.printInfo("Dropped the partition " + partition.getName());
       // We have already locked the table, don't lock the partitions.
-      work.getOutputs().add(new WriteEntity(partition, WriteEntity.WriteType.DDL_NO_LOCK));
+      addIfAbsentByName(new WriteEntity(partition, WriteEntity.WriteType.DDL_NO_LOCK));
     };
   }
 
@@ -3900,7 +3932,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
     db.dropTable(dropTbl.getTableName(), dropTbl.getIfPurge());
     if (tbl != null) {
       // We have already locked the table in DDLSemanticAnalyzer, don't do it again here
-      work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
+      addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
     }
   }
 
@@ -4067,7 +4099,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
         );
       }
     }
-    work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
+    addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
     return 0;
   }
 
@@ -4215,7 +4247,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
 
     // create the table
     db.createTable(tbl, crtTbl.getIfNotExists());
-    work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
+    addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
     return 0;
   }
 
@@ -4258,10 +4290,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
         } catch (InvalidOperationException e) {
           throw new HiveException(e);
         }
-        work.getOutputs().add(new WriteEntity(oldview, WriteEntity.WriteType.DDL_NO_LOCK));
+        addIfAbsentByName(new WriteEntity(oldview, WriteEntity.WriteType.DDL_NO_LOCK));
       } else {
         // This is a replace, so we need an exclusive lock
-        work.getOutputs().add(new WriteEntity(oldview, WriteEntity.WriteType.DDL_EXCLUSIVE));
+        addIfAbsentByName(new WriteEntity(oldview, WriteEntity.WriteType.DDL_EXCLUSIVE));
       }
     } else {
       // create new view
@@ -4310,7 +4342,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
       }
 
       db.createTable(tbl, crtView.getIfNotExists());
-      work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
+      addIfAbsentByName(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK));
     }
     return 0;
   }
@@ -4385,10 +4417,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
       // Reuse the partition specs from dest partition since they should be the same
       work.getInputs().add(new ReadEntity(new Partition(sourceTable, partition.getSpec(),
null)));
 
-      work.getOutputs().add(new WriteEntity(new Partition(sourceTable, partition.getSpec(),
null),
+      addIfAbsentByName(new WriteEntity(new Partition(sourceTable, partition.getSpec(), null),
           WriteEntity.WriteType.DELETE));
 
-      work.getOutputs().add(new WriteEntity(new Partition(destTable, partition.getSpec(),
null),
+      addIfAbsentByName(new WriteEntity(new Partition(destTable, partition.getSpec(), null),
           WriteEntity.WriteType.INSERT));
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/394fc47d/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index ec21cd6..8265af4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -362,9 +362,8 @@ public class MoveTask extends Task<MoveWork> implements Serializable
{
               work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID,
               hasFollowingStatsTask());
           if (work.getOutputs() != null) {
-            work.getOutputs().add(new WriteEntity(table,
-                (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
-                WriteEntity.WriteType.INSERT)));
+            DDLTask.addIfAbsentByName(new WriteEntity(table,
+              getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());
           }
         } else {
           LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
@@ -467,10 +466,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable
{
               }
 
               WriteEntity enty = new WriteEntity(partn,
-                  (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
-                      WriteEntity.WriteType.INSERT));
+                getWriteType(tbd, work.getLoadTableWork().getWriteType()));
               if (work.getOutputs() != null) {
-                work.getOutputs().add(enty);
+                DDLTask.addIfAbsentByName(enty, work.getOutputs());
               }
               // Need to update the queryPlan's output as well so that post-exec hook get
executed.
               // This is only needed for dynamic partitioning since for SP the the WriteEntity
is
@@ -515,9 +513,8 @@ public class MoveTask extends Task<MoveWork> implements Serializable
{
             dc = new DataContainer(table.getTTable(), partn.getTPartition());
             // add this partition to post-execution hook
             if (work.getOutputs() != null) {
-              work.getOutputs().add(new WriteEntity(partn,
-                  (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE
-                      : WriteEntity.WriteType.INSERT)));
+              DDLTask.addIfAbsentByName(new WriteEntity(partn,
+                getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());
             }
          }
         }
@@ -552,7 +549,24 @@ public class MoveTask extends Task<MoveWork> implements Serializable
{
       return (1);
     }
   }
-
+  /**
+   * so to make sure we crate WriteEntity with the right WriteType.  This is (at this point)
only
+   * for consistency since LockManager (which is the only thing that pays attention to WriteType)
+   * has done it's job before the query ran.
+   */
+  WriteEntity.WriteType getWriteType(LoadTableDesc tbd, AcidUtils.Operation operation) {
+    if(tbd.getReplace()) {
+      return WriteEntity.WriteType.INSERT_OVERWRITE;
+    }
+    switch (operation) {
+      case DELETE:
+        return WriteEntity.WriteType.DELETE;
+      case UPDATE:
+        return WriteEntity.WriteType.UPDATE;
+      default:
+        return WriteEntity.WriteType.INSERT;
+    }
+  }
   private boolean isSkewedStoredAsDirs(LoadTableDesc tbd) {
     return (tbd.getLbCtx() == null) ? false : tbd.getLbCtx()
         .isSkewedStoredAsDir();

http://git-wip-us.apache.org/repos/asf/hive/blob/394fc47d/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java
index 174b5a8..0842066 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java
@@ -79,7 +79,7 @@ public class Entity implements Serializable {
    * This is derived from t and p, but we need to serialize this field to make
    * sure Entity.hashCode() does not need to recursively read into t and p.
    */
-  private String name;
+  private final String name;
 
   /**
    * Whether the output is complete or not. For eg, for dynamic partitions, the
@@ -99,10 +99,6 @@ public class Entity implements Serializable {
     return name;
   }
 
-  public void setName(String name) {
-    this.name = name;
-  }
-
   public Database getDatabase() {
     return database;
   }
@@ -162,6 +158,7 @@ public class Entity implements Serializable {
    * Only used by serialization.
    */
   public Entity() {
+    name = null;
   }
 
   /**
@@ -326,7 +323,7 @@ public class Entity implements Serializable {
    */
   @Override
   public String toString() {
-    return name;
+    return getName();
   }
 
   private String computeName() {
@@ -360,7 +357,7 @@ public class Entity implements Serializable {
 
     if (o instanceof Entity) {
       Entity ore = (Entity) o;
-      return (toString().equalsIgnoreCase(ore.toString()));
+      return (getName().equalsIgnoreCase(ore.getName()));
     } else {
       return false;
     }
@@ -371,7 +368,7 @@ public class Entity implements Serializable {
    */
   @Override
   public int hashCode() {
-    return toString().hashCode();
+    return getName().hashCode();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/394fc47d/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
index fccb243..3d7de69 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
@@ -148,7 +148,7 @@ public class ReadEntity extends Entity implements Serializable {
 
     if (o instanceof ReadEntity) {
       ReadEntity ore = (ReadEntity) o;
-      return (toString().equalsIgnoreCase(ore.toString()));
+      return (getName().equalsIgnoreCase(ore.getName()));
     } else {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/394fc47d/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
index 2194a6d..9e18638 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
@@ -168,12 +168,16 @@ public class WriteEntity extends Entity implements Serializable {
 
     if (o instanceof WriteEntity) {
       WriteEntity ore = (WriteEntity) o;
-      return (toString().equalsIgnoreCase(ore.toString()));
+      return (getName().equalsIgnoreCase(ore.getName())) && this.writeType == ore.writeType;
     } else {
       return false;
     }
   }
 
+  public String toStringDetail() {
+    return "WriteEntity(" + toString() + ") Type=" + getType() + " WriteType=" + getWriteType();
+  }
+
   public boolean isTempURI() {
     return isTempURI;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/394fc47d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 9d58193..9db8a22 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6673,7 +6673,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
                 new DummyPartition(dest_tab, dest_tab.getDbName()
                     + "@" + dest_tab.getTableName() + "@" + ppath,
                     partSpec);
-            output = new WriteEntity(p, WriteEntity.WriteType.INSERT, false);
+            output = new WriteEntity(p, getWriteType(), false);
             outputs.add(output);
           } catch (HiveException e) {
             throw new SemanticException(e.getMessage(), e);
@@ -6746,9 +6746,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       ltd.setLbCtx(lbCtx);
 
       loadTableWork.add(ltd);
-      if (!outputs.add(new WriteEntity(dest_part, (ltd.getReplace() ?
-          WriteEntity.WriteType.INSERT_OVERWRITE :
-          WriteEntity.WriteType.INSERT)))) {
+
+      if (!outputs.add(new WriteEntity(dest_part,
+        determineWriteType(ltd, dest_tab.isNonNative())))) {
         throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
             .getMsg(dest_tab.getTableName() + "@" + dest_part.getName()));
       }
@@ -13034,8 +13034,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     // and don't have a rational way to guess, so assume the most
     // conservative case.
     if (isNonNativeTable) return WriteEntity.WriteType.INSERT_OVERWRITE;
-    else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
-        WriteEntity.WriteType.INSERT);
+    else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : getWriteType());
+  }
+  private WriteEntity.WriteType getWriteType() {
+    return updating() ? WriteEntity.WriteType.UPDATE :
+      (deleting() ? WriteEntity.WriteType.DELETE : WriteEntity.WriteType.INSERT);
   }
 
   private boolean isAcidOutputFormat(Class<? extends OutputFormat> of) {


Mime
View raw message