apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From timothyfar...@apache.org
Subject [1/2] incubator-apex-malhar git commit: MLHR-1886: moving restoring of a file in its own method and added a test that covers re-writing the file when it isn't closed before failure
Date Tue, 03 Nov 2015 22:31:08 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 8f671f2dd -> d081d0cee


MLHR-1886: moving restoring of a file in its own method and added a test that covers re-writing
the file when it isn't closed before failure


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/8f0c27b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/8f0c27b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/8f0c27b6

Branch: refs/heads/devel-3
Commit: 8f0c27b6df5511c5048774baa78f7a2e25dd200c
Parents: 8f671f2
Author: Chandni Singh <csingh@apache.org>
Authored: Tue Nov 3 13:06:25 2015 -0800
Committer: Chandni Singh <csingh@apache.org>
Committed: Tue Nov 3 13:14:48 2015 -0800

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   | 113 +++++++++++--------
 .../io/fs/AbstractFileOutputOperatorTest.java   |  52 +++++++++
 2 files changed, 116 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/8f0c27b6/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 09294a2..744f024 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -351,50 +351,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends
BaseOperator imp
           }
 
           if (fs.exists(activeFilePath)) {
-            LOG.debug("path exists {}", activeFilePath);
-            long offset = endOffsets.get(seenFileName).longValue();
-            FSDataInputStream inputStream = fs.open(activeFilePath);
-            FileStatus status = fs.getFileStatus(activeFilePath);
-
-            if (status.getLen() != offset) {
-              LOG.info("path corrupted {} {} {}", activeFilePath, offset, status.getLen());
-              byte[] buffer = new byte[COPY_BUFFER_SIZE];
-              String recoveryFileName = seenFileNamePart + '.' + System.currentTimeMillis()
+ TMP_EXTENSION;
-              Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + recoveryFileName);
-              FSDataOutputStream fsOutput = openStream(recoveryFilePath, false);
-
-              while (inputStream.getPos() < offset) {
-                long remainingBytes = offset - inputStream.getPos();
-                int bytesToWrite = remainingBytes < COPY_BUFFER_SIZE ? (int) remainingBytes
: COPY_BUFFER_SIZE;
-                inputStream.read(buffer);
-                fsOutput.write(buffer, 0, bytesToWrite);
-              }
-
-              flush(fsOutput);
-              fsOutput.close();
-              inputStream.close();
-
-              FileContext fileContext = FileContext.getFileContext(fs.getUri());
-              LOG.debug("active {} recovery {} ", activeFilePath, recoveryFilePath);
-
-              if (alwaysWriteToTmp) {
-                //recovery file is used as the new tmp file and we cannot delete the old
tmp file because when the operator
-                //is restored to an earlier check-pointed window, it will look for an older
tmp.
-                fileNameToTmpName.put(seenFileNamePart, recoveryFileName);
-              } else {
-                LOG.debug("recovery path {} actual path {} ", recoveryFilePath, status.getPath());
-                fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE);
-              }
-            } else {
-              if (alwaysWriteToTmp && filesWithOpenStreams.contains(seenFileName))
{
-                String currentTmp = seenFileNamePart + '.' + System.currentTimeMillis() +
TMP_EXTENSION;
-                FSDataOutputStream outputStream = openStream(new Path(filePath + Path.SEPARATOR
+ currentTmp), false);
-                IOUtils.copy(inputStream, outputStream);
-                outputStream.close();
-                fileNameToTmpName.put(seenFileNamePart, currentTmp);
-              }
-              inputStream.close();
-            }
+            recoverFile(seenFileName, seenFileNamePart, activeFilePath);
           }
         }
       }
@@ -448,10 +405,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends
BaseOperator imp
           }
         }
       }
-
       LOG.debug("setup completed");
