apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2063) Integrate WAL to FS WindowDataManager
Date Thu, 11 Aug 2016 05:40:20 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15416592#comment-15416592
] 

ASF GitHub Bot commented on APEXMALHAR-2063:
--------------------------------------------

Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/322#discussion_r74370558
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
---
    @@ -51,291 +66,607 @@
     public class FSWindowDataManager implements WindowDataManager
     {
       private static final String DEF_RECOVERY_PATH = "idempotentState";
    -
    -  protected transient FSStorageAgent storageAgent;
    +  private static final String WAL_FILE_NAME = "wal";
     
       /**
    -   * Recovery path relative to app path where state is saved.
    +   * Recovery filePath relative to app filePath where state is saved.
        */
       @NotNull
    -  private String recoveryPath;
    +  private String recoveryPath = DEF_RECOVERY_PATH;
     
       private boolean isRecoveryPathRelativeToAppPath = true;
     
       /**
    -   * largest window for which there is recovery data across all physical operator instances.
    +   * This is not null only for one physical instance.<br/>
    +   * It consists of operator ids which have been deleted but have some state that can
be replayed.
    +   * Only one of the instances would be handling (modifying) the files that belong to
this state. <br/>
    +   * The value is assigned during partitioning.
        */
    -  protected transient long largestRecoveryWindow;
    +  private Set<Integer> deletedOperators;
    +
    +  private boolean repartitioned;
     
       /**
    -   * This is not null only for one physical instance.<br/>
    -   * It consists of operator ids which have been deleted but have some state that can
be replayed.
    -   * Only one of the instances would be handling (modifying) the files that belong to
this state.
    +   * Used when it is not necessary to replay every streaming/app window.
    +   * Used by {@link IncrementalCheckpointManager}
        */
    -  protected Set<Integer> deletedOperators;
    +  private boolean relyOnCheckpoints;
     
       /**
    -   * Sorted mapping from window id to all the operators that have state to replay for
that window.
    +   * largest window for which there is recovery data across all physical operator instances.
        */
    -  protected final transient TreeMultimap<Long, Integer> replayState;
    +  private transient long largestRecoveryWindow = Stateless.WINDOW_ID;
    +
    +  private final FSWindowReplayWAL wal = new FSWindowReplayWAL();
     
    -  protected transient FileSystem fs;
    -  protected transient Path appPath;
    +  //operator id -> wals (sorted)
    +  private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals = new HashMap<>();
    +
    +  private transient String statePath;
    +  private transient int operatorId;
    +
    +  private final transient Kryo kryo = new Kryo();
    +
    +  private transient FileContext fileContext;
     
       public FSWindowDataManager()
       {
    -    replayState = TreeMultimap.create();
    -    largestRecoveryWindow = Stateless.WINDOW_ID;
    -    recoveryPath = DEF_RECOVERY_PATH;
    +    kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
    --- End diff --
    
    for my learning :) why is this needed?


> Integrate WAL to FS WindowDataManager
> -------------------------------------
>
>                 Key: APEXMALHAR-2063
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Chandni Singh
>            Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying tuples every
completed application window after failure. For this it saves meta-data in a file per window.
Having multiple small size files on hdfs cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain a mapping
of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there are few
changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This window may not
be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery so this
poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to actual file
is part of its state which is checkpointed. Since WindowDataManager replays data of a window
not yet checkpointed, it needs to know the actual temporary file the data is being persisted
to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message