hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [08/50] [abbrv] hadoop git commit: YARN-6357. Implement putEntitiesAsync API in TimelineCollector (Haibo Chen via Varun Saxena)
Date Tue, 04 Apr 2017 19:56:32 GMT
YARN-6357. Implement putEntitiesAsync API in TimelineCollector (Haibo Chen via Varun Saxena)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/063b513b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/063b513b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/063b513b

Branch: refs/heads/HDFS-7240
Commit: 063b513b1c10987461caab3d26c8543c6e657bf7
Parents: 01aca54
Author: Varun Saxena <varunsaxena@apache.org>
Authored: Wed Mar 29 03:48:03 2017 +0530
Committer: Varun Saxena <varunsaxena@apache.org>
Committed: Wed Mar 29 03:48:03 2017 +0530

----------------------------------------------------------------------
 .../collector/TimelineCollector.java            | 31 ++++++++--
 .../collector/TimelineCollectorWebService.java  | 12 ++--
 .../collector/TestTimelineCollector.java        | 63 ++++++++++++++++++++
 3 files changed, 96 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/063b513b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index 2fc3033..353066b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -133,19 +133,35 @@ public abstract class TimelineCollector extends CompositeService {
   public TimelineWriteResponse putEntities(TimelineEntities entities,
       UserGroupInformation callerUgi) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE");
       LOG.debug("putEntities(entities=" + entities + ", callerUgi="
           + callerUgi + ")");
     }
-    TimelineCollectorContext context = getTimelineEntityContext();
 
+    TimelineWriteResponse response = writeTimelineEntities(entities);
+    flushBufferedTimelineEntities();
+
+    return response;
+  }
+
+  private TimelineWriteResponse writeTimelineEntities(
+      TimelineEntities entities) throws IOException {
     // Update application metrics for aggregation
     updateAggregateStatus(entities, aggregationGroups,
         getEntityTypesSkipAggregation());
 
+    final TimelineCollectorContext context = getTimelineEntityContext();
     return writer.write(context.getClusterId(), context.getUserId(),
-        context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(),
-        context.getAppId(), entities);
+        context.getFlowName(), context.getFlowVersion(),
+        context.getFlowRunId(), context.getAppId(), entities);
+  }
+
+  /**
+   * Flush buffered timeline entities, if any.
+   * @throws IOException if there is any exception encountered while
+   *      flushing buffered entities.
+   */
+  private void flushBufferedTimelineEntities() throws IOException {
+    writer.flush();
   }
 
   /**
@@ -158,14 +174,17 @@ public abstract class TimelineCollector extends CompositeService {
    *
    * @param entities entities to post
    * @param callerUgi the caller UGI
+   * @throws IOException if there is any exception encounted while putting
+   *     entities.
    */
   public void putEntitiesAsync(TimelineEntities entities,
-      UserGroupInformation callerUgi) {
-    // TODO implement
+      UserGroupInformation callerUgi) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" +
           callerUgi + ")");
     }
+
+    writeTimelineEntities(entities);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/063b513b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
index f36c636..fe04b7a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
@@ -152,9 +152,6 @@ public class TimelineCollectorWebService {
       throw new ForbiddenException(msg);
     }
 
-    // TODO how to express async posts and handle them
-    boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
-
     try {
       ApplicationId appID = parseApplicationId(appId);
       if (appID == null) {
@@ -169,7 +166,14 @@ public class TimelineCollectorWebService {
         throw new NotFoundException(); // different exception?
       }
 
-      collector.putEntities(processTimelineEntities(entities), callerUgi);
+      boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
+      if (isAsync) {
+        collector.putEntitiesAsync(
+            processTimelineEntities(entities), callerUgi);
+      } else {
+        collector.putEntities(processTimelineEntities(entities), callerUgi);
+      }
+
       return Response.ok().build();
     } catch (Exception e) {
       LOG.error("Error putting entities", e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/063b513b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
index 5b4dc50..a55f227 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
@@ -18,17 +18,27 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class TestTimelineCollector {
 
@@ -124,4 +134,57 @@ public class TestTimelineCollector {
     }
 
   }
+
+  /**
+   * Test TimelineCollector's interaction with TimelineWriter upon
+   * putEntity() calls.
+   */
+  @Test
+  public void testPutEntity() throws IOException {
+    TimelineWriter writer = mock(TimelineWriter.class);
+    TimelineCollector collector = new TimelineCollectorForTest(writer);
+
+    TimelineEntities entities = generateTestEntities(1, 1);
+    collector.putEntities(
+        entities, UserGroupInformation.createRemoteUser("test-user"));
+
+    verify(writer, times(1)).write(
+        anyString(), anyString(), anyString(), anyString(), anyLong(),
+        anyString(), any(TimelineEntities.class));
+    verify(writer, times(1)).flush();
+  }
+
+  /**
+   * Test TimelineCollector's interaction with TimelineWriter upon
+   * putEntityAsync() calls.
+   */
+  @Test
+  public void testPutEntityAsync() throws IOException {
+    TimelineWriter writer = mock(TimelineWriter.class);
+    TimelineCollector collector = new TimelineCollectorForTest(writer);
+
+    TimelineEntities entities = generateTestEntities(1, 1);
+    collector.putEntitiesAsync(
+        entities, UserGroupInformation.createRemoteUser("test-user"));
+
+    verify(writer, times(1)).write(
+        anyString(), anyString(), anyString(), anyString(), anyLong(),
+        anyString(), any(TimelineEntities.class));
+    verify(writer, never()).flush();
+  }
+
+  private static class TimelineCollectorForTest extends TimelineCollector {
+    private final TimelineCollectorContext context =
+        new TimelineCollectorContext();
+
+    TimelineCollectorForTest(TimelineWriter writer) {
+      super("TimelineCollectorForTest");
+      setWriter(writer);
+    }
+
+    @Override
+    public TimelineCollectorContext getTimelineEntityContext() {
+      return context;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message