hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtcarre...@apache.org
Subject hadoop git commit: YARN-4696. Improving EntityGroupFSTimelineStore on exception handling, test setup, and concurrency. (Steve Loughran via gtcarrera9)
Date Thu, 10 Mar 2016 18:56:07 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 318c9b68b -> d49cfb350


YARN-4696. Improving EntityGroupFSTimelineStore on exception handling, test setup, and concurrency.
(Steve Loughran via gtcarrera9)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d49cfb35
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d49cfb35
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d49cfb35

Branch: refs/heads/trunk
Commit: d49cfb350454c2dfa2f3eb70f79b6d5030ce7bec
Parents: 318c9b6
Author: Li Lu <gtcarrera9@apache.org>
Authored: Thu Mar 10 10:51:55 2016 -0800
Committer: Li Lu <gtcarrera9@apache.org>
Committed: Thu Mar 10 10:51:55 2016 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   6 +
 .../hadoop/yarn/client/api/TimelineClient.java  |   4 +-
 .../api/impl/FileSystemTimelineWriter.java      |  51 ++---
 .../client/api/impl/TimelineClientImpl.java     |  13 ++
 .../yarn/client/api/impl/TimelineWriter.java    |  40 +++-
 .../timeline/webapp/TimelineWebServices.java    |  12 +-
 .../yarn/server/timeline/EntityCacheItem.java   |  40 ++--
 .../timeline/EntityGroupFSTimelineStore.java    | 204 ++++++++++++++-----
 .../hadoop/yarn/server/timeline/LogInfo.java    |  11 +-
 .../TestEntityGroupFSTimelineStore.java         |   8 +-
 10 files changed, 279 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 61d1d72..ff4b493 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1747,6 +1747,12 @@ public class YarnConfiguration extends Configuration {
   public static final long
       TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT = 7 * 60;
 
+  // This is temporary solution. The configuration will be deleted once we have
+  // the FileSystem API to check whether append operation is supported or not.
+  public static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+      = TIMELINE_SERVICE_PREFIX
+      + "entity-file.fs-support-append";
+
   // mark app-history related configs @Private as application history is going
   // to be integrated into the timeline service
   @Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index 258b9f5..09298b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.client.api;
 
+import java.io.Flushable;
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -41,7 +42,8 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
  */
 @Public
 @Evolving
-public abstract class TimelineClient extends AbstractService {
+public abstract class TimelineClient extends AbstractService implements
+    Flushable {
 
   /**
    * Create a timeline client. The current UGI when the user initialize the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
index aa1f1f8..9e719b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.Closeable;
+import java.io.FileNotFoundException;
 import java.io.Flushable;
 import java.io.IOException;
 import java.net.URI;
@@ -78,12 +79,6 @@ public class FileSystemTimelineWriter extends TimelineWriter{
   private static final Log LOG = LogFactory
       .getLog(FileSystemTimelineWriter.class);
 
-  // This is temporary solution. The configuration will be deleted once we have
-  // the FileSystem API to check whether append operation is supported or not.
-  private static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
-      = YarnConfiguration.TIMELINE_SERVICE_PREFIX
-          + "entity-file.fs-support-append";
-
   // App log directory must be readable by group so server can access logs
   // and writable by group so it can be deleted by server
   private static final short APP_LOG_DIR_PERMISSIONS = 0770;
@@ -122,20 +117,10 @@ public class FileSystemTimelineWriter extends TimelineWriter{
           .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
       YarnConfiguration
           .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
+    fs = FileSystem.newInstance(activePath.toUri(), fsConf);
 
-    String scheme = activePath.toUri().getScheme();
-    if (scheme == null) {
-      scheme = FileSystem.getDefaultUri(fsConf).getScheme();
-    }
-    if (scheme != null) {
-      String disableCacheName = String.format("fs.%s.impl.disable.cache",
-          scheme);
-      fsConf.setBoolean(disableCacheName, true);
-    }
-
-    fs = activePath.getFileSystem(fsConf);
     if (!fs.exists(activePath)) {
-      throw new IOException(activePath + " does not exist");
+      throw new FileNotFoundException(activePath + " does not exist");
     }
 
     summaryEntityTypes = new HashSet<String>(
@@ -168,7 +153,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
             timerTaskTTL);
 
     this.isAppendSupported =
-        conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
+        conf.getBoolean(
+            YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
 
     objMapper = createObjectMapper();
 
@@ -181,7 +167,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
               + "=" + cleanIntervalSecs + ", " +
           YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS
               + "=" + ttl + ", " +
-          TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+          YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
               + "=" + isAppendSupported + ", " +
           YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR
               + "=" + activePath);
@@ -196,6 +182,11 @@ public class FileSystemTimelineWriter extends TimelineWriter{
   }
 
   @Override
+  public String toString() {
+    return "FileSystemTimelineWriter writing to " + activePath;
+  }
+
+  @Override
   public TimelinePutResponse putEntities(
       ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
       TimelineEntity... entities) throws IOException, YarnException {
@@ -263,9 +254,20 @@ public class FileSystemTimelineWriter extends TimelineWriter{
   }
 
   @Override
-  public void close() throws Exception {
-    if (this.logFDsCache != null) {
-      this.logFDsCache.close();
+  public synchronized void close() throws Exception {
+    if (logFDsCache != null) {
+      LOG.debug("Closing cache");
+      logFDsCache.flush();
+      logFDsCache.close();
+      logFDsCache = null;
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (logFDsCache != null) {
+      LOG.debug("Flushing cache");
+      logFDsCache.flush();
     }
   }
 
@@ -333,6 +335,9 @@ public class FileSystemTimelineWriter extends TimelineWriter{
       if (writerClosed()) {
         prepareForWrite();
       }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Writing entity list of size " + entities.size());
+      }
       for (TimelineEntity entity : entities) {
         getObjectMapper().writeValue(getJsonGenerator(), entity);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 195a661..ef46229 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -326,6 +326,13 @@ public class TimelineClientImpl extends TimelineClient {
   }
 
   @Override
+  public void flush() throws IOException {
+    if (timelineWriter != null) {
+      timelineWriter.flush();
+    }
+  }
+
+  @Override
   public TimelinePutResponse putEntities(
       TimelineEntity... entities) throws IOException, YarnException {
     return timelineWriter.putEntities(entities);
@@ -432,6 +439,12 @@ public class TimelineClientImpl extends TimelineClient {
     operateDelegationToken(cancelDTAction);
   }
 
+  @Override
+  public String toString() {
+    return super.toString() + " with timeline server " + resURI
+        + " and writer " + timelineWriter;
+  }
+
   private Object operateDelegationToken(
       final PrivilegedExceptionAction<?> action)
       throws IOException, YarnException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
index c616e63..9590f4a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.yarn.client.api.impl;
 
+import java.io.Flushable;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
@@ -48,7 +50,7 @@ import com.sun.jersey.api.client.WebResource;
  */
 @Private
 @Unstable
-public abstract class TimelineWriter {
+public abstract class TimelineWriter implements Flushable {
 
   private static final Log LOG = LogFactory
       .getLog(TimelineWriter.class);
@@ -68,6 +70,16 @@ public abstract class TimelineWriter {
     // DO NOTHING
   }
 
+  @Override
+  public void flush() throws IOException {
+    // DO NOTHING
+  }
+
+  @Override
+  public String toString() {
+    return "Timeline writer posting to " + resURI;
+  }
+
   public TimelinePutResponse putEntities(
       TimelineEntity... entities) throws IOException, YarnException {
     TimelineEntities entitiesContainer = new TimelineEntities();
@@ -104,19 +116,27 @@ public abstract class TimelineWriter {
         }
       });
     } catch (UndeclaredThrowableException e) {
-      throw new IOException(e.getCause());
+      Throwable cause = e.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException)cause;
+      } else {
+        throw new IOException(cause);
+      }
     } catch (InterruptedException ie) {
-      throw new IOException(ie);
+      throw (IOException)new InterruptedIOException().initCause(ie);
     }
     if (resp == null ||
         resp.getClientResponseStatus() != ClientResponse.Status.OK) {
       String msg =
           "Failed to get the response from the timeline server.";
       LOG.error(msg);
-      if (LOG.isDebugEnabled() && resp != null) {
-        String output = resp.getEntity(String.class);
-        LOG.debug("HTTP error code: " + resp.getStatus()
-            + " Server response : \n" + output);
+      if (resp != null) {
+        msg += " HTTP error code: " + resp.getStatus();
+        if (LOG.isDebugEnabled()) {
+          String output = resp.getEntity(String.class);
+          LOG.debug("HTTP error code: " + resp.getStatus()
+              + " Server response : \n" + output);
+        }
       }
       throw new YarnException(msg);
     }
@@ -128,10 +148,16 @@ public abstract class TimelineWriter {
   public ClientResponse doPostingObject(Object object, String path) {
     WebResource webResource = client.resource(resURI);
     if (path == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("POST to " + resURI);
+      }
       return webResource.accept(MediaType.APPLICATION_JSON)
           .type(MediaType.APPLICATION_JSON)
           .post(ClientResponse.class, object);
     } else if (path.equals("domain")) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("PUT to " + resURI +"/" + path);
+      }
       return webResource.path(path).accept(MediaType.APPLICATION_JSON)
           .type(MediaType.APPLICATION_JSON)
           .put(ClientResponse.class, object);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
index eb47ef2..e1e684b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
@@ -129,9 +129,9 @@ public class TimelineWebServices {
           getUser(req));
     } catch (NumberFormatException e) {
       throw new BadRequestException(
-          "windowStart, windowEnd or limit is not a numeric value.");
+        "windowStart, windowEnd, fromTs or limit is not a numeric value: " + e);
     } catch (IllegalArgumentException e) {
-      throw new BadRequestException("requested invalid field.");
+      throw new BadRequestException("requested invalid field: " + e);
     } catch (Exception e) {
       LOG.error("Error getting entities", e);
       throw new WebApplicationException(e,
@@ -160,8 +160,7 @@ public class TimelineWebServices {
           parseFieldsStr(fields, ","),
           getUser(req));
     } catch (IllegalArgumentException e) {
-      throw new BadRequestException(
-          "requested invalid field.");
+      throw new BadRequestException(e);
     } catch (Exception e) {
       LOG.error("Error getting entity", e);
       throw new WebApplicationException(e,
@@ -201,8 +200,9 @@ public class TimelineWebServices {
           parseLongStr(limit),
           getUser(req));
     } catch (NumberFormatException e) {
-      throw new BadRequestException(
-          "windowStart, windowEnd or limit is not a numeric value.");
+      throw (BadRequestException)new BadRequestException(
+          "windowStart, windowEnd or limit is not a numeric value.")
+          .initCause(e);
     } catch (Exception e) {
       LOG.error("Error getting entity timelines", e);
       throw new WebApplicationException(e,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
index efbf994..7eec7c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
@@ -107,30 +107,30 @@ public class EntityCacheItem {
           store.init(config);
           store.start();
         }
-        TimelineDataManager tdm = new TimelineDataManager(store,
-            aclManager);
-        tdm.init(config);
-        tdm.start();
-        List<LogInfo> removeList = new ArrayList<LogInfo>();
-        for (LogInfo log : appLogs.getDetailLogs()) {
-          LOG.debug("Try refresh logs for {}", log.getFilename());
-          // Only refresh the log that matches the cache id
-          if (log.matchesGroupId(groupId)) {
-            Path appDirPath = appLogs.getAppDirPath();
-            if (fs.exists(log.getPath(appDirPath))) {
-              LOG.debug("Refresh logs for cache id {}", groupId);
-              log.parseForStore(tdm, appDirPath, appLogs.isDone(), jsonFactory,
-                  objMapper, fs);
-            } else {
-              // The log may have been removed, remove the log
-              removeList.add(log);
-              LOG.info("File {} no longer exists, remove it from log list",
-                  log.getPath(appDirPath));
+        List<LogInfo> removeList = new ArrayList<>();
+        try(TimelineDataManager tdm =
+                new TimelineDataManager(store, aclManager)) {
+          tdm.init(config);
+          tdm.start();
+          for (LogInfo log : appLogs.getDetailLogs()) {
+            LOG.debug("Try refresh logs for {}", log.getFilename());
+            // Only refresh the log that matches the cache id
+            if (log.matchesGroupId(groupId)) {
+              Path appDirPath = appLogs.getAppDirPath();
+              if (fs.exists(log.getPath(appDirPath))) {
+                LOG.debug("Refresh logs for cache id {}", groupId);
+                log.parseForStore(tdm, appDirPath, appLogs.isDone(),
+                    jsonFactory, objMapper, fs);
+              } else {
+                // The log may have been removed, remove the log
+                removeList.add(log);
+                LOG.info("File {} no longer exists, removing it from log list",
+                    log.getPath(appDirPath));
+              }
             }
           }
         }
         appLogs.getDetailLogs().removeAll(removeList);
-        tdm.close();
       }
       updateRefreshTimeToNow();
     } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
index b1fbd13..34a2072 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
@@ -26,7 +26,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.ServiceOperations;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -55,6 +56,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -71,12 +73,13 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Plugin timeline storage to support timeline server v1.5 API. This storage
  * uses a file system to store timeline entities in their groups.
  */
-public class EntityGroupFSTimelineStore extends AbstractService
+public class EntityGroupFSTimelineStore extends CompositeService
     implements TimelineStore {
 
   static final String DOMAIN_LOG_PREFIX = "domainlog-";
@@ -110,6 +113,7 @@ public class EntityGroupFSTimelineStore extends AbstractService
   private ConcurrentMap<ApplicationId, AppLogs> appIdLogMap =
       new ConcurrentHashMap<ApplicationId, AppLogs>();
   private ScheduledThreadPoolExecutor executor;
+  private AtomicBoolean stopExecutors = new AtomicBoolean(false);
   private FileSystem fs;
   private ObjectMapper objMapper;
   private JsonFactory jsonFactory;
@@ -128,7 +132,8 @@ public class EntityGroupFSTimelineStore extends AbstractService
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     summaryStore = createSummaryStore();
-    summaryStore.init(conf);
+    addService(summaryStore);
+
     long logRetainSecs = conf.getLong(
         YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS,
         YarnConfiguration
@@ -170,17 +175,28 @@ public class EntityGroupFSTimelineStore extends AbstractService
       });
     cacheIdPlugins = loadPlugIns(conf);
     // Initialize yarn client for application status
-    yarnClient = YarnClient.createYarnClient();
-    yarnClient.init(conf);
+    yarnClient = createAndInitYarnClient(conf);
+    // if non-null, hook its lifecycle up
+    addIfService(yarnClient);
+    activeRootPath = new Path(conf.get(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
+        YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
+    doneRootPath = new Path(conf.get(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
+        YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT));
+    fs = activeRootPath.getFileSystem(conf);
     super.serviceInit(conf);
   }
 
   private List<TimelineEntityGroupPlugin> loadPlugIns(Configuration conf)
       throws RuntimeException {
-    Collection<String> pluginNames = conf.getStringCollection(
+    Collection<String> pluginNames = conf.getTrimmedStringCollection(
         YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES);
     List<TimelineEntityGroupPlugin> pluginList
         = new LinkedList<TimelineEntityGroupPlugin>();
+    Exception caught = null;
     for (final String name : pluginNames) {
       LOG.debug("Trying to load plugin class {}", name);
       TimelineEntityGroupPlugin cacheIdPlugin = null;
@@ -191,10 +207,11 @@ public class EntityGroupFSTimelineStore extends AbstractService
                 clazz, conf);
       } catch (Exception e) {
         LOG.warn("Error loading plugin " + name, e);
+        caught = e;
       }
 
       if (cacheIdPlugin == null) {
-        throw new RuntimeException("No class defined for " + name);
+        throw new RuntimeException("No class defined for " + name, caught);
       }
       LOG.info("Load plugin class {}", cacheIdPlugin.getClass().getName());
       pluginList.add(cacheIdPlugin);
@@ -210,8 +227,9 @@ public class EntityGroupFSTimelineStore extends AbstractService
 
   @Override
   protected void serviceStart() throws Exception {
+
+    super.serviceStart();
     LOG.info("Starting {}", getName());
-    yarnClient.start();
     summaryStore.start();
 
     Configuration conf = getConfig();
@@ -219,16 +237,10 @@ public class EntityGroupFSTimelineStore extends AbstractService
     aclManager.setTimelineStore(summaryStore);
     summaryTdm = new TimelineDataManager(summaryStore, aclManager);
     summaryTdm.init(conf);
-    summaryTdm.start();
-    activeRootPath = new Path(conf.get(
-        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
-        YarnConfiguration
-            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
-    doneRootPath = new Path(conf.get(
-        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
-        YarnConfiguration
-            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT));
-    fs = activeRootPath.getFileSystem(conf);
+    addService(summaryTdm);
+    // start child services that aren't already started
+    super.serviceStart();
+
     if (!fs.exists(activeRootPath)) {
       fs.mkdirs(activeRootPath);
       fs.setPermission(activeRootPath, ACTIVE_DIR_PERMISSION);
@@ -257,7 +269,8 @@ public class EntityGroupFSTimelineStore extends AbstractService
         YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS,
         YarnConfiguration
             .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT);
-    LOG.info("Scanning active directory every {} seconds", scanIntervalSecs);
+    LOG.info("Scanning active directory {} every {} seconds", activeRootPath,
+        scanIntervalSecs);
     LOG.info("Cleaning logs every {} seconds", cleanerIntervalSecs);
 
     executor = new ScheduledThreadPoolExecutor(numThreads,
@@ -267,12 +280,12 @@ public class EntityGroupFSTimelineStore extends AbstractService
         TimeUnit.SECONDS);
     executor.scheduleAtFixedRate(new EntityLogCleaner(), cleanerIntervalSecs,
         cleanerIntervalSecs, TimeUnit.SECONDS);
-    super.serviceStart();
   }
 
   @Override
   protected void serviceStop() throws Exception {
     LOG.info("Stopping {}", getName());
+    stopExecutors.set(true);
     if (executor != null) {
       executor.shutdown();
       if (executor.isTerminating()) {
@@ -286,18 +299,9 @@ public class EntityGroupFSTimelineStore extends AbstractService
         }
       }
     }
-    if (summaryTdm != null) {
-      summaryTdm.stop();
-    }
-    if (summaryStore != null) {
-      summaryStore.stop();
-    }
-    if (yarnClient != null) {
-      yarnClient.stop();
-    }
     synchronized (cachedLogs) {
       for (EntityCacheItem cacheItem : cachedLogs.values()) {
-        cacheItem.getStore().close();
+        ServiceOperations.stopQuietly(cacheItem.getStore());
       }
     }
     super.serviceStop();
@@ -305,17 +309,34 @@ public class EntityGroupFSTimelineStore extends AbstractService
 
   @InterfaceAudience.Private
   @VisibleForTesting
-  void scanActiveLogs() throws IOException {
-    RemoteIterator<FileStatus> iter = fs.listStatusIterator(activeRootPath);
+  int scanActiveLogs() throws IOException {
+    RemoteIterator<FileStatus> iter = list(activeRootPath);
+    int logsToScanCount = 0;
     while (iter.hasNext()) {
       FileStatus stat = iter.next();
-      ApplicationId appId = parseApplicationId(stat.getPath().getName());
+      String name = stat.getPath().getName();
+      ApplicationId appId = parseApplicationId(name);
       if (appId != null) {
         LOG.debug("scan logs for {} in {}", appId, stat.getPath());
+        logsToScanCount++;
         AppLogs logs = getAndSetActiveLog(appId, stat.getPath());
         executor.execute(new ActiveLogParser(logs));
+      } else {
+        LOG.debug("Unable to parse entry {}", name);
       }
     }
+    return logsToScanCount;
+  }
+
+  /**
+   * List a directory, returning an iterator which will fail fast if this
+   * service has been stopped
+   * @param path path to list
+   * @return an iterator over the contents of the directory
+   * @throws IOException
+   */
+  private RemoteIterator<FileStatus> list(Path path) throws IOException {
+    return new StoppableRemoteIterator(fs.listStatusIterator(path));
   }
 
   private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId,
@@ -377,11 +398,11 @@ public class EntityGroupFSTimelineStore extends AbstractService
    */
   @InterfaceAudience.Private
   @VisibleForTesting
-  static void cleanLogs(Path dirpath, FileSystem fs, long retainMillis)
+  void cleanLogs(Path dirpath, FileSystem fs, long retainMillis)
       throws IOException {
     long now = Time.now();
     // Depth first search from root directory for all application log dirs
-    RemoteIterator<FileStatus> iter = fs.listStatusIterator(dirpath);
+    RemoteIterator<FileStatus> iter = list(dirpath);
     while (iter.hasNext()) {
       FileStatus stat = iter.next();
       if (stat.isDirectory()) {
@@ -456,7 +477,42 @@ public class EntityGroupFSTimelineStore extends AbstractService
             bucket1, bucket2, appId.toString()));
   }
 
-  // This method has to be synchronized to control traffic to RM
+  /**
+   * Create and initialize the YARN Client. Tests may override/mock this.
+   * If they return null, then {@link #getAppState(ApplicationId)} MUST
+   * also be overridden
+   * @param conf configuration
+   * @return the yarn client, or null.
+   *
+   */
+  @VisibleForTesting
+  protected YarnClient createAndInitYarnClient(Configuration conf) {
+    YarnClient client = YarnClient.createYarnClient();
+    client.init(conf);
+    return client;
+  }
+
+  /**
+   * Get the application state.
+   * @param appId application ID
+   * @return the state or {@link AppState#UNKNOWN} if it could not
+   * be determined
+   * @throws IOException on IO problems
+   */
+  @VisibleForTesting
+  protected AppState getAppState(ApplicationId appId) throws IOException {
+    return getAppState(appId, yarnClient);
+  }
+
+  /**
+   * Ask the RM for the state of the application.
+   * This method has to be synchronized to control traffic to RM
+   * @param appId application ID
+   * @param yarnClient
+   * @return the state or {@link AppState#UNKNOWN} if it could not
+   * be determined
+   * @throws IOException
+   */
   private static synchronized AppState getAppState(ApplicationId appId,
       YarnClient yarnClient) throws IOException {
     AppState appState = AppState.ACTIVE;
@@ -474,9 +530,12 @@ public class EntityGroupFSTimelineStore extends AbstractService
     return appState;
   }
 
+  /**
+   * Application states,
+   */
   @InterfaceAudience.Private
   @VisibleForTesting
-  enum AppState {
+  public enum AppState {
     ACTIVE,
     UNKNOWN,
     COMPLETED
@@ -526,7 +585,7 @@ public class EntityGroupFSTimelineStore extends AbstractService
       if (!isDone()) {
         LOG.debug("Try to parse summary log for log {} in {}",
             appId, appDirPath);
-        appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient);
+        appState = getAppState(appId);
         long recentLogModTime = scanForLogs();
         if (appState == AppState.UNKNOWN) {
           if (Time.now() - recentLogModTime > unknownActiveMillis) {
@@ -559,8 +618,7 @@ public class EntityGroupFSTimelineStore extends AbstractService
     long scanForLogs() throws IOException {
       LOG.debug("scanForLogs on {}", appDirPath);
       long newestModTime = 0;
-      RemoteIterator<FileStatus> iterAttempt =
-          fs.listStatusIterator(appDirPath);
+      RemoteIterator<FileStatus> iterAttempt = list(appDirPath);
       while (iterAttempt.hasNext()) {
         FileStatus statAttempt = iterAttempt.next();
         LOG.debug("scanForLogs on {}", statAttempt.getPath().getName());
@@ -572,8 +630,7 @@ public class EntityGroupFSTimelineStore extends AbstractService
           continue;
         }
         String attemptDirName = statAttempt.getPath().getName();
-        RemoteIterator<FileStatus> iterCache
-            = fs.listStatusIterator(statAttempt.getPath());
+        RemoteIterator<FileStatus> iterCache = list(statAttempt.getPath());
         while (iterCache.hasNext()) {
           FileStatus statCache = iterCache.next();
           if (!statCache.isFile()) {
@@ -659,14 +716,34 @@ public class EntityGroupFSTimelineStore extends AbstractService
     }
   }
 
+  /**
+   * Extract any nested throwable forwarded from IPC operations.
+   * @param e exception
+   * @return either the exception passed an an argument, or any nested
+   * exception which was wrapped inside an {@link UndeclaredThrowableException}
+   */
+  private Throwable extract(Exception e) {
+    Throwable t = e;
+    if (e instanceof UndeclaredThrowableException && e.getCause() != null) {
+      t = e.getCause();
+    }
+    return t;
+  }
+
   private class EntityLogScanner implements Runnable {
     @Override
     public void run() {
       LOG.debug("Active scan starting");
       try {
-        scanActiveLogs();
+        int scanned = scanActiveLogs();
+        LOG.debug("Scanned {} active applications", scanned);
       } catch (Exception e) {
-        LOG.error("Error scanning active files", e);
+        Throwable t = extract(e);
+        if (t instanceof InterruptedException) {
+          LOG.info("File scanner interrupted");
+        } else {
+          LOG.error("Error scanning active files", t);
+        }
       }
       LOG.debug("Active scan complete");
     }
@@ -690,7 +767,12 @@ public class EntityGroupFSTimelineStore extends AbstractService
         }
         LOG.debug("End parsing summary logs. ");
       } catch (Exception e) {
-        LOG.error("Error processing logs for " + appLogs.getAppId(), e);
+        Throwable t = extract(e);
+        if (t instanceof InterruptedException) {
+          LOG.info("Log parser interrupted");
+        } else {
+          LOG.error("Error processing logs for " + appLogs.getAppId(), t);
+        }
       }
     }
   }
@@ -702,7 +784,12 @@ public class EntityGroupFSTimelineStore extends AbstractService
       try {
         cleanLogs(doneRootPath, fs, logRetainMillis);
       } catch (Exception e) {
-        LOG.error("Error cleaning files", e);
+        Throwable t = extract(e);
+        if (t instanceof InterruptedException) {
+          LOG.info("Cleaner interrupted");
+        } else {
+          LOG.error("Error cleaning files", e);
+        }
       }
       LOG.debug("Cleaner finished");
     }
@@ -892,4 +979,29 @@ public class EntityGroupFSTimelineStore extends AbstractService
   public void put(TimelineDomain domain) throws IOException {
     summaryStore.put(domain);
   }
+
+  /**
+   * This is a special remote iterator whose {@link #hasNext()} method
+   * returns false if {@link #stopExecutors} is true.
+   *
+   * This provides an implicit shutdown of all iterative file list and scan
+   * operations without needing to implement it in the while loops themselves.
+   */
+  private class StoppableRemoteIterator implements RemoteIterator<FileStatus> {
+    private final RemoteIterator<FileStatus> remote;
+
+    public StoppableRemoteIterator(RemoteIterator<FileStatus> remote) {
+      this.remote = remote;
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+      return !stopExecutors.get() && remote.hasNext();
+    }
+
+    @Override
+    public FileStatus next() throws IOException {
+      return remote.next();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
index 4caed8d..bc80175 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timeline;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
@@ -103,7 +104,8 @@ abstract class LogInfo {
     LOG.debug("Parsing for log dir {} on attempt {}", appDirPath,
         attemptDirName);
     Path logPath = getPath(appDirPath);
-    if (fs.exists(logPath)) {
+    FileStatus status = fs.getFileStatus(logPath);
+    if (status != null) {
       long startTime = Time.monotonicNow();
       try {
         LOG.debug("Parsing {} at offset {}", logPath, offset);
@@ -112,8 +114,11 @@ abstract class LogInfo {
         LOG.info("Parsed {} entities from {} in {} msec",
             count, logPath, Time.monotonicNow() - startTime);
       } catch (RuntimeException e) {
-        if (e.getCause() instanceof JsonParseException) {
-          // If AppLogs cannot parse this log, it may be corrupted
+        // If AppLogs cannot parse this log, it may be corrupted or just empty
+        if (e.getCause() instanceof JsonParseException &&
+            (status.getLen() > 0 || offset > 0)) {
+          // log on parse problems if the file as been read in the past or
+          // is visibly non-empty
           LOG.info("Log {} appears to be corrupted. Skip. ", logPath);
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
index e43b705..3e5bc06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
@@ -116,14 +116,14 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils
{
           EntityGroupPlugInForTest.class.getName());
     }
     store.init(config);
-    store.start();
     store.setFs(fs);
+    store.start();
   }
 
   @After
   public void tearDown() throws Exception {
-    fs.delete(TEST_APP_DIR_PATH, true);
     store.stop();
+    fs.delete(TEST_APP_DIR_PATH, true);
   }
 
   @AfterClass
@@ -222,7 +222,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils
{
     fs.mkdirs(dirPathEmpty);
 
     // Should retain all logs after this run
-    EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 10000);
+    store.cleanLogs(TEST_DONE_DIR_PATH, fs, 10000);
     assertTrue(fs.exists(irrelevantDirPath));
     assertTrue(fs.exists(irrelevantFilePath));
     assertTrue(fs.exists(filePath));
@@ -239,7 +239,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils
{
     // Touch the third application by creating a new dir
     fs.mkdirs(new Path(dirPathHold, "holdByMe"));
 
-    EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 1000);
+    store.cleanLogs(TEST_DONE_DIR_PATH, fs, 1000);
 
     // Verification after the second cleaner call
     assertTrue(fs.exists(irrelevantDirPath));


Mime
View raw message