ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yus...@apache.org
Subject ambari git commit: AMBARI-14081. HiveView fails to find DAGs corresponding to query. (Dipayan Bhowmick via yusaku)
Date Wed, 02 Dec 2015 05:56:52 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk fec19f364 -> 21b37c20f


AMBARI-14081. HiveView fails to find DAGs corresponding to query. (Dipayan Bhowmick via yusaku)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/21b37c20
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/21b37c20
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/21b37c20

Branch: refs/heads/trunk
Commit: 21b37c20f66f3b87fe2d521d47bb240593b247a6
Parents: fec19f3
Author: Yusaku Sako <yusaku@hortonworks.com>
Authored: Tue Dec 1 21:56:22 2015 -0800
Committer: Yusaku Sako <yusaku@hortonworks.com>
Committed: Tue Dec 1 21:56:22 2015 -0800

----------------------------------------------------------------------
 .../view/hive/resources/jobs/Aggregator.java    | 24 ++++-----
 .../hive/resources/jobs/atsJobs/ATSParser.java  | 14 +++++
 .../jobs/atsJobs/ATSRequestsDelegate.java       |  2 +
 .../jobs/atsJobs/ATSRequestsDelegateImpl.java   | 13 +++++
 .../resources/jobs/atsJobs/HiveQueryId.java     |  3 ++
 .../hive/resources/jobs/atsJobs/IATSParser.java |  2 +
 .../view/hive/resources/jobs/ATSParserTest.java | 56 ++++++++++++++++++++
 .../hive/resources/jobs/AggregatorTest.java     | 42 +++++++++++++++
 8 files changed, 144 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/21b37c20/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java
b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java
index 31eb678..4eb1568 100644
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java
@@ -82,14 +82,7 @@ public class Aggregator {
     List<Job> allJobs = new LinkedList<Job>();
     for (HiveQueryId atsHiveQuery : ats.getHiveQueryIdsList(username)) {
 
-      TezDagId atsTezDag;
-      if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0) {
-        String dagName = atsHiveQuery.dagNames.get(0);
-
-        atsTezDag = ats.getTezDAGByName(dagName);
-      } else {
-        atsTezDag = new TezDagId();
-      }
+      TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery);
 
       JobImpl atsJob;
       if (hasOperationId(atsHiveQuery)) {
@@ -135,17 +128,24 @@ public class Aggregator {
     String hexGuid = Hex.encodeHexString(operationHandle.getOperationId().getGuid());
     HiveQueryId atsHiveQuery = ats.getHiveQueryIdByOperationId(hexStringToUrlSafeBase64(hexGuid));
 
+    TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery);
+
+    saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob);
+    return mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob);
+  }
+
+  private TezDagId getTezDagFromHiveQueryId(HiveQueryId atsHiveQuery) {
     TezDagId atsTezDag;
-    if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0) {
+    if (atsHiveQuery.version >= HiveQueryId.ATS_15_RESPONSE_VERSION) {
+      atsTezDag = ats.getTezDAGByEntity(atsHiveQuery.entity);
+    } else if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() >
0) {
       String dagName = atsHiveQuery.dagNames.get(0);
 
       atsTezDag = ats.getTezDAGByName(dagName);
     } else {
       atsTezDag = new TezDagId();
     }
-
-    saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob);
-    return mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob);
+    return atsTezDag;
   }
 
   protected boolean hasOperationId(HiveQueryId atsHiveQuery) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/21b37c20/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java
