hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vgumas...@apache.org
Subject hive git commit: HIVE-15333: Add a FetchTask to REPL DUMP plan for reading dump uri, last repl id as ResultSet (Vaibhav Gumashta reviewed by Sushanth Sowmyan, Thejas Nair)
Date Fri, 09 Dec 2016 08:29:53 GMT
Repository: hive
Updated Branches:
  refs/heads/master 0ed01fdf8 -> 24f48f124


HIVE-15333: Add a FetchTask to REPL DUMP plan for reading dump uri, last repl id as ResultSet
(Vaibhav Gumashta reviewed by Sushanth Sowmyan, Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/24f48f12
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/24f48f12
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/24f48f12

Branch: refs/heads/master
Commit: 24f48f12431dfa647f48ba9311676265c71c941f
Parents: 0ed01fd
Author: Vaibhav Gumashta <vgumashta@hortonworks.com>
Authored: Fri Dec 9 00:29:13 2016 -0800
Committer: Vaibhav Gumashta <vgumashta@hortonworks.com>
Committed: Fri Dec 9 00:29:13 2016 -0800

----------------------------------------------------------------------
 .../hive/ql/TestReplicationScenarios.java       |  3 +-
 .../apache/hive/jdbc/TestJdbcWithMiniHS2.java   | 35 +++++++++
 .../hive/ql/parse/BaseSemanticAnalyzer.java     | 35 ++++++++-
 .../hive/ql/parse/DDLSemanticAnalyzer.java      | 28 -------
 .../apache/hadoop/hive/ql/parse/EximUtil.java   | 80 ++++++++++----------
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 27 +++----
 .../clientnegative/authorization_import.q.out   |  2 +-
 .../exim_00_unsupported_schema.q.out            |  2 +-
 .../apache/hadoop/fs/ProxyLocalFileSystem.java  | 19 +++--
 9 files changed, 135 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/24f48f12/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 95db9e8..9b7014b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -286,7 +286,8 @@ public class TestReplicationScenarios {
         throw new RuntimeException(e);
       }
     }
-    return (lastResults.get(rowNum).split("\\001"))[colNum];
+    // Split around the 'tab' character
+    return (lastResults.get(rowNum).split("\\t"))[colNum];
   }
 
   private void verifyResults(String[] data) throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/24f48f12/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index c84570b..3d4057b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -28,10 +28,12 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.net.URI;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
@@ -1286,4 +1288,37 @@ public class TestJdbcWithMiniHS2 {
     }
     assertTrue("Rows returned from describe function", numRows > 0);
   }
