apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-1965) Create a WAL in Malhar
Date Tue, 19 Apr 2016 21:56:25 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15248761#comment-15248761
] 

ASF GitHub Bot commented on APEXMALHAR-1965:
--------------------------------------------

Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60319166
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,598 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.apex.malhar.lib.utils.Serde;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +
    +public class FileSystemWAL<T> implements WAL<FileSystemWAL.FileSystemWALReader,
FileSystemWAL.FileSystemWALWriter>
    +{
    +  @NotNull
    +  private Serde<T, byte[]> serde;
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader<T> fileSystemWALReader = new FileSystemWALReader<>(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter<T> fileSystemWALWriter = new FileSystemWALWriter<>(this);
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALReader.open(fileContext);
    +      fileSystemWALWriter.open(fileContext);
    +
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    +  {
    +    try {
    +      fileSystemWALWriter.finalizeFiles(window);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during committed", e);
    +    }
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      fileSystemWALReader.close();
    +      fileSystemWALWriter.close();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during teardown", e);
    +    }
    +  }
    +
    +  protected long getLastCheckpointedWindow()
    +  {
    +    return lastCheckpointedWindow;
    +  }
    +
    +  protected String getPartFilePath(int partNumber)
    +  {
    +    return filePath + "_" + partNumber;
    +  }
    +
    +  @Override
    +  public FileSystemWALReader<T> getReader()
    +  {
    +    return fileSystemWALReader;
    +  }
    +
    +  /**
    +   * Sets the  File System WAL Reader. This can be used to override the default wal reader.
    +   *
    +   * @param fileSystemWALReader wal reader.
    +   */
    +  public void setFileSystemWALReader(@NotNull FileSystemWALReader<T> fileSystemWALReader)
    +  {
    +    this.fileSystemWALReader = Preconditions.checkNotNull(fileSystemWALReader, "filesystem
wal reader");
    +  }
    +
    +  @Override
    +  public FileSystemWALWriter<T> getWriter()
    +  {
    +    return fileSystemWALWriter;
    +  }
    +
    +  /**
    +   * Sets the File System WAL Writer. This can be used to override the default wal writer.
    +   * @param fileSystemWALWriter wal writer.
    +   */
    +  public void setFileSystemWALWriter(@NotNull FileSystemWALWriter<T> fileSystemWALWriter)
    +  {
    +    this.fileSystemWALWriter = Preconditions.checkNotNull(fileSystemWALWriter, "filesystem
wal writer");
    +  }
    +
    +  /**
    +   * @return WAL Entry serde
    +   */
    +  public Serde<T, byte[]> getSerde()
    +  {
    +    return serde;
    +  }
    +
    +  /**
    +   * Sets the serde which is used for wal entry serialization and de-serialization
    +   *
    +   * @param serde serializer/deserializer
    +   */
    +  public void setSerde(@NotNull Serde<T, byte[]> serde)
    +  {
    +    this.serde = Preconditions.checkNotNull(serde, "serde");
    +  }
    +
    +  /**
    +   * @return WAL file path
    +   */
    +  public String getFilePath()
    +  {
    +    return filePath;
    +  }
    +
    +  /**
    +   * Sets the WAL file path.
    +   *
    +   * @param filePath WAL file path
    +   */
    +  public void setFilePath(@NotNull String filePath)
    +  {
    +    this.filePath = Preconditions.checkNotNull(filePath, "filePath");
    +  }
    +
    +  /**
    +   * @return max length of a WAL part file.
    +   */
    +  public long getMaxLength()
    +  {
    +    return maxLength;
    +  }
    +
    +  /**
    +   * Sets the maximum length of a WAL part file.
    +   *
    +   * @param maxLength max length of the WAL part file
    +   */
    +  public void setMaxLength(long maxLength)
    +  {
    +    this.maxLength = maxLength;
    +  }
    +
    +  public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
    +  {
    +    private final int partNum;
    +    private long offset;
    +
    +    private FileSystemWALPointer()
    +    {
    +      //for kryo
    +      partNum = -1;
    +    }
    +
    +    public FileSystemWALPointer(long offset)
    +    {
    +      this(0, offset);
    +    }
    +
    +    public FileSystemWALPointer(int partNum, long offset)
    +    {
    +      this.partNum = partNum;
    +      this.offset = offset;
    +    }
    +
    +    @Override
    +    public int compareTo(@NotNull FileSystemWALPointer o)
    +    {
    +      if (this.partNum < o.partNum) {
    +        return -1;
    +      }
    +      if (this.partNum > o.partNum) {
    +        return 1;
    +      }
    +      if (this.offset < o.offset) {
    +        return -1;
    +      }
    +      if (this.offset > o.offset) {
    +        return 1;
    +      }
    +      return 0;
    +    }
    +
    +    public int getPartNum()
    +    {
    +      return partNum;
    +    }
    +
    +    public long getOffset()
    +    {
    +      return offset;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset=" + offset +
'}';
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem Wal Reader
    +   * @param <T> type of tuple.
    +   */
    +  public static class FileSystemWALReader<T> implements WAL.WALReader<T, FileSystemWALPointer>
    +  {
    +    private T entry;
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +
    +    private transient DataInputStream inputStream;
    +    private transient Path currentOpenPath;
    +
    +    private final FileSystemWAL<T> fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALReader()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALReader(@NotNull FileSystemWAL<T> fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext");
    +      //initialize the input stream
    +      inputStream = getInputStream(currentPointer);
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +    }
    +
    +    @Override
    +    public void seek(FileSystemWALPointer pointer) throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +      }
    +      inputStream = getInputStream(pointer);
    +      Preconditions.checkNotNull(inputStream, "invalid pointer " + pointer);
    +      currentPointer = pointer;
    +    }
    +
    +    /**
    +     * Move to the next WAL segment.
    +     *
    +     * @return true if the next part file exists and is opened; false otherwise.
    +     * @throws IOException
    +     */
    +    private boolean nextSegment() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +
    +      currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
    +      inputStream = getInputStream(currentPointer);
    +
    +      return inputStream != null;
    +    }
    +
    +    private DataInputStream getInputStream(FileSystemWALPointer walPointer) throws IOException
    +    {
    +      Path walPartPath = new Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
    +      if (fileContext.util().exists(walPartPath)) {
    +        DataInputStream stream = fileContext.open(walPartPath);
    +        if (walPointer.offset > 0) {
    +          stream.skip(walPointer.offset);
    +        }
    +        currentOpenPath = walPartPath;
    +        return stream;
    +      }
    +      return null;
    +    }
    +
    +    @Override
    +    public boolean advance() throws IOException
    +    {
    +      do {
    +        if (inputStream == null) {
    +          inputStream = getInputStream(currentPointer);
    +        }
    +
    +        if (inputStream != null &&
    +            currentPointer.offset < fileContext.getFileStatus(currentOpenPath).getLen())
{
    +          int len = inputStream.readInt();
    +          Preconditions.checkState(len >= 0, "negative length");
    +
    +          byte[] data = new byte[len];
    +          inputStream.readFully(data);
    +
    +          entry = fileSystemWAL.serde.deserialize(data);
    +          currentPointer.offset += data.length + 4;
    +          return true;
    +        }
    +      } while (nextSegment());
    +
    +      entry = null;
    +      return false;
    +    }
    +
    +    @Override
    +    public T get()
    +    {
    +      return entry;
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem WAL Writer.
    +   * @param <T> type of tuple
    +   */
    +  public static class FileSystemWALWriter<T> implements WAL.WALWriter<T>
    +  {
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +    private transient DataOutputStream outputStream;
    +
    +    //windowId => Latest part which can be finalized.
    +    private final Map<Long, Integer> pendingFinalization = new TreeMap<>();
    +
    +    //part => tmp file path;
    +    private final Map<Integer, String> tmpFiles = new TreeMap<>();
    +
    +    private final FileSystemWAL<T> fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALWriter()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALWriter(@NotNull FileSystemWAL<T> fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "file context");
    +      recover();
    +      if (outputStream == null) {
    +        outputStream = getOutputStream(currentPointer);
    +      }
    +    }
    +
    +    private void recover() throws IOException
    +    {
    +      LOG.debug("current point", currentPointer);
    +      String tmpFilePath = tmpFiles.get(currentPointer.getPartNum());
    +      if (tmpFilePath != null) {
    +
    +        Path tmpPath = new Path(tmpFilePath);
    +        if (fileContext.util().exists(tmpPath)) {
    +          LOG.debug("tmp path exists {}", tmpPath);
    +
    +          outputStream = getOutputStream(currentPointer);
    +          DataInputStream inputStreamOldTmp = fileContext.open(tmpPath);
    +
    +          IOUtils.copyPartial(inputStreamOldTmp, currentPointer.offset, outputStream);
    +
    +          outputStream.flush();
    +          //remove old tmp
    +          inputStreamOldTmp.close();
    +          LOG.debug("delete tmp {}", tmpPath);
    +          fileContext.delete(tmpPath, true);
    +        }
    +      }
    +
    +      //find all valid path names
    +      Set<String> validPathNames = new HashSet<>();
    +      for (Map.Entry<Integer, String> entry : tmpFiles.entrySet()) {
    +        if (entry.getKey() <= currentPointer.partNum) {
    +          validPathNames.add(new Path(entry.getValue()).getName());
    +        }
    +      }
    +      LOG.debug("valid names {}", validPathNames);
    +
    +      //there can be a failure just between the flush and the actual checkpoint which
can leave some stray tmp files
    +      //which aren't accounted by tmp files map
    +      Path walPath = new Path(fileSystemWAL.filePath);
    +      Path parentWAL = walPath.getParent();
    +      if (parentWAL != null && fileContext.util().exists(parentWAL)) {
    +        RemoteIterator<FileStatus> remoteIterator = fileContext.listStatus(parentWAL);
    +        while (remoteIterator.hasNext()) {
    +          FileStatus status = remoteIterator.next();
    +          String fileName = status.getPath().getName();
    +          if (fileName.startsWith(walPath.getName()) && fileName.endsWith(TMP_EXTENSION)
&&
    +              !validPathNames.contains(fileName)) {
    +            LOG.debug("delete stray tmp {}", status.getPath());
    +            fileContext.delete(status.getPath(), true);
    +          }
    +
    +        }
    +      }
    +
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (outputStream != null) {
    +        flush();
    +        outputStream.close();
    +        outputStream = null;
    +        LOG.debug("closed {}", currentPointer.partNum);
    +      }
    +    }
    +
    +    @Override
    +    public int append(T entry) throws IOException
    +    {
    +      byte[] slice = fileSystemWAL.serde.serialize(entry);
    +      int entryLength = slice.length + 4;
    +
    +      // rotate if needed
    +      if (shouldRotate(entryLength)) {
    +        rotate(true);
    +      }
    +
    +      outputStream.writeInt(slice.length);
    +      outputStream.write(slice);
    +      currentPointer.offset += entryLength;
    +
    +      if (currentPointer.offset == fileSystemWAL.maxLength) {
    +        //if the file is completed then we can rotate it. do not have to wait for next
entry
    +        rotate(false);
    +      }
    +
    +      return entryLength;
    +    }
    +
    +    protected void flush() throws IOException
    +    {
    +      if (outputStream != null) {
    +        outputStream.flush();
    +        if (outputStream instanceof FSDataOutputStream) {
    +          ((FSDataOutputStream)outputStream).hflush();
    --- End diff --
    
    ```
          if (outputStream instanceof Syncable) {
              Syncable syncableOutputStream = (Syncable)outputStream;
              syncableOutputStream.hflush();
              syncableOutputStream.hsync();
            } else {
              //On local file system the write stream needs to be closed for the reader to
see any data
              outputStream.close();
            }
    ```
    Does the above look good. On local file system, the reader wasn't seeing data unless the
output stream gets closed


> Create a WAL in Malhar
> ----------------------
>
>                 Key: APEXMALHAR-1965
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-1965
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Chandni Singh
>            Assignee: Tushar Gosavi
>
> In Malhar we have an IdempotentStorageManager which we use like a Write Ahead Logger.
There have been some other places where we have created a different flavor of Write Ahead
Logger. 
> We need to find overlap between all these flavors and create a common Write Ahead Logger
for use in Apex core and Apex malhar.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message