apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [03/13] apex-malhar git commit: Changed package path for files to be included under malhar. Modifications to build files for project to build under malhar.
Date Tue, 23 May 2017 01:24:01 GMT
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java
deleted file mode 100644
index 97e9aa8..0000000
--- a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.primitives.Ints;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * <p>HDFSStoragePerformanceTest class.</p>
- *
- * @since 1.0.1
- */
-public class HDFSStoragePerformanceTest
-{
-
-  public static void main(String[] args)
-  {
-    HDFSStorage storage = new HDFSStorage();
-    storage.setBaseDir(args[0]);
-    storage.setId(args[1]);
-    storage.setRestore(true);
-    storage.setup(null);
-    int count = 100000000;
-
-    logger.debug(" start time {}", System.currentTimeMillis());
-    int index = 10000;
-    byte[] b = Ints.toByteArray(index);
-    for (int i = 0; i < count; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      index++;
-      b = Ints.toByteArray(index);
-    }
-    storage.flush();
-    for (int i = 0; i < count; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      index++;
-      b = Ints.toByteArray(index);
-    }
-    storage.flush();
-    for (int i = 0; i < count; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      index++;
-      b = Ints.toByteArray(index);
-    }
-    storage.flush();
-    for (int i = 0; i < count; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      index++;
-      b = Ints.toByteArray(index);
-    }
-    storage.flush();
-    for (int i = 0; i < count; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      index++;
-      b = Ints.toByteArray(index);
-    }
-    storage.flush();
-    logger.debug(" end time {}", System.currentTimeMillis());
-    logger.debug(" start time for retrieve {}", System.currentTimeMillis());
-    b = storage.retrieve(new byte[8]);
-    int org_index = index;
-    index = 10000;
-    match(b, index);
-    while (true) {
-      index++;
-      b = storage.retrieveNext();
-      if (b == null) {
-        logger.debug(" end time for retrieve {}/{}/{}", System.currentTimeMillis(), index, org_index);
-        return;
-      } else {
-        if (!match(b, index)) {
-          throw new RuntimeException("failed : " + index);
-        }
-      }
-    }
-
-  }
-
-  public static boolean match(byte[] data, int match)
-  {
-    byte[] tempData = new byte[data.length - 8];
-    System.arraycopy(data, 8, tempData, 0, tempData.length);
-    int dataR = Ints.fromByteArray(tempData);
-    //logger.debug("input: {}, output: {}",match,dataR);
-    if (match == dataR) {
-      return true;
-    }
-    return false;
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(HDFSStoragePerformanceTest.class);
-}
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java
deleted file mode 100644
index 0cb9935..0000000
--- a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java
+++ /dev/null
@@ -1,695 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.storage;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flume.Context;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- *
- */
-public class HDFSStorageTest
-{
-  public static class TestMeta extends TestWatcher
-  {
-    public String baseDir;
-    public String testFile;
-    private String testData = "No and yes. There is also IdleTimeHandler that allows the operator to emit tuples. " +
-        "There is overlap, why not have a single interface. \n" +
-        "Also consider the possibility of an operator that does other processing and not consume nor emit tuples,";
-
-    @Override
-    protected void starting(org.junit.runner.Description description)
-    {
-      String className = description.getClassName();
-      baseDir = "target/" + className;
-      try {
-        baseDir = (new File(baseDir)).getAbsolutePath();
-        FileUtils.forceMkdir(new File(baseDir));
-        testFile = baseDir + "/testInput.txt";
-        FileOutputStream outputStream = FileUtils.openOutputStream(new File(testFile));
-        outputStream.write(testData.getBytes());
-        outputStream.close();
-
-      } catch (IOException ex) {
-        throw new RuntimeException(ex);
-      }
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      try {
-        FileUtils.deleteDirectory(new File(baseDir));
-      } catch (IOException ex) {
-        throw new RuntimeException(ex);
-      }
-    }
-  }
-
-  @Rule
-  public TestMeta testMeta = new TestMeta();
-
-  private String STORAGE_DIRECTORY;
-
-  private HDFSStorage getStorage(String id, boolean restore)
-  {
-    Context ctx = new Context();
-    STORAGE_DIRECTORY = testMeta.baseDir;
-    ctx.put(HDFSStorage.BASE_DIR_KEY, testMeta.baseDir);
-    ctx.put(HDFSStorage.RESTORE_KEY, Boolean.toString(restore));
-    ctx.put(HDFSStorage.ID, id);
-    ctx.put(HDFSStorage.BLOCKSIZE, "256");
-    HDFSStorage lstorage = new HDFSStorage();
-    lstorage.configure(ctx);
-    lstorage.setup(null);
-    return lstorage;
-  }
-
-  private HDFSStorage storage;
-
-  @Before
-  public void setup()
-  {
-    storage = getStorage("1", false);
-  }
-
-  @After
-  public void teardown()
-  {
-    storage.teardown();
-    try {
-      Thread.sleep(100);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-    storage.cleanHelperFiles();
-  }
-
-  /**
-   * This test covers following use case 1. Some data is stored 2. File is flush but the file is not close 3. Some more
-   * data is stored but the file doesn't roll-overs 4. Retrieve is called for the last returned address and it return
-   * nulls 5. Some more data is stored again but the address is returned null because of previous retrieve call
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testPartialFlush() throws Exception
-  {
-    Assert.assertNull(storage.retrieve(new byte[8]));
-    byte[] b = "ab".getBytes();
-    byte[] address = storage.store(new Slice(b, 0, b.length));
-    Assert.assertNotNull(address);
-    storage.flush();
-    b = "cb".getBytes();
-    byte[] addr = storage.store(new Slice(b, 0, b.length));
-    match(storage.retrieve(new byte[8]), "ab");
-    Assert.assertNull(storage.retrieve(addr));
-    Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
-    storage.flush();
-    match(storage.retrieve(address), "cb");
-    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
-  }
-
-  /**
-   * This test covers following use case 1. Some data is stored to make sure that there is no roll over 2. File is
-   * flushed but the file is not closed 3. Some more data is stored. The data stored is enough to make the file roll
-   * over 4. Retrieve is called for the last returned address and it return nulls as the data is not flushed 5. Some
-   * more data is stored again but the address is returned null because of previous retrieve call 6. The data is flushed
-   * to make sure that the data is committed. 7. Now the data is retrieved from the starting and data returned matches
-   * the data stored
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testPartialFlushRollOver() throws Exception
-  {
-    Assert.assertNull(storage.retrieve(new byte[8]));
-    byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52,
-        51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49,
-        45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54,
-        49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53,
-        52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1,
-        48, 46, 48, 1, 48, 46, 48};
-    byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53,
-        52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49,
-        49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49,
-        54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49,
-        53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48,
-        1, 48, 46, 48, 1, 48, 46, 48};
-    byte[] address = storage.store(new Slice(b, 0, b.length));
-    Assert.assertNotNull(address);
-    storage.flush();
-    byte[] addr = null;
-    for (int i = 0; i < 5; i++) {
-      b[0] = (byte)(b[0] + 1);
-      addr = storage.store(new Slice(b, 0, b.length));
-    }
-    Assert.assertNull(storage.retrieve(addr));
-    for (int i = 0; i < 5; i++) {
-      b[0] = (byte)(b[0] + 1);
-      Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
-    }
-    storage.flush();
-    match(storage.retrieve(new byte[8]), new String(b_org));
-    b_org[0] = (byte)(b_org[0] + 1);
-    match(storage.retrieve(address), new String(b_org));
-    b_org[0] = (byte)(b_org[0] + 1);
-    match(storage.retrieveNext(), new String(b_org));
-    b_org[0] = (byte)(b_org[0] + 1);
-    match(storage.retrieveNext(), new String(b_org));
-
-  }
-
-  /**
-   * This test covers following use case 1. Some data is stored to make sure that there is no roll over 2. File is
-   * flushed but the file is not closed 3. Some more data is stored. The data stored is enough to make the file roll
-   * over 4. The storage crashes and new storage is instiated. 5. Retrieve is called for the last returned address and
-   * it return nulls as the data is not flushed 6. Some more data is stored again but the address is returned null
-   * because of previous retrieve call 7. The data is flushed to make sure that the data is committed. 8. Now the data
-   * is retrieved from the starting and data returned matches the data stored
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testPartialFlushRollOverWithFailure() throws Exception
-  {
-    Assert.assertNull(storage.retrieve(new byte[8]));
-    byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52,
-        51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49,
-        45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54,
-        49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53,
-        52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1,
-        48, 46, 48, 1, 48, 46, 48};
-    byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53,
-        52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49,
-        49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49,
-        54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49,
-        53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48,
-        1, 48, 46, 48, 1, 48, 46, 48};
-    byte[] address = storage.store(new Slice(b, 0, b.length));
-    Assert.assertNotNull(address);
-    storage.flush();
-    byte[] addr = null;
-    for (int i = 0; i < 5; i++) {
-      b[0] = (byte)(b[0] + 1);
-      addr = storage.store(new Slice(b, 0, b.length));
-    }
-    storage = getStorage("1", true);
-    Assert.assertNull(storage.retrieve(addr));
-    for (int i = 0; i < 5; i++) {
-      b[0] = (byte)(b[0] + 1);
-      Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
-    }
-    storage.flush();
-    match(storage.retrieve(new byte[8]), new String(b_org));
-    b_org[0] = (byte)(b_org[0] + 1);
-    match(storage.retrieve(address), new String(b_org));
-    b_org[0] = (byte)(b_org[0] + 1);
-    match(storage.retrieveNext(), new String(b_org));
-    b_org[0] = (byte)(b_org[0] + 1);
-    match(storage.retrieveNext(), new String(b_org));
-
-  }
-
-  /**
-   * This tests clean when the file doesn't roll over
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testPartialFlushWithClean() throws Exception
-  {
-    Assert.assertNull(storage.retrieve(new byte[8]));
-    byte[] b = "ab".getBytes();
-    byte[] address = storage.store(new Slice(b, 0, b.length));
-    Assert.assertNotNull(address);
-    storage.flush();
-    storage.clean(address);
-    b = "cb".getBytes();
-    byte[] addr = storage.store(new Slice(b, 0, b.length));
-    Assert.assertNull(storage.retrieve(addr));
-    Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
-    storage.flush();
-    match(storage.retrieve(new byte[8]), "cb");
-    match(storage.retrieve(address), "cb");
-    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
-  }
-
-  /**
-   * This tests clean when the file doesn't roll over
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testPartialFlushWithCleanAndFailure() throws Exception
-  {
-    Assert.assertNull(storage.retrieve(new byte[8]));
-    byte[] b = "ab".getBytes();
-    byte[] address = storage.store(new Slice(b, 0, b.length));
-    Assert.assertNotNull(address);
-    storage.flush();
-    storage.clean(address);
-    b = "cb".getBytes();
-    byte[] addr = storage.store(new Slice(b, 0, b.length));
-    storage = getStorage("1", true);
-    Assert.assertNull(storage.retrieve(addr));
-    Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
-    storage.flush();
-    match(storage.retrieve(new byte[8]), "cb");
-    match(storage.retrieve(address), "cb");
-    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
-  }
-
-  /**
-   * This test covers following use case 1. Some data is stored to make sure that there is no roll over 2. File is
-   * flushed but the file is not closed 3. The data is cleaned till the last returned address 4. Some more data is
-   * stored. The data stored is enough to make the file roll over 5. Retrieve is called for the last returned address
-   * and it return nulls as the data is not flushed 6. Some more data is stored again but the address is returned null
-   * because of previous retrieve call 7. The data is flushed to make sure that the data is committed. 8. Now the data
-   * is retrieved from the starting and data returned matches the data stored
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testPartialFlushWithCleanAndRollOver() throws Exception
-  {
-    Assert.assertNull(storage.retrieve(new byte[8]));
-    byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52,
-        51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49,
-        45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54,
-        49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53,
-        52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1,
-        48, 46, 48, 1, 48, 46, 48};
-    byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53,
-        52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49,
-        49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49,
-        54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49,
-        53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48,
-        1, 48, 46, 48, 1, 48, 46, 48};
-    byte[] address = storage.store(new Slice(b, 0, b.length));
-    Assert.assertNotNull(address);
-    storage.flush();
-    storage.clean(address);
-
-    byte[] addr = null;
-    for (int i = 0; i < 5; i++) {
-      b[0] = (byte)(b[0] + 1);
-      addr = storage.store(new Slice(b, 0, b.length));
-    }
-    Assert.assertNull(storage.retrieve(addr));
-    for (int i = 0; i < 5; i++) {
-      b[0] = (byte)(b[0] + 1);
-      Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
-    }
-    storage.flush();
-    b_org[0] = (byte)(b_org[0] + 1);
-    match(storage.retrieve(new byte[8]), new String(b_org));
-    match(storage.retrieve(address), new String(b_org));
-    b_org[0] = (byte)(b_org[0] + 1);
-    match(storage.retrieveNext(), new String(b_org));
-    b_org[0] = (byte)(b_org[0] + 1);
-    match(storage.retrieveNext(), new String(b_org));
-
-  }
-
-  /**
-   * This tests the clean when the files are roll-over and the storage fails
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testPartialFlushWithCleanAndRollOverAndFailure() throws Exception
-  {
-    Assert.assertNull(storage.retrieve(new byte[8]));
-    byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52,
-        51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49,
-        45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54,
-        49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53,
-        52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1,
-        48, 46, 48, 1, 48, 46, 48};
-    byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53,
-        52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49,
-        49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49,
-        54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49,
-        53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48,
-        1, 48, 46, 48, 1, 48, 46, 48};
-    byte[] address = storage.store(new Slice(b, 0, b.length));
-    Assert.assertNotNull(address);
-    storage.flush();
-    storage.clean(address);
-    byte[] addr = null;
-    for (int i = 0; i < 5; i++) {
-      b[0] = (byte)(b[0] + 1);
-      addr = storage.store(new Slice(b, 0, b.length));
-    }
-    storage = getStorage("1", true);
-    Assert.assertNull(storage.retrieve(addr));
-    for (int i = 0; i < 5; i++) {
-      b[0] = (byte)(b[0] + 1);
-      Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
-    }
-    storage.flush();
-    b_org[0] = (byte)(b_org[0] + 1);
-    match(storage.retrieve(address), new String(b_org));
-    b_org[0] = (byte)(b_org[0] + 1);
-    match(storage.retrieveNext(), new String(b_org));
-    b_org[0] = (byte)(b_org[0] + 1);
-    match(storage.retrieveNext(), new String(b_org));
-
-  }
-
-  /**
-   * This test covers following use case The file is flushed and then more data is written to the same file, but the new
-   * data is not flushed and file is not roll over and storage fails The new storage comes up and client asks for data
-   * at the last returned address from earlier storage instance. The new storage returns null. Client stores the data
-   * again but the address returned this time is null and the retrieval of the earlier address now returns data
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testPartialFlushWithFailure() throws Exception
-  {
-    Assert.assertNull(storage.retrieve(new byte[8]));
-    byte[] b = "ab".getBytes();
-    byte[] address = storage.store(new Slice(b, 0, b.length));
-    Assert.assertNotNull(address);
-    storage.flush();
-    b = "cb".getBytes();
-    byte[] addr = storage.store(new Slice(b, 0, b.length));
-    storage = getStorage("1", true);
-    Assert.assertNull(storage.retrieve(addr));
-    Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
-    storage.flush();
-    match(storage.retrieve(address), "cb");
-  }
-
-  private void match(byte[] data, String match)
-  {
-    byte[] tempData = new byte[data.length - 8];
-    System.arraycopy(data, 8, tempData, 0, tempData.length);
-    Assert.assertEquals("matched the stored value with retrieved value", match, new String(tempData));
-  }
-
-  @Test
-  public void testStorage() throws IOException
-  {
-    Assert.assertNull(storage.retrieve(new byte[8]));
-    byte[] b = new byte[200];
-    byte[] identifier;
-    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
-    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
-    Assert.assertNull(storage.retrieve(new byte[8]));
-    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
-    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
-    storage.flush();
-    byte[] data = storage.retrieve(new byte[8]);
-    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
-    identifier = storage.store(new Slice(b, 0, b.length));
-    byte[] tempData = new byte[data.length - 8];
-    System.arraycopy(data, 8, tempData, 0, tempData.length);
-    Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData));
-    Assert.assertNull(storage.retrieve(identifier));
-  }
-
-  @Test
-  public void testStorageWithRestore() throws IOException
-  {
-    Assert.assertNull(storage.retrieve(new byte[8]));
-    byte[] b = new byte[200];
-    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
-    storage.flush();
-    storage.teardown();
-
-    storage = getStorage("1", true);
-    storage.store(new Slice(b, 0, b.length));
-    storage.flush();
-    Configuration conf = new Configuration();
-    FileSystem fs = FileSystem.get(conf);
-    boolean exists = fs.exists(new Path(STORAGE_DIRECTORY + "/1/" + "1"));
-    Assert.assertEquals("file should exist", true, exists);
-  }
-
-  @Test
-  public void testCleanup() throws IOException
-  {
-    RandomAccessFile r = new RandomAccessFile(testMeta.testFile, "r");
-    r.seek(0);
-    byte[] b = r.readLine().getBytes();
-    storage.store(new Slice(b, 0, b.length));
-    byte[] val = storage.store(new Slice(b, 0, b.length));
-    storage.flush();
-    storage.clean(val);
-    Configuration conf = new Configuration();
-    FileSystem fs = FileSystem.get(conf);
-    boolean exists = fs.exists(new Path(STORAGE_DIRECTORY + "/" + "0"));
-    Assert.assertEquals("file should not exist", false, exists);
-    r.close();
-  }
-
-  @Test
-  public void testNext() throws IOException
-  {
-    RandomAccessFile r = new RandomAccessFile(testMeta.testFile, "r");
-    r.seek(0);
-    Assert.assertNull(storage.retrieve(new byte[8]));
-    byte[] b = r.readLine().getBytes();
-    storage.store(new Slice(b, 0, b.length));
-    byte[] b1 = r.readLine().getBytes();
-    storage.store(new Slice(b1, 0, b1.length));
-    storage.store(new Slice(b, 0, b.length));
-    storage.flush();
-    storage.store(new Slice(b1, 0, b1.length));
-    storage.store(new Slice(b, 0, b.length));
-    storage.flush();
-    byte[] data = storage.retrieve(new byte[8]);
-    byte[] tempData = new byte[data.length - 8];
-    System.arraycopy(data, 8, tempData, 0, tempData.length);
-    Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData));
-    data = storage.retrieveNext();
-    tempData = new byte[data.length - 8];
-    System.arraycopy(data, 8, tempData, 0, tempData.length);
-    Assert.assertEquals("matched the stored value with retrieved value", new String(b1), new String(tempData));
-    data = storage.retrieveNext();
-    tempData = new byte[data.length - 8];
-    System.arraycopy(data, 8, tempData, 0, tempData.length);
-    Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData));
-    r.close();
-  }
-
-  @Test
-  public void testFailure() throws IOException
-  {
-    byte[] address;
-    byte[] b = new byte[200];
-    storage.retrieve(new byte[8]);
-    for (int i = 0; i < 5; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      address = storage.store(new Slice(b, 0, b.length));
-      storage.flush();
-      storage.clean(address);
-    }
-    storage.teardown();
-
-    byte[] identifier = new byte[8];
-    storage = getStorage("1", true);
-
-    storage.retrieve(identifier);
-
-    storage.store(new Slice(b, 0, b.length));
-    storage.store(new Slice(b, 0, b.length));
-    storage.store(new Slice(b, 0, b.length));
-    storage.flush();
-    byte[] data = storage.retrieve(identifier);
-    byte[] tempData = new byte[data.length - 8];
-    System.arraycopy(data, 8, tempData, 0, tempData.length);
-    Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData));
-  }
-
-  /**
-   * This test case tests the clean call before any flush is called.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testCleanUnflushedData() throws IOException
-  {
-    for (int i = 0; i < 5; i++) {
-      final byte[] bytes = (i + "").getBytes();
-      storage.store(new Slice(bytes, 0, bytes.length));
-    }
-    storage.clean(new byte[8]);
-    storage.flush();
-    match(storage.retrieve(new byte[8]), "0");
-    match(storage.retrieveNext(), "1");
-  }
-
-  @Test
-  public void testCleanForUnflushedData() throws IOException
-  {
-    byte[] address = null;
-    byte[] b = new byte[200];
-    storage.retrieve(new byte[8]);
-    for (int i = 0; i < 5; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      address = storage.store(new Slice(b, 0, b.length));
-      storage.flush();
-      // storage.clean(address);
-    }
-    byte[] lastWrittenAddress = null;
-    for (int i = 0; i < 5; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      lastWrittenAddress = storage.store(new Slice(b, 0, b.length));
-    }
-    storage.clean(lastWrittenAddress);
-    byte[] cleanedOffset = storage.readData(new Path(STORAGE_DIRECTORY + "/1/cleanoffsetFile"));
-    Assert.assertArrayEquals(address, cleanedOffset);
-
-  }
-
-  @Test
-  public void testCleanForFlushedData() throws IOException
-  {
-    byte[] b = new byte[200];
-    storage.retrieve(new byte[8]);
-    for (int i = 0; i < 5; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      storage.store(new Slice(b, 0, b.length));
-      storage.flush();
-      // storage.clean(address);
-    }
-    byte[] lastWrittenAddress = null;
-    for (int i = 0; i < 5; i++) {
-      storage.store(new Slice(b, 0, b.length));
-      lastWrittenAddress = storage.store(new Slice(b, 0, b.length));
-    }
-    storage.flush();
-    storage.clean(lastWrittenAddress);
-    byte[] cleanedOffset = storage.readData(new Path(STORAGE_DIRECTORY + "/1/cleanoffsetFile"));
-    Assert.assertArrayEquals(lastWrittenAddress, cleanedOffset);
-
-  }
-
-  @Test
-  public void testCleanForPartialFlushedData() throws IOException
-  {
-    byte[] b = new byte[8];
-    storage.retrieve(new byte[8]);
-
-    storage.store(new Slice(b, 0, b.length));
-    byte[] bytes = "1a".getBytes();
-    byte[] address = storage.store(new Slice(bytes, 0, bytes.length));
-    storage.flush();
-    storage.clean(address);
-
-    byte[] lastWrittenAddress = null;
-    for (int i = 0; i < 5; i++) {
-      final byte[] bytes1 = (i + "").getBytes();
-      storage.store(new Slice(bytes1, 0, bytes1.length));
-      lastWrittenAddress = storage.store(new Slice(b, 0, b.length));
-    }
-    Assert.assertNull(storage.retrieve(new byte[8]));
-    Assert.assertNull(storage.retrieve(lastWrittenAddress));
-    storage.store(new Slice(b, 0, b.length));
-    storage.flush();
-    Assert.assertNull(storage.retrieve(lastWrittenAddress));
-  }
-
-  @Test
-  public void testRandomSequence() throws IOException
-  {
-    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
-    byte[] bytes = new byte[]{48, 48, 48, 51, 101, 100, 55, 56, 55, 49, 53, 99, 52, 101, 55, 50, 97, 52, 48, 49, 51,
-        99, 97, 54, 102, 57, 55, 53, 57, 100, 49, 99, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51,
-        45, 49, 49, 45, 48, 55, 32, 48, 48, 58, 48, 48, 58, 52, 54, 1, 52, 50, 49, 50, 51, 1, 50, 1, 49, 53, 49, 49,
-        52, 50, 54, 53, 1, 49, 53, 49, 49, 57, 51, 53, 49, 1, 49, 53, 49, 50, 57, 56, 50, 52, 1, 49, 53, 49, 50, 49,
-        55, 48, 55, 1, 49, 48, 48, 55, 55, 51, 57, 51, 1, 49, 57, 49, 52, 55, 50, 53, 52, 54, 49, 1, 49, 1, 48, 1, 48,
-        46, 48, 1, 48, 46, 48, 1, 48, 46, 48};
-    storage.store(new Slice(bytes, 0, bytes.length));
-    storage.flush();
-    storage.clean(new byte[]{-109, 0, 0, 0, 0, 0, 0, 0});
-    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
-    for (int i = 0; i < 2555; i++) {
-      byte[] bytes1 = new byte[]{48, 48, 48, 55, 56, 51, 98, 101, 50, 54, 50, 98, 52, 102, 50, 54, 56, 97, 55, 56, 102,
-          48, 54, 54, 50, 49, 49, 54, 99, 98, 101, 99, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51,
-          45, 49, 49, 45, 48, 55, 32, 48, 48, 58, 48, 48, 58, 53, 49, 1, 49, 49, 49, 49, 54, 51, 57, 1, 50, 1, 49, 53,
-          49, 48, 57, 57, 56, 51, 1, 49, 53, 49, 49, 49, 55, 48, 52, 1, 49, 53, 49, 50, 49, 51, 55, 49, 1, 49, 53, 49,
-          49, 52, 56, 51, 49, 1, 49, 48, 48, 55, 49, 57, 56, 49, 1, 49, 50, 48, 50, 55, 54, 49, 54, 56, 53, 1, 49, 1,
-          48, 1, 48, 46, 48, 1, 48, 46, 48, 1, 48, 46, 48};
-      storage.store(new Slice(bytes1, 0, bytes1.length));
-      storage.flush();
-    }
-    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
-    for (int i = 0; i < 1297; i++) {
-      storage.retrieveNext();
-    }
-    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
-    for (int i = 0; i < 1302; i++) {
-      storage.retrieveNext();
-    }
-    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
-    for (int i = 0; i < 1317; i++) {
-      storage.retrieveNext();
-    }
-    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
-    for (int i = 0; i < 2007; i++) {
-      storage.retrieveNext();
-    }
-    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
-    for (int i = 0; i < 2556; i++) {
-      storage.retrieveNext();
-    }
-    byte[] bytes1 = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53,
-        52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49,
-        49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49,
-        54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49,
-        53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48,
-        1, 48, 46, 48, 1, 48, 46, 48};
-    storage.store(new Slice(bytes1, 0, bytes1.length));
-    storage.flush();
-    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
-    for (int i = 0; i < 2062; i++) {
-      storage.retrieveNext();
-
-    }
-  }
-
-  @SuppressWarnings("unused")
-  private static final Logger logger = LoggerFactory.getLogger(HDFSStorageTest.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java
new file mode 100644
index 0000000..6503357
--- /dev/null
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.discovery;
+
+import org.codehaus.jackson.type.TypeReference;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.InstanceSerializer;
+
+import com.datatorrent.flume.discovery.Discovery.Service;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ *
+ */
+@Ignore
+public class ZKAssistedDiscoveryTest
+{
+  public ZKAssistedDiscoveryTest()
+  {
+  }
+
+  @Test
+  public void testSerialization() throws Exception
+  {
+    ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
+    discovery.setServiceName("DTFlumeTest");
+    discovery.setConnectionString("localhost:2181");
+    discovery.setBasePath("/HelloDT");
+    discovery.setup(null);
+    ServiceInstance<byte[]> instance = discovery.getInstance(new Service<byte[]>()
+    {
+      @Override
+      public String getHost()
+      {
+        return "localhost";
+      }
+
+      @Override
+      public int getPort()
+      {
+        return 8080;
+      }
+
+      @Override
+      public byte[] getPayload()
+      {
+        return null;
+      }
+
+      @Override
+      public String getId()
+      {
+        return "localhost8080";
+      }
+
+    });
+    InstanceSerializer<byte[]> instanceSerializer =
+        discovery.getInstanceSerializerFactory().getInstanceSerializer(new TypeReference<ServiceInstance<byte[]>>()
+        {
+        });
+    byte[] serialize = instanceSerializer.serialize(instance);
+    logger.debug("serialized json = {}", new String(serialize));
+    ServiceInstance<byte[]> deserialize = instanceSerializer.deserialize(serialize);
+    assertArrayEquals("Metadata", instance.getPayload(), deserialize.getPayload());
+  }
+
+  @Test
+  public void testDiscover()
+  {
+    ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
+    discovery.setServiceName("DTFlumeTest");
+    discovery.setConnectionString("localhost:2181");
+    discovery.setBasePath("/HelloDT");
+    discovery.setup(null);
+    assertNotNull("Discovered Sinks", discovery.discover());
+    discovery.teardown();
+  }
+
+  @Test
+  public void testAdvertize()
+  {
+    ZKAssistedDiscovery discovery = new ZKAssistedDiscovery();
+    discovery.setServiceName("DTFlumeTest");
+    discovery.setConnectionString("localhost:2181");
+    discovery.setBasePath("/HelloDT");
+    discovery.setup(null);
+
+    Service<byte[]> service = new Service<byte[]>()
+    {
+      @Override
+      public String getHost()
+      {
+        return "chetan";
+      }
+
+      @Override
+      public int getPort()
+      {
+        return 5033;
+      }
+
+      @Override
+      public byte[] getPayload()
+      {
+        return new byte[] {3, 2, 1};
+      }
+
+      @Override
+      public String getId()
+      {
+        return "uniqueId";
+      }
+
+    };
+    discovery.advertise(service);
+    discovery.teardown();
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscoveryTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java
new file mode 100644
index 0000000..10153bc
--- /dev/null
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.integration;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Event;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.flume.operator.AbstractFlumeInputOperator;
+import com.datatorrent.flume.storage.EventCodec;
+
+/**
+ *
+ */
+@Ignore
+public class ApplicationTest implements StreamingApplication
+{
+  public static class FlumeInputOperator extends AbstractFlumeInputOperator<Event>
+  {
+    @Override
+    public Event convert(Event event)
+    {
+      return event;
+    }
+  }
+
+  public static class Counter implements Operator
+  {
+    private int count;
+    private transient Event event;
+    public final transient DefaultInputPort<Event> input = new DefaultInputPort<Event>()
+    {
+      @Override
+      public void process(Event tuple)
+      {
+        count++;
+        event = tuple;
+      }
+
+    };
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+    }
+
+    @Override
+    public void endWindow()
+    {
+      logger.debug("total count = {}, tuple = {}", count, event);
+    }
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+    }
+
+    @Override
+    public void teardown()
+    {
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(Counter.class);
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    dag.setAttribute(com.datatorrent.api.Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 1000);
+    FlumeInputOperator flume = dag.addOperator("FlumeOperator", new FlumeInputOperator());
+    flume.setConnectAddresses(new String[]{"test:127.0.0.1:8080"});
+    flume.setCodec(new EventCodec());
+    Counter counter = dag.addOperator("Counter", new Counter());
+
+    dag.addStream("Slices", flume.output, counter.input).setLocality(Locality.CONTAINER_LOCAL);
+  }
+
+  @Test
+  public void test()
+  {
+    try {
+      LocalMode.runApp(this, Integer.MAX_VALUE);
+    } catch (Exception ex) {
+      logger.warn("The dag seems to be not testable yet, if it's - remove this exception handling", ex);
+    }
+
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
new file mode 100644
index 0000000..47bcacf
--- /dev/null
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.interceptor;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.flume.Context;
+import org.apache.flume.interceptor.Interceptor;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Tests for {@link ColumnFilteringFormattingInterceptor}
+ */
+public class ColumnFilteringFormattingInterceptorTest
+{
+  private static InterceptorTestHelper helper;
+
+  @BeforeClass
+  public static void startUp()
+  {
+    HashMap<String, String> contextMap = new HashMap<String, String>();
+    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "{1}\001{2}\001{3}\001");
+
+    helper = new InterceptorTestHelper(new ColumnFilteringFormattingInterceptor.Builder(), contextMap);
+  }
+
+  @Test
+  public void testInterceptEvent()
+  {
+    helper.testIntercept_Event();
+  }
+
+  @Test
+  public void testFiles() throws IOException, URISyntaxException
+  {
+    helper.testFiles();
+  }
+
+  @Test
+  public void testInterceptEventWithPrefix()
+  {
+    HashMap<String, String> contextMap = new HashMap<String, String>();
+    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "\001{1}\001{2}\001{3}\001");
+
+    ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
+    builder.configure(new Context(contextMap));
+    Interceptor interceptor = builder.build();
+
+    assertArrayEquals("Six Fields",
+        "\001\001Second\001\001".getBytes(),
+        interceptor.intercept(
+        new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody());
+  }
+
+  @Test
+  public void testInterceptEventWithLongSeparator()
+  {
+    HashMap<String, String> contextMap = new HashMap<String, String>();
+    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "a{1}bc{2}def{3}ghi");
+
+    ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
+    builder.configure(new Context(contextMap));
+    Interceptor interceptor = builder.build();
+    byte[] body = interceptor.intercept(
+        new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody();
+
+    assertArrayEquals("Six Fields, " + new String(body), "abcSeconddefghi".getBytes(), body);
+  }
+
+  @Test
+  public void testInterceptEventWithTerminatingSeparator()
+  {
+    HashMap<String, String> contextMap = new HashMap<String, String>();
+    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "a{1}bc{2}def{3}");
+
+    ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
+    builder.configure(new Context(contextMap));
+    Interceptor interceptor = builder.build();
+    byte[] body = interceptor.intercept(
+        new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody();
+
+    assertArrayEquals("Six Fields, " + new String(body), "abcSeconddef".getBytes(), body);
+  }
+
+  @Test
+  public void testInterceptEventWithColumnZero()
+  {
+    HashMap<String, String> contextMap = new HashMap<String, String>();
+    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "{0}\001");
+
+    ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
+    builder.configure(new Context(contextMap));
+    Interceptor interceptor = builder.build();
+
+    assertArrayEquals("Empty Bytes",
+        "\001".getBytes(),
+        interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody());
+
+    assertArrayEquals("One Field",
+        "First\001".getBytes(),
+        interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody());
+
+    assertArrayEquals("Two Fields",
+        "\001".getBytes(),
+        interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptorTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptorTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptorTest.java
new file mode 100644
index 0000000..b001b21
--- /dev/null
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptorTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.interceptor;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.flume.Context;
+import org.apache.flume.interceptor.Interceptor;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ *
+ */
+public class ColumnFilteringInterceptorTest
+{
+  private static InterceptorTestHelper helper;
+
+  @BeforeClass
+  public static void startUp()
+  {
+    HashMap<String, String> contextMap = new HashMap<String, String>();
+    contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1));
+    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+    contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "1 2 3");
+
+    helper = new InterceptorTestHelper(new ColumnFilteringInterceptor.Builder(), contextMap);
+  }
+
+  @Test
+  public void testInterceptEvent()
+  {
+    helper.testIntercept_Event();
+  }
+
+  @Test
+  public void testFiles() throws IOException, URISyntaxException
+  {
+    helper.testFiles();
+  }
+
+  @Test
+  public void testInterceptEventWithColumnZero()
+  {
+    HashMap<String, String> contextMap = new HashMap<String, String>();
+    contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1));
+    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+    contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "0");
+
+    ColumnFilteringInterceptor.Builder builder = new ColumnFilteringInterceptor.Builder();
+    builder.configure(new Context(contextMap));
+    Interceptor interceptor = builder.build();
+
+    assertArrayEquals("Empty Bytes",
+        "\001".getBytes(),
+        interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody());
+
+    assertArrayEquals("One Field",
+        "First\001".getBytes(),
+        interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody());
+
+    assertArrayEquals("Two Fields",
+        "\001".getBytes(),
+        interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/InterceptorTestHelper.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/InterceptorTestHelper.java b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/InterceptorTestHelper.java
new file mode 100644
index 0000000..b8dfbe0
--- /dev/null
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/InterceptorTestHelper.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.interceptor;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.interceptor.Interceptor;
+
+import com.datatorrent.netlet.util.Slice;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ *
+ */
+public class InterceptorTestHelper
+{
+  private static final byte FIELD_SEPARATOR = 1;
+
+  static class MyEvent implements Event
+  {
+    byte[] body;
+
+    MyEvent(byte[] bytes)
+    {
+      body = bytes;
+    }
+
+    @Override
+    public Map<String, String> getHeaders()
+    {
+      return null;
+    }
+
+    @Override
+    public void setHeaders(Map<String, String> map)
+    {
+    }
+
+    @Override
+    @SuppressWarnings("ReturnOfCollectionOrArrayField")
+    public byte[] getBody()
+    {
+      return body;
+    }
+
+    @Override
+    @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter")
+    public void setBody(byte[] bytes)
+    {
+      body = bytes;
+    }
+  }
+
+  private final Interceptor.Builder builder;
+  private final Map<String, String> context;
+
+  InterceptorTestHelper(Interceptor.Builder builder, Map<String, String> context)
+  {
+    this.builder = builder;
+    this.context = context;
+  }
+
+  public void testIntercept_Event()
+  {
+    builder.configure(new Context(context));
+    Interceptor interceptor = builder.build();
+
+    assertArrayEquals("Empty Bytes",
+        "\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("".getBytes())).getBody());
+
+    assertArrayEquals("One Separator",
+        "\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("\002".getBytes())).getBody());
+
+    assertArrayEquals("Two Separators",
+        "\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("\002\002".getBytes())).getBody());
+
+    assertArrayEquals("One Field",
+        "\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First".getBytes())).getBody());
+
+    assertArrayEquals("Two Fields",
+        "First\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("\002First".getBytes())).getBody());
+
+    assertArrayEquals("Two Fields",
+        "\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First\001".getBytes())).getBody());
+
+    assertArrayEquals("Two Fields",
+        "Second\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First\002Second".getBytes())).getBody());
+
+    assertArrayEquals("Three Fields",
+        "Second\001\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First\002Second\002".getBytes())).getBody());
+
+    assertArrayEquals("Three Fields",
+        "\001Second\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First\002\002Second".getBytes())).getBody());
+
+    assertArrayEquals("Four Fields",
+        "\001Second\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First\002\002Second\002".getBytes())).getBody());
+
+    assertArrayEquals("Five Fields",
+        "\001Second\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First\002\002Second\002\002".getBytes())).getBody());
+
+    assertArrayEquals("Six Fields",
+        "\001Second\001\001".getBytes(),
+        interceptor.intercept(new MyEvent("First\002\002Second\002\002\002".getBytes())).getBody());
+  }
+
+  public void testFiles() throws IOException, URISyntaxException
+  {
+    Properties properties = new Properties();
+    properties.load(getClass().getResourceAsStream("/flume/conf/flume-conf.properties"));
+
+    String interceptor = null;
+    for (Entry<Object, Object> entry : properties.entrySet()) {
+      logger.debug("{} => {}", entry.getKey(), entry.getValue());
+
+      if (builder.getClass().getName().equals(entry.getValue().toString())) {
+        String key = entry.getKey().toString();
+        if (key.endsWith(".type")) {
+          interceptor = key.substring(0, key.length() - "type".length());
+          break;
+        }
+      }
+    }
+
+    assertNotNull(builder.getClass().getName(), interceptor);
+    @SuppressWarnings({"null", "ConstantConditions"})
+    final int interceptorLength = interceptor.length();
+
+    HashMap<String, String> map = new HashMap<String, String>();
+    for (Entry<Object, Object> entry : properties.entrySet()) {
+      String key = entry.getKey().toString();
+      if (key.startsWith(interceptor)) {
+        map.put(key.substring(interceptorLength), entry.getValue().toString());
+      }
+    }
+
+    builder.configure(new Context(map));
+    Interceptor interceptorInstance = builder.build();
+
+    URL url = getClass().getResource("/test_data/gentxns/");
+    assertNotNull("Generated Transactions", url);
+
+    int records = 0;
+    File dir = new File(url.toURI());
+    for (File file : dir.listFiles()) {
+      records += processFile(file, interceptorInstance);
+    }
+
+    Assert.assertEquals("Total Records", 2200, records);
+  }
+
+  private int processFile(File file, Interceptor interceptor) throws IOException
+  {
+    InputStream stream = getClass().getResourceAsStream("/test_data/gentxns/" + file.getName());
+    BufferedReader br = new BufferedReader(new InputStreamReader(stream));
+
+    String line;
+    int i = 0;
+    while ((line = br.readLine()) != null) {
+      byte[] body = interceptor.intercept(new MyEvent(line.getBytes())).getBody();
+      RawEvent event = RawEvent.from(body, FIELD_SEPARATOR);
+      Assert.assertEquals("GUID", new Slice(line.getBytes(), 0, 32), event.guid);
+      logger.debug("guid = {}, time = {}", event.guid, event.time);
+      i++;
+    }
+
+    br.close();
+    return i;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(InterceptorTestHelper.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/RawEvent.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/RawEvent.java b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/RawEvent.java
new file mode 100644
index 0000000..cf6a823
--- /dev/null
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/RawEvent.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.interceptor;
+
+import java.io.Serializable;
+
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ *
+ */
+public class RawEvent implements Serializable
+{
+  public Slice guid;
+  public long time;
+  public int dimensionsOffset;
+
+  public Slice getGUID()
+  {
+    return guid;
+  }
+
+  public long getTime()
+  {
+    return time;
+  }
+
+  RawEvent()
+  {
+    /* needed for Kryo serialization */
+  }
+
+  public static RawEvent from(byte[] row, byte separator)
+  {
+    final int rowsize = row.length;
+
+    /*
+     * Lets get the guid out of the current record
+     */
+    int sliceLengh = -1;
+    while (++sliceLengh < rowsize) {
+      if (row[sliceLengh] == separator) {
+        break;
+      }
+    }
+
+    int i = sliceLengh + 1;
+
+    /* lets parse the date */
+    int dateStart = i;
+    while (i < rowsize) {
+      if (row[i++] == separator) {
+        long time = DATE_PARSER.parseMillis(new String(row, dateStart, i - dateStart - 1));
+        RawEvent event = new RawEvent();
+        event.guid = new Slice(row, 0, sliceLengh);
+        event.time = time;
+        event.dimensionsOffset = i;
+        return event;
+      }
+    }
+
+    return null;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int hash = 5;
+    hash = 61 * hash + (this.guid != null ? this.guid.hashCode() : 0);
+    hash = 61 * hash + (int)(this.time ^ (this.time >>> 32));
+    return hash;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "RawEvent{" + "guid=" + guid + ", time=" + time + '}';
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    final RawEvent other = (RawEvent)obj;
+    if (this.guid != other.guid && (this.guid == null || !this.guid.equals(other.guid))) {
+      return false;
+    }
+    return this.time == other.time;
+  }
+
+  private static final DateTimeFormatter DATE_PARSER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
+  private static final Logger logger = LoggerFactory.getLogger(RawEvent.class);
+  private static final long serialVersionUID = 201312191312L;
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperatorTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperatorTest.java
new file mode 100644
index 0000000..d7c4c30
--- /dev/null
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperatorTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.operator;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ */
+public class AbstractFlumeInputOperatorTest
+{
+  public AbstractFlumeInputOperatorTest()
+  {
+  }
+
+  @Test
+  public void testThreadLocal()
+  {
+    ThreadLocal<Set<Integer>> tl = new ThreadLocal<Set<Integer>>()
+    {
+      @Override
+      protected Set<Integer> initialValue()
+      {
+        return new HashSet<Integer>();
+      }
+
+    };
+    Set<Integer> get1 = tl.get();
+    get1.add(1);
+    assertTrue("Just Added Value", get1.contains(1));
+
+    Set<Integer> get2 = tl.get();
+    assertTrue("Previously added value", get2.contains(1));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java
new file mode 100644
index 0000000..9bc69e8
--- /dev/null
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.sink;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.channel.MemoryChannel;
+
+import com.datatorrent.flume.discovery.Discovery;
+import com.datatorrent.netlet.AbstractLengthPrependerClient;
+import com.datatorrent.netlet.DefaultEventLoop;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ *
+ */
+public class DTFlumeSinkTest
+{
+  static final String hostname = "localhost";
+  int port = 0;
+
+  @Test
+  @SuppressWarnings("SleepWhileInLoop")
+  public void testServer() throws InterruptedException, IOException
+  {
+    Discovery<byte[]> discovery = new Discovery<byte[]>()
+    {
+      @Override
+      public synchronized void unadvertise(Service<byte[]> service)
+      {
+        notify();
+      }
+
+      @Override
+      public synchronized void advertise(Service<byte[]> service)
+      {
+        port = service.getPort();
+        logger.debug("listening at {}", service);
+        notify();
+      }
+
+      @Override
+      @SuppressWarnings("unchecked")
+      public synchronized Collection<Service<byte[]>> discover()
+      {
+        try {
+          wait();
+        } catch (InterruptedException ie) {
+          throw new RuntimeException(ie);
+        }
+        return Collections.EMPTY_LIST;
+      }
+
+    };
+    DTFlumeSink sink = new DTFlumeSink();
+    sink.setName("TeskSink");
+    sink.setHostname(hostname);
+    sink.setPort(0);
+    sink.setAcceptedTolerance(2000);
+    sink.setChannel(new MemoryChannel());
+    sink.setDiscovery(discovery);
+    sink.start();
+    AbstractLengthPrependerClient client = new AbstractLengthPrependerClient()
+    {
+      private byte[] array;
+      private int offset = 2;
+
+      @Override
+      public void onMessage(byte[] buffer, int offset, int size)
+      {
+        Slice received = new Slice(buffer, offset, size);
+        logger.debug("Client Received = {}", received);
+        Assert.assertEquals(received,
+            new Slice(Arrays.copyOfRange(array, this.offset, array.length), 0, Server.Request.FIXED_SIZE));
+        synchronized (DTFlumeSinkTest.this) {
+          DTFlumeSinkTest.this.notify();
+        }
+      }
+
+      @Override
+      public void connected()
+      {
+        super.connected();
+        array = new byte[Server.Request.FIXED_SIZE + offset];
+        array[offset] = Server.Command.ECHO.getOrdinal();
+        array[offset + 1] = 1;
+        array[offset + 2] = 2;
+        array[offset + 3] = 3;
+        array[offset + 4] = 4;
+        array[offset + 5] = 5;
+        array[offset + 6] = 6;
+        array[offset + 7] = 7;
+        array[offset + 8] = 8;
+        Server.writeLong(array, offset + Server.Request.TIME_OFFSET, System.currentTimeMillis());
+        write(array, offset, Server.Request.FIXED_SIZE);
+      }
+
+    };
+
+    DefaultEventLoop eventloop = new DefaultEventLoop("Eventloop-TestClient");
+    eventloop.start();
+    discovery.discover();
+    try {
+      eventloop.connect(new InetSocketAddress(hostname, port), client);
+      try {
+        synchronized (this) {
+          this.wait();
+        }
+      } finally {
+        eventloop.disconnect(client);
+      }
+    } finally {
+      eventloop.stop();
+    }
+
+    sink.stop();
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(DTFlumeSinkTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/sink/ServerTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/sink/ServerTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/sink/ServerTest.java
new file mode 100644
index 0000000..a893ebd
--- /dev/null
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/sink/ServerTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.sink;
+
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ServerTest
+{
+  byte[] array;
+
+  public ServerTest()
+  {
+    array = new byte[1024];
+  }
+
+  @Test
+  public void testInt()
+  {
+    Server.writeInt(array, 0, Integer.MAX_VALUE);
+    Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readInt(array, 0));
+
+    Server.writeInt(array, 0, Integer.MIN_VALUE);
+    Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readInt(array, 0));
+
+    Server.writeInt(array, 0, 0);
+    Assert.assertEquals("Zero Integer", 0, Server.readInt(array, 0));
+
+    Random rand = new Random();
+    for (int i = 0; i < 128; i++) {
+      int n = rand.nextInt();
+      if (rand.nextBoolean()) {
+        n = -n;
+      }
+      Server.writeInt(array, 0, n);
+      Assert.assertEquals("Random Integer", n, Server.readInt(array, 0));
+    }
+  }
+
+  @Test
+  public void testLong()
+  {
+    Server.writeLong(array, 0, Integer.MAX_VALUE);
+    Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readLong(array, 0));
+
+    Server.writeLong(array, 0, Integer.MIN_VALUE);
+    Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readLong(array, 0));
+
+    Server.writeLong(array, 0, 0);
+    Assert.assertEquals("Zero Integer", 0L, Server.readLong(array, 0));
+
+    Server.writeLong(array, 0, Long.MAX_VALUE);
+    Assert.assertEquals("Max Long", Long.MAX_VALUE, Server.readLong(array, 0));
+
+    Server.writeLong(array, 0, Long.MIN_VALUE);
+    Assert.assertEquals("Min Long", Long.MIN_VALUE, Server.readLong(array, 0));
+
+    Server.writeLong(array, 0, 0L);
+    Assert.assertEquals("Zero Long", 0L, Server.readLong(array, 0));
+
+    Random rand = new Random();
+    for (int i = 0; i < 128; i++) {
+      long n = rand.nextLong();
+      if (rand.nextBoolean()) {
+        n = -n;
+      }
+      Server.writeLong(array, 0, n);
+      Assert.assertEquals("Random Long", n, Server.readLong(array, 0));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStorageMatching.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStorageMatching.java b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStorageMatching.java
new file mode 100644
index 0000000..4a714fe
--- /dev/null
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStorageMatching.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.primitives.Ints;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ *
+ */
+public class HDFSStorageMatching
+{
+
+  public static void main(String[] args)
+  {
+    HDFSStorage storage = new HDFSStorage();
+    storage.setBaseDir(args[0]);
+    storage.setId(args[1]);
+    storage.setRestore(true);
+    storage.setup(null);
+    int count = 100000000;
+
+    logger.debug(" start time {}", System.currentTimeMillis());
+    int index = 10000;
+    byte[] b = Ints.toByteArray(index);
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    logger.debug(" end time {}", System.currentTimeMillis());
+    logger.debug(" start time for retrieve {}", System.currentTimeMillis());
+    b = storage.retrieve(new byte[8]);
+    int org_index = index;
+    index = 10000;
+    match(b, index);
+    while (true) {
+      index++;
+      b = storage.retrieveNext();
+      if (b == null) {
+        logger.debug(" end time for retrieve {}/{}/{}", System.currentTimeMillis(), index, org_index);
+        return;
+      } else {
+        if (!match(b, index)) {
+          throw new RuntimeException("failed : " + index);
+        }
+      }
+    }
+
+  }
+
+  public static boolean match(byte[] data, int match)
+  {
+    byte[] tempData = new byte[data.length - 8];
+    System.arraycopy(data, 8, tempData, 0, tempData.length);
+    int dataR = Ints.fromByteArray(tempData);
+    //logger.debug("input: {}, output: {}",match,dataR);
+    if (match == dataR) {
+      return true;
+    }
+    return false;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(HDFSStorageMatching.class);
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformance.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformance.java b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformance.java
new file mode 100644
index 0000000..6ed3892
--- /dev/null
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformance.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ *
+ */
+public class HDFSStoragePerformance
+{
+
+  public static void main(String[] args)
+  {
+    HDFSStorage storage = new HDFSStorage();
+    storage.setBaseDir(".");
+    storage.setId("gaurav_flume_1");
+    storage.setRestore(true);
+    storage.setup(null);
+    int count = 1000000;
+
+    logger.debug(" start time {}", System.currentTimeMillis());
+    int index = 10000;
+    byte[] b = new byte[1024];
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+    }
+    storage.flush();
+    logger.debug(" end time {}", System.currentTimeMillis());
+    logger.debug(" start time for retrieve {}", System.currentTimeMillis());
+    storage.retrieve(new byte[8]);
+    String inputData = new String(b);
+    index = 1;
+    while (true) {
+      b = storage.retrieveNext();
+      if (b == null) {
+        logger.debug(" end time for retrieve {}", System.currentTimeMillis());
+        return;
+      } else {
+        if (!match(b, inputData)) {
+          throw new RuntimeException("failed : " + index);
+        }
+      }
+
+      index++;
+    }
+
+  }
+
+  public static boolean match(byte[] data, String match)
+  {
+    byte[] tempData = new byte[data.length - 8];
+    System.arraycopy(data, 8, tempData, 0, tempData.length);
+//    logger.debug("input: {}, output: {}",match,new String(tempData));
+    return (match.equals(new String(tempData)));
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(HDFSStoragePerformance.class);
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformanceTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformanceTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformanceTest.java
new file mode 100644
index 0000000..72f03fc
--- /dev/null
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformanceTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.primitives.Ints;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>HDFSStoragePerformanceTest class.</p>
+ *
+ * @since 1.0.1
+ */
+public class HDFSStoragePerformanceTest
+{
+
+  public static void main(String[] args)
+  {
+    HDFSStorage storage = new HDFSStorage();
+    storage.setBaseDir(args[0]);
+    storage.setId(args[1]);
+    storage.setRestore(true);
+    storage.setup(null);
+    int count = 100000000;
+
+    logger.debug(" start time {}", System.currentTimeMillis());
+    int index = 10000;
+    byte[] b = Ints.toByteArray(index);
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    logger.debug(" end time {}", System.currentTimeMillis());
+    logger.debug(" start time for retrieve {}", System.currentTimeMillis());
+    b = storage.retrieve(new byte[8]);
+    int org_index = index;
+    index = 10000;
+    match(b, index);
+    while (true) {
+      index++;
+      b = storage.retrieveNext();
+      if (b == null) {
+        logger.debug(" end time for retrieve {}/{}/{}", System.currentTimeMillis(), index, org_index);
+        return;
+      } else {
+        if (!match(b, index)) {
+          throw new RuntimeException("failed : " + index);
+        }
+      }
+    }
+
+  }
+
+  public static boolean match(byte[] data, int match)
+  {
+    byte[] tempData = new byte[data.length - 8];
+    System.arraycopy(data, 8, tempData, 0, tempData.length);
+    int dataR = Ints.fromByteArray(tempData);
+    //logger.debug("input: {}, output: {}",match,dataR);
+    if (match == dataR) {
+      return true;
+    }
+    return false;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(HDFSStoragePerformanceTest.class);
+}
+


Mime
View raw message