apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-1965) Create a WAL in Malhar
Date Wed, 20 Apr 2016 06:00:33 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249325#comment-15249325
] 

ASF GitHub Bot commented on APEXMALHAR-1965:
--------------------------------------------

Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60353719
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,594 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.concurrent.ConcurrentSkipListMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +import org.apache.hadoop.fs.Syncable;
    +import org.apache.hadoop.fs.local.LocalFs;
    +import org.apache.hadoop.fs.local.RawLocalFs;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +import com.datatorrent.netlet.util.Slice;
    +
    +public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
    +
    +  //part => tmp file path;
    +  private final ConcurrentSkipListMap<Integer, String> tempPartFiles = new ConcurrentSkipListMap<>();
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALWriter.open(fileContext);
    +      fileSystemWALReader.open(fileContext);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    +  {
    +    try {
    +      fileSystemWALWriter.finalizeFiles(window);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during committed", e);
    +    }
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      fileSystemWALReader.close();
    +      fileSystemWALWriter.close();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during teardown", e);
    +    }
    +  }
    +
    +  protected long getLastCheckpointedWindow()
    +  {
    +    return lastCheckpointedWindow;
    +  }
    +
    +  protected String getPartFilePath(int partNumber)
    +  {
    +    return filePath + "_" + partNumber;
    +  }
    +
    +  @Override
    +  public FileSystemWALReader getReader()
    +  {
    +    return fileSystemWALReader;
    +  }
    +
    +  /**
    +   * Sets the  File System WAL Reader. This can be used to override the default wal reader.
    +   *
    +   * @param fileSystemWALReader wal reader.
    +   */
    +  public void setFileSystemWALReader(@NotNull FileSystemWALReader fileSystemWALReader)
    +  {
    +    this.fileSystemWALReader = Preconditions.checkNotNull(fileSystemWALReader, "filesystem
wal reader");
    +  }
    +
    +  @Override
    +  public FileSystemWALWriter getWriter()
    +  {
    +    return fileSystemWALWriter;
    +  }
    +
    +  /**
    +   * Sets the File System WAL Writer. This can be used to override the default wal writer.
    +   *
    +   * @param fileSystemWALWriter wal writer.
    +   */
    +  public void setFileSystemWALWriter(@NotNull FileSystemWALWriter fileSystemWALWriter)
    +  {
    +    this.fileSystemWALWriter = Preconditions.checkNotNull(fileSystemWALWriter, "filesystem
wal writer");
    +  }
    +
    +  /**
    +   * @return WAL file path
    +   */
    +  public String getFilePath()
    +  {
    +    return filePath;
    +  }
    +
    +  /**
    +   * Sets the WAL file path.
    +   *
    +   * @param filePath WAL file path
    +   */
    +  public void setFilePath(@NotNull String filePath)
    +  {
    +    this.filePath = Preconditions.checkNotNull(filePath, "filePath");
    +  }
    +
    +  /**
    +   * @return max length of a WAL part file.
    +   */
    +  public long getMaxLength()
    +  {
    +    return maxLength;
    +  }
    +
    +  /**
    +   * Sets the maximum length of a WAL part file.
    +   *
    +   * @param maxLength max length of the WAL part file
    +   */
    +  public void setMaxLength(long maxLength)
    +  {
    +    this.maxLength = maxLength;
    +  }
    +
    +  public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
    +  {
    +    private final int partNum;
    +    private long offset;
    +
    +    private FileSystemWALPointer()
    +    {
    +      //for kryo
    +      partNum = -1;
    +    }
    +
    +    public FileSystemWALPointer(long offset)
    +    {
    +      this(0, offset);
    +    }
    +
    +    public FileSystemWALPointer(int partNum, long offset)
    +    {
    +      this.partNum = partNum;
    +      this.offset = offset;
    +    }
    +
    +    @Override
    +    public int compareTo(@NotNull FileSystemWALPointer o)
    +    {
    +      if (this.partNum < o.partNum) {
    +        return -1;
    +      }
    +      if (this.partNum > o.partNum) {
    +        return 1;
    +      }
    +      if (this.offset < o.offset) {
    +        return -1;
    +      }
    +      if (this.offset > o.offset) {
    +        return 1;
    +      }
    +      return 0;
    +    }
    +
    +    public int getPartNum()
    +    {
    +      return partNum;
    +    }
    +
    +    public long getOffset()
    +    {
    +      return offset;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset=" + offset +
'}';
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem Wal Reader
    +   */
    +  public static class FileSystemWALReader implements WAL.WALReader<FileSystemWALPointer>
    +  {
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +
    +    private transient DataInputStream inputStream;
    +    private transient Path currentOpenPath;
    +
    +    private final FileSystemWAL fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALReader()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALReader(@NotNull FileSystemWAL fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext");
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +    }
    +
    +    @Override
    +    public void seek(FileSystemWALPointer pointer) throws IOException
    +    {
    +      if (inputStream != null) {
    +        close();
    +      }
    +      inputStream = getInputStream(pointer);
    +      Preconditions.checkNotNull(inputStream, "invalid pointer " + pointer);
    +      currentPointer = pointer;
    +    }
    +
    +    /**
    +     * Move to the next WAL segment.
    +     *
    +     * @return true if the next part file exists and is opened; false otherwise.
    +     * @throws IOException
    +     */
    +    private boolean nextSegment() throws IOException
    +    {
    +      if (inputStream != null) {
    +        close();
    +      }
    +
    +      currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
    +      inputStream = getInputStream(currentPointer);
    +
    +      return inputStream != null;
    +    }
    +
    +    private DataInputStream getInputStream(FileSystemWALPointer walPointer) throws IOException
    +    {
    +      Preconditions.checkArgument(inputStream == null, "input stream not null");
    +      Path pathToReadFrom;
    +      String tmpPath = fileSystemWAL.tempPartFiles.get(walPointer.getPartNum());
    +      if (tmpPath != null) {
    +        pathToReadFrom = new Path(tmpPath);
    +      } else {
    +        pathToReadFrom = new Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
    +      }
    +
    +      LOG.debug("path to read {} and pointer {}", pathToReadFrom, walPointer);
    +      if (fileContext.util().exists(pathToReadFrom)) {
    +        DataInputStream stream = fileContext.open(pathToReadFrom);
    +        if (walPointer.offset > 0) {
    +          stream.skip(walPointer.offset);
    +        }
    +        currentOpenPath = pathToReadFrom;
    +        return stream;
    +      }
    +      return null;
    +    }
    +
    +    @Override
    +    public Slice next() throws IOException
    +    {
    +      do {
    +        if (inputStream == null) {
    +          inputStream = getInputStream(currentPointer);
    +        }
    +
    +        if (inputStream != null && !fileContext.util().exists(currentOpenPath))
{
    --- End diff --
    
    Ok will do that. Was trying to minimize the overlap between reader and writer.


> Create a WAL in Malhar
> ----------------------
>
>                 Key: APEXMALHAR-1965
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-1965
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Chandni Singh
>            Assignee: Tushar Gosavi
>
> In Malhar we have an IdempotentStorageManager which we use like a Write Ahead Logger.
There have been some other places where we have created a different flavor of Write Ahead
Logger. 
> We need to find overlap between all these flavors and create a common Write Ahead Logger
for use in Apex core and Apex malhar.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message