hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From the...@apache.org
Subject [1/4] hive git commit: HIVE-15473: Progress Bar on Beeline client (Anishek Agarwal via Thejas Nair)
Date Tue, 07 Feb 2017 20:12:34 GMT
Repository: hive
Updated Branches:
  refs/heads/master f6cdbc879 -> 3e01ef326


http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java b/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java
new file mode 100644
index 0000000..76ce24a
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java
@@ -0,0 +1,38 @@
+package org.apache.hive.service.cli;
+
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+
+import java.util.List;
+
+public class JobProgressUpdate {
+  public final double progressedPercentage;
+  public final String footerSummary;
+  public final long startTimeMillis;
+  private final List<String> headers;
+  private final List<List<String>> rows;
+  public final String status;
+
+
+  JobProgressUpdate(ProgressMonitor monitor) {
+    this(monitor.headers(), monitor.rows(), monitor.footerSummary(), monitor.progressedPercentage(),
+        monitor.startTime(), monitor.executionStatus());
+  }
+
+  private JobProgressUpdate(List<String> headers, List<List<String>> rows,
String footerSummary,
+      double progressedPercentage, long startTimeMillis, String status) {
+    this.progressedPercentage = progressedPercentage;
+    this.footerSummary = footerSummary;
+    this.startTimeMillis = startTimeMillis;
+    this.headers = headers;
+    this.rows = rows;
+    this.status = status;
+  }
+
+  public List<String> headers() {
+    return headers;
+  }
+
+  public List<List<String>> rows() {
+    return rows;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/OperationStatus.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/OperationStatus.java b/service/src/java/org/apache/hive/service/cli/OperationStatus.java
index b0a26e3..317585f 100644
--- a/service/src/java/org/apache/hive/service/cli/OperationStatus.java
+++ b/service/src/java/org/apache/hive/service/cli/OperationStatus.java
@@ -30,6 +30,7 @@ public class OperationStatus {
   private final long operationCompleted;
   private final boolean hasResultSet;
   private final HiveSQLException operationException;
+  private JobProgressUpdate jobProgressUpdate;
 
   public OperationStatus(OperationState state, String taskStatus, long operationStarted,
long operationCompleted, boolean hasResultSet, HiveSQLException operationException) {
     this.state = state;
@@ -64,4 +65,11 @@ public class OperationStatus {
     return operationException;
   }
 
+  void setJobProgressUpdate(JobProgressUpdate jobProgressUpdate){
+    this.jobProgressUpdate = jobProgressUpdate;
+  }
+
+  public JobProgressUpdate jobProgressUpdate(){
+    return jobProgressUpdate;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/ProgressMonitorStatusMapper.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/ProgressMonitorStatusMapper.java
b/service/src/java/org/apache/hive/service/cli/ProgressMonitorStatusMapper.java
new file mode 100644
index 0000000..29a5f66
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/cli/ProgressMonitorStatusMapper.java
@@ -0,0 +1,19 @@
+package org.apache.hive.service.cli;
+
+import org.apache.hive.service.rpc.thrift.TJobExecutionStatus;
+
+/**
+ * This defines the mapping between the internal execution status of various engines and
the
+ * generic states that the progress monitor cares about. Theses are specified by TJobExecutionStatus
+ */
+public interface ProgressMonitorStatusMapper {
+
+  ProgressMonitorStatusMapper DEFAULT = new ProgressMonitorStatusMapper() {
+    @Override
+    public TJobExecutionStatus forStatus(String status) {
+      return TJobExecutionStatus.NOT_AVAILABLE;
+    }
+  };
+
+  TJobExecutionStatus forStatus(String status);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/TezProgressMonitorStatusMapper.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/TezProgressMonitorStatusMapper.java
b/service/src/java/org/apache/hive/service/cli/TezProgressMonitorStatusMapper.java
new file mode 100644
index 0000000..88fbd22
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/cli/TezProgressMonitorStatusMapper.java
@@ -0,0 +1,32 @@
+package org.apache.hive.service.cli;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hive.service.rpc.thrift.TJobExecutionStatus;
+
+public class TezProgressMonitorStatusMapper implements ProgressMonitorStatusMapper {
+
+  /**
+   * These states are taken form DAGStatus.State, could not use that here directly as it
was
+   * optional dependency and did not want to include it just for the enum.
+   */
+  enum TezStatus {
+    SUBMITTED, INITING, RUNNING, SUCCEEDED, KILLED, FAILED, ERROR
+
+  }
+
+  @Override
+  public TJobExecutionStatus forStatus(String status) {
+    if (StringUtils.isEmpty(status)) {
+      return TJobExecutionStatus.NOT_AVAILABLE;
+    }
+      TezStatus tezStatus = TezStatus.valueOf(status);
+      switch (tezStatus) {
+      case SUBMITTED:
+      case INITING:
+      case RUNNING:
+        return TJobExecutionStatus.IN_PROGRESS;
+      default:
+        return TJobExecutionStatus.COMPLETE;
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
index 85b82b6..0e76c91 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
 import javax.security.sasl.SaslException;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.auth.PlainSaslHelper;
 import org.apache.hive.service.cli.CLIServiceClient;
@@ -191,8 +190,8 @@ public class RetryingThriftCLIServiceClient implements InvocationHandler
{
     }
 
     @Override
-    public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException
{
-      return cliService.getOperationStatus(opHandle);
+    public OperationStatus getOperationStatus(OperationHandle opHandle, boolean getProgressUpdate)
throws HiveSQLException {
+      return cliService.getOperationStatus(opHandle, getProgressUpdate);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 2938338..e09d9fe 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -24,7 +24,6 @@ import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.security.auth.login.LoginException;
 
@@ -33,24 +32,25 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
 import org.apache.hive.service.AbstractService;
 import org.apache.hive.service.ServiceException;
 import org.apache.hive.service.ServiceUtils;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.auth.TSetIpAddressProcessor;
-import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes;
 import org.apache.hive.service.cli.CLIService;
 import org.apache.hive.service.cli.FetchOrientation;
 import org.apache.hive.service.cli.FetchType;
 import org.apache.hive.service.cli.GetInfoType;
 import org.apache.hive.service.cli.GetInfoValue;
 import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.JobProgressUpdate;
 import org.apache.hive.service.cli.OperationHandle;
 import org.apache.hive.service.cli.OperationStatus;
+import org.apache.hive.service.cli.ProgressMonitorStatusMapper;
 import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.SessionHandle;
 import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.TezProgressMonitorStatusMapper;
 import org.apache.hive.service.cli.session.SessionManager;
 import org.apache.hive.service.rpc.thrift.TCLIService;
 import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq;
@@ -93,6 +93,7 @@ import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq;
 import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
 import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
 import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
+import org.apache.hive.service.rpc.thrift.TProgressUpdateResp;
 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
 import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
 import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
@@ -629,15 +630,30 @@ public abstract class ThriftCLIService extends AbstractService implements
TCLISe
   @Override
   public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException
{
     TGetOperationStatusResp resp = new TGetOperationStatusResp();
+    OperationHandle operationHandle = new OperationHandle(req.getOperationHandle());
     try {
-      OperationStatus operationStatus = cliService.getOperationStatus(
-          new OperationHandle(req.getOperationHandle()));
+      OperationStatus operationStatus =
+          cliService.getOperationStatus(operationHandle, req.isGetProgressUpdate());
       resp.setOperationState(operationStatus.getState().toTOperationState());
       HiveSQLException opException = operationStatus.getOperationException();
       resp.setTaskStatus(operationStatus.getTaskStatus());
       resp.setOperationStarted(operationStatus.getOperationStarted());
       resp.setOperationCompleted(operationStatus.getOperationCompleted());
       resp.setHasResultSet(operationStatus.getHasResultSet());
+      JobProgressUpdate progressUpdate = operationStatus.jobProgressUpdate();
+      ProgressMonitorStatusMapper mapper = ProgressMonitorStatusMapper.DEFAULT;
+      if ("tez".equals(hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE))) {
+        mapper = new TezProgressMonitorStatusMapper();
+      }
+
+      resp.setProgressUpdateResponse(new TProgressUpdateResp(
+          progressUpdate.headers(),
+          progressUpdate.rows(),
+          progressUpdate.progressedPercentage,
+          mapper.forStatus(progressUpdate.status),
+          progressUpdate.footerSummary,
+          progressUpdate.startTimeMillis
+      ));
       if (opException != null) {
         resp.setSqlState(opException.getSQLState());
         resp.setErrorCode(opException.getErrorCode());
@@ -746,7 +762,7 @@ public abstract class ThriftCLIService extends AbstractService implements
TCLISe
 	}
     return resp;
   }
-  
+
   @Override
   public abstract void run();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
index 9805641..617bc40 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
@@ -367,9 +367,10 @@ public class ThriftCLIServiceClient extends CLIServiceClient {
    * @see org.apache.hive.service.cli.ICLIService#getOperationStatus(org.apache.hive.service.cli.OperationHandle)
    */
   @Override
-  public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException
{
+  public OperationStatus getOperationStatus(OperationHandle opHandle, boolean getProgressUpdate)
throws HiveSQLException {
     try {
       TGetOperationStatusReq req = new TGetOperationStatusReq(opHandle.toTOperationHandle());
+      req.setGetProgressUpdate(getProgressUpdate);
       TGetOperationStatusResp resp = cliService.GetOperationStatus(req);
       // Checks the status of the RPC call, throws an exception in case of error
       checkStatus(resp.getStatus());

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index f325dbc..bc6648e 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -173,11 +173,11 @@ public abstract class CLIServiceTest {
     queryString = "SELECT ID+1 FROM TEST_EXEC";
     opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
 
-    OperationStatus opStatus = client.getOperationStatus(opHandle);
+    OperationStatus opStatus = client.getOperationStatus(opHandle, false);
     checkOperationTimes(opHandle, opStatus);
     // Expect query to be completed now
     assertEquals("Query should be finished",
-        OperationState.FINISHED, client.getOperationStatus(opHandle).getState());
+        OperationState.FINISHED, client.getOperationStatus(opHandle, false).getState());
     client.closeOperation(opHandle);
 
     // Cleanup
@@ -273,10 +273,10 @@ public abstract class CLIServiceTest {
     System.out.println("Cancelling " + opHandle);
     client.cancelOperation(opHandle);
 
-    OperationStatus operationStatus = client.getOperationStatus(opHandle);
+    OperationStatus operationStatus = client.getOperationStatus(opHandle, false);
     checkOperationTimes(opHandle, operationStatus);
 
-    state = client.getOperationStatus(opHandle).getState();
+    state = client.getOperationStatus(opHandle, false).getState();
     System.out.println(opHandle + " after cancelling, state= " + state);
     assertEquals("Query should be cancelled", OperationState.CANCELED, state);
 
@@ -545,7 +545,7 @@ public abstract class CLIServiceTest {
       }
       longPollingStart = System.currentTimeMillis();
       System.out.println("Long polling starts at: " + longPollingStart);
-      opStatus = client.getOperationStatus(opHandle);
+      opStatus = client.getOperationStatus(opHandle, false);
       state = opStatus.getState();
       longPollingEnd = System.currentTimeMillis();
       System.out.println("Long polling ends at: " + longPollingEnd);
@@ -568,7 +568,7 @@ public abstract class CLIServiceTest {
         assertTrue(longPollingTimeDelta - 0.9*expectedTimeout > 0);
       }
     }
-    assertEquals(expectedState, client.getOperationStatus(opHandle).getState());
+    assertEquals(expectedState, client.getOperationStatus(opHandle, false).getState());
     client.closeOperation(opHandle);
     return opStatus;
   }
@@ -606,7 +606,7 @@ public abstract class CLIServiceTest {
     assertNotNull(opHandle);
     // query should pass and create the table
     assertEquals("Query should be finished",
-        OperationState.FINISHED, client.getOperationStatus(opHandle).getState());
+        OperationState.FINISHED, client.getOperationStatus(opHandle, false).getState());
     client.closeOperation(opHandle);
 
     // select from  the new table should pass
@@ -615,7 +615,7 @@ public abstract class CLIServiceTest {
     assertNotNull(opHandle);
     // query should pass and create the table
     assertEquals("Query should be finished",
-        OperationState.FINISHED, client.getOperationStatus(opHandle).getState());
+        OperationState.FINISHED, client.getOperationStatus(opHandle, false).getState());
     client.closeOperation(opHandle);
 
     // the settings in conf overlay should not be part of session config
@@ -653,7 +653,7 @@ public abstract class CLIServiceTest {
     OperationStatus status = null;
     int count = 0;
     while (true) {
-      status = client.getOperationStatus(ophandle);
+      status = client.getOperationStatus(ophandle, false);
       checkOperationTimes(ophandle, status);
       OperationState state = status.getState();
       System.out.println("Polling: " + ophandle + " count=" + (++count)

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
index 2855bb3..79953c4 100644
--- a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
+++ b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
@@ -206,7 +206,7 @@ public class TestRetryingThriftCLIServiceClient {
       // operations will be lost once owning session is closed.
       for (OperationHandle op: new OperationHandle[]{op1, op2}) {
         try {
-          client.getOperationStatus(op);
+          client.getOperationStatus(op, false);
           fail("Should have failed.");
         } catch (HiveSQLException ignored) {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java
b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java
index abb1ecf..4c59fca 100644
--- a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java
@@ -181,7 +181,7 @@ public abstract class ThriftCLIServiceTest {
     OperationHandle opHandle = client.executeStatement(sessHandle, queryString, opConf);
     assertNotNull(opHandle);
 
-    OperationStatus opStatus = client.getOperationStatus(opHandle);
+    OperationStatus opStatus = client.getOperationStatus(opHandle, false);
     assertNotNull(opStatus);
 
     OperationState state = opStatus.getState();
@@ -241,7 +241,7 @@ public abstract class ThriftCLIServiceTest {
         System.out.println("Polling timed out");
         break;
       }
-      opStatus = client.getOperationStatus(opHandle);
+      opStatus = client.getOperationStatus(opHandle, false);
       assertNotNull(opStatus);
       state = opStatus.getState();
       System.out.println("Current state: " + state);
@@ -264,7 +264,7 @@ public abstract class ThriftCLIServiceTest {
     System.out.println("Will attempt to execute: " + queryString);
     opHandle = client.executeStatementAsync(sessHandle, queryString, opConf);
     assertNotNull(opHandle);
-    opStatus = client.getOperationStatus(opHandle);
+    opStatus = client.getOperationStatus(opHandle, false);
     assertNotNull(opStatus);
     isQueryRunning = true;
     pollTimeout = System.currentTimeMillis() + 100000;
@@ -283,7 +283,7 @@ public abstract class ThriftCLIServiceTest {
         isQueryRunning = false;
       }
       Thread.sleep(1000);
-      opStatus = client.getOperationStatus(opHandle);
+      opStatus = client.getOperationStatus(opHandle, false);
     }
     // Expect query to return an error state
     assertEquals("Operation should be in error state",

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java
b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java
index a5c8d62..6fec947 100644
--- a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java
+++ b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java
@@ -202,7 +202,7 @@ public class ThriftCliServiceTestWithCookie {
     OperationHandle opHandle = client.executeStatement(sessHandle, queryString, opConf);
     assertNotNull(opHandle);
 
-    OperationStatus opStatus = client.getOperationStatus(opHandle);
+    OperationStatus opStatus = client.getOperationStatus(opHandle, false);
     assertNotNull(opStatus);
 
     OperationState state = opStatus.getState();


Mime
View raw message