+
+  @Test
+  public void testReplDumpResultSet() throws Exception {
+    String tid =
+        TestJdbcWithMiniHS2.class.getCanonicalName().toLowerCase().replace('.', '_') + "_"
+            + System.currentTimeMillis();
+    String testPathName = System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR
+ tid;
+    Path testPath = new Path(testPathName);
+    FileSystem fs = testPath.getFileSystem(new HiveConf());
+    Statement stmt = conDefault.createStatement();
+    try {
+      stmt.execute("set hive.repl.rootdir = " + testPathName);
+      ResultSet rs = stmt.executeQuery("repl dump " + testDbName);
+      ResultSetMetaData rsMeta = rs.getMetaData();
+      assertEquals(2, rsMeta.getColumnCount());
+      int numRows = 0;
+      while (rs.next()) {
+        numRows++;
+        URI uri = new URI(rs.getString(1));
+        int notificationId = rs.getInt(2);
+        assertNotNull(uri);
+        assertEquals(testPath.toUri().getScheme(), uri.getScheme());
+        assertEquals(testPath.toUri().getAuthority(), uri.getAuthority());
+        // In test setup, we append '/next' to hive.repl.rootdir and use that as the dump
location
+        assertEquals(testPath.toUri().getPath() + "/next", uri.getPath());
+        assertNotNull(notificationId);
+      }
+      assertEquals(1, numRows);
+    } finally {
+      // Clean up
+      fs.delete(testPath, true);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/24f48f12/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index 7b63c52..3e749eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -32,6 +32,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.Set;
 
 import org.antlr.runtime.tree.Tree;
@@ -53,10 +54,12 @@ import org.apache.hadoop.hive.ql.QueryProperties;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -67,16 +70,20 @@ import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,7 +115,7 @@ public abstract class BaseSemanticAnalyzer {
    * back and set it once we actually start running the query.
    */
   protected Set<FileSinkDesc> acidFileSinks = new HashSet<FileSinkDesc>();
-  
+
   // whether any ACID table is involved in a query
   protected boolean acidInQuery;
 
@@ -756,7 +763,7 @@ public abstract class BaseSemanticAnalyzer {
     String[] qualifiedTabName = getQualifiedTableName((ASTNode) parent.getChild(0));
     // The ANTLR grammar looks like :
     // 1.  KW_CONSTRAINT idfr=identifier KW_FOREIGN KW_KEY fkCols=columnParenthesesList
-    // KW_REFERENCES tabName=tableName parCols=columnParenthesesList 
+    // KW_REFERENCES tabName=tableName parCols=columnParenthesesList
     // enableSpec=enableSpecification validateSpec=validateSpecification relySpec=relySpecification
     // -> ^(TOK_FOREIGN_KEY $idfr $fkCols $tabName $parCols $relySpec $enableSpec $validateSpec)
     // when the user specifies the constraint name (i.e. child.getChildCount() == 11)
@@ -1324,7 +1331,7 @@ public abstract class BaseSemanticAnalyzer {
   public Set<FileSinkDesc> getAcidFileSinks() {
     return acidFileSinks;
   }
-  
+
   public boolean hasAcidInQuery() {
     return acidInQuery;
   }
@@ -1744,7 +1751,29 @@ public abstract class BaseSemanticAnalyzer {
   public HashSet<WriteEntity> getAllOutputs() {
     return outputs;
   }
+
   public QueryState getQueryState() {
     return queryState;
   }
+
+  /**
+   * Create a FetchTask for a given schema.
+   *
+   * @param schema string
+   */
+  protected FetchTask createFetchTask(String schema) {
+    Properties prop = new Properties();
+    // Sets delimiter to tab (ascii 9)
+    prop.setProperty(serdeConstants.SERIALIZATION_FORMAT, Integer.toString(Utilities.tabCode));
+    prop.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, " ");
+    String[] colTypes = schema.split("#");
+    prop.setProperty("columns", colTypes[0]);
+    prop.setProperty("columns.types", colTypes[1]);
+    prop.setProperty(serdeConstants.SERIALIZATION_LIB, LazySimpleSerDe.class.getName());
+    FetchWork fetch =
+        new FetchWork(ctx.getResFile(), new TableDesc(TextInputFormat.class,
+            IgnoreKeyTextOutputFormat.class, prop), -1);
+    fetch.setSerializationNullFormat(" ");
+    return (FetchTask) TaskFactory.get(fetch, conf);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/24f48f12/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index c7389a8..3f58130 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
 import org.apache.hadoop.hive.ql.exec.ColumnStatsUpdateTask;
-import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -59,7 +58,6 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity.WriteType;
 import org.apache.hadoop.hive.ql.index.HiveIndex;
 import org.apache.hadoop.hive.ql.index.HiveIndex.IndexType;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
-import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.lib.Node;
@@ -72,7 +70,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.PKInfo;
 import org.apache.hadoop.hive.ql.parse.authorization.AuthorizationParseUtils;
 import org.apache.hadoop.hive.ql.parse.authorization.HiveAuthorizationTaskFactory;
 import org.apache.hadoop.hive.ql.parse.authorization.HiveAuthorizationTaskFactoryImpl;
@@ -104,7 +101,6 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
@@ -139,7 +135,6 @@ import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
 import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
@@ -149,7 +144,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.StringUtils;
 
 import java.io.FileNotFoundException;
@@ -169,7 +163,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Properties;
 import java.util.Set;
 
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DATABASELOCATION;
@@ -1938,27 +1931,6 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   }
 
-  /**
-   * Create a FetchTask for a given thrift ddl schema.
-   *
-   * @param schema
-   *          thrift ddl
-   */
-  private FetchTask createFetchTask(String schema) {
-    Properties prop = new Properties();
-
-    prop.setProperty(serdeConstants.SERIALIZATION_FORMAT, "9");
-    prop.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, " ");
-    String[] colTypes = schema.split("#");
-    prop.setProperty("columns", colTypes[0]);
-    prop.setProperty("columns.types", colTypes[1]);
-    prop.setProperty(serdeConstants.SERIALIZATION_LIB, LazySimpleSerDe.class.getName());
-    FetchWork fetch = new FetchWork(ctx.getResFile(), new TableDesc(
-        TextInputFormat.class,IgnoreKeyTextOutputFormat.class, prop), -1);
-    fetch.setSerializationNullFormat(" ");
-    return (FetchTask) TaskFactory.get(fetch, conf);
-  }
-
   private void validateDatabase(String databaseName) throws SemanticException {
     try {
       if (!db.databaseExists(databaseName)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/24f48f12/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index a0d492d..6e9602f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -154,40 +154,36 @@ public class EximUtil {
       String scheme = uri.getScheme();
       String authority = uri.getAuthority();
       String path = uri.getPath();
+      FileSystem fs = FileSystem.get(uri, conf);
+
       LOG.info("Path before norm :" + path);
       // generate absolute path relative to home directory
       if (!path.startsWith("/")) {
         if (testMode) {
-          path = (new Path(System.getProperty("test.tmp.dir"),
-              path)).toUri().getPath();
-        } else {
-          path = (new Path(new Path("/user/" + System.getProperty("user.name")),
-              path)).toUri().getPath();
-        }
-      }
-      // set correct scheme and authority
-      if (StringUtils.isEmpty(scheme)) {
-        if (testMode) {
-          scheme = "pfile";
+          path = (new Path(System.getProperty("test.tmp.dir"), path)).toUri().getPath();
         } else {
-          scheme = "hdfs";
+          path =
+              (new Path(new Path("/user/" + System.getProperty("user.name")), path)).toUri()
+                  .getPath();
         }
       }
 
-      // if scheme is specified but not authority then use the default
-      // authority
+      // Get scheme from FileSystem
+      scheme = fs.getScheme();
+
+      // if scheme is specified but not authority then use the default authority
       if (StringUtils.isEmpty(authority)) {
         URI defaultURI = FileSystem.get(conf).getUri();
         authority = defaultURI.getAuthority();
       }
 
       LOG.info("Scheme:" + scheme + ", authority:" + authority + ", path:" + path);
-      Collection<String> eximSchemes = conf.getStringCollection(
-          HiveConf.ConfVars.HIVE_EXIM_URI_SCHEME_WL.varname);
+      Collection<String> eximSchemes =
+          conf.getStringCollection(HiveConf.ConfVars.HIVE_EXIM_URI_SCHEME_WL.varname);
       if (!eximSchemes.contains(scheme)) {
         throw new SemanticException(
-            ErrorMsg.INVALID_PATH.getMsg(
-                "only the following file systems accepted for export/import : "
+            ErrorMsg.INVALID_PATH
+                .getMsg("only the following file systems accepted for export/import : "
                     + conf.get(HiveConf.ConfVars.HIVE_EXIM_URI_SCHEME_WL.varname)));
       }
 
@@ -197,7 +193,7 @@ public class EximUtil {
         throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
       }
     } catch (IOException e) {
-      throw new SemanticException(ErrorMsg.IO_ERROR.getMsg(), e);
+      throw new SemanticException(ErrorMsg.IO_ERROR.getMsg() + ": " + e.getMessage(), e);
     }
   }
 
@@ -210,29 +206,31 @@ public class EximUtil {
     }
   }
 
-  public static String relativeToAbsolutePath(HiveConf conf, String location) throws SemanticException
{
-    boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE);
-    if (testMode) {
-      URI uri = new Path(location).toUri();
-      String scheme = uri.getScheme();
-      String authority = uri.getAuthority();
-      String path = uri.getPath();
-      if (!path.startsWith("/")) {
-          path = (new Path(System.getProperty("test.tmp.dir"),
-              path)).toUri().getPath();
-      }
-      if (StringUtils.isEmpty(scheme)) {
-          scheme = "pfile";
-      }
-      try {
-        uri = new URI(scheme, authority, path, null, null);
-      } catch (URISyntaxException e) {
-        throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+  public static String relativeToAbsolutePath(HiveConf conf, String location)
+      throws SemanticException {
+    try {
+      boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE);
+      if (testMode) {
+        URI uri = new Path(location).toUri();
+        FileSystem fs = FileSystem.get(uri, conf);
+        String scheme = fs.getScheme();
+        String authority = uri.getAuthority();
+        String path = uri.getPath();
+        if (!path.startsWith("/")) {
+          path = (new Path(System.getProperty("test.tmp.dir"), path)).toUri().getPath();
+        }
+        try {
+          uri = new URI(scheme, authority, path, null, null);
+        } catch (URISyntaxException e) {
+          throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+        }
+        return uri.toString();
+      } else {
+        // no-op for non-test mode for now
+        return location;
       }
-      return uri.toString();
-    } else {
-      //no-op for non-test mode for now
-      return location;
+    } catch (IOException e) {
+      throw new SemanticException(ErrorMsg.IO_ERROR.getMsg() + ": " + e.getMessage(), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/24f48f12/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 8007c4e..6fff98d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -74,6 +74,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   private String path;
 
   private static String testInjectDumpDir = null; // unit tests can overwrite this to affect
default dump behaviour
+  private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
 
   public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
     super(queryState);
@@ -154,6 +155,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
     String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
     Path dumpRoot = new Path(replRoot, getNextDumpDir());
     Path dumpMetadata = new Path(dumpRoot,"_dumpmetadata");
+    String lastReplId;
     try {
       if (eventFrom == null){
         // bootstrap case
@@ -192,8 +194,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
           // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot)
         }
         LOG.info("Consolidation done, preparing to return {},{}",dumpRoot.toUri(),bootDumpEndReplId);
-        prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), bootDumpEndReplId),
-            "dump_dir,last_repl_id#string,string");
+        // Set the correct last repl id to return to the user
+        lastReplId = bootDumpEndReplId;
       } else {
         // get list of events matching dbPattern & tblPattern
         // go through each event, and dump out each event to a event-level dump dir inside
dumproot
@@ -231,9 +233,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
         LOG.info("Done dumping events, preparing to return {},{}",dumpRoot.toUri(),eventTo);
         List<String> vals;
         writeOutput(Arrays.asList("event", String.valueOf(eventFrom), String.valueOf(eventTo)),
dumpMetadata);
-        prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(eventTo)),
-            "dump_dir,last_repl_id#string,string");
+        // Set the correct last repl id to return to the user
+        lastReplId = String.valueOf(eventTo);
       }
+      prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), lastReplId), dumpSchema);
+      setFetchTask(createFetchTask(dumpSchema));
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
       LOG.warn("Error during analyzeReplDump",e);
@@ -681,14 +685,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
     for (String s : values) {
       LOG.debug("    > " + s);
     }
-
     ctx.setResFile(ctx.getLocalTmpPath());
-    // FIXME : this should not accessible by the user if we write to it from the frontend.
-    // Thus, we should Desc/Work this, otherwise there is a security issue here.
-    // Note: if we don't call ctx.setResFile, we get a NPE from the following code section
-    // If we do call it, then FetchWork thinks that the "table" here winds up thinking that
-    // this is a partitioned dir, which does not work. Thus, this does not work.
-
     writeOutput(values,ctx.getResFile());
   }
 
@@ -700,16 +697,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
       outStream = fs.create(outputFile);
       outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0)));
       for (int i = 1; i < values.size(); i++) {
-        outStream.write(Utilities.ctrlaCode);
+        outStream.write(Utilities.tabCode);
         outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i)));
       }
       outStream.write(Utilities.newLineCode);
     } catch (IOException e) {
-      throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean
up with error
-                                      // codes
+      throw new SemanticException(e);
     } finally {
-      IOUtils.closeStream(outStream); // TODO : we have other closes here, and in ReplCopyTask
-
-                                      // replace with this
+      IOUtils.closeStream(outStream);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/24f48f12/ql/src/test/results/clientnegative/authorization_import.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/authorization_import.q.out b/ql/src/test/results/clientnegative/authorization_import.q.out
index 9972a8a..30a2be3 100644
--- a/ql/src/test/results/clientnegative/authorization_import.q.out
+++ b/ql/src/test/results/clientnegative/authorization_import.q.out
@@ -45,4 +45,4 @@ PREHOOK: query: set role public
 PREHOOK: type: SHOW_ROLES
 POSTHOOK: query: set role public
 POSTHOOK: type: SHOW_ROLES
-FAILED: HiveAccessControlException Permission denied: Principal [name=hive_test_user, type=USER]
does not have following privileges for operation IMPORT [[OBJECT OWNERSHIP] on Object [type=DATABASE,
name=importer]]
+#### A masked pattern was here ####

http://git-wip-us.apache.org/repos/asf/hive/blob/24f48f12/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out b/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
index 0caa42a..dbcf6f4 100644
--- a/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
+++ b/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
@@ -19,4 +19,4 @@ POSTHOOK: type: LOAD
 #### A masked pattern was here ####
 POSTHOOK: Output: default@exim_department
 #### A masked pattern was here ####
-FAILED: SemanticException Invalid path only the following file systems accepted for export/import
: hdfs,pfile,file
+FAILED: SemanticException [Error 10320]: Error while peforming IO operation : No FileSystem
for scheme: nosuchschema

http://git-wip-us.apache.org/repos/asf/hive/blob/24f48f12/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
index 228a972..bd97521 100644
--- a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
+++ b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
@@ -24,19 +24,23 @@ import java.net.URI;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.hive.shims.HadoopShims;
 
 /****************************************************************
  * A Proxy for LocalFileSystem
  *
- * Serves uri's corresponding to 'pfile:///' namespace with using
- * a LocalFileSystem
+ * As an example, it serves uri's corresponding to:
+ * 'pfile:///' namespace with using a LocalFileSystem
  *****************************************************************/
 
 public class ProxyLocalFileSystem extends FilterFileSystem {
 
   protected LocalFileSystem localFs;
 
+  /**
+   * URI scheme
+   */
+  private String scheme;
+
   public ProxyLocalFileSystem() {
     localFs = new LocalFileSystem();
   }
@@ -50,7 +54,7 @@ public class ProxyLocalFileSystem extends FilterFileSystem {
     // create a proxy for the local filesystem
     // the scheme/authority serving as the proxy is derived
     // from the supplied URI
-    String scheme = name.getScheme();
+    this.scheme = name.getScheme();
     String nameUriString = name.toString();
     if (Shell.WINDOWS) {
       // Replace the encoded backward slash with forward slash
@@ -62,11 +66,16 @@ public class ProxyLocalFileSystem extends FilterFileSystem {
     }
 
     String authority = name.getAuthority() != null ? name.getAuthority() : "";
-    String proxyUriString = nameUriString + "://" + authority + "/";
+    String proxyUriString = scheme + "://" + authority + "/";
 
     fs = ShimLoader.getHadoopShims().createProxyFileSystem(
         localFs, URI.create(proxyUriString));
 
     fs.initialize(name, conf);
   }
+
+  @Override
+  public String getScheme() {
+    return scheme;
+  }
 }


Mime
View raw message