hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtcarre...@apache.org
Subject hadoop git commit: YARN-4237 Support additional queries for ATSv2 Web UI. Contributed by Varun Saxena.
Date Thu, 15 Oct 2015 17:51:03 GMT
Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 bd5af9c5f -> 581a6b6df


YARN-4237 Support additional queries for ATSv2 Web UI. Contributed by
Varun Saxena.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/581a6b6d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/581a6b6d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/581a6b6d

Branch: refs/heads/YARN-2928
Commit: 581a6b6df70a04728f034b42f23f1fe3bb19ba34
Parents: bd5af9c
Author: Li Lu <gtcarrera9@apache.org>
Authored: Thu Oct 15 10:49:36 2015 -0700
Committer: Li Lu <gtcarrera9@apache.org>
Committed: Thu Oct 15 10:49:36 2015 -0700

----------------------------------------------------------------------
 .../reader/TimelineReaderWebServices.java       |  64 ++++++++++
 .../storage/FlowRunEntityReader.java            |  45 ++++++-
 .../storage/GenericEntityReader.java            |   3 -
 .../storage/TimelineEntityReader.java           |   3 +
 .../storage/flow/FlowRunRowKey.java             |  15 +++
 ...stTimelineReaderWebServicesHBaseStorage.java | 117 ++++++++++++++++++-
 6 files changed, 236 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/581a6b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
