ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dbhowm...@apache.org
Subject ambari git commit: AMBARI-18042. Part 2. Query execution takes a long time in hive view version1.5.0. (dipayanb)
Date Tue, 16 Aug 2016 18:23:47 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 4c02ad328 -> cf5788e1d


AMBARI-18042. Part 2. Query execution takes a long time in hive view version1.5.0. (dipayanb)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/cf5788e1
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/cf5788e1
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/cf5788e1

Branch: refs/heads/branch-2.4
Commit: cf5788e1d2ceb1f9fe9a7136ea2b68b3a5c92176
Parents: 4c02ad3
Author: Dipayan Bhowmick <dipayan.bhowmick@gmail.com>
Authored: Tue Aug 16 17:33:48 2016 +0530
Committer: Dipayan Bhowmick <dipayan.bhowmick@gmail.com>
Committed: Tue Aug 16 23:53:10 2016 +0530

----------------------------------------------------------------------
 .../ambari/view/hive2/actor/JdbcConnector.java  | 88 ++++++++++++++++++--
 .../view/hive2/actor/OperationController.java   | 15 ++++
 .../view/hive2/actor/StatementExecutor.java     |  9 +-
 .../view/hive2/actor/YarnAtsGUIDFetcher.java    | 21 ++---
 .../actor/message/job/SaveDagInformation.java   | 52 ++++++++++++
 .../hive2/actor/message/job/SaveGuidToDB.java   | 46 ++++++++++
 .../actor/message/job/UpdateYarnAtsGuid.java    |  8 +-
 .../view/hive2/resources/jobs/Aggregator.java   | 40 +++++++--
 .../view/hive2/resources/jobs/JobService.java   |  4 +-
 9 files changed, 245 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/cf5788e1/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
index 0fcb800..6cdf336 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java
@@ -41,6 +41,8 @@ import org.apache.ambari.view.hive2.actor.message.job.ExecutionFailed;
 import org.apache.ambari.view.hive2.actor.message.job.Failure;
 import org.apache.ambari.view.hive2.actor.message.job.NoResult;
 import org.apache.ambari.view.hive2.actor.message.job.ResultSetHolder;
+import org.apache.ambari.view.hive2.actor.message.job.SaveDagInformation;
+import org.apache.ambari.view.hive2.actor.message.job.SaveGuidToDB;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.CleanUp;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.DestroyConnector;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.FreeConnector;
@@ -189,6 +191,10 @@ public class JdbcConnector extends HiveActor {
       fetchResult((FetchResult) message);
     } else if (message instanceof FetchError) {
       fetchError((FetchError) message);
+    } else if (message instanceof SaveGuidToDB) {
+      saveGuid((SaveGuidToDB) message);
+    } else if (message instanceof SaveDagInformation) {
+      saveDagInformation((SaveDagInformation) message);
     } else {
       unhandled(message);
     }
@@ -418,16 +424,50 @@ public class JdbcConnector extends HiveActor {
     startTerminateInactivityScheduler();
   }
 
