apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ilooner <...@git.apache.org>
Subject [GitHub] apex-malhar pull request #322: APEXMALHAR-2063 Made window data manager use ...
Date Thu, 11 Aug 2016 06:34:14 GMT
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/322#discussion_r74373686
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
---
    @@ -51,291 +66,607 @@
     public class FSWindowDataManager implements WindowDataManager
     {
       private static final String DEF_RECOVERY_PATH = "idempotentState";
    -
    -  protected transient FSStorageAgent storageAgent;
    +  private static final String WAL_FILE_NAME = "wal";
     
       /**
    -   * Recovery path relative to app path where state is saved.
    +   * Recovery filePath relative to app filePath where state is saved.
        */
       @NotNull
    -  private String recoveryPath;
    +  private String recoveryPath = DEF_RECOVERY_PATH;
     
       private boolean isRecoveryPathRelativeToAppPath = true;
     
       /**
    -   * largest window for which there is recovery data across all physical operator instances.
    +   * This is not null only for one physical instance.<br/>
    +   * It consists of operator ids which have been deleted but have some state that can
be replayed.
    +   * Only one of the instances would be handling (modifying) the files that belong to
this state. <br/>
    +   * The value is assigned during partitioning.
        */
    -  protected transient long largestRecoveryWindow;
    +  private Set<Integer> deletedOperators;
    +
    +  private boolean repartitioned;
     
       /**
    -   * This is not null only for one physical instance.<br/>
    -   * It consists of operator ids which have been deleted but have some state that can
be replayed.
    -   * Only one of the instances would be handling (modifying) the files that belong to
this state.
    +   * Used when it is not necessary to replay every streaming/app window.
    +   * Used by {@link IncrementalCheckpointManager}
        */
    -  protected Set<Integer> deletedOperators;
    +  private boolean relyOnCheckpoints;
     
       /**
    -   * Sorted mapping from window id to all the operators that have state to replay for
that window.
    +   * largest window for which there is recovery data across all physical operator instances.
        */
    -  protected final transient TreeMultimap<Long, Integer> replayState;
    +  private transient long largestRecoveryWindow = Stateless.WINDOW_ID;
    +
    +  private final FSWindowReplayWAL wal = new FSWindowReplayWAL();
     
    -  protected transient FileSystem fs;
    -  protected transient Path appPath;
    +  //operator id -> wals (sorted)
    +  private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals = new HashMap<>();
    +
    +  private transient String statePath;
    +  private transient int operatorId;
    +
    +  private final transient Kryo kryo = new Kryo();
    +
    +  private transient FileContext fileContext;
     
       public FSWindowDataManager()
       {
    -    replayState = TreeMultimap.create();
    -    largestRecoveryWindow = Stateless.WINDOW_ID;
    -    recoveryPath = DEF_RECOVERY_PATH;
    +    kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
       }
     
       @Override
       public void setup(Context.OperatorContext context)
       {
    -    Configuration configuration = new Configuration();
    +    operatorId = context.getId();
    +
         if (isRecoveryPathRelativeToAppPath) {
    -      appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath);
    +      statePath = context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath;
         } else {
    -      appPath = new Path(recoveryPath);
    +      statePath = recoveryPath;
         }
     
         try {
    -      storageAgent = new FSStorageAgent(appPath.toString(), configuration);
    +      fileContext = FileContextUtils.getFileContext(statePath);
    +      setupWals(context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID));
    +    } catch (IOException e) {
    +      throw new RuntimeException(e);
    +    }
    +  }
     
    -      fs = FileSystem.newInstance(appPath.toUri(), configuration);
    +  private void setupWals(long activationWindow) throws IOException
    +  {
    +    findFiles(wal, operatorId);
    +    configureWal(wal, operatorId, !relyOnCheckpoints);
    +
    +    if (repartitioned) {
    +      createReadOnlyWals();
    +      for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet())
{
    +        findFiles(entry.getValue(), entry.getKey());
    +        configureWal(entry.getValue(), entry.getKey(), true);
    +      }
    +    }
     
    -      if (fs.exists(appPath)) {
    -        FileStatus[] fileStatuses = fs.listStatus(appPath);
    +    //find largest recovery window
    +    if (!relyOnCheckpoints) {
    +      long recoveryWindow = findLargestRecoveryWindow(wal, null);
    +      //committed will not delete temp files so it is possible that when reading from
files, a smaller window
    +      //than the activation window is found.
    +      if (recoveryWindow > activationWindow) {
    +        largestRecoveryWindow = recoveryWindow;
    +      }
    +      if (wal.getReader().getCurrentPointer() != null) {
    +        wal.getWriter().setCurrentPointer(wal.getReader().getCurrentPointer().getCopy());
    +      }
    +    } else {
    +      wal.walEndPointerAfterRecovery = wal.getWriter().getCurrentPointer();
    +      largestRecoveryWindow = wal.getLastCheckpointedWindow();
    +    }
     
    -        for (FileStatus operatorDirStatus : fileStatuses) {
    -          int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName());
    +    if (repartitioned && largestRecoveryWindow > Stateless.WINDOW_ID) {
    +      //find the min of max window ids: a downstream will not finish a window until all
the upstream have finished it.
    +      for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet())
{
     
    -          for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) {
    -            String fileName = status.getPath().getName();
    -            if (fileName.endsWith(FSStorageAgent.TMP_FILE)) {
    -              continue;
    -            }
    -            long windowId = Long.parseLong(fileName, 16);
    -            replayState.put(windowId, operatorId);
    -            if (windowId > largestRecoveryWindow) {
    -              largestRecoveryWindow = windowId;
    -            }
    +        long recoveryWindow = Stateless.WINDOW_ID;
    +        if (!relyOnCheckpoints) {
    +          long window = findLargestRecoveryWindow(entry.getValue(), null);
    +          if (window > activationWindow) {
    +            recoveryWindow = window;
               }
    +        } else {
    +          recoveryWindow = findLargestRecoveryWindow(entry.getValue(), activationWindow);
    +        }
    +
    +        if (recoveryWindow < largestRecoveryWindow) {
    +          largestRecoveryWindow = recoveryWindow;
             }
           }
    -    } catch (IOException e) {
    -      throw new RuntimeException(e);
         }
    +
    +    //reset readers
    +    wal.getReader().seek(wal.walStartPointer);
    +    for (FSWindowReplayWAL wal : readOnlyWals.values()) {
    +      wal.getReader().seek(wal.walStartPointer);
    +    }
    +
    +    wal.setup();
    +    for (FSWindowReplayWAL wal : readOnlyWals.values()) {
    +      wal.setup();
    +    }
    +
       }
     
    -  @Override
    -  public void save(Object object, int operatorId, long windowId) throws IOException
    +  protected void createReadOnlyWals() throws IOException
       {
    -    storageAgent.save(object, operatorId, windowId);
    +    RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(new Path(statePath));
    +    while (operatorsIter.hasNext()) {
    +      FileStatus status = operatorsIter.next();
    +      int operatorId = Integer.parseInt(status.getPath().getName());
    +
    +      if (operatorId != this.operatorId) {
    +        //create read-only wal for other partitions
    +        FSWindowReplayWAL wal = new FSWindowReplayWAL(true);
    +        readOnlyWals.put(operatorId, wal);
    +      }
    +    }
       }
     
    -  @Override
    -  public Object load(int operatorId, long windowId) throws IOException
    +  private void configureWal(FSWindowReplayWAL wal, int operatorId, boolean updateWalState)
