apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pri...@apache.org
Subject [1/3] incubator-apex-malhar git commit: APEXMALHAR-2013 : HDFS output module for file copy 1. Added operators BlockWriter, Synchronizer, FileStitcher, FileMerger, HDFSFileMerger 2. Added junit tests 3. Added sample app for HDFS to HDFS file copy app 4
Date Fri, 22 Apr 2016 08:46:59 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master dad9f56fd -> bdd93488e


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/51176dd7/library/src/test/java/com/datatorrent/lib/io/fs/FileMergerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileMergerTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileMergerTest.java
new file mode 100644
index 0000000..d869074
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileMergerTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.io.fs;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+import com.datatorrent.lib.io.block.BlockWriter;
+import com.datatorrent.lib.io.fs.FileStitcher.BlockNotFoundException;
+import com.datatorrent.lib.io.fs.Synchronizer.OutputFileMetadata;
+import com.datatorrent.lib.io.fs.Synchronizer.StitchBlock;
+import com.datatorrent.lib.io.fs.Synchronizer.StitchBlockMetaData;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link FileMerger}
+ */
+public class FileMergerTest
+{
+  private static OperatorContext context;
+  private static final long[] blockIds = new long[] {1, 2, 3 };
+
+  private static final String FILE_DATA = "0123456789";
+  private static final String[] BLOCKS_DATA = {"0123", "4567", "89" };
+
+  private static final String dummyDir = "dummpDir/anotherDummDir/";
+  private static final String dummyFile = "dummy.txt";
+
+  public static class TestFileMerger extends TestWatcher
+  {
+    public String recoveryDir = "";
+    public String baseDir = "";
+    public String blocksDir = "";
+    public String outputDir = "";
+    public String outputFileName = "";
+
+    public File[] blockFiles = new File[blockIds.length];
+
+    public FileMerger underTest;
+    @Mock
+    public OutputFileMetadata fileMetaDataMock;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      String className = description.getClassName();
+
+      this.baseDir = "target" + Path.SEPARATOR + className + Path.SEPARATOR + description.getMethodName()
+          + Path.SEPARATOR;
+      this.blocksDir = baseDir + Path.SEPARATOR + BlockWriter.DEFAULT_BLOCKS_DIR + Path.SEPARATOR;
+      this.recoveryDir = baseDir + Path.SEPARATOR + "recovery";
+      this.outputDir = baseDir + Path.SEPARATOR + "output" + Path.SEPARATOR;
+      outputFileName = "output.txt";
+
+      Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.DAGContext.APPLICATION_ID, description.getMethodName());
+      attributes.put(DAGContext.APPLICATION_PATH, baseDir);
+      context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+      try {
+        FileContext.getLocalFSFileContext().delete(new Path(new File(baseDir).getAbsolutePath()),
true);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      this.underTest = new FileMerger();
+      this.underTest.setFilePath(outputDir);
+      this.underTest.setup(context);
+
+      MockitoAnnotations.initMocks(this);
+      when(fileMetaDataMock.getFileName()).thenReturn(outputFileName);
+      when(fileMetaDataMock.getRelativePath()).thenReturn(outputFileName);
+      when(fileMetaDataMock.getStitchedFileRelativePath()).thenReturn(outputFileName);
+      when(fileMetaDataMock.getNumberOfBlocks()).thenReturn(3);
+      when(fileMetaDataMock.getBlockIds()).thenReturn(new long[] {1, 2, 3 });
+      when(fileMetaDataMock.isDirectory()).thenReturn(false);
+      when(fileMetaDataMock.getNumberOfBlocks()).thenReturn(blockIds.length);
+
+      List<StitchBlock> outputBlockMetaDataList = Lists.newArrayList();
+      try {
+        for (int i = 0; i < blockIds.length; i++) {
+          blockFiles[i] = new File(blocksDir + blockIds[i]);
+          FileUtils.write(blockFiles[i], BLOCKS_DATA[i]);
+          FileBlockMetadata fmd = new FileBlockMetadata(blockFiles[i].getPath(), blockIds[i],
0,
+              BLOCKS_DATA[i].length(), (i == blockIds.length - 1), -1);
+          StitchBlockMetaData outputFileBlockMetaData = new StitchBlockMetaData(fmd, outputFileName,
+              (i == blockIds.length - 1));
+          outputBlockMetaDataList.add(outputFileBlockMetaData);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      when(fileMetaDataMock.getStitchBlocksList()).thenReturn(outputBlockMetaDataList);
+
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      this.underTest.teardown();
+      try {
+        FileUtils.deleteDirectory(new File("target" + Path.SEPARATOR + description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+  }
+
+  @AfterClass
+  public static void cleanup()
+  {
+    try {
+      FileUtils.deleteDirectory(new File("target" + Path.SEPARATOR + FileMergerTest.class.getName()));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Rule
+  public TestFileMerger testFM = new TestFileMerger();
+
+  @Test
+  public void testMergeFile() throws IOException
+  {
+    testFM.underTest.mergeOutputFile(testFM.fileMetaDataMock);
+    Assert.assertEquals("File size differes", FILE_DATA.length(),
+        FileUtils.sizeOf(new File(testFM.outputDir, testFM.outputFileName)));
+  }
+
+  @Test
+  public void testBlocksPath()
+  {
+    Assert.assertEquals("Blocks path not initialized in application context",
+        context.getValue(DAGContext.APPLICATION_PATH) + Path.SEPARATOR + BlockWriter.DEFAULT_BLOCKS_DIR
+ Path.SEPARATOR,
+        testFM.blocksDir);
+  }
+
+  @Test
+  public void testOverwriteFlag() throws IOException, InterruptedException
+  {
+    FileUtils.write(new File(testFM.outputDir, testFM.outputFileName), "");
+    long modTime = testFM.underTest.outputFS.getFileStatus(new Path(testFM.outputDir, testFM.outputFileName))
+        .getModificationTime();
+    when(testFM.fileMetaDataMock.getNumberOfBlocks()).thenReturn(0);
+    when(testFM.fileMetaDataMock.isDirectory()).thenReturn(false);
+    when(testFM.fileMetaDataMock.getBlockIds()).thenReturn(new long[] {});
+
+    Thread.sleep(1000);
+    testFM.underTest.setOverwriteOnConflict(true);
+    testFM.underTest.processCommittedData(testFM.fileMetaDataMock);
+    FileStatus fileStatus = testFM.underTest.outputFS.getFileStatus(new Path(testFM.outputDir,
testFM.outputFileName));
+    Assert.assertTrue(fileStatus.getModificationTime() > modTime);
+  }
+
+  // Using a bit of reconciler during testing, so using committed call explicitly
+  @Test
+  public void testOverwriteFlagForDirectory() throws IOException, InterruptedException
+  {
+    FileUtils.forceMkdir(new File(testFM.outputDir + dummyDir));
+    when(testFM.fileMetaDataMock.isDirectory()).thenReturn(true);
+    when(testFM.fileMetaDataMock.getStitchedFileRelativePath()).thenReturn(dummyDir);
+    testFM.underTest.setOverwriteOnConflict(true);
+
+    testFM.underTest.beginWindow(1L);
+    testFM.underTest.input.process(testFM.fileMetaDataMock);
+    testFM.underTest.endWindow();
+    testFM.underTest.checkpointed(1);
+    testFM.underTest.committed(1);
+    Thread.sleep(1000);
+
+    File statsFile = new File(testFM.outputDir, dummyDir);
+    Assert.assertTrue(statsFile.exists() && statsFile.isDirectory());
+  }
+
+  @Test(expected = BlockNotFoundException.class)
+  public void testMissingBlock() throws IOException, BlockNotFoundException
+  {
+    FileUtils.deleteQuietly(testFM.blockFiles[2]);
+    testFM.underTest.tempOutFilePath = new Path(testFM.baseDir, testFM.fileMetaDataMock.getStitchedFileRelativePath()
+        + '.' + System.currentTimeMillis() + FileStitcher.PART_FILE_EXTENTION);
+    testFM.underTest.writeTempOutputFile(testFM.fileMetaDataMock);
+    fail("Failed when one block missing.");
+  }
+
+  @Test
+  public void testDirectory() throws IOException
+  {
+    when(testFM.fileMetaDataMock.getFileName()).thenReturn(dummyDir);
+    when(testFM.fileMetaDataMock.getRelativePath()).thenReturn(dummyDir);
+    when(testFM.fileMetaDataMock.getStitchedFileRelativePath()).thenReturn(dummyDir);
+    when(testFM.fileMetaDataMock.isDirectory()).thenReturn(true); // is a directory
+    when(testFM.fileMetaDataMock.getNumberOfBlocks()).thenReturn(0);
+    when(testFM.fileMetaDataMock.getBlockIds()).thenReturn(new long[] {});
+
+    testFM.underTest.mergeOutputFile(testFM.fileMetaDataMock);
+    File statsFile = new File(testFM.outputDir, dummyDir);
+    Assert.assertTrue(statsFile.exists() && statsFile.isDirectory());
+  }
+
+  @Test
+  public void testFileWithRelativePath() throws IOException
+  {
+    FileUtils.write(new File(testFM.outputDir, dummyDir + dummyFile), FILE_DATA);
+    when(testFM.fileMetaDataMock.getFileName()).thenReturn(dummyDir + dummyFile);
+    when(testFM.fileMetaDataMock.getRelativePath()).thenReturn(dummyDir + dummyFile);
+    when(testFM.fileMetaDataMock.getStitchedFileRelativePath()).thenReturn(dummyDir + dummyFile);
+
+    testFM.underTest.mergeOutputFile(testFM.fileMetaDataMock);
+    File statsFile = new File(testFM.outputDir, dummyDir + dummyFile);
+    Assert.assertTrue(statsFile.exists() && !statsFile.isDirectory());
+    Assert.assertEquals("File size differes", FILE_DATA.length(),
+        FileUtils.sizeOf(new File(testFM.outputDir, dummyDir + dummyFile)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/51176dd7/library/src/test/java/com/datatorrent/lib/io/fs/FileStitcherTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileStitcherTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileStitcherTest.java
new file mode 100644
index 0000000..4833db2
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileStitcherTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.io.fs;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+import com.datatorrent.lib.io.block.BlockWriter;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
+import com.datatorrent.lib.io.fs.Synchronizer.OutputFileMetadata;
+import com.datatorrent.lib.io.fs.Synchronizer.StitchBlock;
+import com.datatorrent.lib.io.fs.Synchronizer.StitchBlockMetaData;
+
+/**
+ * Unit tests for {@link FileStitcher}
+ */
+public class FileStitcherTest
+{
+  public static final String[] FILE_CONTENTS = {"abcdefghi", "pqr", "hello world", "ABCDEFGHIJKLMNOPQRSTUVWXYZ",
+      "0123456789" };
+
+  public static final int BLOCK_SIZE = 5;
+
+  private class TestMeta extends TestWatcher
+  {
+    String outputPath;
+    List<FileMetadata> fileMetadataList = Lists.newArrayList();
+
+    FileStitcher<OutputFileMetadata> oper;
+    File blocksDir;
+    Context.OperatorContext context;
+
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      outputPath = new File("target/" + description.getClassName() + "/" + description.getMethodName()).getPath();
+
+      oper = new FileStitcher<OutputFileMetadata>();
+      oper.setFilePath(outputPath);
+      String appDirectory = outputPath;
+
+      Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.DAGContext.APPLICATION_ID, description.getClassName());
+      attributes.put(DAG.DAGContext.APPLICATION_PATH, appDirectory);
+      context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+      oper.setup(context);
+
+      try {
+        File outDir = new File(outputPath);
+        FileUtils.forceMkdir(outDir);
+
+        blocksDir = new File(context.getValue(Context.DAGContext.APPLICATION_PATH), BlockWriter.DEFAULT_BLOCKS_DIR);
+        blocksDir.mkdirs();
+
+        long blockID = 1000;
+
+        for (int i = 0; i < FILE_CONTENTS.length; i++) {
+          List<Long> blockIDs = Lists.newArrayList();
+
+          File file = new File(outputPath, i + ".txt");
+
+          FileUtils.write(file, FILE_CONTENTS[i]);
+
+          int offset = 0;
+          for (; offset < FILE_CONTENTS[i].length(); offset += BLOCK_SIZE, blockID++)
{
+            String blockContents;
+            if (offset + BLOCK_SIZE < FILE_CONTENTS[i].length()) {
+              blockContents = FILE_CONTENTS[i].substring(offset, offset + BLOCK_SIZE);
+            } else {
+              blockContents = FILE_CONTENTS[i].substring(offset);
+            }
+            FileUtils.write(new File(blocksDir, blockID + ""), blockContents);
+            blockIDs.add(blockID);
+          }
+
+          FileMetadata fileMetadata = new FileMetadata(file.getPath());
+          fileMetadata.setBlockIds(ArrayUtils.toPrimitive(blockIDs.toArray(new Long[0])));
+          fileMetadataList.add(fileMetadata);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    /* (non-Javadoc)
+     * @see org.junit.rules.TestWatcher#finished(org.junit.runner.Description)
+     */
+    @Override
+    protected void finished(Description description)
+    {
+      super.finished(description);
+
+      try {
+        FileUtils.deleteDirectory(new File(outputPath));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testFileMerger() throws IOException, InterruptedException
+  {
+    long[][][] partitionMeta = {
+        {{1000, 0, 5 }, {1001, 0, 4 } }, {{1002, 0, 3 } }, //testing multiple blocks from
same block file
+        {{1003, 0, 5 }, {1004, 0, 5 }, {1005, 0, 1 } },
+        {{1006, 0, 5 }, {1007, 0, 5 }, {1008, 0, 5 }, {1009, 0, 5 }, {1010, 0, 5 }, {1011,
0, 1 } },
+        {{1012, 0, 5 }, {1013, 0, 5 } } };
+
+    testMeta.oper.beginWindow(0);
+    long fileID = 0;
+    for (int tupleIndex = 0; tupleIndex < partitionMeta.length; tupleIndex++) {
+      OutputFileMetadata ingestionFileMetaData = new OutputFileMetadata();
+      String fileName = fileID++ + ".txt";
+      ingestionFileMetaData.setRelativePath(fileName);
+      List<StitchBlock> outputBlocks = Lists.newArrayList();
+      for (long[] block : partitionMeta[tupleIndex]) {
+        String blockFilePath = new Path(testMeta.outputPath, Long.toString(block[0])).toString();
+        FileBlockMetadata fileBlockMetadata = new FileBlockMetadata(blockFilePath, block[0],
block[1], block[2], false,
+            -1);
+        StitchBlockMetaData outputFileBlockMetaData = new StitchBlockMetaData(fileBlockMetadata,
fileName, false);
+        outputBlocks.add(outputFileBlockMetaData);
+      }
+      ingestionFileMetaData.setOutputBlockMetaDataList(outputBlocks);
+      testMeta.oper.input.process(ingestionFileMetaData);
+    }
+    testMeta.oper.endWindow();
+    testMeta.oper.committed(0);
+    //give some time to complete postCommit operations
+    Thread.sleep(2 * 1000);
+
+    for (int fileId = 0; fileId < partitionMeta.length; fileId++) {
+      String fromFile = FileUtils.readFileToString(new File(testMeta.oper.getFilePath(),
fileId + ".txt"));
+      Assert.assertEquals("File " + fileId + "not matching", FILE_CONTENTS[fileId], fromFile);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/51176dd7/library/src/test/java/com/datatorrent/lib/io/fs/SynchronizerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/SynchronizerTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/SynchronizerTest.java
new file mode 100644
index 0000000..b017f20
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/SynchronizerTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.io.fs;
+
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
+import com.datatorrent.lib.io.fs.Synchronizer.OutputFileMetadata;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Unit tests for {@link Synchronizer}
+ */
+public class SynchronizerTest
+{
+  public static final String[] FILE_NAMES = {"a.txt", "b.txt", "c.txt", "d.txt", "e.txt"
};
+
+  public static final long[][] BLOCK_IDS = {
+      //Block ids for file1 (a.txt) 
+      {1001, 1002, 1003 },
+      //Block ids for file2 (b.txt)
+      {1004, 1005, 1006, 1007 },
+      //c.txt
+      {1008, 1009, 1010 },
+      //d.txt
+      {1011, 1012 },
+      //e.txt
+      {1013, 1014 } };
+
+  List<FileMetadata> fileMetadataList;
+
+  List<FileBlockMetadata> blockMetadataList;
+
+  Synchronizer underTest;
+
+  public SynchronizerTest()
+  {
+
+    underTest = new Synchronizer();
+    fileMetadataList = Lists.newArrayList();
+    blockMetadataList = Lists.newArrayList();
+
+    for (int i = 0; i < FILE_NAMES.length; i++) {
+      FileMetadata fileMetadata = new FileMetadata(FILE_NAMES[i]);
+      fileMetadata.setFileName(FILE_NAMES[i]);
+      fileMetadata.setBlockIds(BLOCK_IDS[i]);
+      fileMetadata.setNumberOfBlocks(BLOCK_IDS[i].length);
+
+      for (int blockIndex = 0; blockIndex < BLOCK_IDS[i].length; blockIndex++) {
+        FileBlockMetadata fileBlockMetadata = new FileBlockMetadata(FILE_NAMES[i]);
+        fileBlockMetadata.setBlockId(BLOCK_IDS[i][blockIndex]);
+        blockMetadataList.add(fileBlockMetadata);
+      }
+
+      fileMetadataList.add(fileMetadata);
+    }
+  }
+
+  @Test
+  public void testSynchronizer()
+  {
+
+    CollectorTestSink<OutputFileMetadata> sink = new CollectorTestSink<OutputFileMetadata>();
+    underTest.trigger.setSink((CollectorTestSink)sink);
+
+    underTest.filesMetadataInput.process(fileMetadataList.get(0));
+    Assert.assertEquals(0, sink.collectedTuples.size());
+    underTest.blocksMetadataInput.process(blockMetadataList.get(0));
+    underTest.blocksMetadataInput.process(blockMetadataList.get(1));
+    Assert.assertEquals(0, sink.collectedTuples.size());
+    underTest.blocksMetadataInput.process(blockMetadataList.get(2));
+    Assert.assertEquals(1, sink.collectedTuples.size());
+    Assert.assertEquals("a.txt", sink.collectedTuples.get(0).getFileName());
+
+    underTest.blocksMetadataInput.process(blockMetadataList.get(3));
+    underTest.blocksMetadataInput.process(blockMetadataList.get(4));
+    Assert.assertEquals(1, sink.collectedTuples.size());
+
+    underTest.filesMetadataInput.process(fileMetadataList.get(1));
+    Assert.assertEquals(1, sink.collectedTuples.size());
+
+    underTest.blocksMetadataInput.process(blockMetadataList.get(5));
+    Assert.assertEquals(1, sink.collectedTuples.size());
+    underTest.blocksMetadataInput.process(blockMetadataList.get(6));
+    Assert.assertEquals(2, sink.collectedTuples.size());
+    Assert.assertEquals("b.txt", sink.collectedTuples.get(1).getFileName());
+
+    underTest.blocksMetadataInput.process(blockMetadataList.get(7));
+    underTest.blocksMetadataInput.process(blockMetadataList.get(8));
+    Assert.assertEquals(2, sink.collectedTuples.size());
+    underTest.blocksMetadataInput.process(blockMetadataList.get(9));
+    Assert.assertEquals(2, sink.collectedTuples.size());
+
+    underTest.filesMetadataInput.process(fileMetadataList.get(2));
+    Assert.assertEquals(3, sink.collectedTuples.size());
+    Assert.assertEquals("c.txt", sink.collectedTuples.get(2).getFileName());
+
+    underTest.filesMetadataInput.process(fileMetadataList.get(3));
+    underTest.filesMetadataInput.process(fileMetadataList.get(4));
+    Assert.assertEquals(3, sink.collectedTuples.size());
+
+    underTest.blocksMetadataInput.process(blockMetadataList.get(10));
+    Assert.assertEquals(3, sink.collectedTuples.size());
+
+    underTest.blocksMetadataInput.process(blockMetadataList.get(11));
+
+    Assert.assertEquals(4, sink.collectedTuples.size());
+    Assert.assertEquals("d.txt", sink.collectedTuples.get(3).getFileName());
+
+    underTest.blocksMetadataInput.process(blockMetadataList.get(12));
+    Assert.assertEquals(4, sink.collectedTuples.size());
+
+    underTest.blocksMetadataInput.process(blockMetadataList.get(13));
+    Assert.assertEquals(5, sink.collectedTuples.size());
+    Assert.assertEquals("e.txt", sink.collectedTuples.get(4).getFileName());
+
+  }
+
+}


Mime
View raw message