b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java
index 1138767..c4c85ad 100644
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java
@@ -94,6 +94,16 @@ public class ATSParser implements IATSParser {
   @Override
   public TezDagId getTezDAGByName(String name) {
     JSONArray tezDagEntities = (JSONArray) delegate.tezDagByName(name).get("entities");
+    return parseTezDag(tezDagEntities);
+  }
+
+  @Override
+  public TezDagId getTezDAGByEntity(String entity) {
+    JSONArray tezDagEntities = (JSONArray) delegate.tezDagByEntity(entity).get("entities");
+    return parseTezDag(tezDagEntities);
+  }
+
+  private TezDagId parseTezDag(JSONArray tezDagEntities) {
     assert tezDagEntities.size() <= 1;
     if (tezDagEntities.size() == 0) {
       return new TezDagId();
@@ -151,6 +161,10 @@ public class ATSParser implements IATSParser {
       parsedJob.dagNames = dagIds;
       parsedJob.stages = stagesList;
     }
+
+    if (otherinfo.get("VERSION") != null) {
+      parsedJob.version = (Long) otherinfo.get("VERSION");
+    }
     return parsedJob;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/21b37c20/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java
b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java
index 2e59ea0..02091f8 100644
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java
@@ -38,4 +38,6 @@ public interface ATSRequestsDelegate {
   JSONObject tezDagByName(String name);
 
   JSONObject tezVerticesListForDAG(String dagId);
+
+  JSONObject tezDagByEntity(String entity);
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/21b37c20/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java
b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java
index 8f12991..8989449 100644
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java
@@ -97,11 +97,24 @@ public class ATSRequestsDelegateImpl implements ATSRequestsDelegate {
   }
 
   @Override
+  public JSONObject tezDagByEntity(String entity) {
+    String tezDagEntityUrl = tezDagEntityUrl(entity);
+    String response = readFromWithDefault(tezDagEntityUrl, EMPTY_ENTITIES_JSON);
+    return (JSONObject) JSONValue.parse(response);
+  }
+
+  private String tezDagEntityUrl(String entity) {
+    return atsUrl + "/ws/v1/timeline/TEZ_DAG_ID?primaryFilter=callerId:" + entity;
+  }
+
+  @Override
   public JSONObject tezVerticesListForDAG(String dagId) {
     String response = readFromWithDefault(tezVerticesListForDAGUrl(dagId), "{ \"entities\"
: [  ] }");
     return (JSONObject) JSONValue.parse(response);
   }
 
+
+
   protected String readFromWithDefault(String atsUrl, String defaultResponse) {
     String response;
     try {

http://git-wip-us.apache.org/repos/asf/ambari/blob/21b37c20/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java
b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java
index af54dbb..bb81fef 100644
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java
@@ -23,6 +23,8 @@ import org.json.simple.JSONObject;
 import java.util.List;
 
 public class HiveQueryId {
+  public static long ATS_15_RESPONSE_VERSION = 2; // version returned from ATS 1.5 release
+
   public String url;
 
   public String entity;
@@ -36,4 +38,5 @@ public class HiveQueryId {
   public long duration;
   public String operationId;
   public String user;
+  public long version;
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/21b37c20/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java
b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java
index 292ead8..f51b880 100644
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java
@@ -28,4 +28,6 @@ public interface IATSParser {
   HiveQueryId getHiveQueryIdByOperationId(String guidString);
 
   TezDagId getTezDAGByName(String name);
+
+  TezDagId getTezDAGByEntity(String entity);
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/21b37c20/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/ATSParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/ATSParserTest.java
b/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/ATSParserTest.java
index a0cf57a..d8e60c4 100644
--- a/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/ATSParserTest.java
+++ b/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/ATSParserTest.java
@@ -54,6 +54,13 @@ public class ATSParserTest {
     Assert.assertEquals("hive_20150209144848_c3a5a07b-c3b6-4f57-a6d5-3dadecdd6fd0:4", job.dagNames.get(0));
 
     Assert.assertEquals(2, job.stages.size());
+    Assert.assertTrue(HiveQueryId.ATS_15_RESPONSE_VERSION > job.version);
+
+    jobLoader = new ATSParser(new ATSV15RequestsDelegateStub());
+    List<HiveQueryId> jobsv2 = jobLoader.getHiveQueryIdsList("hive");
+    Assert.assertEquals(1, jobsv2.size());
+    HiveQueryId jobv2 = jobsv2.get(0);
+    Assert.assertTrue(HiveQueryId.ATS_15_RESPONSE_VERSION <= jobv2.version);
   }
 
   @Test
@@ -67,6 +74,50 @@ public class ATSParserTest {
     Assert.assertEquals("SUCCEEDED", tezDag.status);
   }
 
+  @Test
+  public void testGetTezDagByEntity() throws Exception {
+    IATSParser jobLoader = new ATSParser(new ATSV15RequestsDelegateStub());
+
+    TezDagId tezDag = jobLoader.getTezDAGByEntity("hive_20150209144848_c3a5a07b-c3b6-4f57-a6d5-3dadecdd6fd0:4");
+
+    Assert.assertEquals("dag_1423156117563_0005_2", tezDag.entity);
+    Assert.assertEquals("application_1423156117563_0005", tezDag.applicationId);
+    Assert.assertEquals("SUCCEEDED", tezDag.status);
+  }
+
+  protected static class ATSV15RequestsDelegateStub extends ATSRequestsDelegateStub {
+    /**
+     * This returns the version field that the ATS v1.5 returns.
+     */
+    @Override
+    public JSONObject hiveQueryIdList(String username) {
+      return (JSONObject) JSONValue.parse(
+        "{ \"entities\" : [ { \"domain\" : \"DEFAULT\",\n" +
+          "        \"entity\" : \"hive_20150209144848_c3a5a07b-c3b6-4f57-a6d5-3dadecdd6fd0\",\n"
+
+          "        \"entitytype\" : \"HIVE_QUERY_ID\",\n" +
+          "        \"events\" : [ { \"eventinfo\" : {  },\n" +
+          "              \"eventtype\" : \"QUERY_COMPLETED\",\n" +
+          "              \"timestamp\" : 1423493342843\n" +
+          "            },\n" +
+          "            { \"eventinfo\" : {  },\n" +
+          "              \"eventtype\" : \"QUERY_SUBMITTED\",\n" +
+          "              \"timestamp\" : 1423493324355\n" +
+          "            }\n" +
+          "          ],\n" +
+          "        \"otherinfo\" : { \"MAPRED\" : false,\n" +
+          "            \"QUERY\" : \"{\\\"queryText\\\":\\\"select count(*) from z\\\",\\\"queryPlan\\\":{\\\"STAGE
PLANS\\\":{\\\"Stage-1\\\":{\\\"Tez\\\":{\\\"DagName:\\\":\\\"hive_20150209144848_c3a5a07b-c3b6-4f57-a6d5-3dadecdd6fd0:4\\\",\\\"Vertices:\\\":{\\\"Reducer
2\\\":{\\\"Reduce Operator Tree:\\\":{\\\"Group By Operator\\\":{\\\"mode:\\\":\\\"mergepartial\\\",\\\"aggregations:\\\":[\\\"count(VALUE._col0)\\\"],\\\"outputColumnNames:\\\":[\\\"_col0\\\"],\\\"children\\\":{\\\"Select
Operator\\\":{\\\"expressions:\\\":\\\"_col0 (type: bigint)\\\",\\\"outputColumnNames:\\\":[\\\"_col0\\\"],\\\"children\\\":{\\\"File
Output Operator\\\":{\\\"Statistics:\\\":\\\"Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: COMPLETE\\\",\\\"compressed:\\\":\\\"false\\\",\\\"table:\\\":{\\\"serde:\\\":\\\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\\\",\\\"input
format:\\\":\\\"org.apache.hadoop.mapred.TextInputFormat\\\",\\\"output format:\\\":\\\"org.apache.hadoop.hive.ql.i
 o.HiveIgnoreKeyTextOutputFormat\\\"}}},\\\"Statistics:\\\":\\\"Num rows: 1 Data size: 8 Basic
stats: COMPLETE Column stats: COMPLETE\\\"}},\\\"Statistics:\\\":\\\"Num rows: 1 Data size:
8 Basic stats: COMPLETE Column stats: COMPLETE\\\"}}},\\\"Map 1\\\":{\\\"Map Operator Tree:\\\":[{\\\"TableScan\\\":{\\\"alias:\\\":\\\"z\\\",\\\"children\\\":{\\\"Select
Operator\\\":{\\\"children\\\":{\\\"Group By Operator\\\":{\\\"mode:\\\":\\\"hash\\\",\\\"aggregations:\\\":[\\\"count()\\\"],\\\"outputColumnNames:\\\":[\\\"_col0\\\"],\\\"children\\\":{\\\"Reduce
Output Operator\\\":{\\\"sort order:\\\":\\\"\\\",\\\"value expressions:\\\":\\\"_col0 (type:
bigint)\\\",\\\"Statistics:\\\":\\\"Num rows: 1 Data size: 8 Basic stats: COMPLETE Column
stats: COMPLETE\\\"}},\\\"Statistics:\\\":\\\"Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: COMPLETE\\\"}},\\\"Statistics:\\\":\\\"Num rows: 0 Data size: 40 Basic stats:
PARTIAL Column stats: COMPLETE\\\"}},\\\"Statistics:\\\":\\\"Num rows: 0 
 Data size: 40 Basic stats: PARTIAL Column stats: COMPLETE\\\"}}]}},\\\"Edges:\\\":{\\\"Reducer
2\\\":{\\\"parent\\\":\\\"Map 1\\\",\\\"type\\\":\\\"SIMPLE_EDGE\\\"}}}},\\\"Stage-0\\\":{\\\"Fetch
Operator\\\":{\\\"limit:\\\":\\\"-1\\\",\\\"Processor Tree:\\\":{\\\"ListSink\\\":{}}}}},\\\"STAGE
DEPENDENCIES\\\":{\\\"Stage-1\\\":{\\\"ROOT STAGE\\\":\\\"TRUE\\\"},\\\"Stage-0\\\":{\\\"DEPENDENT
STAGES\\\":\\\"Stage-1\\\"}}}}\",\n" +
+          "            \"STATUS\" : true,\n" +
+          "            \"TEZ\" : true\n" +
+          "            \"VERSION\" : 2\n" +
+          "          },\n" +
+          "        \"primaryfilters\" : { \"user\" : [ \"hive\" ] },\n" +
+          "        \"relatedentities\" : {  },\n" +
+          "        \"starttime\" : 1423493324355\n" +
+          "      } ] }"
+      );
+    }
+  }
+
   protected static class ATSRequestsDelegateStub implements ATSRequestsDelegate {
 
     @Override
@@ -438,5 +489,10 @@ public class ATSParserTest {
     public JSONObject tezVerticesListForDAG(String dagId) {
       return null;
     }
+
+    @Override
+    public JSONObject tezDagByEntity(String entity) {
+      return tezDagByName(entity);
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/21b37c20/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/AggregatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/AggregatorTest.java
b/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/AggregatorTest.java
index 3347d15..e1f7c7c 100644
--- a/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/AggregatorTest.java
+++ b/contrib/views/hive/src/test/java/org/apache/ambari/view/hive/resources/jobs/AggregatorTest.java
@@ -140,6 +140,33 @@ public class AggregatorTest {
   }
 
   @Test
+  public void testReadJobBothATSAndViewV2() throws Exception {
+    HiveQueryId hiveQueryId = getSampleHiveQueryIdV2("ENTITY-NAME");
+    hiveQueryId.operationId = Aggregator.hexStringToUrlSafeBase64("1b2b");
+    MockATSParser atsParser = getMockATSWithQueries(hiveQueryId);
+
+    MockJobResourceManager jobResourceManager = getJobResourceManagerWithJobs(getSampleViewJob("1"));
+
+    StoredOperationHandle operationHandle = getSampleOperationHandle("5", "1");
+    operationHandle.setGuid("1b2b");
+    MockOperationHandleResourceManager operationHandleResourceManager = getOperationHandleRMWithEntities(operationHandle);
+
+    Aggregator aggregator = new Aggregator(jobResourceManager,
+      operationHandleResourceManager,
+      atsParser);
+
+    List<Job> aggregated = aggregator.readAll("luke");
+
+    Assert.assertEquals(1, aggregated.size());
+    Job job = aggregated.get(0);
+    Assert.assertEquals("1", job.getId());
+    Assert.assertEquals("app_test_1", job.getApplicationId());
+    Assert.assertEquals("ENTITY-NAME", job.getDagId());
+    Assert.assertEquals("SUCCEEDED", job.getStatus());
+  }
+
+
+  @Test
   public void testReadJobComplex() throws Exception {
     //job both on ATS and View
     HiveQueryId hiveQueryId1 = getSampleHiveQueryId("ENTITY-NAME");
@@ -232,6 +259,12 @@ public class AggregatorTest {
     return hiveQueryId;
   }
 
+  private HiveQueryId getSampleHiveQueryIdV2(String id) {
+    HiveQueryId hiveQueryId = getSampleHiveQueryId(id);
+    hiveQueryId.version = HiveQueryId.ATS_15_RESPONSE_VERSION;
+    return hiveQueryId;
+  }
+
   @Test
   public void testGetJobByOperationId() throws Exception {
 
@@ -412,6 +445,15 @@ public class AggregatorTest {
       return new TezDagId();
     }
 
+    @Override
+    public TezDagId getTezDAGByEntity(String entity) {
+      TezDagId dagId = new TezDagId();
+      dagId.applicationId = "app_test_1";
+      dagId.entity = entity;
+      dagId.status = "SUCCEEDED";
+      return dagId;
+    }
+
     public List<HiveQueryId> getHiveQueryIds() {
       return hiveQueryIds;
     }


Mime
View raw message