throws IOException
       {
    -    Set<Integer> operators = replayState.get(windowId);
    -    if (operators == null || !operators.contains(operatorId)) {
    -      return null;
    +    String operatorDir = statePath + Path.SEPARATOR + operatorId;
    +    wal.setFilePath(operatorDir + Path.SEPARATOR + WAL_FILE_NAME);
    +    wal.fileContext = fileContext;
    +
    +    if (updateWalState) {
    +      if (!wal.fileDescriptors.isEmpty()) {
    +        SortedSet<Integer> sortedParts = wal.fileDescriptors.keySet();
    +
    +        wal.walStartPointer = new FileSystemWAL.FileSystemWALPointer(sortedParts.first(),
0);
    +
    +        FSWindowReplayWAL.FileDescriptor last = wal.fileDescriptors.get(sortedParts.last()).last();
    +        if (last.isTmp) {
    +          wal.tempPartFiles.put(last.part, last.filePath.toString());
    +        }
    +      }
         }
    -    return storageAgent.load(operatorId, windowId);
       }
     
    -  @Override
    -  public void delete(int operatorId, long windowId) throws IOException
    +  private void findFiles(FSWindowReplayWAL wal, int operatorId) throws IOException
       {
    -    storageAgent.delete(operatorId, windowId);
    +    String operatorDir = statePath + Path.SEPARATOR + operatorId;
    +    Path operatorPath = new Path(operatorDir);
    +    if (fileContext.util().exists(operatorPath)) {
    +      RemoteIterator<FileStatus> walFilesIter = fileContext.listStatus(operatorPath);
    +
    +      while (walFilesIter.hasNext()) {
    +        FileStatus fileStatus = walFilesIter.next();
    +        FSWindowReplayWAL.FileDescriptor descriptor = FSWindowReplayWAL.FileDescriptor.create(fileStatus.getPath());
    +        wal.fileDescriptors.put(descriptor.part, descriptor);
    +      }
    +    }
       }
     
    -  @Override
    -  public Map<Integer, Object> load(long windowId) throws IOException
    +  private long findLargestRecoveryWindow(FSWindowReplayWAL wal, Long ceilingWindow) throws
IOException
       {
    -    Set<Integer> operators = replayState.get(windowId);
    -    if (operators == null) {
    -      return null;
    -    }
    -    Map<Integer, Object> data = Maps.newHashMap();
    -    for (int operatorId : operators) {
    -      data.put(operatorId, load(operatorId, windowId));
    +    if (!wal.fileDescriptors.isEmpty()) {
    +      FileSystemWAL.FileSystemWALReader reader = wal.getReader();
    +
    +      //to find the largest window, we only need to look at the last file.
    +      NavigableSet<Integer> descendingParts = new TreeSet<>(wal.fileDescriptors.keySet()).descendingSet();
    +      for (int part : descendingParts) {
    +        FSWindowReplayWAL.FileDescriptor last = wal.fileDescriptors.get(part).last();
    +        reader.seek(new FileSystemWAL.FileSystemWALPointer(last.part, 0));
    +
    +        long endOffset = -1;
    +
    +        long lastWindow = Stateless.WINDOW_ID;
    +        Slice slice = readNext(reader);
    +
    +        while (slice != null) {
    +          boolean skipComplete = skipNext(reader);
    +          if (!skipComplete) {
    +            //artifact not saved so this window was not finished.
    +            break;
    +          }
    +          Slice windowSlice = slice;
    +          long offset = reader.getCurrentPointer().getOffset();
    +          slice = readNext(reader);  //either null, deleted or next window
    +
    +          if (slice == null || !slice.equals(DELETED)) {
    +            //delete entry not found so window was not deleted
    +            long window = Longs.fromByteArray(windowSlice.toByteArray());
    +
    +            if (ceilingWindow != null && window > ceilingWindow) {
    +              break;
    +            }
    +            endOffset = offset;
    +            lastWindow = window;
    --- End diff --
    
    Would the endoffset have to be rolled back as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message