index 610f74c..83062f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
@@ -450,6 +450,70 @@ public class TimelineReaderWebServices {
   }
 
   /**
+   * Return a set of flows runs for the given flow id.
+   * Cluster ID is not provided by client so default cluster ID has to be taken.
+   */
+  @GET
+  @Path("/flowruns/{flowid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Set<TimelineEntity> getFlowRuns(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("flowid") String flowId,
+      @QueryParam("userid") String userId,
+      @QueryParam("limit") String limit,
+      @QueryParam("createdtimestart") String createdTimeStart,
+      @QueryParam("createdtimeend") String createdTimeEnd,
+      @QueryParam("fields") String fields) {
+    return getFlowRuns(req, res, null, flowId, userId, limit, createdTimeStart,
+        createdTimeEnd, fields);
+  }
+
+  /**
+   * Return a set of flow runs for the given cluster and flow id.
+   */
+  @GET
+  @Path("/flowruns/{clusterid}/{flowid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Set<TimelineEntity> getFlowRuns(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("clusterid") String clusterId,
+      @PathParam("flowid") String flowId,
+      @QueryParam("userid") String userId,
+      @QueryParam("limit") String limit,
+      @QueryParam("createdtimestart") String createdTimeStart,
+      @QueryParam("createdtimeend") String createdTimeEnd,
+      @QueryParam("fields") String fields) {
+    String url = req.getRequestURI() +
+        (req.getQueryString() == null ? "" :
+            QUERY_STRING_SEP + req.getQueryString());
+    UserGroupInformation callerUGI = getUser(req);
+    LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
+    long startTime = Time.monotonicNow();
+    init(res);
+    TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
+    Set<TimelineEntity> entities = null;
+    try {
+      entities = timelineReaderManager.getEntities(
+          parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId),
+          null, null, TimelineEntityType.YARN_FLOW_RUN.toString(),
+          parseLongStr(limit), parseLongStr(createdTimeStart),
+          parseLongStr(createdTimeEnd), null, null, null, null, null, null,
+          null, null, parseFieldsStr(fields, COMMA_DELIMITER));
+    } catch (Exception e) {
+      handleException(e, url, startTime, "createdTime start/end or limit");
+    }
+    long endTime = Time.monotonicNow();
+    if (entities == null) {
+      entities = Collections.emptySet();
+    }
+    LOG.info("Processed URL " + url +
+        " (Took " + (endTime - startTime) + " ms.)");
+    return entities;
+  }
+
+  /**
    * Return a list of flows for a given cluster id. Cluster ID is not
    * provided by client so default cluster ID has to be taken.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/581a6b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
index 90ce28f..c4b4e91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
@@ -56,7 +58,7 @@ class FlowRunEntityReader extends TimelineEntityReader {
     super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
         createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
         relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
-        eventFilters, fieldsToRetrieve, false);
+        eventFilters, fieldsToRetrieve, true);
   }
 
   public FlowRunEntityReader(String userId, String clusterId,
@@ -79,11 +81,27 @@ class FlowRunEntityReader extends TimelineEntityReader {
     Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
     Preconditions.checkNotNull(userId, "userId shouldn't be null");
     Preconditions.checkNotNull(flowId, "flowId shouldn't be null");
-    Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null");
+    if (singleEntityRead) {
+      Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null");
+    }
   }
 
   @Override
   protected void augmentParams(Configuration hbaseConf, Connection conn) {
+    if (!singleEntityRead) {
+      if (fieldsToRetrieve == null) {
+        fieldsToRetrieve = EnumSet.noneOf(Field.class);
+      }
+      if (limit == null || limit < 0) {
+        limit = TimelineReader.DEFAULT_LIMIT;
+      }
+      if (createdTimeBegin == null) {
+        createdTimeBegin = DEFAULT_BEGIN_TIME;
+      }
+      if (createdTimeEnd == null) {
+        createdTimeEnd = DEFAULT_END_TIME;
+      }
+    }
   }
 
   @Override
@@ -99,8 +117,11 @@ class FlowRunEntityReader extends TimelineEntityReader {
   @Override
   protected ResultScanner getResults(Configuration hbaseConf,
       Connection conn) throws IOException {
-    throw new UnsupportedOperationException(
-        "multiple entity query is not supported");
+    Scan scan = new Scan();
+    scan.setRowPrefixFilter(
+        FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowId));
+    scan.setFilter(new PageFilter(limit));
+    return table.getResultScanner(hbaseConf, conn, scan);
   }
 
   @Override
@@ -108,13 +129,23 @@ class FlowRunEntityReader extends TimelineEntityReader {
     FlowRunEntity flowRun = new FlowRunEntity();
     flowRun.setUser(userId);
     flowRun.setName(flowId);
-    flowRun.setRunId(flowRunId);
+    if (singleEntityRead) {
+      flowRun.setRunId(flowRunId);
+    } else {
+      FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow());
+      flowRun.setRunId(rowKey.getFlowRunId());
+    }
 
     // read the start time
     Number startTime = (Number)FlowRunColumn.MIN_START_TIME.readResult(result);
     if (startTime != null) {
       flowRun.setStartTime(startTime.longValue());
     }
+    if (!singleEntityRead && (flowRun.getStartTime() < createdTimeBegin ||
+        flowRun.getStartTime() > createdTimeEnd)) {
+      return null;
+    }
+
     // read the end time if available
     Number endTime = (Number)FlowRunColumn.MAX_END_TIME.readResult(result);
     if (endTime != null) {
@@ -128,7 +159,9 @@ class FlowRunEntityReader extends TimelineEntityReader {
     }
 
     // read metrics
-    readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
+    if (singleEntityRead || fieldsToRetrieve.contains(Field.METRICS)) {
+      readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
+    }
 
     // set the id
     flowRun.setId(flowRun.getId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/581a6b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
index c18966f..bbca209 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
@@ -60,9 +60,6 @@ class GenericEntityReader extends TimelineEntityReader {
   private static final EntityTable ENTITY_TABLE = new EntityTable();
   private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
 
-  protected static final long DEFAULT_BEGIN_TIME = 0L;
-  protected static final long DEFAULT_END_TIME = Long.MAX_VALUE;
-
   /**
    * Used to look up the flow context.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/581a6b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
index d4a659c..adaf42e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
@@ -44,6 +44,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
  */
 abstract class TimelineEntityReader {
   private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class);
+  protected static final long DEFAULT_BEGIN_TIME = 0L;
+  protected static final long DEFAULT_END_TIME = Long.MAX_VALUE;
+
   protected final boolean singleEntityRead;
 
   protected String userId;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/581a6b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