-      LOG.debug("end-offsets {}", endOffsets);
-
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -463,6 +417,68 @@ public abstract class AbstractFileOutputOperator<INPUT> extends
BaseOperator imp
   }
 
   /**
+   * Recovers a file which exists on the disk. If the length of the file is not same as the
+   * length which the operator remembers then the file is truncated. <br/>
+   * When always writing to a temporary file, then a file is restored even when the length
is same as what the
+   * operator remembers however this is done only for files which had open streams that weren't
closed before
+   * failure.
+   *
+   * @param filename     name of the actual file.
+   * @param partFileName name of the part file. When not rolling this is same as filename;
otherwise this is the
+   *                     latest open part file name.
+   * @param filepath     path of the file. When always writing to temp file, this is the
path of the temp file; otherwise
+   *                     path of the actual file.
+   * @throws IOException
+   */
+  private void recoverFile(String filename, String partFileName, Path filepath) throws IOException
+  {
+    LOG.debug("path exists {}", filepath);
+    long offset = endOffsets.get(filename).longValue();
+    FSDataInputStream inputStream = fs.open(filepath);
+    FileStatus status = fs.getFileStatus(filepath);
+
+    if (status.getLen() != offset) {
+      LOG.info("path corrupted {} {} {}", filepath, offset, status.getLen());
+      byte[] buffer = new byte[COPY_BUFFER_SIZE];
+      String recoveryFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
+      Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + recoveryFileName);
+      FSDataOutputStream fsOutput = openStream(recoveryFilePath, false);
+
+      while (inputStream.getPos() < offset) {
+        long remainingBytes = offset - inputStream.getPos();
+        int bytesToWrite = remainingBytes < COPY_BUFFER_SIZE ? (int)remainingBytes : COPY_BUFFER_SIZE;
+        inputStream.read(buffer);
+        fsOutput.write(buffer, 0, bytesToWrite);
+      }
+
+      flush(fsOutput);
+      fsOutput.close();
+      inputStream.close();
+
+      FileContext fileContext = FileContext.getFileContext(fs.getUri());
+      LOG.debug("active {} recovery {} ", filepath, recoveryFilePath);
+
+      if (alwaysWriteToTmp) {
+        //recovery file is used as the new tmp file and we cannot delete the old tmp file
because when the operator
+        //is restored to an earlier check-pointed window, it will look for an older tmp.
+        fileNameToTmpName.put(partFileName, recoveryFileName);
+      } else {
+        LOG.debug("recovery path {} actual path {} ", recoveryFilePath, status.getPath());
+        fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE);
+      }
+    } else {
+      if (alwaysWriteToTmp && filesWithOpenStreams.contains(filename)) {
+        String currentTmp = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
+        FSDataOutputStream outputStream = openStream(new Path(filePath + Path.SEPARATOR +
currentTmp), false);
+        IOUtils.copy(inputStream, outputStream);
+        streamsCache.put(filename, new FSFilterStreamContext(outputStream));
+        fileNameToTmpName.put(partFileName, currentTmp);
+      }
+      inputStream.close();
+    }
+  }
+
+  /**
    * Creates the {@link CacheLoader} for loading an output stream when it is not present
in the cache.
    * @return cache loader
    */
@@ -656,7 +672,6 @@ public abstract class AbstractFileOutputOperator<INPUT> extends
BaseOperator imp
       fileName = getPartFileNamePri(fileName);
       part.setValue(currentOpenPart.getValue());
     }
-    LOG.debug("request finalize {}", fileName);
     filesPerWindow.add(fileName);
   }
 
@@ -1215,7 +1230,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends
BaseOperator imp
         //a tmp file has tmp extension always preceded by timestamp
         String actualFileName = statusName.substring(0, statusName.lastIndexOf('.', statusName.lastIndexOf('.')
- 1));
         if (fileName.equals(actualFileName)) {
-          LOG.debug("deleting vagrant file {}", statusName);
+          LOG.debug("deleting stray file {}", statusName);
           fs.delete(status.getPath(), true);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/8f0c27b6/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index f7d1731..cbcc8b4 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -1883,6 +1883,58 @@ public class AbstractFileOutputOperatorTest
     checkCompressedFile(oddFile, oddOffsets, 1, 5, 1000, null, null);
   }
 
+  @Test
+  public void testRecoveryOfOpenFiles()
+  {
+    EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
+    writer.setMaxLength(4);
+    File meta = new File(testMeta.getDir());
+    writer.setFilePath(meta.getAbsolutePath());
+    writer.setAlwaysWriteToTmp(true);
+    writer.setup(testMeta.testOperatorContext);
+
+    writer.beginWindow(0);
+    writer.input.put(0);
+    writer.input.put(1);
+    writer.input.put(2);
+    writer.input.put(3);
+    writer.endWindow();
+
+    //failure and restored
+    writer.setup(testMeta.testOperatorContext);
+    writer.input.put(4);
+    writer.input.put(5);
+    writer.endWindow();
+
+    writer.beginWindow(1);
+    writer.input.put(6);
+    writer.input.put(7);
+    writer.input.put(8);
+    writer.input.put(9);
+    writer.input.put(6);
+    writer.input.put(7);
+    writer.endWindow();
+
+    writer.committed(1);
+
+    //Part 0 checks
+    String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
+    String correctContents = "0\n" + "2\n" + "4\n";
+    checkOutput(0, evenFileName, correctContents);
+
+    String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
+    correctContents = "1\n" + "3\n" + "5\n";
+    checkOutput(0, oddFileName, correctContents);
+
+
+    //Part 1 checks
+    correctContents = "6\n" + "8\n" + "6\n";
+    checkOutput(1, evenFileName, correctContents);
+
+    correctContents = "7\n" + "9\n" + "7\n";
+    checkOutput(1, oddFileName, correctContents);
+  }
+
   private void checkCompressedFile(File file, List<Long> offsets, int startVal, int
totalWindows, int totalRecords, SecretKey secretKey, byte[] iv) throws IOException
   {
     FileInputStream fis;


Mime
View raw message