ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dbhowm...@apache.org
Subject [2/2] ambari git commit: AMBARI-17192. Enable cancel of currently long running job.(dipayanb)
Date Wed, 15 Jun 2016 06:09:11 GMT
AMBARI-17192. Enable cancel of currently long running job.(dipayanb)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/60057378
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/60057378
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/60057378

Branch: refs/heads/branch-2.4
Commit: 6005737844c73bd3cd661af34b4a58cc85acedd4
Parents: 22cc2d5
Author: Dipayan Bhowmick <dipayan.bhowmick@gmail.com>
Authored: Wed Jun 8 12:22:38 2016 +0530
Committer: Dipayan Bhowmick <dipayan.bhowmick@gmail.com>
Committed: Wed Jun 15 11:38:49 2016 +0530

----------------------------------------------------------------------
 .../ambari/view/hive2/ConnectionDelegate.java   |  13 +-
 .../view/hive2/HiveJdbcConnectionDelegate.java  |  90 ++---
 .../view/hive2/actor/AsyncJdbcConnector.java    | 193 ----------
 .../view/hive2/actor/AsyncQueryExecutor.java    |  92 -----
 .../view/hive2/actor/GetResultHolder.java       |  47 ---
 .../ambari/view/hive2/actor/JdbcConnector.java  | 352 +++++++++++++++----
 .../ambari/view/hive2/actor/LogAggregator.java  |  31 +-
 .../view/hive2/actor/OperationController.java   | 146 +++-----
 .../view/hive2/actor/ResultSetIterator.java     |  75 +---
 .../view/hive2/actor/StatementExecutor.java     | 147 ++++++++
 .../view/hive2/actor/SyncJdbcConnector.java     | 174 ---------
 .../view/hive2/actor/YarnAtsGUIDFetcher.java    |  69 ++++
 .../ambari/view/hive2/actor/YarnAtsParser.java  |  32 --
 .../view/hive2/actor/message/AdvanceCursor.java |  32 --
 .../hive2/actor/message/AssignResultSet.java    |  48 ---
 .../hive2/actor/message/AssignStatement.java    |  46 ---
 .../view/hive2/actor/message/AsyncJob.java      |  52 ---
 .../view/hive2/actor/message/Connect.java       |  22 +-
 .../ambari/view/hive2/actor/message/DDLJob.java |  73 ----
 .../actor/message/GetColumnMetadataJob.java     |   1 +
 .../actor/message/JobExecutionCompleted.java    |  21 --
 .../hive2/actor/message/ResultInformation.java  |  83 +++++
 .../hive2/actor/message/ResultNotReady.java     |  40 +++
 .../view/hive2/actor/message/ResultReady.java   |  27 +-
 .../view/hive2/actor/message/RunStatement.java  |  73 ++++
 .../hive2/actor/message/SQLStatementJob.java    |  65 ++++
 .../actor/message/StartLogAggregation.java      |  15 +-
 .../view/hive2/actor/message/SyncJob.java       |  27 --
 .../view/hive2/actor/message/job/CancelJob.java |  40 +++
 .../actor/message/job/ExecuteNextStatement.java |  22 ++
 .../actor/message/job/UpdateYarnAtsGuid.java    |  38 ++
 .../view/hive2/client/AsyncJobRunner.java       |  15 +-
 .../view/hive2/client/AsyncJobRunnerImpl.java   | 101 +++---
 .../view/hive2/client/ConnectionConfig.java     |   5 +
 .../view/hive2/client/DDLDelegatorImpl.java     |   8 +-
 .../ambari/view/hive2/internal/Either.java      |  59 ++--
 .../view/hive2/resources/jobs/Aggregator.java   |   7 -
 .../view/hive2/resources/jobs/JobService.java   |   8 +-
 .../jobs/ResultsPaginationController.java       |   6 +-
 .../jobs/viewJobs/JobControllerImpl.java        |  13 +-
 .../utils/ResultFetchFormattedException.java    |  27 ++
 .../utils/ResultNotReadyFormattedException.java |  27 ++
 .../ui/hive-web/app/controllers/index.js        |  14 +-
 .../ui/hive-web/app/controllers/upload-table.js |   4 +-
 .../ui/hive-web/app/services/job-progress.js    |   9 +
 .../ambari/view/hive2/AsyncQueriesTest.java     | 124 -------
 .../ambari/view/hive2/InactivityTest.java       | 109 ------
 .../apache/ambari/view/hive2/Mocksupport.java   |  94 -----
 .../ambari/view/hive2/SyncQueriesTest.java      | 141 --------
 49 files changed, 1218 insertions(+), 1739 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java
index 918dc68..bb1fde8 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java
@@ -19,10 +19,7 @@
 package org.apache.ambari.view.hive2;
 
 import com.google.common.base.Optional;
-import org.apache.ambari.view.hive2.actor.message.DDLJob;
 import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob;
-import org.apache.ambari.view.hive2.actor.message.HiveJob;
-import org.apache.ambari.view.hive2.internal.HiveResult;
 import org.apache.hive.jdbc.HiveConnection;
 import org.apache.hive.jdbc.HiveStatement;
 
