apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
Date Thu, 11 Aug 2016 06:20:20 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15416630#comment-15416630
] 

ASF GitHub Bot commented on APEXMALHAR-2063:
--------------------------------------------

Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/322#discussion_r74372710
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
---
    @@ -51,291 +66,607 @@
     public class FSWindowDataManager implements WindowDataManager
     {
       private static final String DEF_RECOVERY_PATH = "idempotentState";
    -
    -  protected transient FSStorageAgent storageAgent;
    +  private static final String WAL_FILE_NAME = "wal";
     
       /**
    -   * Recovery path relative to app path where state is saved.
    +   * Recovery filePath relative to app filePath where state is saved.
        */
       @NotNull
    -  private String recoveryPath;
    +  private String recoveryPath = DEF_RECOVERY_PATH;
     
       private boolean isRecoveryPathRelativeToAppPath = true;
     
       /**
    -   * largest window for which there is recovery data across all physical operator instances.
    +   * This is not null only for one physical instance.<br/>
    +   * It consists of operator ids which have been deleted but have some state that can
be replayed.
    +   * Only one of the instances would be handling (modifying) the files that belong to
this state. <br/>
    +   * The value is assigned during partitioning.
        */
    -  protected transient long largestRecoveryWindow;
    +  private Set<Integer> deletedOperators;
    +
    +  private boolean repartitioned;
     
       /**
    -   * This is not null only for one physical instance.<br/>
    -   * It consists of operator ids which have been deleted but have some state that can
be replayed.
    -   * Only one of the instances would be handling (modifying) the files that belong to
this state.
    +   * Used when it is not necessary to replay every streaming/app window.
    +   * Used by {@link IncrementalCheckpointManager}
        */
    -  protected Set<Integer> deletedOperators;
    +  private boolean relyOnCheckpoints;
     
       /**
    -   * Sorted mapping from window id to all the operators that have state to replay for
that window.
    +   * largest window for which there is recovery data across all physical operator instances.
        */
    -  protected final transient TreeMultimap<Long, Integer> replayState;
    +  private transient long largestRecoveryWindow = Stateless.WINDOW_ID;
    +
    +  private final FSWindowReplayWAL wal = new FSWindowReplayWAL();
     
    -  protected transient FileSystem fs;
    -  protected transient Path appPath;
    +  //operator id -> wals (sorted)
    +  private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals = new HashMap<>();
    +
    +  private transient String statePath;
    +  private transient int operatorId;
    +
    +  private final transient Kryo kryo = new Kryo();
    +
    +  private transient FileContext fileContext;
     
       public FSWindowDataManager()
       {
    -    replayState = TreeMultimap.create();
    -    largestRecoveryWindow = Stateless.WINDOW_ID;
    -    recoveryPath = DEF_RECOVERY_PATH;
    +    kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
       }
     
       @Override
       public void setup(Context.OperatorContext context)
       {
    -    Configuration configuration = new Configuration();
    +    operatorId = context.getId();
    +
         if (isRecoveryPathRelativeToAppPath) {
    -      appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath);
    +      statePath = context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath;
         } else {
    -      appPath = new Path(recoveryPath);
    +      statePath = recoveryPath;
         }
     
         try {
    -      storageAgent = new FSStorageAgent(appPath.toString(), configuration);
    +      fileContext = FileContextUtils.getFileContext(statePath);
    +      setupWals(context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID));
    +    } catch (IOException e) {
    +      throw new RuntimeException(e);
    +    }
    +  }
     
    -      fs = FileSystem.newInstance(appPath.toUri(), configuration);
    +  private void setupWals(long activationWindow) throws IOException
    +  {
    +    findFiles(wal, operatorId);
    +    configureWal(wal, operatorId, !relyOnCheckpoints);
    +
    +    if (repartitioned) {
    +      createReadOnlyWals();
    +      for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet())
{
    +        findFiles(entry.getValue(), entry.getKey());
    +        configureWal(entry.getValue(), entry.getKey(), true);
    +      }
    +    }
     
    -      if (fs.exists(appPath)) {
    -        FileStatus[] fileStatuses = fs.listStatus(appPath);
    +    //find largest recovery window
    +    if (!relyOnCheckpoints) {
    +      long recoveryWindow = findLargestRecoveryWindow(wal, null);
    +      //committed will not delete temp files so it is possible that when reading from
files, a smaller window
    +      //than the activation window is found.
    +      if (recoveryWindow > activationWindow) {
    +        largestRecoveryWindow = recoveryWindow;
    +      }
    +      if (wal.getReader().getCurrentPointer() != null) {
    +        wal.getWriter().setCurrentPointer(wal.getReader().getCurrentPointer().getCopy());
    +      }
    +    } else {
    +      wal.walEndPointerAfterRecovery = wal.getWriter().getCurrentPointer();
    +      largestRecoveryWindow = wal.getLastCheckpointedWindow();
    +    }
     
    -        for (FileStatus operatorDirStatus : fileStatuses) {
    -          int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName());
    +    if (repartitioned && largestRecoveryWindow > Stateless.WINDOW_ID) {
    +      //find the min of max window ids: a downstream will not finish a window until all
the upstream have finished it.
    +      for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet())
{
     
    -          for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) {
    -            String fileName = status.getPath().getName();
    -            if (fileName.endsWith(FSStorageAgent.TMP_FILE)) {
    -              continue;
    -            }
    -            long windowId = Long.parseLong(fileName, 16);
    -            replayState.put(windowId, operatorId);
    -            if (windowId > largestRecoveryWindow) {
    -              largestRecoveryWindow = windowId;
    -            }
    +        long recoveryWindow = Stateless.WINDOW_ID;
    +        if (!relyOnCheckpoints) {
    +          long window = findLargestRecoveryWindow(entry.getValue(), null);
    +          if (window > activationWindow) {
    +            recoveryWindow = window;
               }
    +        } else {
    +          recoveryWindow = findLargestRecoveryWindow(entry.getValue(), activationWindow);
    +        }
    +
    +        if (recoveryWindow < largestRecoveryWindow) {
    +          largestRecoveryWindow = recoveryWindow;
             }
           }
    -    } catch (IOException e) {
    -      throw new RuntimeException(e);
         }
    +
    +    //reset readers
    +    wal.getReader().seek(wal.walStartPointer);
    +    for (FSWindowReplayWAL wal : readOnlyWals.values()) {
    +      wal.getReader().seek(wal.walStartPointer);
    +    }
    +
    +    wal.setup();
    +    for (FSWindowReplayWAL wal : readOnlyWals.values()) {
    +      wal.setup();
    +    }
    +
       }
     
    -  @Override
    -  public void save(Object object, int operatorId, long windowId) throws IOException
    +  protected void createReadOnlyWals() throws IOException
       {
    -    storageAgent.save(object, operatorId, windowId);
    +    RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(new Path(statePath));
    +    while (operatorsIter.hasNext()) {
    +      FileStatus status = operatorsIter.next();
    +      int operatorId = Integer.parseInt(status.getPath().getName());
    +
    +      if (operatorId != this.operatorId) {
    +        //create read-only wal for other partitions
    +        FSWindowReplayWAL wal = new FSWindowReplayWAL(true);
    +        readOnlyWals.put(operatorId, wal);
    +      }
    +    }
       }
     
    -  @Override
    -  public Object load(int operatorId, long windowId) throws IOException
    +  private void configureWal(FSWindowReplayWAL wal, int operatorId, boolean updateWalState)