-  private void updateJobStatus(String jobid, String status) {
-    try {
-      JobImpl job = storage.load(JobImpl.class, jobid);
-      job.setStatus(status);
-      job.setDuration(getUpdatedDuration(job.getDateSubmitted()));
-      storage.store(JobImpl.class, job);
-      LOG.info("Stored job status for Job id: {} as '{}'", jobid, status);
-    } catch (ItemNotFound itemNotFound) {
-      // Cannot do anything
+  private void updateJobStatus(String jobid, final String status) {
+    new JobSaver(jobid) {
+      @Override
+      protected void update(JobImpl job) {
+        job.setStatus(status);
+        job.setDuration(getUpdatedDuration(job.getDateSubmitted()));
+      }
+    }.save();
+    LOG.info("Stored job status for Job id: {} as '{}'", jobid, status);
+  }
+
+  private void saveGuid(final SaveGuidToDB message) {
+    new JobSaver(message.getJobId()) {
+      @Override
+      protected void update(JobImpl job) {
+        job.setGuid(message.getGuid());
+      }
+    }.save();
+    LOG.info("Stored GUID for Job id: {} as '{}'", message.getJobId(), message.getGuid());
+  }
+
+  private void saveDagInformation(final SaveDagInformation message) {
+    if(message.getDagId() == null &&
+        message.getDagName() == null &&
+        message.getApplicationId() == null) {
+      LOG.error("Cannot save Dag Information for job Id: {} as all the properties are null.",
message.getJobId());
+      return;
     }
+    new JobSaver(message.getJobId()) {
+
+      @Override
+      protected void update(JobImpl job) {
+        if (message.getApplicationId() != null) {
+          job.setApplicationId(message.getApplicationId());
+        }
+        if (message.getDagId() != null) {
+          job.setDagId(message.getDagId());
+        }
+        if(message.getDagName() != null) {
+          job.setDagName(message.getDagName());
+        }
+      }
+    }.save();
+    LOG.info("Store Dag Information for job. Job id: {}, dagName: {}, dagId: {}, applicationId:
{}", message.getJobId(), message.getDagName(), message.getDagId(), message.getApplicationId());
   }
 
   private Long getUpdatedDuration(Long dateSubmitted) {
@@ -539,4 +579,34 @@ public class JdbcConnector extends HiveActor {
       connectable.disconnect();
     }
   }
+
+  /**
+   * Saves the job to database.
+   */
+  private abstract class JobSaver {
+    private final String jobId;
+
+    JobSaver(String jobId) {
+      this.jobId = jobId;
+    }
+
+    public void save() {
+      try {
+        JobImpl job = storage.load(JobImpl.class, jobId);
+        update(job);
+        storage.store(JobImpl.class, job);
+      } catch (ItemNotFound itemNotFound) {
+        itemNotFound(jobId);
+      }
+    }
+
+    /**
+     * Override to handle Not found exception
+     */
+    private void itemNotFound(String jobId) {
+      // Nothing to do
+    }
+
+    protected abstract void update(JobImpl job);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/cf5788e1/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
index edef9ec..98e60f2 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java
@@ -35,6 +35,7 @@ import org.apache.ambari.view.hive2.actor.message.RegisterActor;
 import org.apache.ambari.view.hive2.actor.message.SQLStatementJob;
 import org.apache.ambari.view.hive2.actor.message.job.CancelJob;
 import org.apache.ambari.view.hive2.actor.message.job.FetchFailed;
+import org.apache.ambari.view.hive2.actor.message.job.SaveDagInformation;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.DestroyConnector;
 import org.apache.ambari.view.hive2.actor.message.lifecycle.FreeConnector;
 import org.apache.ambari.view.hive2.internal.ContextSupplier;
@@ -145,6 +146,10 @@ public class OperationController extends HiveActor {
     if (message instanceof DestroyConnector) {
       destroyConnector((DestroyConnector) message);
     }
+
+    if (message instanceof SaveDagInformation) {
+      saveDagInformation((SaveDagInformation) message);
+    }
   }
 
   private void cancelJob(CancelJob message) {
@@ -160,6 +165,16 @@ public class OperationController extends HiveActor {
     }
   }
 
+  private void saveDagInformation(SaveDagInformation message) {
+    ActorRef jdbcConnection = asyncBusyConnections.get(context.getUsername()).get(message.getJobId());
+    if(jdbcConnection != null) {
+      jdbcConnection.tell(message, sender());
+    } else {
+      String msg = String.format("Cannot update Dag Information for job. Job with id: %s
for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName());
+      LOG.error(msg);
+    }
+  }
+
   private void fetchError(FetchError message) {
     String jobId = message.getJobId();
     String username = message.getUsername();

http://git-wip-us.apache.org/repos/asf/ambari/blob/cf5788e1/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
index 5b4f76c..d7b4f54 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java
@@ -19,7 +19,6 @@
 package org.apache.ambari.view.hive2.actor;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.actor.Props;
 import com.google.common.base.Optional;
 import org.apache.ambari.view.hive2.ConnectionDelegate;
@@ -82,7 +81,7 @@ public class StatementExecutor extends HiveActor {
       }
 
       if (message.shouldStartGUIDFetch() && message.getJobId().isPresent()) {
-        startGUIDFetch(statement, message.getJobId().get());
+        startGUIDFetch(message.getId(), statement, message.getJobId().get());
       }
       LOG.info("Statement executor is executing statement: {}, Statement id: {}, JobId: {}",
message.getStatement(), message.getId(), message.getJobId().or("SYNC JOB"));
       Optional<ResultSet> resultSetOptional = connectionDelegate.execute(message.getStatement());
@@ -102,13 +101,13 @@ public class StatementExecutor extends HiveActor {
     }
   }
 
-  private void startGUIDFetch(HiveStatement statement, String jobId) {
+  private void startGUIDFetch(int statementId, HiveStatement statement, String jobId) {
     if (guidFetcher == null) {
-      guidFetcher = getContext().actorOf(Props.create(YarnAtsGUIDFetcher.class, storage)
+      guidFetcher = getContext().actorOf(Props.create(YarnAtsGUIDFetcher.class, sender())
         .withDispatcher("akka.actor.misc-dispatcher"), "YarnAtsGUIDFetcher:" + UUID.randomUUID().toString());
     }
     LOG.info("Fetching guid for Job Id: {}", jobId);
-    guidFetcher.tell(new UpdateYarnAtsGuid(statement, jobId), self());
+    guidFetcher.tell(new UpdateYarnAtsGuid(statementId, statement, jobId), self());
   }
 
   private void stopGUIDFetch() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/cf5788e1/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java
index 4bcb815..e6c8084 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java
@@ -18,11 +18,10 @@
 
 package org.apache.ambari.view.hive2.actor;
 
+import akka.actor.ActorRef;
 import org.apache.ambari.view.hive2.actor.message.HiveMessage;
+import org.apache.ambari.view.hive2.actor.message.job.SaveGuidToDB;
 import org.apache.ambari.view.hive2.actor.message.job.UpdateYarnAtsGuid;
-import org.apache.ambari.view.hive2.persistence.Storage;
-import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
-import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl;
 import org.apache.hive.jdbc.HiveStatement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,10 +37,10 @@ public class YarnAtsGUIDFetcher extends HiveActor {
 
   private final Logger LOG = LoggerFactory.getLogger(getClass());
 
-  private final Storage storage;
+  private final ActorRef jdbcConnectorActor;
 
-  public YarnAtsGUIDFetcher(Storage storage) {
-    this.storage = storage;
+  public YarnAtsGUIDFetcher(ActorRef jdbcConnectorActor) {
+    this.jdbcConnectorActor = jdbcConnectorActor;
   }
 
   @Override
@@ -65,14 +64,8 @@ public class YarnAtsGUIDFetcher extends HiveActor {
       getContext().system().scheduler()
         .scheduleOnce(Duration.create(1, TimeUnit.SECONDS), getSelf(), message, getContext().dispatcher(),
null);
     } else {
-      try {
-        JobImpl job = storage.load(JobImpl.class, jobId);
-        job.setGuid(yarnAtsGuid);
-        storage.store(JobImpl.class, job);
-        LOG.info("Stored guid: {} for job id: {} in database", yarnAtsGuid, jobId);
-      } catch (ItemNotFound itemNotFound) {
-        // Cannot do anything if the job is not present
-      }
+      jdbcConnectorActor.tell(new SaveGuidToDB(message.getStatementId(), yarnAtsGuid, jobId),
self());
+      LOG.info("Message send to save GUID for Statement Id: {}, Job id: {}, Guid: {}", message.getStatementId(),
message.getJobId(), yarnAtsGuid);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/cf5788e1/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/SaveDagInformation.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/SaveDagInformation.java
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/SaveDagInformation.java
new file mode 100644
index 0000000..187bb67
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/SaveDagInformation.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message.job;
+
+/**
+ * Message to save the Dag Information like the dagName, dagId and ApplicationId
+ */
+public class SaveDagInformation {
+  private final String jobId;
+  private final String dagName;
+  private final String dagId;
+  private final String applicationId;
+
+  public SaveDagInformation(String jobId, String dagName, String dagId, String applicationId)
{
+    this.jobId = jobId;
+    this.dagName = dagName;
+    this.dagId = dagId;
+    this.applicationId = applicationId;
+  }
+
+  public String getJobId() {
+    return jobId;
+  }
+
+  public String getDagName() {
+    return dagName;
+  }
+
+  public String getDagId() {
+    return dagId;
+  }
+
+  public String getApplicationId() {
+    return applicationId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/cf5788e1/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/SaveGuidToDB.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/SaveGuidToDB.java
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/SaveGuidToDB.java
new file mode 100644
index 0000000..c447d0b
--- /dev/null
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/SaveGuidToDB.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive2.actor.message.job;
+
+/**
+ * Message to ask JdbcConnector for job to update the GUID for the current statement in the
database for the job.
+ */
+public class SaveGuidToDB {
+  private final int statementId;
+  private final String guid;
+  private final String jobId;
+
+  public SaveGuidToDB(int statementId, String guid, String jobId) {
+    this.statementId = statementId;
+    this.guid = guid;
+    this.jobId = jobId;
+  }
+
+  public int getStatementId() {
+    return statementId;
+  }
+
+  public String getGuid() {
+    return guid;
+  }
+
+  public String getJobId() {
+    return jobId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/cf5788e1/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/UpdateYarnAtsGuid.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/UpdateYarnAtsGuid.java
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/UpdateYarnAtsGuid.java
index 7edbab4..1998a2a 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/UpdateYarnAtsGuid.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/UpdateYarnAtsGuid.java
@@ -21,13 +21,19 @@ package org.apache.ambari.view.hive2.actor.message.job;
 import org.apache.hive.jdbc.HiveStatement;
 
 public class UpdateYarnAtsGuid {
+  private final int statementId;
   private final HiveStatement statement;
   private final String jobId;
-  public UpdateYarnAtsGuid(HiveStatement statement, String jobId) {
+  public UpdateYarnAtsGuid(int statementId, HiveStatement statement, String jobId) {
+    this.statementId = statementId;
     this.statement = statement;
     this.jobId = jobId;
   }
 
+  public int getStatementId() {
+    return statementId;
+  }
+
   public HiveStatement getStatement() {
     return statement;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/cf5788e1/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java
index b86eb81..99faeca 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java
@@ -18,6 +18,8 @@
 
 package org.apache.ambari.view.hive2.resources.jobs;
 
+import akka.actor.ActorRef;
+import org.apache.ambari.view.hive2.actor.message.job.SaveDagInformation;
 import org.apache.ambari.view.hive2.persistence.utils.FilteringStrategy;
 import org.apache.ambari.view.hive2.persistence.utils.Indexed;
 import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
@@ -63,11 +65,13 @@ public class Aggregator {
 
   private final IATSParser ats;
   private IResourceManager<Job> viewJobResourceManager;
+  private final ActorRef operationController;
 
   public Aggregator(IResourceManager<Job> jobResourceManager,
-                    IATSParser ats) {
+                    IATSParser ats, ActorRef operationController) {
     this.viewJobResourceManager = jobResourceManager;
     this.ats = ats;
+    this.operationController = operationController;
   }
 
   /**
@@ -257,7 +261,7 @@ public class Aggregator {
 
     TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery);
 
-    saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob);
+    saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob, true);
     return mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob);
   }
 
@@ -298,17 +302,37 @@ public class Aggregator {
   }
 
   protected void saveJobInfoIfNeeded(HiveQueryId hiveQueryId, TezDagId tezDagId, Job viewJob)
throws ItemNotFound {
+    saveJobInfoIfNeeded(hiveQueryId, tezDagId, viewJob, false);
+  }
+
+  protected void saveJobInfoIfNeeded(HiveQueryId hiveQueryId, TezDagId tezDagId, Job viewJob,
boolean useActorSystem) throws ItemNotFound {
+    boolean updateDb = false;
+    String dagName = null;
+    String dagId = null;
+    String applicationId = null;
     if (viewJob.getDagName() == null || viewJob.getDagName().isEmpty()) {
       if (hiveQueryId.dagNames != null && hiveQueryId.dagNames.size() > 0) {
-        viewJob.setDagName(hiveQueryId.dagNames.get(0));
-        viewJobResourceManager.update(viewJob, viewJob.getId());
+        dagName = hiveQueryId.dagNames.get(0);
+        updateDb = true;
       }
     }
     if (tezDagId.status != null && (tezDagId.status.compareToIgnoreCase(Job.JOB_STATE_UNKNOWN)
!= 0) &&
-      !viewJob.getStatus().equalsIgnoreCase(tezDagId.status)) {
-      viewJob.setDagId(tezDagId.entity);
-      viewJob.setApplicationId(tezDagId.applicationId);
-      viewJobResourceManager.update(viewJob, viewJob.getId());
+        !viewJob.getStatus().equalsIgnoreCase(tezDagId.status)) {
+      dagId = tezDagId.entity;
+      applicationId = tezDagId.applicationId;
+      updateDb = true;
+    }
+
+    if(updateDb) {
+      if (useActorSystem) {
+        LOG.info("Saving DAG information via actor system for job id: {}", viewJob.getId());
+        operationController.tell(new SaveDagInformation(viewJob.getId(), dagName, dagId,
applicationId), ActorRef.noSender());
+      } else {
+        viewJob.setDagName(dagName);
+        viewJob.setDagId(dagId);
+        viewJob.setApplicationId(applicationId);
+        viewJobResourceManager.update(viewJob, viewJob.getId());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/cf5788e1/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/JobService.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/JobService.java
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/JobService.java
index c3ecf67..f054a9d 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/JobService.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/JobService.java
@@ -18,6 +18,7 @@
 
 package org.apache.ambari.view.hive2.resources.jobs;
 
+import akka.actor.ActorRef;
 import com.beust.jcommander.internal.Lists;
 import com.google.common.base.Optional;
 import org.apache.ambari.view.ViewResourceHandler;
@@ -111,7 +112,8 @@ public class JobService extends BaseService {
   protected Aggregator getAggregator() {
     if (aggregator == null) {
       IATSParser atsParser = getSharedObjectsFactory().getATSParser();
-      aggregator = new Aggregator(getResourceManager(), atsParser);
+      ActorRef operationController = ConnectionSystem.getInstance().getOperationController(context);
+      aggregator = new Aggregator(getResourceManager(), atsParser, operationController);
     }
     return aggregator;
   }


Mime
View raw message