@@ -30,11 +27,11 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 
 public interface ConnectionDelegate {
-  Optional<ResultSet> execute(HiveConnection connection, DDLJob job) throws SQLException;
-  Optional<ResultSet> executeSync(HiveConnection connection, DDLJob job) throws SQLException;
-  Optional<ResultSet> getColumnMetadata(HiveConnection connection, GetColumnMetadataJob job) throws SQLException;
-  Optional<ResultSet> getCurrentResultSet();
-  Optional<HiveStatement> getCurrentStatement();
+  HiveStatement createStatement(HiveConnection connection) throws SQLException;
+  Optional<ResultSet> execute(String statement) throws SQLException;
+  Optional<ResultSet> execute(HiveConnection connection, String statement) throws SQLException;
+  ResultSet getColumnMetadata(HiveConnection connection, GetColumnMetadataJob job) throws SQLException;
+  void cancel() throws SQLException;
   void closeResultSet();
   void closeStatement();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java
index e8d3333..bd2b9ba 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java
@@ -19,11 +19,7 @@
 package org.apache.ambari.view.hive2;
 
 import com.google.common.base.Optional;
-import org.apache.ambari.view.hive2.actor.message.DDLJob;
 import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob;
-import org.apache.ambari.view.hive2.actor.message.HiveJob;
-import org.apache.ambari.view.hive2.actor.message.job.Result;
-import org.apache.ambari.view.hive2.internal.HiveResult;
 import org.apache.hive.jdbc.HiveConnection;
 import org.apache.hive.jdbc.HiveStatement;
 
@@ -36,85 +32,51 @@ public class HiveJdbcConnectionDelegate implements ConnectionDelegate {
 
   private ResultSet currentResultSet;
   private HiveStatement currentStatement;
-  private String atsGuid;
 
   @Override
-  public Optional<ResultSet> execute(HiveConnection connection, DDLJob job) throws SQLException {
-
-    try {
-      Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY);
-      currentStatement = (HiveStatement) statement;
+  public HiveStatement createStatement(HiveConnection connection) throws SQLException {
+    Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
+    currentStatement = (HiveStatement) statement;
+    return currentStatement;
+  }
 
-      for (String syncStatement : job.getSyncStatements()) {
-        // we don't care about the result
-        // fail all if one fails
-        statement.execute(syncStatement);
-      }
+  @Override
+  public Optional<ResultSet> execute(String statement) throws SQLException {
+    if (currentStatement == null) {
+      throw new SQLException("Statement not created. Cannot execute Hive queries");
+    }
 
-      HiveStatement hiveStatement = (HiveStatement) statement;
-      boolean result = hiveStatement.executeAsync(job.getAsyncStatement());
-      atsGuid = hiveStatement.getYarnATSGuid();
-      if (result) {
-        // query has a result set
-        ResultSet resultSet = hiveStatement.getResultSet();
-        currentResultSet = resultSet;
-        Optional<ResultSet> resultSetOptional = Optional.of(resultSet);
-        return resultSetOptional;
+    boolean hasResultSet = currentStatement.execute(statement);
 
-      }
+    if (hasResultSet) {
+      ResultSet resultSet = currentStatement.getResultSet();
+      currentResultSet = resultSet;
+      return Optional.of(resultSet);
+    } else {
       return Optional.absent();
-
-    } catch (SQLException e) {
-      // Close the statement on any error
-      currentStatement.close();
-      throw e;
     }
   }
 
   @Override
-  public Optional<ResultSet> executeSync(HiveConnection connection, DDLJob job) throws SQLException {
-    try {
-      Statement statement = connection.createStatement();
-      currentStatement = (HiveStatement) statement;
-
-      boolean hasResultSet = false;
-      for (String syncStatement : job.getStatements()) {
-        // we don't care about the result
-        // fail all if one fails
-        hasResultSet = statement.execute(syncStatement);
-      }
-
-      if (hasResultSet) {
-        ResultSet resultSet = statement.getResultSet();
-        //HiveResult result = new HiveResult(resultSet);
-        return Optional.of(resultSet);
-      } else {
-        return Optional.absent();
-      }
-    } catch (SQLException e) {
-      // Close the statement on any error
-      currentStatement.close();
-      throw e;
-    }
+  public Optional<ResultSet> execute(HiveConnection connection, String sqlStatement) throws SQLException {
+    createStatement(connection);
+    return execute(sqlStatement);
   }
 
 
   @Override
-  public Optional<ResultSet> getColumnMetadata(HiveConnection connection, GetColumnMetadataJob job) throws SQLException {
+  public ResultSet getColumnMetadata(HiveConnection connection, GetColumnMetadataJob job) throws SQLException {
     DatabaseMetaData metaData = connection.getMetaData();
     ResultSet resultSet = metaData.getColumns("", job.getSchemaPattern(), job.getTablePattern(), job.getColumnPattern());
     currentResultSet = resultSet;
-    return Optional.of(resultSet);
+    return resultSet;
   }
 
   @Override
-  public Optional<ResultSet> getCurrentResultSet() {
-    return Optional.fromNullable(currentResultSet);
-  }
-
-  @Override
-  public Optional<HiveStatement> getCurrentStatement() {
-    return Optional.fromNullable(currentStatement);
+  public void cancel() throws SQLException {
+    if (currentStatement != null) {
+      currentStatement.cancel();
+    }
   }
 
   @Override
@@ -130,7 +92,7 @@ public class HiveJdbcConnectionDelegate implements ConnectionDelegate {
   }
 
   @Override
-  public void closeStatement()  {
+  public void closeStatement() {
     try {
       if (currentStatement != null) {
         currentStatement.close();

http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java
deleted file mode 100644
index 9a5992c..0000000
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.view.hive2.actor;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import com.google.common.base.Optional;
-import org.apache.ambari.view.ViewContext;
-import org.apache.ambari.view.hive2.ConnectionDelegate;
-import org.apache.ambari.view.hive2.actor.message.AsyncJob;
-import org.apache.ambari.view.hive2.actor.message.HiveMessage;
-import org.apache.ambari.view.hive2.actor.message.RegisterActor;
-import org.apache.ambari.view.hive2.actor.message.ResultReady;
-import org.apache.ambari.view.hive2.actor.message.StartLogAggregation;
-import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed;
-import org.apache.ambari.view.hive2.actor.message.lifecycle.InactivityCheck;
-import org.apache.ambari.view.hive2.internal.Either;
-import org.apache.ambari.view.hive2.persistence.Storage;
-import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
-import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job;
-import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl;
-import org.apache.ambari.view.utils.hdfs.HdfsApi;
-import org.apache.hive.jdbc.HiveConnection;
-import org.apache.hive.jdbc.HiveStatement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.concurrent.TimeUnit;
-
-public class AsyncJdbcConnector extends JdbcConnector {
-
-  private final Logger LOG = LoggerFactory.getLogger(getClass());
-
-  private ActorRef logAggregator = null;
-  private ActorRef asyncQueryExecutor = null;
-  private ActorRef resultSetActor = null;
-
-
-  public AsyncJdbcConnector(ViewContext viewContext, HdfsApi hdfsApi, ActorSystem system, ActorRef parent,ActorRef deathWatch, ConnectionDelegate connectionDelegate, Storage storage) {
-    super(viewContext, hdfsApi, system, parent,deathWatch, connectionDelegate, storage);
-  }
-
-  @Override
-  protected void handleJobMessage(HiveMessage message) {
-    Object job = message.getMessage();
-    if (job instanceof AsyncJob) {
-      LOG.debug("Executing async job " + message.toString());
-      execute((AsyncJob) job);
-    }
-  }
-
-  @Override
-  protected boolean isAsync() {
-    return true;
-  }
-
-  @Override
-  protected void cleanUpChildren() {
-    if(logAggregator != null && !logAggregator.isTerminated()) {
-      LOG.debug("Sending poison pill to log aggregator");
-      logAggregator.tell(PoisonPill.getInstance(), self());
-    }
-
-    if(asyncQueryExecutor != null && !asyncQueryExecutor.isTerminated()) {
-      LOG.debug("Sending poison pill to Async Query Executor");
-      asyncQueryExecutor.tell(PoisonPill.getInstance(), self());
-    }
-
-    if(resultSetActor != null && !resultSetActor.isTerminated()) {
-      LOG.debug("Sending poison pill to Resultset Actor");
-      resultSetActor.tell(PoisonPill.getInstance(), self());
-    }
-  }
-
-  @Override
-  protected void notifyFailure() {
-    AsyncExecutionFailed failure = new AsyncExecutionFailed(jobId,username,"Cannot connect to hive");
-    parent.tell(failure, self());
-  }
-
-  private void execute(AsyncJob message) {
-    this.executing = true;
-    this.jobId = message.getJobId();
-    updateJobStatus(jobId,Job.JOB_STATE_INITIALIZED);
-    if (connectable == null) {
-      notifyAndCleanUp();
-      return;
-    }
-
-    Optional<HiveConnection> connectionOptional = connectable.getConnection();
-    if (!connectionOptional.isPresent()) {
-      notifyAndCleanUp();
-      return;
-    }
-
-    try {
-      Optional<ResultSet> resultSetOptional = connectionDelegate.execute(connectionOptional.get(), message);
-      Optional<HiveStatement> currentStatement = connectionDelegate.getCurrentStatement();
-      // There should be a result set, which either has a result set, or an empty value
-      // for operations which do not return anything
-
-      logAggregator = getContext().actorOf(
-        Props.create(LogAggregator.class, system, hdfsApi, currentStatement.get(), message.getLogFile())
-        .withDispatcher("akka.actor.misc-dispatcher"),   message.getJobId() + ":" +"-logAggregator"
-      );
-      deathWatch.tell(new RegisterActor(logAggregator),self());
-
-      updateGuidInJob(jobId, currentStatement.get());
-      updateJobStatus(jobId,Job.JOB_STATE_RUNNING);
-
-      if (resultSetOptional.isPresent()) {
-        // Start a result set aggregator on the same context, a notice to the parent will kill all these as well
-        // tell the result holder to assign the result set for further operations
-        resultSetActor = getContext().actorOf(Props.create(ResultSetIterator.class, self(),
-          resultSetOptional.get(),storage).withDispatcher("akka.actor.result-dispatcher"),
-          "ResultSetActor:ResultSetIterator:JobId:"+ jobId );
-        deathWatch.tell(new RegisterActor(resultSetActor),self());
-        parent.tell(new ResultReady(jobId,username, Either.<ActorRef, ActorRef>left(resultSetActor)), self());
-
-        // Start a actor to query ATS
-      } else {
-        // Case when this is an Update/query with no results
-        // Wait for operation to complete and add results;
-
-        ActorRef asyncQueryExecutor = getContext().actorOf(
-                Props.create(AsyncQueryExecutor.class, parent, currentStatement.get(),storage,jobId,username)
-                  .withDispatcher("akka.actor.result-dispatcher"),
-                 message.getJobId() + "-asyncQueryExecutor");
-        deathWatch.tell(new RegisterActor(asyncQueryExecutor),self());
-        parent.tell(new ResultReady(jobId,username, Either.<ActorRef, ActorRef>right(asyncQueryExecutor)), self());
-
-      }
-      // Start a actor to query log
-      logAggregator.tell(new StartLogAggregation(), self());
-
-
-    } catch (SQLException e) {
-      // update the error on the log
-      AsyncExecutionFailed failure = new AsyncExecutionFailed(message.getJobId(),username,
-              e.getMessage(), e);
-      updateJobStatus(jobId,Job.JOB_STATE_ERROR);
-      parent.tell(failure, self());
-      // Update the operation controller to write an error on the right side
-      // make sure we can stop the connector
-      executing = false;
-      LOG.error("Caught SQL excpetion for job-"+message,e);
-
-    }
-
-    // Start Inactivity timer to close the statement
-    this.inactivityScheduler = system.scheduler().schedule(
-      Duration.Zero(), Duration.create(15 * 1000, TimeUnit.MILLISECONDS),
-      this.self(), new InactivityCheck(), system.dispatcher(), null);
-  }
-
-  private void notifyAndCleanUp() {
-    updateJobStatus(jobId, Job.JOB_STATE_ERROR);
-    notifyFailure();
-    cleanUp();
-  }
-
-  private void updateJobStatus(String jobId, String jobState) {
-    JobImpl job = null;
-    try {
-      job = storage.load(JobImpl.class, jobId);
-    } catch (ItemNotFound itemNotFound) {
-      itemNotFound.printStackTrace();
-    }
-    job.setStatus(jobState);
-    storage.store(JobImpl.class, job);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java
deleted file mode 100644
index 2a1dbb3..0000000
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.view.hive2.actor;
-
-import akka.actor.ActorRef;
-import org.apache.ambari.view.hive2.actor.message.ExecuteQuery;
-import org.apache.ambari.view.hive2.actor.message.HiveMessage;
-import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed;
-import org.apache.ambari.view.hive2.actor.message.job.ExecutionFailed;
-import org.apache.ambari.view.hive2.actor.message.lifecycle.CleanUp;
-import org.apache.ambari.view.hive2.internal.AsyncExecutionSuccess;
-import org.apache.ambari.view.hive2.persistence.Storage;
-import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
-import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job;
-import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.SQLException;
-import java.sql.Statement;
-
-public class AsyncQueryExecutor extends HiveActor {
-  private final Logger LOG = LoggerFactory.getLogger(getClass());
-
-  private Statement statement;
-  private final Storage storage;
-  private final String jobId;
-  private final ActorRef parent;
-  private final String userName;
-
-  public AsyncQueryExecutor(ActorRef parent, Statement statement, Storage storage, String jobId,String userName) {
-    this.statement = statement;
-    this.storage = storage;
-    this.jobId = jobId;
-    this.parent = parent;
-    this.userName = userName;
-  }
-
-  @Override
-  public void handleMessage(HiveMessage hiveMessage) {
-    Object message = hiveMessage.getMessage();
-
-    if (message instanceof ExecuteQuery) {
-      executeQuery();
-    }
-
-  }
-
-  private void executeQuery() {
-    JobImpl job = null;
-    try {
-      job = storage.load(JobImpl.class, jobId);
-      statement.getUpdateCount();
-      LOG.info("Job execution successful. Setting status in db.");
-      job.setStatus(Job.JOB_STATE_FINISHED);
-      storage.store(JobImpl.class, job);
-      sender().tell(new AsyncExecutionSuccess(), self());
-
-    } catch (SQLException e) {
-      job.setStatus(Job.JOB_STATE_ERROR);
-      sender().tell(new AsyncExecutionFailed(jobId,userName, e.getMessage(), e), self());
-      storage.store(JobImpl.class, job);
-    } catch (ItemNotFound itemNotFound) {
-      sender().tell(new AsyncExecutionFailed(jobId,userName, "Cannot load job", itemNotFound), self());
-    } finally {
-      // We can clean up this connection here
-      parent.tell(new CleanUp(), self());
-    }
-
-  }
-
-
-}
-
-
-

http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java
deleted file mode 100644
index c2ee5c7..0000000
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.view.hive2.actor;
-
-public class GetResultHolder {
-
-    private String jobId;
-    private String userName;
-
-    public GetResultHolder(String jobId, String userName) {
-        this.jobId = jobId;
-        this.userName = userName;
-    }
-
-
-    public String getJobId() {
-        return jobId;
-    }
-
-    public String getUserName() {
-        return userName;
-    }
-
-    @Override
-    public String toString() {
-        return "GetResultHolder{" +
-                "jobId='" + jobId + '\'' +
-                ", userName='" + userName + '\'' +
-                '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
index 7769dde..1894739 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
@@ -19,15 +19,28 @@
 package org.apache.ambari.view.hive2.actor;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.actor.Cancellable;
 import akka.actor.PoisonPill;
+import akka.actor.Props;
 import com.google.common.base.Optional;
 import org.apache.ambari.view.ViewContext;
 import org.apache.ambari.view.hive2.ConnectionDelegate;
 import org.apache.ambari.view.hive2.actor.message.Connect;
+import org.apache.ambari.view.hive2.actor.message.FetchError;
+import org.apache.ambari.view.hive2.actor.message.FetchResult;
+import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob;
+import org.apache.ambari.view.hive2.actor.message.HiveJob;
 import org.apache.ambari.view.hive2.actor.message.HiveMessage;
-import org.apache.ambari.view.hive2.actor.message.JobExecutionCompleted;
+import org.apache.ambari.view.hive2.actor.message.ResultInformation;
+import org.apache.ambari.view.hive2.actor.message.ResultNotReady;
+import org.apache.ambari.view.hive2.actor.message.RunStatement;
+import org.apache.ambari.view.hive2.actor.message.SQLStatementJob;
+import org.apache.ambari.view.hive2.actor.message.job.CancelJob;
+import org.apache.ambari.view.hive2.actor.message.job.ExecuteNextStatement;
+import org.apache.ambari.view.hive2.actor.message.job.ExecutionFailed;
+import org.apache.ambari.view.hive2.actor.message.job.Failure;
+import org.apache.ambari.view.hive2.actor.message.job.NoResult;
+import org.apache.ambari.view.hive2.actor.message.job.ResultSetHolder;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.CleanUp;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.DestroyConnector;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.FreeConnector;
@@ -38,22 +51,28 @@ import org.apache.ambari.view.hive2.internal.Connectable;
 import org.apache.ambari.view.hive2.internal.ConnectionException;
 import org.apache.ambari.view.hive2.persistence.Storage;
 import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job;
 import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl;
 import org.apache.ambari.view.hive2.utils.HiveActorConfiguration;
 import org.apache.ambari.view.utils.hdfs.HdfsApi;
-import org.apache.hive.jdbc.HiveStatement;
+import org.apache.hive.jdbc.HiveConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
 
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 
 /**
  * Wraps one Jdbc connection per user, per instance. This is used to delegate execute the statements and
- * creates child actors to delegate the resultset extraction, YARN/ATS querying for ExecuteJob info and Log Aggregation
+ * creates child actors to delegate the ResultSet extraction, YARN/ATS querying for ExecuteJob info and Log Aggregation
  */
-public abstract class JdbcConnector extends HiveActor {
+public class JdbcConnector extends HiveActor {
 
   private final Logger LOG = LoggerFactory.getLogger(getClass());
 
@@ -67,9 +86,7 @@ public abstract class JdbcConnector extends HiveActor {
    */
   private static final long MAX_TERMINATION_INACTIVITY_INTERVAL = 10 * 60 * 1000;
 
-  protected final ViewContext viewContext;
-  protected final ActorSystem system;
-  protected final Storage storage;
+  private final Storage storage;
 
   /**
    * Keeps track of the timestamp when the last activity has happened. This is
@@ -81,47 +98,57 @@ public abstract class JdbcConnector extends HiveActor {
   /**
    * Akka scheduler to tick at an interval to deal with inactivity of this actor
    */
-  protected Cancellable inactivityScheduler;
+  private Cancellable inactivityScheduler;
 
   /**
    * Akka scheduler to tick at an interval to deal with the inactivity after which
-   * the actor should be killed and connectable should be released
+   * the actor should be killed and connection should be released
    */
-  protected Cancellable terminateActorScheduler;
+  private Cancellable terminateActorScheduler;
 
-  protected Connectable connectable = null;
-  protected final ActorRef deathWatch;
-  protected final ConnectionDelegate connectionDelegate;
-  protected final ActorRef parent;
-  protected final HdfsApi hdfsApi;
+  private Connectable connectable = null;
+  private final ActorRef deathWatch;
+  private final ConnectionDelegate connectionDelegate;
+  private final ActorRef parent;
+  private ActorRef statementExecutor = null;
+  private final HdfsApi hdfsApi;
 
   /**
    * true if the actor is currently executing any job.
    */
-  protected boolean executing = false;
+  private boolean executing = false;
+  private HiveJob.Type executionType = HiveJob.Type.SYNC;
 
   /**
-   * true if the currently executing job is async job.
+   * Returns the timeout configurations.
    */
-  private boolean async = true;
+  private final HiveActorConfiguration actorConfiguration;
+  private String username;
+  private Optional<String> jobId = Optional.absent();
+  private Optional<String> logFile = Optional.absent();
+  private int statementsCount = 0;
+
+  private ActorRef commandSender = null;
+
+  private ActorRef resultSetIterator = null;
+  private boolean isFailure = false;
+  private Failure failure = null;
+  private boolean isCancelCalled = false;
 
   /**
-   * Returns the timeout configurations.
+   * For every execution, this will hold the statements that are left to execute
    */
-  private final HiveActorConfiguration actorConfiguration;
-  protected String username;
-  protected String jobId;
+  private Queue<String> statementQueue = new ArrayDeque<>();
 
-  public JdbcConnector(ViewContext viewContext, HdfsApi hdfsApi, ActorSystem system, ActorRef parent, ActorRef deathWatch,
+  public JdbcConnector(ViewContext viewContext, ActorRef parent, ActorRef deathWatch, HdfsApi hdfsApi,
                        ConnectionDelegate connectionDelegate, Storage storage) {
-    this.viewContext = viewContext;
     this.hdfsApi = hdfsApi;
-    this.system = system;
     this.parent = parent;
     this.deathWatch = deathWatch;
     this.connectionDelegate = connectionDelegate;
     this.storage = storage;
     this.lastActivityTimestamp = System.currentTimeMillis();
+    resultSetIterator = null;
     actorConfiguration = new HiveActorConfiguration(viewContext);
   }
 
@@ -136,8 +163,6 @@ public abstract class JdbcConnector extends HiveActor {
       keepAlive();
     } else if (message instanceof CleanUp) {
       cleanUp();
-    } else if (message instanceof JobExecutionCompleted) {
-      jobExecutionCompleted();
     } else {
       handleNonLifecycleMessage(hiveMessage);
     }
@@ -148,19 +173,205 @@ public abstract class JdbcConnector extends HiveActor {
     keepAlive();
     if (message instanceof Connect) {
       connect((Connect) message);
+    } else if (message instanceof SQLStatementJob) {
+      runStatementJob((SQLStatementJob) message);
+    } else if (message instanceof GetColumnMetadataJob) {
+      runGetMetaData((GetColumnMetadataJob) message);
+    } else if (message instanceof ExecuteNextStatement) {
+      executeNextStatement();
+    } else if (message instanceof ResultInformation) {
+      gotResultBack((ResultInformation) message);
+    } else if (message instanceof CancelJob) {
+      cancelJob((CancelJob) message);
+    } else if (message instanceof FetchResult) {
+      fetchResult((FetchResult) message);
+    } else if (message instanceof FetchError) {
+      fetchError((FetchError) message);
+    } else {
+      unhandled(message);
+    }
+  }
+
+  private void fetchError(FetchError message) {
+    if (isFailure) {
+      sender().tell(Optional.of(failure), self());
+      return;
+    }
+    sender().tell(Optional.absent(), self());
+  }
+
+  private void fetchResult(FetchResult message) {
+    if (isFailure) {
+      sender().tell(failure, self());
+      return;
+    }
+
+    if (executing) {
+      sender().tell(new ResultNotReady(jobId.get(), username), self());
+      return;
+    }
+    sender().tell(Optional.fromNullable(resultSetIterator), self());
+  }
+
+  private void cancelJob(CancelJob message) {
+    if (!executing || connectionDelegate == null) {
+      LOG.error("Cannot cancel job for user as currently the job is not running or started. JobId: {}", message.getJobId());
+      return;
+    }
+    LOG.info("Cancelling job for user. JobId: {}, user: {}", message.getJobId(), username);
+    try {
+      isCancelCalled = true;
+      connectionDelegate.cancel();
+    } catch (SQLException e) {
+      LOG.error("Failed to cancel job. JobId: {}. {}", message.getJobId(), e);
+    }
+  }
+
+  private void gotResultBack(ResultInformation message) {
+    Optional<Failure> failureOptional = message.getFailure();
+    if (failureOptional.isPresent()) {
+      Failure failure = failureOptional.get();
+      processFailure(failure);
+      return;
+    }
+    if (statementQueue.size() == 0) {
+      // This is the last resultSet
+      processResult(message.getResultSet());
     } else {
-      handleJobMessage(hiveMessage);
+      self().tell(new ExecuteNextStatement(), self());
+    }
+  }
+
+  private void processCancel() {
+    executing = false;
+    if (isAsync() && jobId.isPresent()) {
+      LOG.error("Job canceled by user for JobId: {}", jobId.get());
+      updateJobStatus(jobId.get(), Job.JOB_STATE_CANCELED);
     }
+  }
 
+  private void processFailure(Failure failure) {
+    executing = false;
+    isFailure = true;
+    this.failure = failure;
+    if (isAsync() && jobId.isPresent()) {
+      if(isCancelCalled) {
+        processCancel();
+        return;
+      }
+      updateJobStatus(jobId.get(), Job.JOB_STATE_ERROR);
+    } else {
+      // Send for sync execution
+      commandSender.tell(new ExecutionFailed(failure.getMessage(), failure.getError()), self());
+      cleanUpWithTermination();
+    }
   }
 
-  protected abstract void handleJobMessage(HiveMessage message);
+  private void processResult(Optional<ResultSet> resultSetOptional) {
+    executing = false;
 
-  protected abstract boolean isAsync();
+    if (isAsync() && jobId.isPresent()) {
+      updateJobStatus(jobId.get(), Job.JOB_STATE_FINISHED);
+    }
 
-  protected abstract void notifyFailure();
+    if (resultSetOptional.isPresent()) {
+      ActorRef resultSetActor = getContext().actorOf(Props.create(ResultSetIterator.class, self(),
+        resultSetOptional.get(), isAsync()).withDispatcher("akka.actor.result-dispatcher"),
+        "ResultSetIterator:" + UUID.randomUUID().toString());
+      resultSetIterator = resultSetActor;
+      if (!isAsync()) {
+        commandSender.tell(new ResultSetHolder(resultSetActor), self());
+      }
+    } else {
+      resultSetIterator = null;
+      if (!isAsync()) {
+        commandSender.tell(new NoResult(), self());
+      }
+    }
+  }
 
-  protected abstract void cleanUpChildren();
+  private void executeNextStatement() {
+    if (statementQueue.isEmpty()) {
+      jobExecutionCompleted();
+      return;
+    }
+
+    int index = statementsCount - statementQueue.size();
+    String statement = statementQueue.poll();
+    if (statementExecutor == null) {
+      statementExecutor = getStatementExecutor();
+    }
+
+    if (isAsync()) {
+      statementExecutor.tell(new RunStatement(index, statement, jobId.get(), true, logFile.get(), true), self());
+    } else {
+      statementExecutor.tell(new RunStatement(index, statement), self());
+    }
+  }
+
+  private void runStatementJob(SQLStatementJob message) {
+    executing = true;
+    jobId = message.getJobId();
+    logFile = message.getLogFile();
+    executionType = message.getType();
+    commandSender = getSender();
+
+    if (!checkConnection()) return;
+
+    for (String statement : message.getStatements()) {
+      statementQueue.add(statement);
+    }
+    statementsCount = statementQueue.size();
+
+    if (isAsync() && jobId.isPresent()) {
+      updateJobStatus(jobId.get(), Job.JOB_STATE_RUNNING);
+      startInactivityScheduler();
+    }
+    self().tell(new ExecuteNextStatement(), self());
+  }
+
+  public boolean checkConnection() {
+    if (connectable == null) {
+      notifyConnectFailure();
+      return false;
+    }
+
+    Optional<HiveConnection> connectionOptional = connectable.getConnection();
+    if (!connectionOptional.isPresent()) {
+      notifyConnectFailure();
+      return false;
+    }
+    return true;
+  }
+
+  private void runGetMetaData(GetColumnMetadataJob message) {
+    if (!checkConnection()) return;
+    executing = true;
+    executionType = message.getType();
+    commandSender = getSender();
+    statementExecutor = getStatementExecutor();
+    statementExecutor.tell(message, self());
+  }
+
+  private ActorRef getStatementExecutor() {
+    return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, storage, connectable.getConnection().get(), connectionDelegate).withDispatcher("akka.actor.result-dispatcher"), "StatementExecutor");
+  }
+
+  private boolean isAsync() {
+    return executionType == HiveJob.Type.ASYNC;
+  }
+
+  private void notifyConnectFailure() {
+    executing = false;
+    isFailure = true;
+    this.failure = new Failure("Cannot connect to hive", new SQLException("Cannot connect to hive"));
+    if (isAsync()) {
+      updateJobStatus(jobId.get(), Job.JOB_STATE_ERROR);
+    } else {
+      sender().tell(new ExecutionFailed("Cannot connect to hive"), ActorRef.noSender());
+      cleanUpWithTermination();
+    }
+  }
 
   private void keepAlive() {
     lastActivityTimestamp = System.currentTimeMillis();
@@ -173,16 +384,14 @@ public abstract class JdbcConnector extends HiveActor {
     this.executing = false;
   }
 
-  protected Optional<String> getJobId() {
-    return Optional.fromNullable(jobId);
-  }
-
   protected Optional<String> getUsername() {
     return Optional.fromNullable(username);
   }
 
   private void connect(Connect message) {
-    this.username = message.getUsername();
+    username = message.getUsername();
+    jobId = message.getJobId();
+    executionType = message.getType();
     // check the connectable
     if (connectable == null) {
       connectable = message.getConnectable();
@@ -195,30 +404,25 @@ public abstract class JdbcConnector extends HiveActor {
     } catch (ConnectionException e) {
       // set up job failure
       // notify parent about job failure
-      this.notifyFailure();
-      cleanUp();
+      notifyConnectFailure();
       return;
     }
-
-    this.terminateActorScheduler = system.scheduler().schedule(
-      Duration.Zero(), Duration.create(60 * 1000, TimeUnit.MILLISECONDS),
-      this.getSelf(), new TerminateInactivityCheck(), system.dispatcher(), null);
-
+    startTerminateInactivityScheduler();
   }
 
-  protected void updateGuidInJob(String jobId, HiveStatement statement) {
-    String yarnAtsGuid = statement.getYarnATSGuid();
+  private void updateJobStatus(String jobid, String status) {
     try {
-      JobImpl job = storage.load(JobImpl.class, jobId);
-      job.setGuid(yarnAtsGuid);
+      JobImpl job = storage.load(JobImpl.class, jobid);
+      job.setStatus(status);
       storage.store(JobImpl.class, job);
     } catch (ItemNotFound itemNotFound) {
-      // Cannot do anything if the job is not present
+      // Cannot do anything
     }
   }
 
+
   private void checkInactivity() {
-    LOG.info("Inactivity check, executing status: {}", executing);
+    LOG.debug("Inactivity check, executing status: {}", executing);
     if (executing) {
       keepAlive();
       return;
@@ -233,11 +437,11 @@ public abstract class JdbcConnector extends HiveActor {
   private void checkTerminationInactivity() {
     if (!isAsync()) {
       // Should not terminate if job is sync. Will terminate after the job is finished.
-      stopTeminateInactivityScheduler();
+      stopTerminateInactivityScheduler();
       return;
     }
 
-    LOG.info("Termination check, executing status: {}", executing);
+    LOG.debug("Termination check, executing status: {}", executing);
     if (executing) {
       keepAlive();
       return;
@@ -249,27 +453,26 @@ public abstract class JdbcConnector extends HiveActor {
     }
   }
 
-  protected void cleanUp() {
-    if(jobId != null) {
-      LOG.debug("{} :: Cleaning up resources for inactivity for jobId: {}", self().path().name(), jobId);
+  private void cleanUp() {
+    if (jobId.isPresent()) {
+      LOG.debug("{} :: Cleaning up resources for inactivity for jobId: {}", self().path().name(), jobId.get());
     } else {
       LOG.debug("{} ::Cleaning up resources with inactivity for Sync execution.", self().path().name());
     }
     this.executing = false;
     cleanUpStatementAndResultSet();
-    cleanUpChildren();
     stopInactivityScheduler();
-    parent.tell(new FreeConnector(username, jobId, isAsync()), self());
+    parent.tell(new FreeConnector(username, jobId.orNull(), isAsync()), self());
   }
 
-  protected void cleanUpWithTermination() {
-    LOG.debug("{} :: Cleaning up resources with inactivity for Sync execution.", self().path().name());
+  private void cleanUpWithTermination() {
+    this.executing = false;
+    LOG.debug("{} :: Cleaning up resources with inactivity for execution.", self().path().name());
     cleanUpStatementAndResultSet();
 
-    cleanUpChildren();
     stopInactivityScheduler();
-    stopTeminateInactivityScheduler();
-    parent.tell(new DestroyConnector(username, jobId, isAsync()), this.self());
+    stopTerminateInactivityScheduler();
+    parent.tell(new DestroyConnector(username, jobId.orNull(), isAsync()), this.self());
     self().tell(PoisonPill.getInstance(), ActorRef.noSender());
   }
 
@@ -279,12 +482,27 @@ public abstract class JdbcConnector extends HiveActor {
     connectionDelegate.closeResultSet();
   }
 
-  private void stopTeminateInactivityScheduler() {
+  private void startTerminateInactivityScheduler() {
+    this.terminateActorScheduler = getContext().system().scheduler().schedule(
+      Duration.Zero(), Duration.create(60 * 1000, TimeUnit.MILLISECONDS),
+      this.getSelf(), new TerminateInactivityCheck(), getContext().dispatcher(), null);
+  }
+
+  private void stopTerminateInactivityScheduler() {
     if (!(terminateActorScheduler == null || terminateActorScheduler.isCancelled())) {
       terminateActorScheduler.cancel();
     }
   }
 
+  private void startInactivityScheduler() {
+    if (inactivityScheduler != null) {
+      inactivityScheduler.cancel();
+    }
+    inactivityScheduler = getContext().system().scheduler().schedule(
+      Duration.Zero(), Duration.create(15 * 1000, TimeUnit.MILLISECONDS),
+      this.self(), new InactivityCheck(), getContext().dispatcher(), null);
+  }
+
   private void stopInactivityScheduler() {
     if (!(inactivityScheduler == null || inactivityScheduler.isCancelled())) {
       inactivityScheduler.cancel();
@@ -294,12 +512,10 @@ public abstract class JdbcConnector extends HiveActor {
   @Override
   public void postStop() throws Exception {
     stopInactivityScheduler();
-    stopTeminateInactivityScheduler();
+    stopTerminateInactivityScheduler();
 
     if (connectable.isOpen()) {
       connectable.disconnect();
     }
   }
-
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
index 284345d..889611a 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java
@@ -49,13 +49,13 @@ public class LogAggregator extends HiveActor {
   private final HdfsApi hdfsApi;
   private final HiveStatement statement;
   private final String logFile;
-  private final ActorSystem system;
 
   private Cancellable moreLogsScheduler;
   private ActorRef parent;
+  private boolean hasStartedFetching = false;
+  private boolean shouldFetchMore = true;
 
-  public LogAggregator(ActorSystem system, HdfsApi hdfsApi, HiveStatement statement, String logFile) {
-    this.system = system;
+  public LogAggregator(HdfsApi hdfsApi, HiveStatement statement, String logFile) {
     this.hdfsApi = hdfsApi;
     this.statement = statement;
     this.logFile = logFile;
@@ -82,25 +82,36 @@ public class LogAggregator extends HiveActor {
 
   private void start() {
     parent = this.getSender();
-    this.moreLogsScheduler = system.scheduler().schedule(
+    hasStartedFetching = false;
+    shouldFetchMore = true;
+    if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) {
+      moreLogsScheduler.cancel();
+    }
+    this.moreLogsScheduler = getContext().system().scheduler().schedule(
       Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, TimeUnit.MILLISECONDS),
-      this.getSelf(), new GetMoreLogs(), system.dispatcher(), null);
+      this.getSelf(), new GetMoreLogs(), getContext().dispatcher(), null);
   }
 
   private void getMoreLogs() throws SQLException, HdfsApiException {
-    if (statement.hasMoreLogs()) {
-      List<String> logs = statement.getQueryLog();
+    List<String> logs = statement.getQueryLog();
+    if (logs.size() > 0 && shouldFetchMore) {
       String allLogs = Joiner.on("\n").skipNulls().join(logs);
       HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs);
+      if(!statement.hasMoreLogs()) {
+        shouldFetchMore = false;
+      }
     } else {
-      moreLogsScheduler.cancel();
-      parent.tell(new LogAggregationFinished(), ActorRef.noSender());
+      // Cancel the timer only when log fetching has been started
+      if(!shouldFetchMore) {
+        moreLogsScheduler.cancel();
+        parent.tell(new LogAggregationFinished(), ActorRef.noSender());
+      }
     }
   }
 
   @Override
   public void postStop() throws Exception {
-    if(moreLogsScheduler != null && !moreLogsScheduler.isCancelled()){
+    if (moreLogsScheduler != null && !moreLogsScheduler.isCancelled()) {
       moreLogsScheduler.cancel();
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
index ac62cf7..0681d55 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
@@ -24,23 +24,22 @@ import akka.actor.Props;
 import com.google.common.base.Optional;
 import org.apache.ambari.view.ViewContext;
 import org.apache.ambari.view.hive2.ConnectionDelegate;
-import org.apache.ambari.view.hive2.actor.message.AdvanceCursor;
-import org.apache.ambari.view.hive2.actor.message.AsyncJob;
 import org.apache.ambari.view.hive2.actor.message.Connect;
 import org.apache.ambari.view.hive2.actor.message.ExecuteJob;
-import org.apache.ambari.view.hive2.actor.message.ExecuteQuery;
 import org.apache.ambari.view.hive2.actor.message.FetchError;
 import org.apache.ambari.view.hive2.actor.message.FetchResult;
 import org.apache.ambari.view.hive2.actor.message.HiveJob;
 import org.apache.ambari.view.hive2.actor.message.HiveMessage;
 import org.apache.ambari.view.hive2.actor.message.JobRejected;
 import org.apache.ambari.view.hive2.actor.message.RegisterActor;
+import org.apache.ambari.view.hive2.actor.message.ResultNotReady;
 import org.apache.ambari.view.hive2.actor.message.ResultReady;
+import org.apache.ambari.view.hive2.actor.message.SQLStatementJob;
 import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed;
+import org.apache.ambari.view.hive2.actor.message.job.CancelJob;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.DestroyConnector;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.FreeConnector;
 import org.apache.ambari.view.hive2.internal.ContextSupplier;
-import org.apache.ambari.view.hive2.internal.Either;
 import org.apache.ambari.view.hive2.persistence.Storage;
 import org.apache.ambari.view.hive2.utils.LoggingOutputStream;
 import org.apache.ambari.view.utils.hdfs.HdfsApi;
@@ -87,7 +86,7 @@ public class OperationController extends HiveActor {
   /**
    * Store the connection per user/per job which are currently working.
    */
-  private final Map<String, Map<String, ActorRefResultContainer>> asyncBusyConnections;
+  private final Map<String, Map<String, ActorRef>> asyncBusyConnections;
 
   /**
    * Store the connection per user which will be used to execute sync jobs
@@ -118,22 +117,14 @@ public class OperationController extends HiveActor {
     if (message instanceof ExecuteJob) {
       ExecuteJob job = (ExecuteJob) message;
       if (job.getJob().getType() == HiveJob.Type.ASYNC) {
-        sendJob(job.getConnect(), (AsyncJob) job.getJob());
+        sendJob(job.getConnect(), (SQLStatementJob) job.getJob());
       } else if (job.getJob().getType() == HiveJob.Type.SYNC) {
         sendSyncJob(job.getConnect(), job.getJob());
       }
     }
 
-    if (message instanceof ResultReady) {
-      updateResultContainer((ResultReady) message);
-    }
-
-    if(message instanceof AsyncExecutionFailed){
-      updateResultContainerWithError((AsyncExecutionFailed) message);
-    }
-
-    if (message instanceof GetResultHolder) {
-      getResultHolder((GetResultHolder) message);
+    if (message instanceof CancelJob) {
+      cancelJob((CancelJob) message);
     }
 
     if (message instanceof FetchResult) {
@@ -153,72 +144,38 @@ public class OperationController extends HiveActor {
     }
   }
 
-  private void fetchError(FetchError message) {
+  private void cancelJob(CancelJob message) {
     String jobId = message.getJobId();
     String username = message.getUsername();
-    ActorRefResultContainer container = asyncBusyConnections.get(username).get(jobId);
-    if(container.hasError){
-      sender().tell(Optional.of(container.error), self());
-      return;
-    }
-    sender().tell(Optional.absent(), self());
-  }
-
-  private void updateResultContainerWithError(AsyncExecutionFailed message) {
-    String userName = message.getUsername();
-    String jobId = message.getJobId();
-    ActorRefResultContainer container = asyncBusyConnections.get(userName).get(jobId);
-    container.hasError = true;
-    container.error = message;
-  }
-
-  private void getResultHolder(GetResultHolder message) {
-    String userName = message.getUserName();
-    String jobId = message.getJobId();
-    if(asyncBusyConnections.containsKey(userName) && asyncBusyConnections.get(userName).containsKey(jobId))
-      sender().tell(asyncBusyConnections.get(userName).get(jobId).result, self());
-    else {
-      Either<ActorRef, AsyncExecutionFailed> right = Either.right(new AsyncExecutionFailed(message.getJobId(),userName, "Could not find the job, maybe the pool expired"));
-      sender().tell(right, self());
+    ActorRef actorRef = asyncBusyConnections.get(username).get(jobId);
+    if (actorRef != null) {
+      actorRef.tell(message, sender());
+    } else {
+      LOG.error("Failed to find a running job. Cannot cancel jobId: {}.", message.getJobId());
     }
   }
 
-  private void updateResultContainer(ResultReady message) {
-    // set up result actor in container
+  private void fetchError(FetchError message) {
     String jobId = message.getJobId();
     String username = message.getUsername();
-    Either<ActorRef, ActorRef> result = message.getResult();
-    asyncBusyConnections.get(username).get(jobId).result = result;
-    // start processing
-    if(message.getResult().isRight()){
-      // Query with no result sets to be returned
-      // execute right away
-      result.getRight().tell(new ExecuteQuery(),self());
+    ActorRef actorRef = asyncBusyConnections.get(username).get(jobId);
+    if(actorRef != null) {
+      actorRef.tell(message, sender());
     }
-    if(result.isLeft()){
-      // There is a result set to be processed
-      result.getLeft().tell(new AdvanceCursor(message.getJobId()),self());
-    }
-
   }
 
   private void fetchResultActorRef(FetchResult message) {
-    //Gets an Either actorRef,result implementation
-    // and send back to the caller
     String username = message.getUsername();
     String jobId = message.getJobId();
-    ActorRefResultContainer container = asyncBusyConnections.get(username).get(jobId);
-    if(container.hasError){
-      sender().tell(container.error,self());
-      return;
+    ActorRef actorRef = asyncBusyConnections.get(username).get(jobId);
+    if (actorRef != null) {
+      actorRef.tell(message, sender());
     }
-    Either<ActorRef, ActorRef> result = container.result;
-    sender().tell(result,self());
   }
 
-  private void sendJob(Connect connect, AsyncJob job) {
+  private void sendJob(Connect connect, SQLStatementJob job) {
     String username = job.getUsername();
-    String jobId = job.getJobId();
+    String jobId = job.getJobId().get();
     ActorRef subActor = null;
     // Check if there is available actors to process this
     subActor = getActorRefFromAsyncPool(username);
@@ -232,25 +189,24 @@ public class OperationController extends HiveActor {
       HdfsApi hdfsApi = hdfsApiOptional.get();
 
       subActor = system.actorOf(
-        Props.create(AsyncJdbcConnector.class, viewContext, hdfsApi, system, self(),
-          deathWatch, connectionSupplier.get(viewContext),
+        Props.create(JdbcConnector.class, viewContext, self(),
+          deathWatch, hdfsApi, connectionSupplier.get(viewContext),
           storageSupplier.get(viewContext)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
-         "jobId:" + jobId + ":-asyncjdbcConnector");
-      deathWatch.tell(new RegisterActor(subActor),self());
-
+        "jobId:" + jobId + ":asyncjdbcConnector");
+      deathWatch.tell(new RegisterActor(subActor), self());
     }
 
     if (asyncBusyConnections.containsKey(username)) {
-      Map<String, ActorRefResultContainer> actors = asyncBusyConnections.get(username);
+      Map<String, ActorRef> actors = asyncBusyConnections.get(username);
       if (!actors.containsKey(jobId)) {
-        actors.put(jobId, new ActorRefResultContainer(subActor));
+        actors.put(jobId, subActor);
       } else {
         // Reject this as with the same jobId one connection is already in progress.
         sender().tell(new JobRejected(username, jobId, "Existing job in progress with same jobId."), ActorRef.noSender());
       }
     } else {
-      Map<String, ActorRefResultContainer> actors = new HashMap<>();
-      actors.put(jobId, new ActorRefResultContainer(subActor));
+      Map<String, ActorRef> actors = new HashMap<>();
+      actors.put(jobId, subActor);
       asyncBusyConnections.put(username, actors);
     }
 
@@ -290,19 +246,18 @@ public class OperationController extends HiveActor {
 
     if (subActor == null) {
       Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(viewContext);
-      if(!hdfsApiOptional.isPresent()){
-          sender().tell(new JobRejected(username, ExecuteJob.SYNC_JOB_MARKER, "Failed to connect to HDFS."), ActorRef.noSender());
-          return;
-        }
+      if (!hdfsApiOptional.isPresent()) {
+        sender().tell(new JobRejected(username, ExecuteJob.SYNC_JOB_MARKER, "Failed to connect to HDFS."), ActorRef.noSender());
+        return;
+      }
       HdfsApi hdfsApi = hdfsApiOptional.get();
 
       subActor = system.actorOf(
-        Props.create(SyncJdbcConnector.class, viewContext, hdfsApi, system, self(),
-          deathWatch, connectionSupplier.get(viewContext),
+        Props.create(JdbcConnector.class, viewContext, self(),
+          deathWatch, hdfsApi,  connectionSupplier.get(viewContext),
           storageSupplier.get(viewContext)).withDispatcher("akka.actor.jdbc-connector-dispatcher"),
-          UUID.randomUUID().toString() + ":SyncjdbcConnector" );
-      deathWatch.tell(new RegisterActor(subActor),self());
-
+        UUID.randomUUID().toString() + ":SyncjdbcConnector");
+      deathWatch.tell(new RegisterActor(subActor), self());
     }
 
     if (syncBusyConnections.containsKey(username)) {
@@ -315,7 +270,7 @@ public class OperationController extends HiveActor {
     }
 
     // Termination requires that the ref is known in case of sync jobs
-    subActor.tell(connect, self());
+    subActor.tell(connect, sender());
     subActor.tell(job, sender());
   }
 
@@ -333,7 +288,7 @@ public class OperationController extends HiveActor {
   }
 
   private void freeConnector(FreeConnector message) {
-    LOG.info("About to free connector for job {} and user {}",message.getJobId(),message.getUsername());
+    LOG.info("About to free connector for job {} and user {}", message.getJobId(), message.getUsername());
     ActorRef sender = getSender();
     if (message.isForAsync()) {
       Optional<ActorRef> refOptional = removeFromAsyncBusyPool(message.getUsername(), message.getJobId());
@@ -354,8 +309,8 @@ public class OperationController extends HiveActor {
   }
 
   private void logMaps() {
-    LOG.info("Pool status");
-    LoggingOutputStream out = new LoggingOutputStream(LOG, LoggingOutputStream.LogLevel.INFO);
+    LOG.debug("Pool status");
+    LoggingOutputStream out = new LoggingOutputStream(LOG, LoggingOutputStream.LogLevel.DEBUG);
     MapUtils.debugPrint(new PrintStream(out), "Busy Async connections", asyncBusyConnections);
     MapUtils.debugPrint(new PrintStream(out), "Available Async connections", asyncAvailableConnections);
     MapUtils.debugPrint(new PrintStream(out), "Busy Sync connections", syncBusyConnections);
@@ -378,9 +333,9 @@ public class OperationController extends HiveActor {
   private Optional<ActorRef> removeFromAsyncBusyPool(String username, String jobId) {
     ActorRef ref = null;
     if (asyncBusyConnections.containsKey(username)) {
-      Map<String, ActorRefResultContainer> actors = asyncBusyConnections.get(username);
+      Map<String, ActorRef> actors = asyncBusyConnections.get(username);
       if (actors.containsKey(jobId)) {
-        ref = actors.get(jobId).actorRef;
+        ref = actors.get(jobId);
         actors.remove(jobId);
       }
     }
@@ -420,19 +375,6 @@ public class OperationController extends HiveActor {
     actors.remove(sender);
   }
 
-  private static class ActorRefResultContainer {
-
-    ActorRef actorRef;
-    boolean hasError = false;
-    Either<ActorRef, ActorRef> result = Either.none();
-    AsyncExecutionFailed error;
-
-    public ActorRefResultContainer(ActorRef actorRef) {
-      this.actorRef = actorRef;
-    }
-  }
-
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java
index e883768..afab6c9 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java
@@ -21,23 +21,18 @@ package org.apache.ambari.view.hive2.actor;
 import akka.actor.ActorRef;
 import com.google.common.collect.Lists;
 import org.apache.ambari.view.hive2.actor.message.CursorReset;
-import org.apache.ambari.view.hive2.actor.message.JobExecutionCompleted;
-import org.apache.ambari.view.hive2.actor.message.ResetCursor;
-import org.apache.ambari.view.hive2.client.ColumnDescription;
-import org.apache.ambari.view.hive2.client.ColumnDescriptionShort;
-import org.apache.ambari.view.hive2.client.Row;
-import org.apache.ambari.view.hive2.persistence.Storage;
-import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
-import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job;
-import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl;
-import org.apache.ambari.view.hive2.actor.message.AdvanceCursor;
 import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.ResetCursor;
 import org.apache.ambari.view.hive2.actor.message.job.FetchFailed;
 import org.apache.ambari.view.hive2.actor.message.job.Next;
 import org.apache.ambari.view.hive2.actor.message.job.NoMoreItems;
 import org.apache.ambari.view.hive2.actor.message.job.Result;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.CleanUp;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.KeepAlive;
+import org.apache.ambari.view.hive2.client.ColumnDescription;
+import org.apache.ambari.view.hive2.client.ColumnDescriptionShort;
+import org.apache.ambari.view.hive2.client.Row;
+import org.apache.ambari.view.hive2.persistence.Storage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,33 +53,26 @@ public class ResultSetIterator extends HiveActor {
 
   private List<ColumnDescription> columnDescriptions;
   private int columnCount;
-  private Storage storage;
   boolean async = false;
-  private boolean jobCompleteMessageSent = false;
-
-
   private boolean metaDataFetched = false;
 
-  public ResultSetIterator(ActorRef parent, ResultSet resultSet, int batchSize) {
+  public ResultSetIterator(ActorRef parent, ResultSet resultSet, int batchSize, boolean isAsync) {
     this.parent = parent;
     this.resultSet = resultSet;
     this.batchSize = batchSize;
+    this.async = isAsync;
   }
 
-
-  public ResultSetIterator(ActorRef parent, ResultSet resultSet, Storage storage) {
-    this(parent, resultSet);
-    this.storage = storage;
-    this.async = true;
+  public ResultSetIterator(ActorRef parent, ResultSet resultSet) {
+    this(parent, resultSet, DEFAULT_BATCH_SIZE, true);
   }
 
-  public ResultSetIterator(ActorRef parent, ResultSet resultSet) {
-    this(parent, resultSet, DEFAULT_BATCH_SIZE);
+  public ResultSetIterator(ActorRef parent, ResultSet resultSet, boolean isAsync) {
+    this(parent, resultSet, DEFAULT_BATCH_SIZE, isAsync);
   }
 
   @Override
   void handleMessage(HiveMessage hiveMessage) {
-    LOG.info("Result set Iterator wil handle message {}", hiveMessage);
     sendKeepAlive();
     Object message = hiveMessage.getMessage();
     if (message instanceof Next) {
@@ -97,39 +85,6 @@ public class ResultSetIterator extends HiveActor {
     if (message instanceof KeepAlive) {
       sendKeepAlive();
     }
-    if (message instanceof AdvanceCursor) {
-      AdvanceCursor moveCursor = (AdvanceCursor) message;
-      advanceCursor(moveCursor);
-    }
-
-  }
-
-  private void advanceCursor(AdvanceCursor moveCursor) {
-    String jobid = moveCursor.getJob();
-    try {
-      // Block here so that we can update the job status
-      resultSet.next();
-      // Resetting the resultset as it needs to fetch from the beginning when the result is asked for.
-      resultSet.beforeFirst();
-      LOG.info("Job execution successful. Setting status in db.");
-      updateJobStatus(jobid, Job.JOB_STATE_FINISHED);
-      sendJobCompleteMessageIfNotDone();
-    } catch (SQLException e) {
-      LOG.error("Failed to reset the cursor after advancing. Setting error state in db.", e);
-      updateJobStatus(jobid, Job.JOB_STATE_ERROR);
-      sender().tell(new FetchFailed("Failed to reset the cursor after advancing", e), self());
-      cleanUpResources();
-    }
-  }
-
-  private void updateJobStatus(String jobid, String status) {
-    try {
-      JobImpl job = storage.load(JobImpl.class, jobid);
-      job.setStatus(status);
-      storage.store(JobImpl.class, job);
-    } catch (ItemNotFound itemNotFound) {
-      // Cannot do anything
-    }
   }
 
   private void resetResultSet() {
@@ -164,7 +119,6 @@ public class ResultSetIterator extends HiveActor {
       while (resultSet.next() && index < batchSize) {
         index++;
         rows.add(getRowFromResultSet(resultSet));
-        sendJobCompleteMessageIfNotDone();
       }
 
       if (index == 0) {
@@ -185,13 +139,6 @@ public class ResultSetIterator extends HiveActor {
     }
   }
 
-  private void sendJobCompleteMessageIfNotDone() {
-    if (!jobCompleteMessageSent) {
-      jobCompleteMessageSent = true;
-      parent.tell(new JobExecutionCompleted(), self());
-    }
-  }
-
   private void cleanUpResources() {
     parent.tell(new CleanUp(), self());
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
new file mode 100644
index 0000000..c60f28b
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.hive2.ConnectionDelegate;
+import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob;
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.ResultInformation;
+import org.apache.ambari.view.hive2.actor.message.RunStatement;
+import org.apache.ambari.view.hive2.actor.message.StartLogAggregation;
+import org.apache.ambari.view.hive2.actor.message.job.Failure;
+import org.apache.ambari.view.hive2.actor.message.job.UpdateYarnAtsGuid;
+import org.apache.ambari.view.hive2.persistence.Storage;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.hive.jdbc.HiveConnection;
+import org.apache.hive.jdbc.HiveStatement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
+/**
+ * Executes a single statement and returns the ResultSet if the statements generates ResultSet.
+ * Also, starts logAggregation and YarnAtsGuidFetcher if they are required.
+ */
+public class StatementExecutor extends HiveActor {
+
+  private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+  private final HdfsApi hdfsApi;
+  private final HiveConnection connection;
+  protected final Storage storage;
+  private final ConnectionDelegate connectionDelegate;
+  private ActorRef logAggregator;
+  private ActorRef guidFetcher;
+
+
+  public StatementExecutor(HdfsApi hdfsApi, Storage storage, HiveConnection connection, ConnectionDelegate connectionDelegate) {
+    this.hdfsApi = hdfsApi;
+    this.storage = storage;
+    this.connection = connection;
+    this.connectionDelegate = connectionDelegate;
+  }
+
+  @Override
+  void handleMessage(HiveMessage hiveMessage) {
+    Object message = hiveMessage.getMessage();
+    if (message instanceof RunStatement) {
+      runStatement((RunStatement) message);
+    } else if (message instanceof GetColumnMetadataJob) {
+      getColumnMetaData((GetColumnMetadataJob) message);
+    }
+  }
+
+  private void runStatement(RunStatement message) {
+    try {
+      HiveStatement statement = connectionDelegate.createStatement(connection);
+      if (message.shouldStartLogAggregation()) {
+        startLogAggregation(statement, message.getStatement(), message.getLogFile().get());
+      }
+
+      if (message.shouldStartGUIDFetch() && message.getJobId().isPresent()) {
+        startGUIDFetch(statement, message.getJobId().get());
+      }
+      Optional<ResultSet> resultSetOptional = connectionDelegate.execute(message.getStatement());
+
+      if (resultSetOptional.isPresent()) {
+        sender().tell(new ResultInformation(message.getId(), resultSetOptional.get()), self());
+      } else {
+        sender().tell(new ResultInformation(message.getId()), self());
+      }
+    } catch (SQLException e) {
+      LOG.error("Failed to execute statement: {}. {}", message.getStatement(), e);
+      sender().tell(new ResultInformation(message.getId(), new Failure("Failed to execute statement: " + message.getStatement(), e)), self());
+    } finally {
+      stopLogAggregation();
+      stopGUIDFetch();
+    }
+  }
+
+  private void startGUIDFetch(HiveStatement statement, String jobId) {
+    if (guidFetcher == null) {
+      guidFetcher = getContext().actorOf(Props.create(YarnAtsGUIDFetcher.class, storage)
+        .withDispatcher("akka.actor.misc-dispatcher"), "YarnAtsGUIDFetcher:" + UUID.randomUUID().toString());
+    }
+    guidFetcher.tell(new UpdateYarnAtsGuid(statement, jobId), self());
+  }
+
+  private void stopGUIDFetch() {
+    if (guidFetcher != null) {
+      getContext().stop(guidFetcher);
+    }
+    guidFetcher = null;
+  }
+
+  private void startLogAggregation(HiveStatement statement, String sqlStatement, String logFile) {
+    if (logAggregator == null) {
+      logAggregator = getContext().actorOf(
+        Props.create(LogAggregator.class, hdfsApi, statement, logFile)
+          .withDispatcher("akka.actor.misc-dispatcher"), "LogAggregator:" + UUID.randomUUID().toString());
+    }
+    logAggregator.tell(new StartLogAggregation(sqlStatement), getSelf());
+  }
+
+  private void stopLogAggregation() {
+    if (logAggregator != null) {
+      getContext().stop(logAggregator);
+    }
+    logAggregator = null;
+  }
+
+
+  private void getColumnMetaData(GetColumnMetadataJob message) {
+    try {
+      ResultSet resultSet = connectionDelegate.getColumnMetadata(connection, message);
+      sender().tell(new ResultInformation(-1, resultSet), self());
+    } catch (SQLException e) {
+      LOG.error("Failed to get column metadata for databasePattern: {}, tablePattern: {}, ColumnPattern {}. {}",
+        message.getSchemaPattern(), message.getTablePattern(), message.getColumnPattern(), e);
+      sender().tell(new ResultInformation(-1,
+        new Failure("Failed to get column metadata for databasePattern: " + message.getSchemaPattern() +
+          ", tablePattern: " + message.getTablePattern() + ", ColumnPattern: " + message.getColumnPattern(), e)), self());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java
deleted file mode 100644
index a0b6eae..0000000
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.view.hive2.actor;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import com.google.common.base.Optional;
-import org.apache.ambari.view.ViewContext;
-import org.apache.ambari.view.hive2.actor.message.RegisterActor;
-import org.apache.ambari.view.hive2.persistence.Storage;
-import org.apache.ambari.view.hive2.ConnectionDelegate;
-import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob;
-import org.apache.ambari.view.hive2.actor.message.HiveMessage;
-import org.apache.ambari.view.hive2.actor.message.SyncJob;
-import org.apache.ambari.view.hive2.actor.message.job.ExecutionFailed;
-import org.apache.ambari.view.hive2.actor.message.job.NoResult;
-import org.apache.ambari.view.hive2.actor.message.job.ResultSetHolder;
-import org.apache.ambari.view.utils.hdfs.HdfsApi;
-import org.apache.hive.jdbc.HiveConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-public class SyncJdbcConnector extends JdbcConnector {
-
-  private final Logger LOG = LoggerFactory.getLogger(getClass());
-  private ActorRef resultSetActor = null;
-
-  public SyncJdbcConnector(ViewContext viewContext, HdfsApi hdfsApi, ActorSystem system, ActorRef parent,ActorRef deathWatch, ConnectionDelegate connectionDelegate, Storage storage) {
-    super(viewContext, hdfsApi, system, parent,deathWatch, connectionDelegate, storage);
-  }
-
-  @Override
-  protected void handleJobMessage(HiveMessage message) {
-    Object job = message.getMessage();
-    if(job instanceof SyncJob) {
-      execute((SyncJob) job);
-    } else if (job instanceof GetColumnMetadataJob) {
-      getColumnMetaData((GetColumnMetadataJob) job);
-    }
-  }
-
-  @Override
-  protected boolean isAsync() {
-    return false;
-  }
-
-  @Override
-  protected void cleanUpChildren() {
-    if(resultSetActor != null && !resultSetActor.isTerminated()) {
-      LOG.debug("Sending poison pill to log aggregator");
-      resultSetActor.tell(PoisonPill.getInstance(), self());
-    }
-  }
-
-  @Override
-  protected void notifyFailure() {
-    sender().tell(new ExecutionFailed("Cannot connect to hive"), ActorRef.noSender());
-  }
-
-  protected void execute(final SyncJob job) {
-    this.executing = true;
-    executeJob(new Operation<SyncJob>() {
-      @Override
-      SyncJob getJob() {
-        return job;
-      }
-
-      @Override
-      Optional<ResultSet> call(HiveConnection connection) throws SQLException {
-        return connectionDelegate.executeSync(connection, job);
-      }
-
-      @Override
-      String notConnectedErrorMessage() {
-        return "Cannot execute sync job for user: " + job.getUsername() + ". Not connected to Hive";
-      }
-
-      @Override
-      String executionFailedErrorMessage() {
-        return "Failed to execute Jdbc Statement";
-      }
-    });
-  }
-
-
-  private void getColumnMetaData(final GetColumnMetadataJob job) {
-    executeJob(new Operation<GetColumnMetadataJob>() {
-
-      @Override
-      GetColumnMetadataJob getJob() {
-        return job;
-      }
-
-      @Override
-      Optional<ResultSet> call(HiveConnection connection) throws SQLException {
-        return connectionDelegate.getColumnMetadata(connection, job);
-      }
-
-      @Override
-      String notConnectedErrorMessage() {
-        return String.format("Cannot get column metadata for user: %s, schema: %s, table: %s, column: %s" +
-            ". Not connected to Hive", job.getUsername(), job.getSchemaPattern(), job.getTablePattern(),
-          job.getColumnPattern());
-      }
-
-      @Override
-      String executionFailedErrorMessage() {
-        return "Failed to execute Jdbc Statement";
-      }
-    });
-  }
-
-  private void executeJob(Operation operation) {
-    ActorRef sender = this.getSender();
-    String errorMessage = operation.notConnectedErrorMessage();
-    if (connectable == null) {
-      sender.tell(new ExecutionFailed(errorMessage), ActorRef.noSender());
-      cleanUp();
-      return;
-    }
-
-    Optional<HiveConnection> connectionOptional = connectable.getConnection();
-    if (!connectionOptional.isPresent()) {
-      sender.tell(new ExecutionFailed(errorMessage), ActorRef.noSender());
-      cleanUp();
-      return;
-    }
-
-    try {
-      Optional<ResultSet> resultSetOptional = operation.call(connectionOptional.get());
-      if(resultSetOptional.isPresent()) {
-        ActorRef resultSetActor = getContext().actorOf(Props.create(ResultSetIterator.class, self(),
-          resultSetOptional.get()).withDispatcher("akka.actor.result-dispatcher"));
-        deathWatch.tell(new RegisterActor(resultSetActor),self());
-        sender.tell(new ResultSetHolder(resultSetActor), self());
-      } else {
-        sender.tell(new NoResult(), self());
-        cleanUp();
-      }
-    } catch (SQLException e) {
-      LOG.error(operation.executionFailedErrorMessage(), e);
-      sender.tell(new ExecutionFailed(operation.executionFailedErrorMessage(), e), self());
-      cleanUp();
-    }
-  }
-
-  private abstract class Operation<T> {
-    abstract T getJob();
-    abstract Optional<ResultSet> call(HiveConnection connection) throws SQLException;
-    abstract String notConnectedErrorMessage();
-    abstract String executionFailedErrorMessage();
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java
new file mode 100644
index 0000000..bd70421
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor;
+
+import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.job.UpdateYarnAtsGuid;
+import org.apache.ambari.view.hive2.persistence.Storage;
+import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl;
+import org.apache.hive.jdbc.HiveStatement;
+import scala.concurrent.duration.Duration;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Queries YARN/ATS time to time to fetch the status of the ExecuteJob and updates database
+ */
+public class YarnAtsGUIDFetcher extends HiveActor {
+
+  private final Storage storage;
+
+  public YarnAtsGUIDFetcher(Storage storage) {
+    this.storage = storage;
+  }
+
+  @Override
+  public void handleMessage(HiveMessage hiveMessage) {
+    Object message = hiveMessage.getMessage();
+    if(message instanceof UpdateYarnAtsGuid) {
+      updateGuid((UpdateYarnAtsGuid) message);
+    }
+  }
+
+  private void updateGuid(UpdateYarnAtsGuid message) {
+    HiveStatement statement = message.getStatement();
+    String jobId = message.getJobId();
+    String yarnAtsGuid = statement.getYarnATSGuid();
+
+    // If ATS GUID is not yet generated, we will retry after 1 second
+    if(yarnAtsGuid == null) {
+      getContext().system().scheduler()
+        .scheduleOnce(Duration.create(1, TimeUnit.SECONDS), getSelf(), message, getContext().dispatcher(), null);
+    } else {
+      try {
+        JobImpl job = storage.load(JobImpl.class, jobId);
+        job.setGuid(yarnAtsGuid);
+        storage.store(JobImpl.class, job);
+      } catch (ItemNotFound itemNotFound) {
+        // Cannot do anything if the job is not present
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java
deleted file mode 100644
index 0f918ad..0000000
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.view.hive2.actor;
-
-import akka.actor.UntypedActor;
-import org.apache.ambari.view.hive2.actor.message.HiveMessage;
-
-/**
- * Queries YARN/ATS time to time to fetch the status of the ExecuteJob and updates database
- */
-public class YarnAtsParser extends HiveActor {
-  @Override
-  public void handleMessage(HiveMessage hiveMessage) {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java
deleted file mode 100644
index c3e6c04..0000000
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.view.hive2.actor.message;
-
-public class AdvanceCursor {
-
-    private String job;
-
-    public AdvanceCursor(String job) {
-        this.job = job;
-    }
-
-    public String getJob() {
-        return job;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java
deleted file mode 100644
index fd1f26f..0000000
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.view.hive2.actor.message;
-
-import com.google.common.base.Optional;
-
-import java.sql.ResultSet;
-
-public class AssignResultSet {
-
-    private Optional<ResultSet> resultSet;
-
-
-    public AssignResultSet(Optional<ResultSet> resultSet) {
-        this.resultSet = resultSet;
-
-    }
-
-
-    public ResultSet getResultSet() {
-        return resultSet.orNull();
-    }
-
-
-    @Override
-    public String toString() {
-        return "ExtractResultSet{" +
-                "resultSet=" + resultSet +
-                '}';
-    }
-
-}


Mime
View raw message