throws IOException
       {
    -    Set<Integer> operators = replayState.get(windowId);
    -    if (operators == null || !operators.contains(operatorId)) {
    -      return null;
    +    String operatorDir = statePath + Path.SEPARATOR + operatorId;
    +    wal.setFilePath(operatorDir + Path.SEPARATOR + WAL_FILE_NAME);
    +    wal.fileContext = fileContext;
    +
    +    if (updateWalState) {
    +      if (!wal.fileDescriptors.isEmpty()) {
    +        SortedSet<Integer> sortedParts = wal.fileDescriptors.keySet();
    +
    +        wal.walStartPointer = new FileSystemWAL.FileSystemWALPointer(sortedParts.first(),
0);
    +
    +        FSWindowReplayWAL.FileDescriptor last = wal.fileDescriptors.get(sortedParts.last()).last();
    +        if (last.isTmp) {
    +          wal.tempPartFiles.put(last.part, last.filePath.toString());
    +        }
    +      }
         }
    -    return storageAgent.load(operatorId, windowId);
       }
     
    -  @Override
    -  public void delete(int operatorId, long windowId) throws IOException
    +  private void findFiles(FSWindowReplayWAL wal, int operatorId) throws IOException
       {
    -    storageAgent.delete(operatorId, windowId);
    +    String operatorDir = statePath + Path.SEPARATOR + operatorId;
    +    Path operatorPath = new Path(operatorDir);
    +    if (fileContext.util().exists(operatorPath)) {
    +      RemoteIterator<FileStatus> walFilesIter = fileContext.listStatus(operatorPath);
    +
    +      while (walFilesIter.hasNext()) {
    +        FileStatus fileStatus = walFilesIter.next();
    +        FSWindowReplayWAL.FileDescriptor descriptor = FSWindowReplayWAL.FileDescriptor.create(fileStatus.getPath());
    +        wal.fileDescriptors.put(descriptor.part, descriptor);
    +      }
    +    }
       }
     
    -  @Override
    -  public Map<Integer, Object> load(long windowId) throws IOException
    +  private long findLargestRecoveryWindow(FSWindowReplayWAL wal, Long ceilingWindow) throws
IOException
       {
    -    Set<Integer> operators = replayState.get(windowId);
    -    if (operators == null) {
    -      return null;
    -    }
    -    Map<Integer, Object> data = Maps.newHashMap();
    -    for (int operatorId : operators) {
    -      data.put(operatorId, load(operatorId, windowId));
    +    if (!wal.fileDescriptors.isEmpty()) {
    +      FileSystemWAL.FileSystemWALReader reader = wal.getReader();
    +
    +      //to find the largest window, we only need to look at the last file.
    +      NavigableSet<Integer> descendingParts = new TreeSet<>(wal.fileDescriptors.keySet()).descendingSet();
    +      for (int part : descendingParts) {
    +        FSWindowReplayWAL.FileDescriptor last = wal.fileDescriptors.get(part).last();
    +        reader.seek(new FileSystemWAL.FileSystemWALPointer(last.part, 0));
    +
    +        long endOffset = -1;
    +
    +        long lastWindow = Stateless.WINDOW_ID;
    +        Slice slice = readNext(reader);
    +
    +        while (slice != null) {
    +          boolean skipComplete = skipNext(reader);
    +          if (!skipComplete) {
    +            //artifact not saved so this window was not finished.
    +            break;
    +          }
    +          Slice windowSlice = slice;
    +          long offset = reader.getCurrentPointer().getOffset();
    +          slice = readNext(reader);  //either null, deleted or next window
    +
    +          if (slice == null || !slice.equals(DELETED)) {
    +            //delete entry not found so window was not deleted
    +            long window = Longs.fromByteArray(windowSlice.toByteArray());
    +
    +            if (ceilingWindow != null && window > ceilingWindow) {
    +              break;
    +            }
    +            endOffset = offset;
    +            lastWindow = window;
    --- End diff --
    
    I'm not sure if I'm understanding how delete works correctly. But should the lastWindow
be rolled back to the previous window when we encounter DELETED?


> Integrate WAL to FS WindowDataManager
> -------------------------------------
>
>                 Key: APEXMALHAR-2063
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Chandni Singh
>            Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying tuples every
completed application window after failure. For this it saves meta-data in a file per window.
Having multiple small size files on hdfs cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain a mapping
of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there are few
changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This window may not
be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery so this
poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to actual file
is part of its state which is checkpointed. Since WindowDataManager replays data of a window
not yet checkpointed, it needs to know the actual temporary file the data is being persisted
to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message