tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From haris...@apache.org
Subject [2/2] tez git commit: TEZ-3915. Create protobuf based history event logger. (Harish Jaiprakash, reviewed by Gunther Hagleitner)
Date Fri, 20 Apr 2018 08:16:09 GMT
TEZ-3915. Create protobuf based history event logger. (Harish Jaiprakash, reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/24b872a7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/24b872a7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/24b872a7

Branch: refs/heads/master
Commit: 24b872a7fc7a9bca11cbf4b5da80198386485547
Parents: 871ea80
Author: Harish JP <harishjp@gmail.com>
Authored: Fri Apr 20 13:41:37 2018 +0530
Committer: Harish JP <harishjp@gmail.com>
Committed: Fri Apr 20 13:41:37 2018 +0530

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 .../apache/tez/dag/api/TezConfiguration.java    |  21 +
 tez-dist/pom.xml                                |  10 +
 tez-plugins/pom.xml                             |   2 +
 .../findbugs-exclude.xml                        |  50 ++
 tez-plugins/tez-protobuf-history-plugin/pom.xml |  92 +++
 .../logging/proto/DagManifesFileScanner.java    | 150 ++++
 .../logging/proto/DatePartitionedLogger.java    | 167 +++++
 .../proto/HistoryEventProtoConverter.java       | 498 +++++++++++++
 .../proto/ProtoHistoryLoggingService.java       | 211 ++++++
 .../logging/proto/ProtoMessageReader.java       |  66 ++
 .../logging/proto/ProtoMessageWritable.java     | 101 +++
 .../logging/proto/ProtoMessageWriter.java       |  71 ++
 .../history/logging/proto/TezProtoLoggers.java  |  64 ++
 .../src/main/proto/HistoryLogger.proto          |  49 ++
 .../proto/TestDagManifestFileScanner.java       | 118 +++
 .../proto/TestHistoryEventProtoConverter.java   | 716 +++++++++++++++++++
 .../proto/TestProtoHistoryLoggingService.java   | 195 +++++
 18 files changed, 2582 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 34240cf..16745f0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
   <properties>
     <maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
     <clover.license>${user.home}/clover.license</clover.license>
-    <hadoop.version>2.7.0</hadoop.version>
+    <hadoop.version>2.7.2</hadoop.version>
     <jetty.version>9.3.22.v20171030</jetty.version>
     <netty.version>3.6.2.Final</netty.version>
     <pig.version>0.13.0</pig.version>

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 6d3050d..243f278 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1439,6 +1439,27 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS_DEFAULT = 10;
 
   /**
+   * String value. The base directory into which history data will be written when proto history
+   * logging service is used for {@link TezConfiguration#TEZ_HISTORY_LOGGING_SERVICE_CLASS}.
+   * If this is not set, then logging is disabled for ProtoHistoryLoggingService.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty
+  public static final String TEZ_HISTORY_LOGGING_PROTO_BASE_DIR =
+      TEZ_PREFIX + "history.logging.proto-base-dir";
+
+  /**
+   * Long value. The amount of time in seconds to wait to ensure all events for a day is synced
+   * to disk. This should be maximum time variation b/w machines + maximum time to sync file
+   * content and metadata.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="long")
+  public static final String TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS =
+      TEZ_PREFIX + "history.logging.proto-sync-window-secs";
+  public static final long TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT = 60L;
+
+  /**
    * Int value. Time, in milliseconds, to wait while flushing YARN ATS data during shutdown.
    * Expert level setting.
    */

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml
index 854a548..9447fe7 100644
--- a/tez-dist/pom.xml
+++ b/tez-dist/pom.xml
@@ -69,6 +69,11 @@
         </dependency>
         <dependency>
           <groupId>org.apache.tez</groupId>
+          <artifactId>tez-protobuf-history-plugin</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.tez</groupId>
           <artifactId>hadoop-shim-2.7</artifactId>
           <version>${project.version}</version>
         </dependency>
@@ -97,6 +102,11 @@
         </dependency>
         <dependency>
           <groupId>org.apache.tez</groupId>
+          <artifactId>tez-protobuf-history-plugin</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.tez</groupId>
           <artifactId>hadoop-shim-2.8</artifactId>
           <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/pom.xml b/tez-plugins/pom.xml
index fb0691a..f90cae7 100644
--- a/tez-plugins/pom.xml
+++ b/tez-plugins/pom.xml
@@ -35,6 +35,7 @@
         </property>
       </activation>
       <modules>
+        <module>tez-protobuf-history-plugin</module>
         <module>tez-yarn-timeline-history</module>
         <module>tez-yarn-timeline-history-with-acls</module>
         <module>tez-history-parser</module>
@@ -47,6 +48,7 @@
         <activeByDefault>false</activeByDefault>
       </activation>
       <modules>
+        <module>tez-protobuf-history-plugin</module>
         <module>tez-yarn-timeline-history</module>
         <module>tez-yarn-timeline-history-with-acls</module>
         <module>tez-yarn-timeline-cache-plugin</module>

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-protobuf-history-plugin/findbugs-exclude.xml b/tez-plugins/tez-protobuf-history-plugin/findbugs-exclude.xml
new file mode 100644
index 0000000..c91265d
--- /dev/null
+++ b/tez-plugins/tez-protobuf-history-plugin/findbugs-exclude.xml
@@ -0,0 +1,50 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<FindBugsFilter>
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.history\.logging\.proto\.HistoryLoggerProtos\$KVPair"/>
+    <Field name="unknownFields"/>
+    <Bug pattern="SE_BAD_FIELD"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.history\.logging\.proto\.HistoryLoggerProtos\$.*Proto"/>
+    <Field name="unknownFields"/>
+    <Bug pattern="SE_BAD_FIELD"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.history\.logging\.proto\.HistoryLoggerProtos\$KVPair"/>
+    <Field name="PARSER"/>
+    <Bug pattern="MS_SHOULD_BE_FINAL"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.history\.logging\.proto\.HistoryLoggerProtos\$.*Proto"/>
+    <Field name="PARSER"/>
+    <Bug pattern="MS_SHOULD_BE_FINAL"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.history\.logging\.proto\.HistoryLoggerProtos\$KVPair\$Builder"/>
+    <Method name="maybeForceBuilderInitialization"/>
+    <Bug pattern="UCF_USELESS_CONTROL_FLOW"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.history\.logging\.proto\.HistoryLoggerProtos\$.*Proto\$Builder"/>
+    <Method name="maybeForceBuilderInitialization"/>
+    <Bug pattern="UCF_USELESS_CONTROL_FLOW"/>
+  </Match>
+</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-protobuf-history-plugin/pom.xml b/tez-plugins/tez-protobuf-history-plugin/pom.xml
new file mode 100644
index 0000000..880aca9
--- /dev/null
+++ b/tez-plugins/tez-protobuf-history-plugin/pom.xml
@@ -0,0 +1,92 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez-plugins</artifactId>
+    <version>0.9.2-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-protobuf-history-plugin</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>HistoryLogger.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-sources/java</output>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DagManifesFileScanner.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DagManifesFileScanner.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DagManifesFileScanner.java
new file mode 100644
index 0000000..c8ea02f
--- /dev/null
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DagManifesFileScanner.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.ManifestEntryProto;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Helper class to scan all the dag manifest files to get manifest entries.
+ */
+public class DagManifesFileScanner implements Closeable {
+  private static final int OFFSET_VERSION = 1;
+
+  private final ObjectMapper mapper = new ObjectMapper();
+  private final DatePartitionedLogger<ManifestEntryProto> manifestLogger;
+  private final long syncTime;
+
+  private String scanDir;
+  private Map<String, Long> offsets;
+  private List<Path> newFiles;
+
+  private ProtoMessageReader<ManifestEntryProto> reader;
+
+  public DagManifesFileScanner(DatePartitionedLogger<ManifestEntryProto> manifestLogger) {
+    this.manifestLogger = manifestLogger;
+    this.syncTime = manifestLogger.getConfig().getLong(
+        TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS,
+        TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT);
+    this.setOffset(LocalDate.ofEpochDay(0));
+  }
+
+  // All public to simplify json conversion.
+  public static class DagManifestOffset {
+    public int version;
+    public String scanDir;
+    public Map<String, Long> offsets;
+  }
+
+  public void setOffset(String offset) {
+    try {
+      DagManifestOffset dagOffset = mapper.readValue(offset, DagManifestOffset.class);
+      if (dagOffset.version != OFFSET_VERSION) {
+        throw new IllegalArgumentException("Version mismatch: " + dagOffset.version);
+      }
+      this.scanDir = dagOffset.scanDir;
+      this.offsets = dagOffset.offsets;
+      this.newFiles = new ArrayList<>();
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Invalid offset", e);
+    }
+  }
+
+  public void setOffset(LocalDate date) {
+    this.scanDir = manifestLogger.getDirForDate(date);
+    this.offsets = new HashMap<>();
+    this.newFiles = new ArrayList<>();
+  }
+
+  public String getOffset() {
+    try {
+      DagManifestOffset offset = new DagManifestOffset();
+      offset.version = OFFSET_VERSION;
+      offset.scanDir = scanDir;
+      offset.offsets = offsets;
+      return mapper.writeValueAsString(offset);
+    } catch (IOException e) {
+      throw new RuntimeException("Unexpected exception while converting to json.", e);
+    }
+  }
+
+  public ManifestEntryProto getNext() throws IOException {
+    while (true) {
+      if (reader != null) {
+        ManifestEntryProto evt = reader.readEvent();
+        if (evt != null) {
+          offsets.put(reader.getFilePath().getName(), reader.getOffset());
+          return evt;
+        } else {
+          IOUtils.closeQuietly(reader);
+          reader = null;
+        }
+      }
+      if (!newFiles.isEmpty()) {
+        this.reader = manifestLogger.getReader(newFiles.remove(0));
+      } else {
+        if (!loadMore()) {
+          return null;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (reader != null) {
+      reader.close();
+      reader = null;
+    }
+  }
+
+  private boolean loadMore() throws IOException {
+    newFiles = manifestLogger.scanForChangedFiles(scanDir, offsets);
+    while (newFiles.isEmpty()) {
+      LocalDateTime utcNow = manifestLogger.getNow();
+      if (utcNow.getHour() * 3600 + utcNow.getMinute() * 60 + utcNow.getSecond() < syncTime) {
+        // We are in the delay window for today, do not advance date if we are moving from
+        // yesterday.
+        String yesterDir = manifestLogger.getDirForDate(utcNow.toLocalDate().minusDays(1));
+        if (yesterDir.equals(scanDir)) {
+          return false;
+        }
+      }
+      String nextDir = manifestLogger.getNextDirectory(scanDir);
+      if (nextDir == null) {
+        return false;
+      }
+      scanDir = nextDir;
+      offsets = new HashMap<>();
+      newFiles = manifestLogger.scanForChangedFiles(scanDir, offsets);
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
new file mode 100644
index 0000000..8f89b2e
--- /dev/null
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.util.Clock;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+/**
+ * Class to create proto reader and writer for a date partitioned directory structure.
+ *
+ * @param <T> The proto message type.
+ */
+public class DatePartitionedLogger<T extends MessageLite> {
+  // Everyone has permission to write, but with sticky set so that delete is restricted.
+  // This is required, since the path is same for all users and everyone writes into it.
+  private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777);
+
+  private final Parser<T> parser;
+  private final Path basePath;
+  private final Configuration conf;
+  private final Clock clock;
+  private final FileSystem fileSystem;
+
+  public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock)
+      throws IOException {
+    this.conf = conf;
+    this.clock = clock;
+    this.parser = parser;
+    this.fileSystem = baseDir.getFileSystem(conf);
+    if (!fileSystem.exists(baseDir)) {
+      fileSystem.mkdirs(baseDir);
+      fileSystem.setPermission(baseDir, DIR_PERMISSION);
+    }
+    this.basePath = fileSystem.resolvePath(baseDir);
+  }
+
+  /**
+   * Creates a writer for the given fileName, with date as today.
+   */
+  public ProtoMessageWriter<T> getWriter(String fileName) throws IOException {
+    Path filePath = getPathForDate(getNow().toLocalDate(), fileName);
+    return new ProtoMessageWriter<>(conf, filePath, parser);
+  }
+
+  /**
+   * Creates a reader for the given filePath, no validation is done.
+   */
+  public ProtoMessageReader<T> getReader(Path filePath) throws IOException {
+    return new ProtoMessageReader<>(conf, filePath, parser);
+  }
+
+  /**
+   * Create a path for the given date and fileName. This can be used to create a reader.
+   */
+  public Path getPathForDate(LocalDate date, String fileName) throws IOException {
+    Path path = new Path(basePath, getDirForDate(date));
+    if (!fileSystem.exists(path)) {
+      fileSystem.mkdirs(path);
+      fileSystem.setPermission(path, DIR_PERMISSION);
+    }
+    return new Path(path, fileName);
+  }
+
+  /**
+   * Extract the date from the directory name, this should be a directory created by this class.
+   */
+  public LocalDate getDateFromDir(String dirName) {
+    if (!dirName.startsWith("date=")) {
+      throw new IllegalArgumentException("Invalid directory: "+ dirName);
+    }
+    return LocalDate.parse(dirName.substring(5), DateTimeFormatter.ISO_LOCAL_DATE);
+  }
+
+  /**
+   * Returns the directory name for a given date.
+   */
+  public String getDirForDate(LocalDate date) {
+    return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date);
+  }
+
+  /**
+   * Find next available directory, after the given directory.
+   */
+  public String getNextDirectory(String currentDir) throws IOException {
+    // Fast check, if the next day directory exists return it.
+    String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1));
+    if (fileSystem.exists(new Path(basePath, nextDate))) {
+      return nextDate;
+    }
+    // Have to scan the directory to find min date greater than currentDir.
+    String dirName = null;
+    for (FileStatus status : fileSystem.listStatus(basePath)) {
+      String name = status.getPath().getName();
+      // String comparison is good enough, since its of form date=yyyy-MM-dd
+      if (name.compareTo(currentDir) > 0 && (dirName == null || name.compareTo(dirName) < 0)) {
+        dirName = name;
+      }
+    }
+    return dirName;
+  }
+
+  /**
+   * Returns new or changed files in the given directory. The offsets are used to find
+   * changed files.
+   */
+  public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets)
+      throws IOException {
+    Path dirPath = new Path(basePath, subDir);
+    List<Path> newFiles = new ArrayList<>();
+    if (!fileSystem.exists(dirPath)) {
+      return newFiles;
+    }
+    for (FileStatus status : fileSystem.listStatus(dirPath)) {
+      String fileName = status.getPath().getName();
+      Long offset = currentOffsets.get(fileName);
+      // If the offset was never added or offset < fileSize.
+      if (offset == null || offset < status.getLen()) {
+        newFiles.add(new Path(dirPath, fileName));
+      }
+    }
+    return newFiles;
+  }
+
+  /**
+   * Returns the current time, using the underlying clock in UTC time.
+   */
+  public LocalDateTime getNow() {
+    // Use UTC date to ensure reader date is same on all timezones.
+    return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC);
+  }
+
+  public Configuration getConfig() {
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java
new file mode 100644
index 0000000..44dccb6
--- /dev/null
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
+import org.apache.tez.dag.app.web.AMWebController;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGRecoveredEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto;
+import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.KVPair;
+import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert history event into HistoryEventProto message.
+ */
+public class HistoryEventProtoConverter {
+  private static final Logger log =
+      LoggerFactory.getLogger(HistoryEventProtoConverter.class);
+
+  private final ObjectMapper mapper = new ObjectMapper();
+
+  /**
+   * Convert a given history event to HistoryEventProto message.
+   */
+  public HistoryEventProto convert(HistoryEvent historyEvent) {
+    validateEvent(historyEvent);
+    switch (historyEvent.getEventType()) {
+    case APP_LAUNCHED:
+      return convertAppLaunchedEvent((AppLaunchedEvent) historyEvent);
+    case AM_LAUNCHED:
+      return convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
+    case AM_STARTED:
+      return convertAMStartedEvent((AMStartedEvent) historyEvent);
+    case CONTAINER_LAUNCHED:
+      return convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
+    case CONTAINER_STOPPED:
+      return convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
+    case DAG_SUBMITTED:
+      return convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
+    case DAG_INITIALIZED:
+      return convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
+    case DAG_STARTED:
+      return convertDAGStartedEvent((DAGStartedEvent) historyEvent);
+    case DAG_FINISHED:
+      return convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
+    case VERTEX_INITIALIZED:
+      return convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
+    case VERTEX_STARTED:
+      return convertVertexStartedEvent((VertexStartedEvent) historyEvent);
+    case VERTEX_FINISHED:
+      return convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
+    case TASK_STARTED:
+      return convertTaskStartedEvent((TaskStartedEvent) historyEvent);
+    case TASK_FINISHED:
+      return convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
+    case TASK_ATTEMPT_STARTED:
+      return convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
+    case TASK_ATTEMPT_FINISHED:
+      return convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
+    case VERTEX_CONFIGURE_DONE:
+      return convertVertexReconfigureDoneEvent((VertexConfigurationDoneEvent) historyEvent);
+    case DAG_RECOVERED:
+      return convertDAGRecoveredEvent((DAGRecoveredEvent) historyEvent);
+    case VERTEX_COMMIT_STARTED:
+    case VERTEX_GROUP_COMMIT_STARTED:
+    case VERTEX_GROUP_COMMIT_FINISHED:
+    case DAG_COMMIT_STARTED:
+    case DAG_KILL_REQUEST:
+      throw new UnsupportedOperationException("Invalid Event, does not support history, eventType="
+          + historyEvent.getEventType());
+      // Do not add default, if a new event type is added, we'll get a warning for the
+      // switch.
+    }
+    throw new UnsupportedOperationException(
+        "Unhandled Event, eventType=" + historyEvent.getEventType());
+  }
+
+  private void validateEvent(HistoryEvent event) {
+    if (!event.isHistoryEvent()) {
+      throw new UnsupportedOperationException(
+          "Invalid Event, does not support history" + ", eventType=" + event.getEventType());
+    }
+  }
+
+  private HistoryEventProto.Builder makeBuilderForEvent(HistoryEvent event, long time,
+      TezDAGID dagId, ApplicationId appId, ApplicationAttemptId appAttemptId, TezVertexID vertexId,
+      TezTaskID taskId, TezTaskAttemptID taskAttemptId, String user) {
+    HistoryEventProto.Builder builder = HistoryEventProto.newBuilder();
+    builder.setEventType(event.getEventType().name());
+    builder.setEventTime(time);
+    if (taskAttemptId != null) {
+      builder.setTaskAttemptId(taskAttemptId.toString());
+      taskId = taskAttemptId.getTaskID();
+    }
+    if (taskId != null) {
+      builder.setTaskId(taskId.toString());
+      vertexId = taskId.getVertexID();
+    }
+    if (vertexId != null) {
+      builder.setVertexId(vertexId.toString());
+      dagId = vertexId.getDAGId();
+    }
+    if (dagId != null) {
+      builder.setDagId(dagId.toString());
+      if (appId == null) {
+        appId = dagId.getApplicationId();
+      }
+    }
+    if (appAttemptId != null) {
+      builder.setAppAttemptId(appAttemptId.toString());
+      if (appId == null) {
+        appId = appAttemptId.getApplicationId();
+      }
+    }
+    if (appId != null) {
+      builder.setAppId(appId.toString());
+    }
+    if (user != null) {
+      builder.setUser(user);
+    }
+    return builder;
+  }
+
+  private void addEventData(HistoryEventProto.Builder builder, String key, String value) {
+    if (value == null) {
+      return;
+    }
+    builder.addEventData(KVPair.newBuilder().setKey(key).setValue(value));
+  }
+
+  private void addEventData(HistoryEventProto.Builder builder, String key, Number value) {
+    builder.addEventData(KVPair.newBuilder().setKey(key).setValue(value.toString()));
+  }
+
+  private void addEventData(HistoryEventProto.Builder builder, String key,
+      Map<String, Object> value) {
+    try {
+      builder.addEventData(
+          KVPair.newBuilder().setKey(key).setValue(mapper.writeValueAsString(value)));
+    } catch (IOException e) {
+      log.error("Error converting value for key {} to json: ", key, e);
+    }
+  }
+
+  private HistoryEventProto convertAppLaunchedEvent(AppLaunchedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getLaunchTime(), null,
+        event.getApplicationId(), null, null, null, null, event.getUser());
+    // This is ok as long as we do not modify the underlying map.
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    Map<String, Object> confMap = (Map)DAGUtils.convertConfigurationToATSMap(event.getConf());
+    addEventData(builder, ATSConstants.CONFIG, confMap);
+    if (event.getVersion() != null) {
+      addEventData(builder, ATSConstants.TEZ_VERSION,
+          DAGUtils.convertTezVersionToATSMap(event.getVersion()));
+    }
+    addEventData(builder, ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION);
+    return builder.build();
+  }
+
+  private HistoryEventProto convertAMLaunchedEvent(AMLaunchedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getLaunchTime(), null,
+        null, event.getApplicationAttemptId(), null, null, null, event.getUser());
+    addEventData(builder, ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime());
+    return builder.build();
+  }
+
+  private HistoryEventProto convertAMStartedEvent(AMStartedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(), null,
+        null, event.getApplicationAttemptId(), null, null, null, event.getUser());
+    return builder.build();
+  }
+
+  private HistoryEventProto convertContainerLaunchedEvent(ContainerLaunchedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getLaunchTime(), null,
+        null, event.getApplicationAttemptId(), null, null, null, null);
+    addEventData(builder, ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+    return builder.build();
+  }
+
+  private HistoryEventProto convertContainerStoppedEvent(ContainerStoppedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStoppedTime(), null,
+        null, event.getApplicationAttemptId(), null, null, null, null);
+    addEventData(builder, ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+    addEventData(builder, ATSConstants.EXIT_STATUS, event.getExitStatus());
+    addEventData(builder, ATSConstants.FINISH_TIME, event.getStoppedTime());
+    return builder.build();
+  }
+
+  private HistoryEventProto convertDAGSubmittedEvent(DAGSubmittedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getSubmitTime(),
+        event.getDagID(), null, event.getApplicationAttemptId(), null, null, null,
+        event.getUser());
+    addEventData(builder, ATSConstants.DAG_NAME, event.getDAGName());
+    if (event.getDAGPlan().hasCallerContext() &&
+        event.getDAGPlan().getCallerContext().hasCallerId()) {
+      CallerContextProto callerContext = event.getDagPlan().getCallerContext();
+      addEventData(builder, ATSConstants.CALLER_CONTEXT_ID, callerContext.getCallerId());
+      addEventData(builder, ATSConstants.CALLER_CONTEXT_TYPE, callerContext.getCallerType());
+      addEventData(builder, ATSConstants.CALLER_CONTEXT, callerContext.getContext());
+    }
+    if (event.getQueueName() != null) {
+      addEventData(builder, ATSConstants.DAG_QUEUE_NAME, event.getQueueName());
+    }
+    addEventData(builder, ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION);
+    addEventData(builder, ATSConstants.IN_PROGRESS_LOGS_URL + "_" +
+        event.getApplicationAttemptId().getAttemptId(), event.getContainerLogs());
+    try {
+      addEventData(builder, ATSConstants.DAG_PLAN,
+          DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+    return builder.build();
+  }
+
+  private HistoryEventProto convertDAGInitializedEvent(DAGInitializedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getInitTime(),
+        event.getDagID(), null, null, null, null, null, event.getUser());
+    addEventData(builder, ATSConstants.DAG_NAME, event.getDagName());
+
+    if (event.getVertexNameIDMap() != null) {
+      Map<String, Object> nameIdStrMap = new TreeMap<String, Object>();
+      for (Entry<String, TezVertexID> entry : event.getVertexNameIDMap().entrySet()) {
+        nameIdStrMap.put(entry.getKey(), entry.getValue().toString());
+      }
+      addEventData(builder, ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap);
+    }
+    return builder.build();
+  }
+
+  private HistoryEventProto convertDAGStartedEvent(DAGStartedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(),
+        event.getDagID(), null, null, null, null, null, event.getUser());
+
+    addEventData(builder, ATSConstants.DAG_NAME, event.getDagName());
+    addEventData(builder, ATSConstants.STATUS, event.getDagState().name());
+
+    return builder.build();
+  }
+
+  private HistoryEventProto convertDAGFinishedEvent(DAGFinishedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getFinishTime(),
+        event.getDagID(), null, event.getApplicationAttemptId(), null, null, null,
+        event.getUser());
+    addEventData(builder, ATSConstants.DAG_NAME, event.getDagName());
+    if (event.getDAGPlan().hasCallerContext()) {
+      if (event.getDAGPlan().getCallerContext().hasCallerType()) {
+        addEventData(builder, ATSConstants.CALLER_CONTEXT_TYPE,
+            event.getDAGPlan().getCallerContext().getCallerType());
+      }
+      if (event.getDAGPlan().getCallerContext().hasCallerId()) {
+        addEventData(builder, ATSConstants.CALLER_CONTEXT_ID,
+            event.getDAGPlan().getCallerContext().getCallerId());
+      }
+    }
+    addEventData(builder, ATSConstants.START_TIME, event.getStartTime());
+    addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    addEventData(builder, ATSConstants.STATUS, event.getState().name());
+    addEventData(builder, ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    addEventData(builder, ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID,
+        event.getApplicationAttemptId().toString());
+    addEventData(builder, ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+    Map<String, Integer> dagTaskStats = event.getDagTaskStats();
+    if (dagTaskStats != null) {
+      for (Entry<String, Integer> entry : dagTaskStats.entrySet()) {
+        addEventData(builder, entry.getKey(), entry.getValue());
+      }
+    }
+    return builder.build();
+  }
+
+  private HistoryEventProto convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(),
+        null, null, null, null, null, event.getTaskAttemptID(), null);
+    if (event.getInProgressLogsUrl() != null) {
+      addEventData(builder, ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+    }
+    if (event.getCompletedLogsUrl() != null) {
+      addEventData(builder, ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+    }
+    addEventData(builder, ATSConstants.NODE_ID, event.getNodeId().toString());
+    addEventData(builder, ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
+    addEventData(builder, ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+    addEventData(builder, ATSConstants.STATUS, TaskAttemptState.RUNNING.name());
+
+    return builder.build();
+  }
+
+  private HistoryEventProto convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getFinishTime(),
+        null, null, null, null, null, event.getTaskAttemptID(), null);
+
+    addEventData(builder, ATSConstants.STATUS, event.getState().name());
+    if (event.getTaskFailureType() != null) {
+      addEventData(builder, ATSConstants.TASK_FAILURE_TYPE, event.getTaskFailureType().name());
+    }
+
+    addEventData(builder, ATSConstants.CREATION_TIME, event.getCreationTime());
+    addEventData(builder, ATSConstants.ALLOCATION_TIME, event.getAllocationTime());
+    addEventData(builder, ATSConstants.START_TIME, event.getStartTime());
+    if (event.getCreationCausalTA() != null) {
+      addEventData(builder, ATSConstants.CREATION_CAUSAL_ATTEMPT,
+          event.getCreationCausalTA().toString());
+    }
+    addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    if (event.getTaskAttemptError() != null) {
+      addEventData(builder, ATSConstants.TASK_ATTEMPT_ERROR_ENUM,
+          event.getTaskAttemptError().name());
+    }
+    addEventData(builder, ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    addEventData(builder, ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getCounters()));
+    if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) {
+      addEventData(builder, ATSConstants.LAST_DATA_EVENTS,
+          DAGUtils.convertDataEventDependecyInfoToATS(event.getDataEvents()));
+    }
+    if (event.getNodeId() != null) {
+      addEventData(builder, ATSConstants.NODE_ID, event.getNodeId().toString());
+    }
+    if (event.getContainerId() != null) {
+      addEventData(builder, ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+    }
+    if (event.getInProgressLogsUrl() != null) {
+      addEventData(builder, ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+    }
+    if (event.getCompletedLogsUrl() != null) {
+      addEventData(builder, ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+    }
+    if (event.getNodeHttpAddress() != null) {
+      addEventData(builder, ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
+    }
+
+    return builder.build();
+  }
+
+  private HistoryEventProto convertTaskFinishedEvent(TaskFinishedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getFinishTime(),
+        null, null, null, null, event.getTaskID(), null, null);
+
+    addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    addEventData(builder, ATSConstants.STATUS, event.getState().name());
+    addEventData(builder, ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, event.getNumFailedAttempts());
+    if (event.getSuccessfulAttemptID() != null) {
+      addEventData(builder, ATSConstants.SUCCESSFUL_ATTEMPT_ID,
+          event.getSuccessfulAttemptID().toString());
+    }
+
+    addEventData(builder, ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    addEventData(builder, ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+
+    return builder.build();
+  }
+
+  private HistoryEventProto convertTaskStartedEvent(TaskStartedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(),
+        null, null, null, null, event.getTaskID(), null, null);
+
+    addEventData(builder, ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
+    addEventData(builder, ATSConstants.STATUS, event.getState().name());
+
+    return builder.build();
+  }
+
+  private HistoryEventProto convertVertexFinishedEvent(VertexFinishedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getFinishTime(),
+        null, null, null, event.getVertexID(), null, null, null);
+
+    addEventData(builder, ATSConstants.STATUS, event.getState().name());
+    addEventData(builder, ATSConstants.VERTEX_NAME, event.getVertexName());
+    addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    addEventData(builder, ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    addEventData(builder, ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+    addEventData(builder, ATSConstants.STATS,
+        DAGUtils.convertVertexStatsToATSMap(event.getVertexStats()));
+    if (event.getServicePluginInfo() != null) {
+      addEventData(builder, ATSConstants.SERVICE_PLUGIN,
+          DAGUtils.convertServicePluginToATSMap(event.getServicePluginInfo()));
+    }
+
+    final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
+    if (vertexTaskStats != null) {
+      for (Entry<String, Integer> entry : vertexTaskStats.entrySet()) {
+        addEventData(builder, entry.getKey(), entry.getValue());
+      }
+    }
+
+    return builder.build();
+  }
+
+  private HistoryEventProto convertVertexInitializedEvent(VertexInitializedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getInitedTime(),
+        null, null, null, event.getVertexID(), null, null, null);
+    addEventData(builder, ATSConstants.VERTEX_NAME, event.getVertexName());
+    addEventData(builder, ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime());
+    addEventData(builder, ATSConstants.NUM_TASKS, event.getNumTasks());
+    addEventData(builder, ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
+    if (event.getServicePluginInfo() != null) {
+      addEventData(builder, ATSConstants.SERVICE_PLUGIN,
+          DAGUtils.convertServicePluginToATSMap(event.getServicePluginInfo()));
+    }
+
+    return builder.build();
+  }
+
+  private HistoryEventProto convertVertexStartedEvent(VertexStartedEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(),
+        null, null, null, event.getVertexID(), null, null, null);
+    addEventData(builder, ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
+    addEventData(builder, ATSConstants.STATUS, event.getVertexState().name());
+    return builder.build();
+  }
+
+  private HistoryEventProto convertVertexReconfigureDoneEvent(VertexConfigurationDoneEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getReconfigureDoneTime(),
+        null, null, null, event.getVertexID(), null, null, null);
+    if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) {
+      Map<String, Object> updatedEdgeManagers = new HashMap<String, Object>();
+      for (Entry<String, EdgeProperty> entry : event.getSourceEdgeProperties().entrySet()) {
+        updatedEdgeManagers.put(entry.getKey(), DAGUtils.convertEdgeProperty(entry.getValue()));
+      }
+      addEventData(builder, ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
+    }
+    addEventData(builder, ATSConstants.NUM_TASKS, event.getNumTasks());
+    return builder.build();
+  }
+
+  private HistoryEventProto convertDAGRecoveredEvent(DAGRecoveredEvent event) {
+    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getRecoveredTime(),
+        event.getDagID(), null, event.getApplicationAttemptId(), null, null, null,
+        event.getUser());
+    addEventData(builder, ATSConstants.DAG_NAME, event.getDagName());
+    if (event.getRecoveredDagState() != null) {
+      addEventData(builder, ATSConstants.DAG_STATE, event.getRecoveredDagState().name());
+    }
+    if (event.getRecoveryFailureReason() != null) {
+      addEventData(builder, ATSConstants.RECOVERY_FAILURE_REASON,
+          event.getRecoveryFailureReason());
+    }
+    addEventData(builder, ATSConstants.IN_PROGRESS_LOGS_URL + "_" +
+        event.getApplicationAttemptId().getAttemptId(), event.getContainerLogs());
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
new file mode 100644
index 0000000..60cbda5
--- /dev/null
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto;
+import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.ManifestEntryProto;
+import org.apache.tez.dag.records.TezDAGID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Logging service to write history events serialized using protobuf into sequence files.
+ * This can be used as external tables in hive. Or the reader can be used independently to
+ * read the data from these files.
+ */
+public class ProtoHistoryLoggingService extends HistoryLoggingService {
+  private static final Logger LOG = LoggerFactory.getLogger(ProtoHistoryLoggingService.class);
+  private final HistoryEventProtoConverter converter =
+      new HistoryEventProtoConverter();
+  private boolean loggingDisabled = false;
+
+  private final LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+      new LinkedBlockingQueue<DAGHistoryEvent>(10000);
+  private Thread eventHandlingThread;
+  private final AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private TezProtoLoggers loggers;
+  private ProtoMessageWriter<HistoryEventProto> appEventsWriter;
+  private ProtoMessageWriter<HistoryEventProto> dagEventsWriter;
+  private TezDAGID currentDagId;
+  private long dagSubmittedEventOffset = -1;
+
+  private String appEventsFile;
+  private long appLaunchedEventOffset;
+
+  public ProtoHistoryLoggingService() {
+    super(ProtoHistoryLoggingService.class.getName());
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    LOG.info("Initing ProtoHistoryLoggingService");
+    setConfig(conf);
+    loggingDisabled = !conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
+        TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
+    LOG.info("Inited ProtoHistoryLoggingService");
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    LOG.info("Starting ProtoHistoryLoggingService");
+    if (!loggingDisabled) {
+      loggers = new TezProtoLoggers();
+      if (!loggers.setup(getConfig(), appContext.getClock())) {
+        LOG.warn("Log file location for ProtoHistoryLoggingService not specified, " +
+            "logging disabled");
+        loggingDisabled = true;
+        return;
+      }
+      appEventsWriter = loggers.getAppEventsLogger().getWriter(
+          appContext.getApplicationAttemptId().toString());
+      eventHandlingThread = new Thread(this::loop, "HistoryEventHandlingThread");
+      eventHandlingThread.start();
+    }
+    LOG.info("Started ProtoHistoryLoggingService");
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Stopping ProtoHistoryLoggingService, eventQueueBacklog=" + eventQueue.size());
+    stopped.set(true);
+    eventHandlingThread.join();
+    IOUtils.closeQuietly(appEventsWriter);
+    IOUtils.closeQuietly(dagEventsWriter);
+    LOG.info("Stopped ProtoHistoryLoggingService");
+  }
+
+  @Override
+  public void handle(DAGHistoryEvent event) {
+    if (loggingDisabled || stopped.get()) {
+      return;
+    }
+    try {
+      eventQueue.add(event);
+    } catch (IllegalStateException e) {
+      LOG.error("Queue capacity filled up, ignoring event: " +
+          event.getHistoryEvent().getEventType());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Queue capacity filled up, ignoring event: {}", event.getHistoryEvent());
+      }
+    }
+  }
+
+  private void loop() {
+    // Keep looping while the service is not stopped.
+    // Drain any left over events after the service has been stopped.
+    while (!stopped.get() || !eventQueue.isEmpty()) {
+      DAGHistoryEvent evt = null;
+      try {
+        evt = eventQueue.poll(100, TimeUnit.MILLISECONDS);
+        if (evt != null) {
+          handleEvent(evt);
+        }
+      } catch (InterruptedException e) {
+        LOG.info("EventQueue poll interrupted, ignoring it.", e);
+      } catch (IOException e) {
+        TezDAGID dagid = evt.getDagID();
+        HistoryEventType type = evt.getHistoryEvent().getEventType();
+        // Retry is hard, because there are several places where this exception can happen
+        // the state will get messed up a lot.
+        LOG.error("Got exception while handling event {} for dag {}.", type, dagid, e);
+      }
+    }
+  }
+
+  private void handleEvent(DAGHistoryEvent event) throws IOException {
+    if (loggingDisabled) {
+      return;
+    }
+    HistoryEvent historyEvent = event.getHistoryEvent();
+    if (event.getDagID() == null) {
+      if (historyEvent.getEventType() == HistoryEventType.APP_LAUNCHED) {
+        appEventsFile = appEventsWriter.getPath().toString();
+        appLaunchedEventOffset = appEventsWriter.getOffset();
+      }
+      appEventsWriter.writeProto(converter.convert(historyEvent));
+    } else {
+      HistoryEventType type = historyEvent.getEventType();
+      TezDAGID dagId = event.getDagID();
+      if (type == HistoryEventType.DAG_FINISHED) {
+        finishCurrentDag((DAGFinishedEvent)historyEvent);
+      } else if (type == HistoryEventType.DAG_SUBMITTED) {
+        finishCurrentDag(null);
+        currentDagId = dagId;
+        dagEventsWriter = loggers.getDagEventsLogger().getWriter(dagId.toString());
+        dagSubmittedEventOffset = dagEventsWriter.getOffset();
+        dagEventsWriter.writeProto(converter.convert(historyEvent));
+      } else if (dagEventsWriter != null) {
+        dagEventsWriter.writeProto(converter.convert(historyEvent));
+      }
+    }
+  }
+
+  private void finishCurrentDag(DAGFinishedEvent event) throws IOException {
+    if (dagEventsWriter == null) {
+      return;
+    }
+    ProtoMessageWriter<ManifestEntryProto> writer = null;
+    try {
+      long finishEventOffset = -1;
+      if (event != null) {
+        finishEventOffset = dagEventsWriter.getOffset();
+        dagEventsWriter.writeProto(converter.convert(event));
+      }
+      // Do not cache this writer, it should be created at the time of writing
+      writer = loggers.getManifestEventsLogger()
+          .getWriter(appContext.getApplicationAttemptId().toString());
+      ManifestEntryProto.Builder entry = ManifestEntryProto.newBuilder()
+          .setDagId(currentDagId.toString())
+          .setAppId(currentDagId.getApplicationId().toString())
+          .setDagSubmittedEventOffset(dagSubmittedEventOffset)
+          .setDagFinishedEventOffset(finishEventOffset)
+          .setDagFilePath(dagEventsWriter.getPath().toString())
+          .setAppFilePath(appEventsFile)
+          .setAppLaunchedEventOffset(appLaunchedEventOffset)
+          .setWriteTime(System.currentTimeMillis());
+      if (event != null) {
+        entry.setDagId(event.getDagID().toString());
+      }
+      writer.writeProto(entry.build());
+      appEventsWriter.hflush();
+    } finally {
+      // On an error, cleanup everything this will ensure, we do not use one dag's writer
+      // into another dag.
+      IOUtils.closeQuietly(dagEventsWriter);
+      IOUtils.closeQuietly(writer);
+      dagEventsWriter = null;
+      currentDagId = null;
+      dagSubmittedEventOffset = -1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
new file mode 100644
index 0000000..e5f5e6b
--- /dev/null
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public class ProtoMessageReader<T extends MessageLite> implements Closeable {
+  private final Path filePath;
+  private final SequenceFile.Reader reader;
+  private final ProtoMessageWritable<T> writable;
+
+  ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
+    this.filePath = filePath;
+    this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath));
+    this.writable = new ProtoMessageWritable<>(parser);
+  }
+
+  public Path getFilePath() {
+    return filePath;
+  }
+
+  public void setOffset(long offset) throws IOException {
+    reader.seek(offset);
+  }
+
+  public long getOffset() throws IOException {
+    return reader.getPosition();
+  }
+
+  public T readEvent() throws IOException {
+    if (!reader.next(NullWritable.get(), writable)) {
+      return null;
+    }
+    return writable.getMessage();
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
new file mode 100644
index 0000000..34e4701
--- /dev/null
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.Writable;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public class ProtoMessageWritable<T extends MessageLite> implements Writable {
+  private T message;
+  private final Parser<T> parser;
+  private DataOutputStream dos;
+  private CodedOutputStream cos;
+  private DataInputStream din;
+  private CodedInputStream cin;
+
+  ProtoMessageWritable(Parser<T> parser) {
+    this.parser = parser;
+  }
+
+  public T getMessage() {
+    return message;
+  }
+
+  public void setMessage(T message) {
+    this.message = message;
+  }
+
+  private static class DataOutputStream extends OutputStream {
+    DataOutput out;
+    @Override
+    public void write(int b) throws IOException {
+      out.write(b);
+    }
+
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+      out.write(b, off, len);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (dos == null) {
+      dos = new DataOutputStream();
+      cos = CodedOutputStream.newInstance(dos);
+    }
+    dos.out = out;
+    cos.writeMessageNoTag(message);
+    cos.flush();
+  }
+
+  private static class DataInputStream extends InputStream {
+    DataInput in;
+    @Override
+    public int read() throws IOException {
+      try {
+        return in.readUnsignedByte();
+      } catch (EOFException e) {
+        return -1;
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    if (din == null) {
+      din = new DataInputStream();
+      cin = CodedInputStream.newInstance(din);
+      cin.setSizeLimit(Integer.MAX_VALUE);
+    }
+    din.in = in;
+    message = cin.readMessage(parser, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
new file mode 100644
index 0000000..ca9ba61
--- /dev/null
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public class ProtoMessageWriter<T extends MessageLite> implements Closeable {
+  private final Path filePath;
+  private final SequenceFile.Writer writer;
+  private final ProtoMessageWritable<T> writable;
+
+  ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
+    this.filePath = filePath;
+    this.writer = SequenceFile.createWriter(
+        conf,
+        SequenceFile.Writer.file(filePath),
+        SequenceFile.Writer.keyClass(NullWritable.class),
+        SequenceFile.Writer.valueClass(ProtoMessageWritable.class),
+        SequenceFile.Writer.appendIfExists(true),
+        SequenceFile.Writer.compression(CompressionType.RECORD));
+    this.writable = new ProtoMessageWritable<>(parser);
+  }
+
+  public Path getPath() {
+    return filePath;
+  }
+
+  public long getOffset() throws IOException {
+    return writer.getLength();
+  }
+
+  public void writeProto(T message) throws IOException {
+    writable.setMessage(message);
+    writer.append(NullWritable.get(), writable);
+  }
+
+  public void hflush() throws IOException {
+    writer.hflush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/TezProtoLoggers.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/TezProtoLoggers.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/TezProtoLoggers.java
new file mode 100644
index 0000000..44390fc
--- /dev/null
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/TezProtoLoggers.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto;
+import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.ManifestEntryProto;
+
+/**
+ * Helper class to create the logger for tez, we would use this to read the events outside
+ * tez and hence isolating the configuration and the paths in this.
+ */
+public class TezProtoLoggers {
+  private DatePartitionedLogger<HistoryEventProto> appEventsLogger;
+  private DatePartitionedLogger<HistoryEventProto> dagEventsLogger;
+  private DatePartitionedLogger<ManifestEntryProto> manifestEventsLogger;
+
+  public boolean setup(Configuration conf, Clock clock) throws IOException {
+    String logDir = conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_BASE_DIR);
+    if (logDir == null) {
+      return false;
+    }
+    appEventsLogger = new DatePartitionedLogger<>(HistoryEventProto.PARSER,
+        new Path(logDir, "app_data"), conf, clock);
+    dagEventsLogger = new DatePartitionedLogger<>(HistoryEventProto.PARSER,
+        new Path(logDir, "dag_data"), conf, clock);
+    manifestEventsLogger = new DatePartitionedLogger<>(ManifestEntryProto.PARSER,
+        new Path(logDir, "dag_meta"), conf, clock);
+    return true;
+  }
+
+  public DatePartitionedLogger<HistoryEventProto> getAppEventsLogger() {
+    return appEventsLogger;
+  }
+
+  public DatePartitionedLogger<HistoryEventProto> getDagEventsLogger() {
+    return dagEventsLogger;
+  }
+
+  public DatePartitionedLogger<ManifestEntryProto> getManifestEventsLogger() {
+    return manifestEventsLogger;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/proto/HistoryLogger.proto
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/proto/HistoryLogger.proto b/tez-plugins/tez-protobuf-history-plugin/src/main/proto/HistoryLogger.proto
new file mode 100644
index 0000000..a5bbe34
--- /dev/null
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/proto/HistoryLogger.proto
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.dag.history.logging.proto";
+option java_outer_classname = "HistoryLoggerProtos";
+
+message KVPair {
+  optional string key = 1;
+  optional string value = 2;
+}
+
+message HistoryEventProto {
+  optional string event_type = 1;
+  optional int64 event_time = 2;
+  optional string user = 3;
+  optional string app_id = 4;
+  optional string app_attempt_id = 5;
+  optional string dag_id = 6;
+  optional string vertex_id = 7;
+  optional string task_id = 8;
+  optional string task_attempt_id = 9;
+  repeated KVPair event_data = 10;
+}
+
+message ManifestEntryProto {
+  optional string dag_id = 1;
+  optional string app_id = 2;
+  optional int64 dag_submitted_event_offset = 3;
+  optional int64 dag_finished_event_offset = 4;
+  optional string dag_file_path = 5;
+  optional int64 writeTime = 6;
+  optional string app_file_path = 7;
+  optional int64 app_launched_event_offset = 8;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestDagManifestFileScanner.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestDagManifestFileScanner.java b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestDagManifestFileScanner.java
new file mode 100644
index 0000000..fcaa315
--- /dev/null
+++ b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestDagManifestFileScanner.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.history.logging.proto;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.ManifestEntryProto;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestDagManifestFileScanner {
+  private MockClock clock;
+  private DatePartitionedLogger<ManifestEntryProto> manifestLogger;
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Before
+  public void setupTest() throws Exception {
+    String basePath = tempFolder.newFolder().getAbsolutePath();
+    clock = new MockClock();
+    Configuration conf = new Configuration(false);
+    conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_BASE_DIR, basePath);
+    TezProtoLoggers loggers = new TezProtoLoggers();
+    loggers.setup(conf, clock);
+    manifestLogger = loggers.getManifestEventsLogger();
+  }
+
+  @Test
+  public void testNormal() throws Exception {
+    clock.setTime(0); // 0th day.
+    createManifestEvents(0, 8);
+    clock.setTime((24 * 60 * 60 + 1) * 1000); // 1 day 1 sec.
+    createManifestEvents(24 * 3600, 5);
+    DagManifesFileScanner scanner = new DagManifesFileScanner(manifestLogger);
+    int count = 0;
+    while (scanner.getNext() != null) {
+      ++count;
+    }
+    Assert.assertEquals(8, count);
+
+    // Save offset for later use.
+    String offset = scanner.getOffset();
+
+    // Move time outside the window, no changes and it will give more events.
+    clock.setTime((24 * 60 * 60 + 61) * 1000); // 1 day 61 sec.
+    count = 0;
+    while (scanner.getNext() != null) {
+      ++count;
+    }
+    Assert.assertEquals(5, count);
+
+    // Reset the offset
+    scanner.setOffset(offset);
+    count = 0;
+    while (scanner.getNext() != null) {
+      ++count;
+    }
+    Assert.assertEquals(5, count);
+
+    scanner.close();
+
+    // Not able to test append since the LocalFileSystem does not implement append.
+  }
+
+  private void createManifestEvents(long time, int numEvents) throws IOException {
+    for (int i = 0; i < numEvents; ++i) {
+      ApplicationId appId = ApplicationId.newInstance(1000l, i);
+      ManifestEntryProto proto = ManifestEntryProto.newBuilder()
+          .setAppId(appId.toString())
+          .setDagFilePath("dummy_dag_path_" + i)
+          .setDagSubmittedEventOffset(0)
+          .setDagFinishedEventOffset(1)
+          .setAppFilePath("dummp_app_path_" + i)
+          .setAppLaunchedEventOffset(2)
+          .setWriteTime(clock.getTime())
+          .build();
+      ProtoMessageWriter<ManifestEntryProto> writer = manifestLogger.getWriter(appId.toString());
+      writer.writeProto(proto);
+      writer.close();
+    }
+  }
+
+  private static class MockClock implements Clock {
+    private long time = 0;
+
+    void setTime(long time) {
+      this.time = time;
+    }
+
+    @Override
+    public long getTime() {
+      return time;
+    }
+  }
+}


Mime
View raw message