hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rohithsharm...@apache.org
Subject [hadoop] 01/05: YARN-9335 [atsv2] Restrict the number of elements held in timeline collector when backend is unreachable for async calls. Contributed by Abhishesk Modi.
Date Thu, 05 Sep 2019 12:52:17 GMT
This is an automated email from the ASF dual-hosted git repository.

rohithsharmaks pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit bcacb5711425f938c87dc985afdc5ec2464d58f2
Author: Vrushali C <vrushali@apache.org>
AuthorDate: Fri Apr 5 12:06:51 2019 -0700

    YARN-9335 [atsv2] Restrict the number of elements held in timeline collector when backend
is unreachable for async calls. Contributed by Abhishesk Modi.
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  9 ++++
 .../src/main/resources/yarn-default.xml            |  7 +++
 .../collector/TimelineCollector.java               | 24 +++++++++-
 .../collector/TestTimelineCollector.java           | 51 ++++++++++++++++++++--
 4 files changed, 87 insertions(+), 4 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 90d4298..c69d857 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2706,6 +2706,15 @@ public class YarnConfiguration extends Configuration {
   public static final int
       DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60;
 
+  /** The setting that controls the capacity of the queue for async writes
+   * to timeline collector.
+   */
+  public static final String TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY =
+      TIMELINE_SERVICE_PREFIX + "writer.async.queue.capacity";
+
+  public static final int
+      DEFAULT_TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY = 100;
+
   /**
    * The name for setting that controls how long the final value of
    * a metric of a completed app is retained before merging
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 72125b1..3b9cf5d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2568,6 +2568,13 @@
   </property>
 
   <property>
+    <description>The setting that decides the capacity of the queue to hold
+    asynchronous timeline entities.</description>
+    <name>yarn.timeline-service.writer.async.queue.capacity</name>
+    <value>100</value>
+  </property>
+
+  <property>
     <description>Time period till which the application collector will be alive
      in NM, after the  application master container finishes.</description>
     <name>yarn.timeline-service.app-collector.linger-period.ms</name>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index e9eeb43..c0004e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -23,8 +23,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -37,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +65,7 @@ public abstract class TimelineCollector extends CompositeService {
       = new ConcurrentHashMap<>();
   private static Set<String> entityTypesSkipAggregation
       = new HashSet<>();
+  private ThreadPoolExecutor pool;
 
   private volatile boolean readyToAggregate = false;
 
@@ -73,6 +78,14 @@ public abstract class TimelineCollector extends CompositeService {
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
+    int capacity = conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY
+    );
+    pool = new ThreadPoolExecutor(1, 1, 3, TimeUnit.SECONDS,
+        new ArrayBlockingQueue<>(capacity));
+    pool.setRejectedExecutionHandler(
+        new ThreadPoolExecutor.DiscardOldestPolicy());
   }
 
   @Override
@@ -83,6 +96,7 @@ public abstract class TimelineCollector extends CompositeService {
   @Override
   protected void serviceStop() throws Exception {
     isStopped = true;
+    pool.shutdownNow();
     super.serviceStop();
   }
 
@@ -221,7 +235,15 @@ public abstract class TimelineCollector extends CompositeService {
           callerUgi + ")");
     }
 
-    writeTimelineEntities(entities, callerUgi);
+    pool.execute(new Runnable() {
+      @Override public void run() {
+        try {
+          writeTimelineEntities(entities, callerUgi);
+        } catch (IOException ie) {
+          LOG.error("Got an exception while writing entity", ie);
+        }
+      }
+    });
   }
 
   /**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
index 88e4f25..b2fc80e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
@@ -27,11 +27,15 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector.AggregationStatusTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.junit.Test;
 
 import com.google.common.collect.Sets;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -46,6 +50,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestTimelineCollector {
 
@@ -165,17 +170,57 @@ public class TestTimelineCollector {
    * putEntityAsync() calls.
    */
   @Test
-  public void testPutEntityAsync() throws IOException {
+  public void testPutEntityAsync() throws Exception {
     TimelineWriter writer = mock(TimelineWriter.class);
     TimelineCollector collector = new TimelineCollectorForTest(writer);
-
+    collector.init(new Configuration());
+    collector.start();
     TimelineEntities entities = generateTestEntities(1, 1);
     collector.putEntitiesAsync(
         entities, UserGroupInformation.createRemoteUser("test-user"));
-
+    Thread.sleep(1000);
     verify(writer, times(1)).write(any(TimelineCollectorContext.class),
         any(TimelineEntities.class), any(UserGroupInformation.class));
     verify(writer, never()).flush();
+    collector.stop();
+  }
+
+  /**
+   * Test TimelineCollector's discarding entities in case of async writes if
+   * write is taking too much time.
+   */
+  @Test
+  public void testAsyncEntityDiscard() throws Exception {
+    TimelineWriter writer = mock(TimelineWriter.class);
+
+    when(writer.write(any(), any(), any()))
+        .thenAnswer(new Answer<TimelineWriteResponse>() {
+          @Override
+          public TimelineWriteResponse answer(InvocationOnMock invocation)
+              throws InterruptedException {
+            Thread.sleep(500);
+            return new TimelineWriteResponse();
+          }
+        });
+
+    TimelineCollector collector = new TimelineCollectorForTest(writer);
+    Configuration config = new Configuration();
+    config
+        .setInt(YarnConfiguration.TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY,
+            3);
+    collector.init(config);
+    collector.start();
+    for (int i = 0; i < 10; ++i) {
+      TimelineEntities entities = generateTestEntities(i + 1, 1);
+      collector.putEntitiesAsync(entities,
+          UserGroupInformation.createRemoteUser("test-user"));
+    }
+    Thread.sleep(3000);
+    verify(writer, times(4))
+        .write(any(TimelineCollectorContext.class), any(TimelineEntities.class),
+            any(UserGroupInformation.class));
+    verify(writer, never()).flush();
+    collector.stop();
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message