index 7ed3651..a14d2bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -55,6 +55,21 @@ public class FlowRunRowKey {
   }
 
   /**
+   * Constructs a row key prefix for the flow run table as follows: {
+   * clusterId!userI!flowId!}
+   *
+   * @param clusterId
+   * @param userId
+   * @param flowId
+   * @return byte array with the row key prefix
+   */
+  public static byte[] getRowKeyPrefix(String clusterId, String userId,
+      String flowId) {
+    return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId,
+        flowId, ""));
+  }
+
+  /**
    * Constructs a row key for the entity table as follows: {
    * clusterId!userI!flowId!Inverted Flow Run Id}
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/581a6b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
index a89d2fc..f6a5090 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -172,11 +172,11 @@ public class TestTimelineReaderWebServicesHBaseStorage {
     id = "application_11111111111111_2223";
     entity3.setId(id);
     entity3.setType(type);
-    cTime = 1425016501030L;
+    cTime = 1425016501037L;
     entity3.setCreatedTime(cTime);
     TimelineEvent event2 = new TimelineEvent();
     event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    event2.setTimestamp(1436512802030L);
+    event2.setTimestamp(1436512802037L);
     event2.addInfo("foo_event", "test");
     entity3.addEvent(event2);
     te3.addEntity(entity3);
@@ -364,6 +364,119 @@ public class TestTimelineReaderWebServicesHBaseStorage {
     }
   }
 
+
+  @Test
+  public void testGetFlowRuns() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowruns/cluster1/flow_name?userid=user1");
+      ClientResponse resp = getResponse(client, uri);
+      Set<FlowRunEntity> entities =
+          resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+      for (FlowRunEntity entity : entities) {
+        assertTrue("Id, run id or start time does not match.",
+            ((entity.getId().equals("user1@flow_name/1002345678919")) &&
+            (entity.getRunId() == 1002345678919L) &&
+            (entity.getStartTime() == 1425016501000L)) ||
+            ((entity.getId().equals("user1@flow_name/1002345678920")) &&
+            (entity.getRunId() == 1002345678920L) &&
+            (entity.getStartTime() == 1425016501034L)));
+        assertEquals(0, entity.getMetrics().size());
+      }
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowruns/cluster1/flow_name?userid=user1&limit=1");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      for (FlowRunEntity entity : entities) {
+        assertTrue("Id, run id or start time does not match.",
+            entity.getId().equals("user1@flow_name/1002345678920") &&
+            entity.getRunId() == 1002345678920L &&
+            entity.getStartTime() == 1425016501034L);
+        assertEquals(0, entity.getMetrics().size());
+      }
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowruns/cluster1/flow_name?userid=user1&" +
+          "createdtimestart=1425016501030");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      for (FlowRunEntity entity : entities) {
+        assertTrue("Id, run id or start time does not match.",
+            entity.getId().equals("user1@flow_name/1002345678920") &&
+            entity.getRunId() == 1002345678920L &&
+            entity.getStartTime() == 1425016501034L);
+        assertEquals(0, entity.getMetrics().size());
+      }
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowruns/cluster1/flow_name?userid=user1&" +
+          "createdtimestart=1425016500999&createdtimeend=1425016501035");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+      for (FlowRunEntity entity : entities) {
+        assertTrue("Id, run id or start time does not match.",
+            ((entity.getId().equals("user1@flow_name/1002345678919")) &&
+            (entity.getRunId() == 1002345678919L) &&
+            (entity.getStartTime() == 1425016501000L)) ||
+            ((entity.getId().equals("user1@flow_name/1002345678920")) &&
+            (entity.getRunId() == 1002345678920L) &&
+            (entity.getStartTime() == 1425016501034L)));
+        assertEquals(0, entity.getMetrics().size());
+      }
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowruns/cluster1/flow_name?userid=user1&" +
+          "createdtimeend=1425016501030");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      for (FlowRunEntity entity : entities) {
+        assertTrue("Id, run id or start time does not match.",
+             entity.getId().equals("user1@flow_name/1002345678919") &&
+             entity.getRunId() == 1002345678919L &&
+             entity.getStartTime() == 1425016501000L);
+        assertEquals(0, entity.getMetrics().size());
+      }
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowruns/cluster1/flow_name?userid=user1&fields=metrics");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+      for (FlowRunEntity entity : entities) {
+        assertTrue("Id, run id or start time does not match.",
+            ((entity.getId().equals("user1@flow_name/1002345678919")) &&
+            (entity.getRunId() == 1002345678919L) &&
+            (entity.getStartTime() == 1425016501000L) &&
+            (entity.getMetrics().size() == 2)) ||
+            ((entity.getId().equals("user1@flow_name/1002345678920")) &&
+            (entity.getRunId() == 1002345678920L) &&
+            (entity.getStartTime() == 1425016501034L) &&
+            (entity.getMetrics().size() == 0)));
+      }
+    } finally {
+      client.destroy();
+    }
+  }
+
   @Test
   public void testGetFlows() throws Exception {
     Client client = createClient();


Mime
View raw message