apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [07/13] apex-malhar git commit: Changed package path for files to be included under malhar. Modifications to build files for project to build under malhar.
Date Tue, 23 May 2017 01:24:05 GMT
Changed package path for files to be included under malhar. Modifications to build files for project to build under malhar.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d200737b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d200737b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d200737b

Branch: refs/heads/master
Commit: d200737b631d9678a72e9e4fc817493de9b77ac0
Parents: bb89fe9
Author: Pramod Immaneni <pramod@datatorrent.com>
Authored: Mon Feb 20 00:26:00 2017 +0530
Committer: Pramod Immaneni <pramod@datatorrent.com>
Committed: Mon May 22 16:47:34 2017 -0700

----------------------------------------------------------------------
 flume/README.md                                 |   6 +
 flume/pom.xml                                   |  28 +-
 .../datatorrent/flume/discovery/Discovery.java  |  69 --
 .../flume/discovery/ZKAssistedDiscovery.java    | 430 ---------
 .../ColumnFilteringFormattingInterceptor.java   | 229 -----
 .../interceptor/ColumnFilteringInterceptor.java | 205 ----
 .../operator/AbstractFlumeInputOperator.java    | 761 ---------------
 .../com/datatorrent/flume/sink/DTFlumeSink.java | 572 -----------
 .../java/com/datatorrent/flume/sink/Server.java | 420 --------
 .../flume/source/HdfsTestSource.java            | 224 -----
 .../datatorrent/flume/source/TestSource.java    | 250 -----
 .../datatorrent/flume/storage/DebugWrapper.java | 132 ---
 .../flume/storage/ErrorMaskingEventCodec.java   |  62 --
 .../datatorrent/flume/storage/EventCodec.java   |  92 --
 .../datatorrent/flume/storage/HDFSStorage.java  | 947 -------------------
 .../com/datatorrent/flume/storage/Storage.java  |  74 --
 .../apex/malhar/flume/discovery/Discovery.java  |  69 ++
 .../flume/discovery/ZKAssistedDiscovery.java    | 430 +++++++++
 .../ColumnFilteringFormattingInterceptor.java   | 227 +++++
 .../interceptor/ColumnFilteringInterceptor.java | 205 ++++
 .../operator/AbstractFlumeInputOperator.java    | 759 +++++++++++++++
 .../apex/malhar/flume/sink/DTFlumeSink.java     | 572 +++++++++++
 .../apache/apex/malhar/flume/sink/Server.java   | 419 ++++++++
 .../malhar/flume/source/HdfsTestSource.java     | 224 +++++
 .../apex/malhar/flume/source/TestSource.java    | 250 +++++
 .../apex/malhar/flume/storage/DebugWrapper.java | 132 +++
 .../flume/storage/ErrorMaskingEventCodec.java   |  62 ++
 .../apex/malhar/flume/storage/EventCodec.java   |  92 ++
 .../apex/malhar/flume/storage/HDFSStorage.java  | 947 +++++++++++++++++++
 .../apex/malhar/flume/storage/Storage.java      |  74 ++
 .../flume-conf/flume-conf.sample.properties     |   4 +-
 .../discovery/ZKAssistedDiscoveryTest.java      | 143 ---
 .../flume/integration/ApplicationTest.java      | 117 ---
 ...olumnFilteringFormattingInterceptorTest.java | 134 ---
 .../ColumnFilteringInterceptorTest.java         |  87 --
 .../interceptor/InterceptorTestHelper.java      | 216 -----
 .../datatorrent/flume/interceptor/RawEvent.java | 120 ---
 .../AbstractFlumeInputOperatorTest.java         |  57 --
 .../datatorrent/flume/sink/DTFlumeSinkTest.java | 145 ---
 .../com/datatorrent/flume/sink/ServerTest.java  |  93 --
 .../flume/storage/HDFSStorageMatching.java      | 111 ---
 .../flume/storage/HDFSStoragePerformance.java   |  87 --
 .../storage/HDFSStoragePerformanceTest.java     | 113 ---
 .../flume/storage/HDFSStorageTest.java          | 695 --------------
 .../discovery/ZKAssistedDiscoveryTest.java      | 143 +++
 .../flume/integration/ApplicationTest.java      | 117 +++
 ...olumnFilteringFormattingInterceptorTest.java | 134 +++
 .../ColumnFilteringInterceptorTest.java         |  87 ++
 .../interceptor/InterceptorTestHelper.java      | 216 +++++
 .../apex/malhar/flume/interceptor/RawEvent.java | 120 +++
 .../AbstractFlumeInputOperatorTest.java         |  57 ++
 .../apex/malhar/flume/sink/DTFlumeSinkTest.java | 145 +++
 .../apex/malhar/flume/sink/ServerTest.java      |  93 ++
 .../flume/storage/HDFSStorageMatching.java      | 111 +++
 .../flume/storage/HDFSStoragePerformance.java   |  87 ++
 .../storage/HDFSStoragePerformanceTest.java     | 113 +++
 .../malhar/flume/storage/HDFSStorageTest.java   | 695 ++++++++++++++
 .../resources/flume/conf/flume-conf.properties  |   6 +-
 flume/src/test/resources/log4j.properties       |  12 +-
 .../test/resources/test_data/gentxns/2013121500 | Bin 225010 -> 0 bytes
 .../resources/test_data/gentxns/2013121500.txt  | Bin 0 -> 225010 bytes
 .../test/resources/test_data/gentxns/2013121501 | Bin 224956 -> 0 bytes
 .../resources/test_data/gentxns/2013121501.txt  | Bin 0 -> 224956 bytes
 .../test/resources/test_data/gentxns/2013121502 | Bin 225028 -> 0 bytes
 .../resources/test_data/gentxns/2013121502.txt  | Bin 0 -> 225028 bytes
 .../test/resources/test_data/gentxns/2013121503 | Bin 225068 -> 0 bytes
 .../resources/test_data/gentxns/2013121503.txt  | Bin 0 -> 225068 bytes
 .../test/resources/test_data/gentxns/2013121504 | Bin 224845 -> 0 bytes
 .../resources/test_data/gentxns/2013121504.txt  | Bin 0 -> 224845 bytes
 .../test/resources/test_data/gentxns/2013121505 | Bin 225004 -> 0 bytes
 .../resources/test_data/gentxns/2013121505.txt  | Bin 0 -> 225004 bytes
 .../test/resources/test_data/gentxns/2013121506 | Bin 224929 -> 0 bytes
 .../resources/test_data/gentxns/2013121506.txt  | Bin 0 -> 224929 bytes
 .../test/resources/test_data/gentxns/2013121507 | Bin 224879 -> 0 bytes
 .../resources/test_data/gentxns/2013121507.txt  | Bin 0 -> 224879 bytes
 .../test/resources/test_data/gentxns/2013121508 | Bin 224963 -> 0 bytes
 .../resources/test_data/gentxns/2013121508.txt  | Bin 0 -> 224963 bytes
 .../test/resources/test_data/gentxns/2013121509 | Bin 224963 -> 0 bytes
 .../resources/test_data/gentxns/2013121509.txt  | Bin 0 -> 224963 bytes
 .../test/resources/test_data/gentxns/2013121510 | Bin 225007 -> 0 bytes
 .../resources/test_data/gentxns/2013121510.txt  | Bin 0 -> 225007 bytes
 .../test/resources/test_data/gentxns/2013121511 | Bin 224913 -> 0 bytes
 .../resources/test_data/gentxns/2013121511.txt  | Bin 0 -> 224913 bytes
 .../test/resources/test_data/gentxns/2013121512 | Bin 224929 -> 0 bytes
 .../resources/test_data/gentxns/2013121512.txt  | Bin 0 -> 224929 bytes
 .../test/resources/test_data/gentxns/2013121513 | Bin 225078 -> 0 bytes
 .../resources/test_data/gentxns/2013121513.txt  | Bin 0 -> 225078 bytes
 .../test/resources/test_data/gentxns/2013121514 | Bin 224882 -> 0 bytes
 .../resources/test_data/gentxns/2013121514.txt  | Bin 0 -> 224882 bytes
 .../test/resources/test_data/gentxns/2013121515 | Bin 224958 -> 0 bytes
 .../resources/test_data/gentxns/2013121515.txt  | Bin 0 -> 224958 bytes
 .../test/resources/test_data/gentxns/2013121516 | Bin 225032 -> 0 bytes
 .../resources/test_data/gentxns/2013121516.txt  | Bin 0 -> 225032 bytes
 .../test/resources/test_data/gentxns/2013121517 | Bin 225059 -> 0 bytes
 .../resources/test_data/gentxns/2013121517.txt  | Bin 0 -> 225059 bytes
 .../test/resources/test_data/gentxns/2013121518 | Bin 224890 -> 0 bytes
 .../resources/test_data/gentxns/2013121518.txt  | Bin 0 -> 224890 bytes
 .../test/resources/test_data/gentxns/2013121519 | Bin 225000 -> 0 bytes
 .../resources/test_data/gentxns/2013121519.txt  | Bin 0 -> 225000 bytes
 .../test/resources/test_data/gentxns/2013121520 | Bin 225064 -> 0 bytes
 .../resources/test_data/gentxns/2013121520.txt  | Bin 0 -> 225064 bytes
 .../test/resources/test_data/gentxns/2013121521 | Bin 225091 -> 0 bytes
 .../resources/test_data/gentxns/2013121521.txt  | Bin 0 -> 225091 bytes
 pom.xml                                         |   1 +
 104 files changed, 6613 insertions(+), 6609 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/README.md
----------------------------------------------------------------------
diff --git a/flume/README.md b/flume/README.md
new file mode 100644
index 0000000..1d0b2d9
--- /dev/null
+++ b/flume/README.md
@@ -0,0 +1,6 @@
+Flume
+===============================
+
+The folder contains support for flume to be used with Apex. It comprises mainly of two components. First is an agent that sits on the flume side, receives data from flume and makes it available via a socket server. In effect it converts a push to a pull model. The second component is the input operator that reads from the agent.
+
+The project is started with the latest code at the time of the sub-module creation. For older history look at the flume sub-module in the older project called Megh (git@github.com:DataTorrent/Megh).

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/pom.xml
----------------------------------------------------------------------
diff --git a/flume/pom.xml b/flume/pom.xml
index 6522148..735a13b 100644
--- a/flume/pom.xml
+++ b/flume/pom.xml
@@ -23,14 +23,14 @@
   <modelVersion>4.0.0</modelVersion>
 
   <parent>
-    <artifactId>dt-megh</artifactId>
-    <groupId>com.datatorrent</groupId>
-    <version>3.6.0-SNAPSHOT</version>
+    <artifactId>malhar</artifactId>
+    <groupId>org.apache.apex</groupId>
+    <version>3.8.0-SNAPSHOT</version>
   </parent>
 
-  <artifactId>dt-flume</artifactId>
+  <artifactId>malhar-flume</artifactId>
   <packaging>jar</packaging>
-  <name>DataTorrent Flume Integration</name>
+  <name>Apache Apex Malhar Flume Support</name>
 
   <profiles>
     <profile>
@@ -57,16 +57,16 @@
                   <goal>attached-rpm</goal>
                 </goals>
                 <configuration>
-                  <license>Copyright &copy; 2014 DataTorrent, Inc.</license>
+                  <license>Apache License, Version 2.0</license>
                   <version>${parsedVersion.majorVersion}.${parsedVersion.minorVersion}.${parsedVersion.incrementalVersion}</version>
                   <release>${parsedVersion.qualifier}${parsedVersion.buildNumber}</release>
                   <workarea>target/sink-rpm</workarea>
                   <classifier>sink</classifier>
-                  <name>datatorrent-flume-sink</name>
-                  <distribution>DataTorrent Enterprise ${project.version}</distribution>
+                  <name>apex-malhar-flume-sink</name>
+                  <distribution>Apache Apex Malhar ${project.version}</distribution>
                   <group>Messaging Client Support</group>
                   <icon>src/main/resources/logo.gif</icon>
-                  <packager>DataTorrent Build System</packager>
+                  <packager>Apex Build System</packager>
                   <prefix>${package.prefix}</prefix>
                   <changelogFile>src/changelog</changelogFile>
                   <defineStatements>
@@ -82,7 +82,7 @@
                       <dependency>
                         <includes>
                           <include>org.apache.apex:apex-api:jar:${apex.core.version}</include>
-                          <include>com.datatorrent:dt-netlet:jar:1.2.0</include>
+                          <include>com.datatorrent:netlet:jar</include>
                           <include>org.apache.apex:apex-common:jar:${apex.core.version}</include>
                           <include>com.esotericsoftware.kryo:kryo:jar:2.24.0</include>
                           <include>com.esotericsoftware.minlog:minlog:jar:1.2</include>
@@ -120,15 +120,15 @@
                 </goals>
                 <configuration>
                   <version>${parsedVersion.majorVersion}.${parsedVersion.minorVersion}.${parsedVersion.incrementalVersion}</version>
-                  <license>Copyright &copy; 2014 DataTorrent, Inc.</license>
+                  <license>Apache License, Version 2.0</license>
                   <release>${parsedVersion.qualifier}${parsedVersion.buildNumber}</release>
                   <workarea>target/operator-rpm</workarea>
                   <classifier>operator</classifier>
-                  <name>datatorrent-flume-operator</name>
-                  <distribution>DataTorrent Enterprise ${project.version}</distribution>
+                  <name>apex-malhar-flume-operator</name>
+                  <distribution>Apache Apex Malhar ${project.version}</distribution>
                   <group>Messaging Client Support</group>
                   <icon>src/main/resources/logo.gif</icon>
-                  <packager>DataTorrent Build System</packager>
+                  <packager>Apex Build System</packager>
                   <prefix>${package.prefix}</prefix>
                   <changelogFile>src/changelog</changelogFile>
                   <description>${rpm.release}</description>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/discovery/Discovery.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/discovery/Discovery.java b/flume/src/main/java/com/datatorrent/flume/discovery/Discovery.java
deleted file mode 100644
index 72a1440..0000000
--- a/flume/src/main/java/com/datatorrent/flume/discovery/Discovery.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.discovery;
-
-import java.util.Collection;
-
-/**
- * When DTFlumeSink server instance binds to the network interface, it can publish
- * its whereabouts by invoking advertise method on the Discovery object. Similarly
- * when it ceases accepting any more connections, it can publish its intent to do
- * so by invoking unadvertise.<p />
- * Interesting parties can call discover method to get the list of addresses where
- * they can find an available DTFlumeSink server instance.
- *
- * @param <T> - Type of the objects which can be discovered
- * @since 0.9.3
- */
-public interface Discovery<T>
-{
-  /**
-   * Recall the previously published address as it's no longer valid.
-   *
-   * @param service
-   */
-  void unadvertise(Service<T> service);
-
-  /**
-   * Advertise the host/port address where DTFlumeSink is accepting a client connection.
-   *
-   * @param service
-   */
-  void advertise(Service<T> service);
-
-  /**
-   * Discover all the addresses which are actively accepting the client connections.
-   *
-   * @return - Active server addresses which can accept the connections.
-   */
-  Collection<Service<T>> discover();
-
-  interface Service<T>
-  {
-    String getHost();
-
-    int getPort();
-
-    T getPayload();
-
-    String getId();
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/discovery/ZKAssistedDiscovery.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/discovery/ZKAssistedDiscovery.java b/flume/src/main/java/com/datatorrent/flume/discovery/ZKAssistedDiscovery.java
deleted file mode 100644
index 97ad8f0..0000000
--- a/flume/src/main/java/com/datatorrent/flume/discovery/ZKAssistedDiscovery.java
+++ /dev/null
@@ -1,430 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.discovery;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-
-import javax.validation.constraints.NotNull;
-
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
-import org.codehaus.jackson.map.ObjectWriter;
-import org.codehaus.jackson.type.TypeReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.curator.utils.EnsurePath;
-import org.apache.curator.x.discovery.ServiceDiscovery;
-import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
-import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.curator.x.discovery.details.InstanceSerializer;
-import org.apache.flume.conf.Configurable;
-
-import com.google.common.base.Throwables;
-
-import com.datatorrent.api.Component;
-
-/**
- * <p>ZKAssistedDiscovery class.</p>
- *
- * @since 0.9.3
- */
-public class ZKAssistedDiscovery implements Discovery<byte[]>,
-    Component<com.datatorrent.api.Context>, Configurable, Serializable
-{
-  @NotNull
-  private String serviceName;
-  @NotNull
-  private String connectionString;
-  @NotNull
-  private String basePath;
-  private int connectionTimeoutMillis;
-  private int connectionRetryCount;
-  private int conntectionRetrySleepMillis;
-  private transient InstanceSerializerFactory instanceSerializerFactory;
-  private transient CuratorFramework curatorFramework;
-  private transient ServiceDiscovery<byte[]> discovery;
-
-  public ZKAssistedDiscovery()
-  {
-    this.serviceName = "DTFlume";
-    this.conntectionRetrySleepMillis = 500;
-    this.connectionRetryCount = 10;
-    this.connectionTimeoutMillis = 1000;
-  }
-
-  @Override
-  public void unadvertise(Service<byte[]> service)
-  {
-    doAdvertise(service, false);
-  }
-
-  @Override
-  public void advertise(Service<byte[]> service)
-  {
-    doAdvertise(service, true);
-  }
-
-  public void doAdvertise(Service<byte[]> service, boolean flag)
-  {
-    try {
-      new EnsurePath(basePath).ensure(curatorFramework.getZookeeperClient());
-
-      ServiceInstance<byte[]> instance = getInstance(service);
-      if (flag) {
-        discovery.registerService(instance);
-      } else {
-        discovery.unregisterService(instance);
-      }
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public Collection<Service<byte[]>> discover()
-  {
-    try {
-      new EnsurePath(basePath).ensure(curatorFramework.getZookeeperClient());
-
-      Collection<ServiceInstance<byte[]>> services = discovery.queryForInstances(serviceName);
-      ArrayList<Service<byte[]>> returnable = new ArrayList<Service<byte[]>>(services.size());
-      for (final ServiceInstance<byte[]> service : services) {
-        returnable.add(new Service<byte[]>()
-        {
-          @Override
-          public String getHost()
-          {
-            return service.getAddress();
-          }
-
-          @Override
-          public int getPort()
-          {
-            return service.getPort();
-          }
-
-          @Override
-          public byte[] getPayload()
-          {
-            return service.getPayload();
-          }
-
-          @Override
-          public String getId()
-          {
-            return service.getId();
-          }
-
-          @Override
-          public String toString()
-          {
-            return "{" + getId() + " => " + getHost() + ':' + getPort() + '}';
-          }
-
-        });
-      }
-      return returnable;
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public String toString()
-  {
-    return "ZKAssistedDiscovery{" + "serviceName=" + serviceName + ", connectionString=" + connectionString +
-        ", basePath=" + basePath + ", connectionTimeoutMillis=" + connectionTimeoutMillis + ", connectionRetryCount=" +
-        connectionRetryCount + ", conntectionRetrySleepMillis=" + conntectionRetrySleepMillis + '}';
-  }
-
-  @Override
-  public int hashCode()
-  {
-    int hash = 7;
-    hash = 47 * hash + this.serviceName.hashCode();
-    hash = 47 * hash + this.connectionString.hashCode();
-    hash = 47 * hash + this.basePath.hashCode();
-    hash = 47 * hash + this.connectionTimeoutMillis;
-    hash = 47 * hash + this.connectionRetryCount;
-    hash = 47 * hash + this.conntectionRetrySleepMillis;
-    return hash;
-  }
-
-  @Override
-  public boolean equals(Object obj)
-  {
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    final ZKAssistedDiscovery other = (ZKAssistedDiscovery)obj;
-    if (!this.serviceName.equals(other.serviceName)) {
-      return false;
-    }
-    if (!this.connectionString.equals(other.connectionString)) {
-      return false;
-    }
-    if (!this.basePath.equals(other.basePath)) {
-      return false;
-    }
-    if (this.connectionTimeoutMillis != other.connectionTimeoutMillis) {
-      return false;
-    }
-    if (this.connectionRetryCount != other.connectionRetryCount) {
-      return false;
-    }
-    if (this.conntectionRetrySleepMillis != other.conntectionRetrySleepMillis) {
-      return false;
-    }
-    return true;
-  }
-
-  ServiceInstance<byte[]> getInstance(Service<byte[]> service) throws Exception
-  {
-    return ServiceInstance.<byte[]>builder()
-            .name(serviceName)
-            .address(service.getHost())
-            .port(service.getPort())
-            .id(service.getId())
-            .payload(service.getPayload())
-            .build();
-  }
-
-  private ServiceDiscovery<byte[]> getDiscovery(CuratorFramework curatorFramework)
-  {
-    return ServiceDiscoveryBuilder.builder(byte[].class)
-            .basePath(basePath)
-            .client(curatorFramework)
-            .serializer(instanceSerializerFactory.getInstanceSerializer(
-            new TypeReference<ServiceInstance<byte[]>>()
-              {})).build();
-  }
-
-  /**
-   * @return the instanceSerializerFactory
-   */
-  InstanceSerializerFactory getInstanceSerializerFactory()
-  {
-    return instanceSerializerFactory;
-  }
-
-  /**
-   * @return the connectionString
-   */
-  public String getConnectionString()
-  {
-    return connectionString;
-  }
-
-  /**
-   * @param connectionString the connectionString to set
-   */
-  public void setConnectionString(String connectionString)
-  {
-    this.connectionString = connectionString;
-  }
-
-  /**
-   * @return the basePath
-   */
-  public String getBasePath()
-  {
-    return basePath;
-  }
-
-  /**
-   * @param basePath the basePath to set
-   */
-  public void setBasePath(String basePath)
-  {
-    this.basePath = basePath;
-  }
-
-  /**
-   * @return the connectionTimeoutMillis
-   */
-  public int getConnectionTimeoutMillis()
-  {
-    return connectionTimeoutMillis;
-  }
-
-  /**
-   * @param connectionTimeoutMillis the connectionTimeoutMillis to set
-   */
-  public void setConnectionTimeoutMillis(int connectionTimeoutMillis)
-  {
-    this.connectionTimeoutMillis = connectionTimeoutMillis;
-  }
-
-  /**
-   * @return the connectionRetryCount
-   */
-  public int getConnectionRetryCount()
-  {
-    return connectionRetryCount;
-  }
-
-  /**
-   * @param connectionRetryCount the connectionRetryCount to set
-   */
-  public void setConnectionRetryCount(int connectionRetryCount)
-  {
-    this.connectionRetryCount = connectionRetryCount;
-  }
-
-  /**
-   * @return the conntectionRetrySleepMillis
-   */
-  public int getConntectionRetrySleepMillis()
-  {
-    return conntectionRetrySleepMillis;
-  }
-
-  /**
-   * @param conntectionRetrySleepMillis the conntectionRetrySleepMillis to set
-   */
-  public void setConntectionRetrySleepMillis(int conntectionRetrySleepMillis)
-  {
-    this.conntectionRetrySleepMillis = conntectionRetrySleepMillis;
-  }
-
-  /**
-   * @return the serviceName
-   */
-  public String getServiceName()
-  {
-    return serviceName;
-  }
-
-  /**
-   * @param serviceName the serviceName to set
-   */
-  public void setServiceName(String serviceName)
-  {
-    this.serviceName = serviceName;
-  }
-
-  @Override
-  public void configure(org.apache.flume.Context context)
-  {
-    serviceName = context.getString("serviceName", "DTFlume");
-    connectionString = context.getString("connectionString");
-    basePath = context.getString("basePath");
-
-    connectionTimeoutMillis = context.getInteger("connectionTimeoutMillis", 1000);
-    connectionRetryCount = context.getInteger("connectionRetryCount", 10);
-    conntectionRetrySleepMillis = context.getInteger("connectionRetrySleepMillis", 500);
-  }
-
-  @Override
-  public void setup(com.datatorrent.api.Context context)
-  {
-    ObjectMapper om = new ObjectMapper();
-    instanceSerializerFactory = new InstanceSerializerFactory(om.reader(), om.writer());
-
-    curatorFramework = CuratorFrameworkFactory.builder()
-            .connectionTimeoutMs(connectionTimeoutMillis)
-            .retryPolicy(new RetryNTimes(connectionRetryCount, conntectionRetrySleepMillis))
-            .connectString(connectionString)
-            .build();
-    curatorFramework.start();
-
-    discovery = getDiscovery(curatorFramework);
-    try {
-      discovery.start();
-    } catch (Exception ex) {
-      Throwables.propagate(ex);
-    }
-  }
-
-  @Override
-  public void teardown()
-  {
-    try {
-      discovery.close();
-    } catch (IOException ex) {
-      throw new RuntimeException(ex);
-    } finally {
-      curatorFramework.close();
-      curatorFramework = null;
-    }
-  }
-
-  public class InstanceSerializerFactory
-  {
-    private final ObjectReader objectReader;
-    private final ObjectWriter objectWriter;
-
-    InstanceSerializerFactory(ObjectReader objectReader, ObjectWriter objectWriter)
-    {
-      this.objectReader = objectReader;
-      this.objectWriter = objectWriter;
-    }
-
-    public <T> InstanceSerializer<T> getInstanceSerializer(
-        TypeReference<ServiceInstance<T>> typeReference)
-    {
-      return new JacksonInstanceSerializer<T>(objectReader, objectWriter, typeReference);
-    }
-
-    final class JacksonInstanceSerializer<T> implements InstanceSerializer<T>
-    {
-      private final TypeReference<ServiceInstance<T>> typeRef;
-      private final ObjectWriter objectWriter;
-      private final ObjectReader objectReader;
-
-      JacksonInstanceSerializer(ObjectReader objectReader, ObjectWriter objectWriter,
-          TypeReference<ServiceInstance<T>> typeRef)
-      {
-        this.objectReader = objectReader;
-        this.objectWriter = objectWriter;
-        this.typeRef = typeRef;
-      }
-
-      @Override
-      public ServiceInstance<T> deserialize(byte[] bytes) throws Exception
-      {
-        return objectReader.withType(typeRef).readValue(bytes);
-      }
-
-      @Override
-      public byte[] serialize(ServiceInstance<T> serviceInstance) throws Exception
-      {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        objectWriter.writeValue(out, serviceInstance);
-        return out.toByteArray();
-      }
-
-    }
-
-  }
-
-  private static final long serialVersionUID = 201401221145L;
-  private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscovery.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptor.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptor.java b/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptor.java
deleted file mode 100644
index fd20f99..0000000
--- a/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptor.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.interceptor;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.interceptor.Interceptor;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Ints;
-
-import static com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER;
-import static com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor.Constants.SRC_SEPARATOR;
-import static com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor.Constants.SRC_SEPARATOR_DFLT;
-
-/**
- * <p>ColumnFilteringFormattingInterceptor class.</p>
- *
- * @since 0.9.4
- */
-public class ColumnFilteringFormattingInterceptor implements Interceptor
-{
-  private final byte srcSeparator;
-  private final byte[][] dstSeparators;
-  private final byte[] prefix;
-  private final int maxIndex;
-  private final int maxColumn;
-  private final int[] columns;
-  private final int[] positions;
-
-  private ColumnFilteringFormattingInterceptor(int[] columns, byte srcSeparator, byte[][] dstSeparators, byte[] prefix)
-  {
-    this.columns = columns;
-
-    int tempMaxColumn = Integer.MIN_VALUE;
-    for (int column : columns) {
-      if (column > tempMaxColumn) {
-        tempMaxColumn = column;
-      }
-    }
-    maxIndex = tempMaxColumn;
-    maxColumn = tempMaxColumn + 1;
-    positions = new int[maxColumn + 1];
-    this.srcSeparator = srcSeparator;
-    this.dstSeparators = dstSeparators;
-    this.prefix = prefix;
-  }
-
-  @Override
-  public void initialize()
-  {
-    /* no-op */
-  }
-
-  @Override
-  public Event intercept(Event event)
-  {
-    byte[] body = event.getBody();
-    if (body == null) {
-      return event;
-    }
-
-    final int length = body.length;
-
-    /* store positions of character after the separators */
-    int i = 0;
-    int index = 0;
-    while (i < length) {
-      if (body[i++] == srcSeparator) {
-        positions[++index] = i;
-        if (index >= maxIndex) {
-          break;
-        }
-      }
-    }
-
-    int nextVirginIndex;
-    boolean separatorAtEnd = true;
-    if (i == length && index < maxColumn) {
-      nextVirginIndex = index + 2;
-      positions[nextVirginIndex - 1] = length;
-      separatorAtEnd = length > 0 ? body[length - 1] == srcSeparator : false;
-    } else {
-      nextVirginIndex = index + 1;
-    }
-
-    int newArrayLen = prefix.length;
-    for (i = columns.length; i-- > 0; ) {
-      int column = columns[i];
-      int len = positions[column + 1] - positions[column];
-      if (len > 0) {
-        if (positions[column + 1] == length && !separatorAtEnd) {
-          newArrayLen += len;
-        } else {
-          newArrayLen += len - 1;
-        }
-      }
-      newArrayLen += dstSeparators[i].length;
-    }
-
-    byte[] newBody = new byte[newArrayLen];
-    int newOffset = 0;
-    if (prefix.length > 0) {
-      System.arraycopy(prefix, 0, newBody, 0, prefix.length);
-      newOffset += prefix.length;
-    }
-    int dstSeparatorsIdx = 0;
-    for (int column : columns) {
-      int len = positions[column + 1] - positions[column];
-      byte[] separator = dstSeparators[dstSeparatorsIdx++];
-      if (len > 0) {
-        System.arraycopy(body, positions[column], newBody, newOffset, len);
-        newOffset += len;
-        if (newBody[newOffset - 1] == srcSeparator) {
-          newOffset--;
-        }
-      }
-      System.arraycopy(separator, 0, newBody, newOffset, separator.length);
-      newOffset += separator.length;
-    }
-    event.setBody(newBody);
-    Arrays.fill(positions, 1, nextVirginIndex, 0);
-    return event;
-  }
-
-  @Override
-  public List<Event> intercept(List<Event> events)
-  {
-    for (Event event : events) {
-      intercept(event);
-    }
-    return events;
-  }
-
-  @Override
-  public void close()
-  {
-  }
-
-  public static class Builder implements Interceptor.Builder
-  {
-    private int[] columns;
-    private byte srcSeparator;
-    private byte[][] dstSeparators;
-    private byte[] prefix;
-
-    @Override
-    public Interceptor build()
-    {
-      return new ColumnFilteringFormattingInterceptor(columns, srcSeparator, dstSeparators, prefix);
-    }
-
-    @Override
-    public void configure(Context context)
-    {
-      String formatter = context.getString(COLUMNS_FORMATTER);
-      if (Strings.isNullOrEmpty(formatter)) {
-        throw new IllegalArgumentException("This interceptor requires columns format to be specified!");
-      }
-      List<String> lSeparators = Lists.newArrayList();
-      List<Integer> lColumns = Lists.newArrayList();
-      Pattern colPat = Pattern.compile("\\{\\d+?\\}");
-      Matcher matcher = colPat.matcher(formatter);
-      int separatorStart = 0;
-      String lPrefix = "";
-      while (matcher.find()) {
-        String col = matcher.group();
-        lColumns.add(Integer.parseInt(col.substring(1, col.length() - 1)));
-        if (separatorStart == 0 && matcher.start() > 0) {
-          lPrefix = formatter.substring(0, matcher.start());
-        } else if (separatorStart > 0) {
-          lSeparators.add(formatter.substring(separatorStart, matcher.start()));
-        }
-
-        separatorStart = matcher.end();
-      }
-      if (separatorStart < formatter.length()) {
-        lSeparators.add(formatter.substring(separatorStart, formatter.length()));
-      }
-      columns = Ints.toArray(lColumns);
-      byte[] emptyStringBytes = "".getBytes();
-
-      dstSeparators = new byte[columns.length][];
-
-      for (int i = 0; i < columns.length; i++) {
-        if (i < lSeparators.size()) {
-          dstSeparators[i] = lSeparators.get(i).getBytes();
-        } else {
-          dstSeparators[i] = emptyStringBytes;
-        }
-      }
-      srcSeparator = context.getInteger(SRC_SEPARATOR, (int)SRC_SEPARATOR_DFLT).byteValue();
-      this.prefix = lPrefix.getBytes();
-    }
-  }
-
-  public static class Constants extends ColumnFilteringInterceptor.Constants
-  {
-    public static final String COLUMNS_FORMATTER = "columnsFormatter";
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(ColumnFilteringFormattingInterceptor.class);
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptor.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptor.java b/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptor.java
deleted file mode 100644
index a2f598f..0000000
--- a/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptor.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.interceptor;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.interceptor.Interceptor;
-
-import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.COLUMNS;
-import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.DST_SEPARATOR;
-import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.DST_SEPARATOR_DFLT;
-import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.SRC_SEPARATOR;
-import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.SRC_SEPARATOR_DFLT;
-
-/**
- * <p>ColumnFilteringInterceptor class.</p>
- *
- * @since 0.9.4
- */
-public class ColumnFilteringInterceptor implements Interceptor
-{
-  private final byte srcSeparator;
-  private final byte dstSeparator;
-
-  private final int maxIndex;
-  private final int maxColumn;
-  private final int[] columns;
-  private final int[] positions;
-
-  private ColumnFilteringInterceptor(int[] columns, byte srcSeparator, byte dstSeparator)
-  {
-    this.columns = columns;
-
-    int tempMaxColumn = Integer.MIN_VALUE;
-    for (int column: columns) {
-      if (column > tempMaxColumn) {
-        tempMaxColumn = column;
-      }
-    }
-    maxIndex = tempMaxColumn;
-    maxColumn = tempMaxColumn + 1;
-    positions = new int[maxColumn + 1];
-
-    this.srcSeparator = srcSeparator;
-    this.dstSeparator = dstSeparator;
-  }
-
-  @Override
-  public void initialize()
-  {
-    /* no-op */
-  }
-
-  @Override
-  public Event intercept(Event event)
-  {
-    byte[] body = event.getBody();
-    if (body == null) {
-      return event;
-    }
-
-    final int length = body.length;
-
-    /* store positions of character after the separators */
-    int i = 0;
-    int index = 0;
-    while (i < length) {
-      if (body[i++] == srcSeparator) {
-        positions[++index] = i;
-        if (index >= maxIndex) {
-          break;
-        }
-      }
-    }
-
-    int nextVirginIndex;
-    boolean separatorTerminated;
-    if (i == length && index < maxColumn) {
-      nextVirginIndex = index + 2;
-      positions[nextVirginIndex - 1] = length;
-      separatorTerminated = length > 0 ? body[length - 1]  != srcSeparator : false;
-    } else {
-      nextVirginIndex = index + 1;
-      separatorTerminated = true;
-    }
-
-    int newArrayLen = 0;
-    for (i = columns.length; i-- > 0;) {
-      int column = columns[i];
-      int len = positions[column + 1] - positions[column];
-      if (len <= 0) {
-        newArrayLen++;
-      } else {
-        if (separatorTerminated && positions[column + 1] == length) {
-          newArrayLen++;
-        }
-        newArrayLen += len;
-      }
-    }
-
-    byte[] newbody = new byte[newArrayLen];
-    int newoffset = 0;
-    for (int column: columns) {
-      int len = positions[column + 1] - positions[column];
-      if (len > 0) {
-        System.arraycopy(body, positions[column], newbody, newoffset, len);
-        newoffset += len;
-        if (newbody[newoffset - 1] == srcSeparator) {
-          newbody[newoffset - 1] = dstSeparator;
-        } else {
-          newbody[newoffset++] = dstSeparator;
-        }
-      } else {
-        newbody[newoffset++] = dstSeparator;
-      }
-    }
-
-    event.setBody(newbody);
-    Arrays.fill(positions, 1, nextVirginIndex, 0);
-    return event;
-  }
-
-  @Override
-  public List<Event> intercept(List<Event> events)
-  {
-    for (Event event: events) {
-      intercept(event);
-    }
-    return events;
-  }
-
-  @Override
-  public void close()
-  {
-  }
-
-  public static class Builder implements Interceptor.Builder
-  {
-    private int[] columns;
-    private byte srcSeparator;
-    private byte dstSeparator;
-
-    @Override
-    public Interceptor build()
-    {
-      return new ColumnFilteringInterceptor(columns, srcSeparator, dstSeparator);
-    }
-
-    @Override
-    public void configure(Context context)
-    {
-      String sColumns = context.getString(COLUMNS);
-      if (sColumns == null || sColumns.trim().isEmpty()) {
-        throw new Error("This interceptor requires filtered columns to be specified!");
-      }
-
-      String[] parts = sColumns.split(" ");
-      columns = new int[parts.length];
-      for (int i = parts.length; i-- > 0;) {
-        columns[i] = Integer.parseInt(parts[i]);
-      }
-
-      srcSeparator = context.getInteger(SRC_SEPARATOR, (int)SRC_SEPARATOR_DFLT).byteValue();
-      dstSeparator = context.getInteger(DST_SEPARATOR, (int)DST_SEPARATOR_DFLT).byteValue();
-    }
-
-  }
-
-  @SuppressWarnings("ClassMayBeInterface") /* adhering to flume until i understand it completely */
-
-  public static class Constants
-  {
-    public static final String SRC_SEPARATOR = "srcSeparator";
-    public static final byte SRC_SEPARATOR_DFLT = 2;
-
-    public static final String DST_SEPARATOR = "dstSeparator";
-    public static final byte DST_SEPARATOR_DFLT = 1;
-
-    public static final String COLUMNS = "columns";
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(ColumnFilteringInterceptor.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/operator/AbstractFlumeInputOperator.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/operator/AbstractFlumeInputOperator.java b/flume/src/main/java/com/datatorrent/flume/operator/AbstractFlumeInputOperator.java
deleted file mode 100644
index d772ff5..0000000
--- a/flume/src/main/java/com/datatorrent/flume/operator/AbstractFlumeInputOperator.java
+++ /dev/null
@@ -1,761 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.operator;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ArrayBlockingQueue;
-
-import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Event;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.DefaultPartition;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Partitioner;
-import com.datatorrent.api.Stats.OperatorStats;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.flume.discovery.Discovery.Service;
-import com.datatorrent.flume.discovery.ZKAssistedDiscovery;
-import com.datatorrent.flume.sink.Server;
-import com.datatorrent.flume.sink.Server.Command;
-import com.datatorrent.flume.sink.Server.Request;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
-import com.datatorrent.netlet.DefaultEventLoop;
-import com.datatorrent.netlet.util.Slice;
-
-import static java.lang.Thread.sleep;
-
-/**
- * <p>
- * Abstract AbstractFlumeInputOperator class.</p>
- *
- * @param <T> Type of the output payload.
- * @since 0.9.2
- */
-public abstract class AbstractFlumeInputOperator<T>
-    implements InputOperator, Operator.ActivationListener<OperatorContext>, Operator.IdleTimeHandler,
-    Operator.CheckpointListener, Partitioner<AbstractFlumeInputOperator<T>>
-{
-  public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
-  public final transient DefaultOutputPort<Slice> drop = new DefaultOutputPort<Slice>();
-  @NotNull
-  private String[] connectionSpecs;
-  @NotNull
-  private StreamCodec<Event> codec;
-  private final ArrayList<RecoveryAddress> recoveryAddresses;
-  @SuppressWarnings("FieldMayBeFinal") // it's not final because that mucks with the serialization somehow
-  private transient ArrayBlockingQueue<Slice> handoverBuffer;
-  private transient int idleCounter;
-  private transient int eventCounter;
-  private transient DefaultEventLoop eventloop;
-  private transient volatile boolean connected;
-  private transient OperatorContext context;
-  private transient Client client;
-  private transient long windowId;
-  private transient byte[] address;
-  @Min(0)
-  private long maxEventsPerSecond;
-  //This is calculated from maxEventsPerSecond, App window count and streaming window size
-  private transient long maxEventsPerWindow;
-
-  public AbstractFlumeInputOperator()
-  {
-    handoverBuffer = new ArrayBlockingQueue<Slice>(1024 * 5);
-    connectionSpecs = new String[0];
-    recoveryAddresses = new ArrayList<RecoveryAddress>();
-    maxEventsPerSecond = Long.MAX_VALUE;
-  }
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    long windowDurationMillis = context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) *
-        context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
-    maxEventsPerWindow = (long)(windowDurationMillis / 1000.0 * maxEventsPerSecond);
-    logger.debug("max-events per-second {} per-window {}", maxEventsPerSecond, maxEventsPerWindow);
-
-    try {
-      eventloop = new DefaultEventLoop("EventLoop-" + context.getId());
-      eventloop.start();
-    } catch (IOException ex) {
-      throw new RuntimeException(ex);
-    }
-  }
-
-  @Override
-  @SuppressWarnings({"unchecked"})
-  public void activate(OperatorContext ctx)
-  {
-    if (connectionSpecs.length == 0) {
-      logger.info("Discovered zero DTFlumeSink");
-    } else if (connectionSpecs.length == 1) {
-      for (String connectAddresse: connectionSpecs) {
-        logger.debug("Connection spec is {}", connectAddresse);
-        String[] parts = connectAddresse.split(":");
-        eventloop.connect(new InetSocketAddress(parts[1], Integer.parseInt(parts[2])), client = new Client(parts[0]));
-      }
-    } else {
-      throw new IllegalArgumentException(
-          String.format("A physical %s operator cannot connect to more than 1 addresses!",
-              this.getClass().getSimpleName()));
-    }
-
-    context = ctx;
-  }
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-    this.windowId = windowId;
-    idleCounter = 0;
-    eventCounter = 0;
-  }
-
-  @Override
-  public void emitTuples()
-  {
-    int i = handoverBuffer.size();
-    if (i > 0 && eventCounter < maxEventsPerWindow) {
-
-      while (--i > 0 && eventCounter < maxEventsPerWindow - 1) {
-        final Slice slice = handoverBuffer.poll();
-        slice.offset += 8;
-        slice.length -= 8;
-        T convert = convert((Event)codec.fromByteArray(slice));
-        if (convert == null) {
-          drop.emit(slice);
-        } else {
-          output.emit(convert);
-        }
-        eventCounter++;
-      }
-
-      final Slice slice = handoverBuffer.poll();
-      slice.offset += 8;
-      slice.length -= 8;
-      T convert = convert((Event)codec.fromByteArray(slice));
-      if (convert == null) {
-        drop.emit(slice);
-      } else {
-        output.emit(convert);
-      }
-      eventCounter++;
-
-      address = Arrays.copyOfRange(slice.buffer, slice.offset - 8, slice.offset);
-    }
-  }
-
-  @Override
-  public void endWindow()
-  {
-    if (connected) {
-      byte[] array = new byte[Request.FIXED_SIZE];
-
-      array[0] = Command.WINDOWED.getOrdinal();
-      Server.writeInt(array, 1, eventCounter);
-      Server.writeInt(array, 5, idleCounter);
-      Server.writeLong(array, Request.TIME_OFFSET, System.currentTimeMillis());
-
-      logger.debug("wrote {} with eventCounter = {} and idleCounter = {}", Command.WINDOWED, eventCounter, idleCounter);
-      client.write(array);
-    }
-
-    if (address != null) {
-      RecoveryAddress rAddress = new RecoveryAddress();
-      rAddress.address = address;
-      address = null;
-      rAddress.windowId = windowId;
-      recoveryAddresses.add(rAddress);
-    }
-  }
-
-  @Override
-  public void deactivate()
-  {
-    if (connected) {
-      eventloop.disconnect(client);
-    }
-    context = null;
-  }
-
-  @Override
-  public void teardown()
-  {
-    eventloop.stop();
-    eventloop = null;
-  }
-
-  @Override
-  public void handleIdleTime()
-  {
-    idleCounter++;
-    try {
-      sleep(context.getValue(OperatorContext.SPIN_MILLIS));
-    } catch (InterruptedException ex) {
-      throw new RuntimeException(ex);
-    }
-  }
-
-  public abstract T convert(Event event);
-
-  /**
-   * @return the connectAddress
-   */
-  public String[] getConnectAddresses()
-  {
-    return connectionSpecs.clone();
-  }
-
-  /**
-   * @param specs - sinkid:host:port specification of all the sinks.
-   */
-  public void setConnectAddresses(String[] specs)
-  {
-    this.connectionSpecs = specs.clone();
-  }
-
-  /**
-   * @return the codec
-   */
-  public StreamCodec<Event> getCodec()
-  {
-    return codec;
-  }
-
-  /**
-   * @param codec the codec to set
-   */
-  public void setCodec(StreamCodec<Event> codec)
-  {
-    this.codec = codec;
-  }
-
-  private static class RecoveryAddress implements Serializable
-  {
-    long windowId;
-    byte[] address;
-
-    @Override
-    public String toString()
-    {
-      return "RecoveryAddress{" + "windowId=" + windowId + ", address=" + Arrays.toString(address) + '}';
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-      if (this == o) {
-        return true;
-      }
-      if (!(o instanceof RecoveryAddress)) {
-        return false;
-      }
-
-      RecoveryAddress that = (RecoveryAddress)o;
-
-      if (windowId != that.windowId) {
-        return false;
-      }
-      return Arrays.equals(address, that.address);
-    }
-
-    @Override
-    public int hashCode()
-    {
-      int result = (int)(windowId ^ (windowId >>> 32));
-      result = 31 * result + (address != null ? Arrays.hashCode(address) : 0);
-      return result;
-    }
-
-    private static final long serialVersionUID = 201312021432L;
-  }
-
-  @Override
-  public void checkpointed(long windowId)
-  {
-    /* dont do anything */
-  }
-
-  @Override
-  public void committed(long windowId)
-  {
-    if (!connected) {
-      return;
-    }
-
-    synchronized (recoveryAddresses) {
-      byte[] addr = null;
-
-      Iterator<RecoveryAddress> iterator = recoveryAddresses.iterator();
-      while (iterator.hasNext()) {
-        RecoveryAddress ra = iterator.next();
-        if (ra.windowId > windowId) {
-          break;
-        }
-
-        iterator.remove();
-        if (ra.address != null) {
-          addr = ra.address;
-        }
-      }
-
-      if (addr != null) {
-        /*
-         * Make sure that we store the last valid address processed
-         */
-        if (recoveryAddresses.isEmpty()) {
-          RecoveryAddress ra = new RecoveryAddress();
-          ra.address = addr;
-          recoveryAddresses.add(ra);
-        }
-
-        int arraySize = 1/* for the type of the message */
-            + 8 /* for the location to commit */
-            + 8 /* for storing the current time stamp*/;
-        byte[] array = new byte[arraySize];
-
-        array[0] = Command.COMMITTED.getOrdinal();
-        System.arraycopy(addr, 0, array, 1, 8);
-        Server.writeLong(array, Request.TIME_OFFSET, System.currentTimeMillis());
-        logger.debug("wrote {} with recoveryOffset = {}", Command.COMMITTED, Arrays.toString(addr));
-        client.write(array);
-      }
-    }
-  }
-
-  @Override
-  public Collection<Partition<AbstractFlumeInputOperator<T>>> definePartitions(
-      Collection<Partition<AbstractFlumeInputOperator<T>>> partitions, PartitioningContext context)
-  {
-    Collection<Service<byte[]>> discovered = discoveredFlumeSinks.get();
-    if (discovered == null) {
-      return partitions;
-    }
-
-    HashMap<String, ArrayList<RecoveryAddress>> allRecoveryAddresses = abandonedRecoveryAddresses.get();
-    ArrayList<String> allConnectAddresses = new ArrayList<String>(partitions.size());
-    for (Partition<AbstractFlumeInputOperator<T>> partition: partitions) {
-      String[] lAddresses = partition.getPartitionedInstance().connectionSpecs;
-      allConnectAddresses.addAll(Arrays.asList(lAddresses));
-      for (int i = lAddresses.length; i-- > 0;) {
-        String[] parts = lAddresses[i].split(":", 2);
-        allRecoveryAddresses.put(parts[0], partition.getPartitionedInstance().recoveryAddresses);
-      }
-    }
-
-    HashMap<String, String> connections = new HashMap<String, String>(discovered.size());
-    for (Service<byte[]> service: discovered) {
-      String previousSpec = connections.get(service.getId());
-      String newspec = service.getId() + ':' + service.getHost() + ':' + service.getPort();
-      if (previousSpec == null) {
-        connections.put(service.getId(), newspec);
-      } else {
-        boolean found = false;
-        for (ConnectionStatus cs: partitionedInstanceStatus.get().values()) {
-          if (previousSpec.equals(cs.spec) && !cs.connected) {
-            connections.put(service.getId(), newspec);
-            found = true;
-            break;
-          }
-        }
-
-        if (!found) {
-          logger.warn("2 sinks found with the same id: {} and {}... Ignoring previous.", previousSpec, newspec);
-          connections.put(service.getId(), newspec);
-        }
-      }
-    }
-
-    for (int i = allConnectAddresses.size(); i-- > 0;) {
-      String[] parts = allConnectAddresses.get(i).split(":");
-      String connection = connections.remove(parts[0]);
-      if (connection == null) {
-        allConnectAddresses.remove(i);
-      } else {
-        allConnectAddresses.set(i, connection);
-      }
-    }
-
-    allConnectAddresses.addAll(connections.values());
-
-    partitions.clear();
-    try {
-      if (allConnectAddresses.isEmpty()) {
-        /* return at least one of them; otherwise stram becomes grumpy */
-        @SuppressWarnings("unchecked")
-        AbstractFlumeInputOperator<T> operator = getClass().newInstance();
-        operator.setCodec(codec);
-        operator.setMaxEventsPerSecond(maxEventsPerSecond);
-        for (ArrayList<RecoveryAddress> lRecoveryAddresses: allRecoveryAddresses.values()) {
-          operator.recoveryAddresses.addAll(lRecoveryAddresses);
-        }
-        operator.connectionSpecs = new String[allConnectAddresses.size()];
-        for (int i = connectionSpecs.length; i-- > 0;) {
-          connectionSpecs[i] = allConnectAddresses.get(i);
-        }
-
-        partitions.add(new DefaultPartition<AbstractFlumeInputOperator<T>>(operator));
-      } else {
-        long maxEventsPerSecondPerOperator = maxEventsPerSecond / allConnectAddresses.size();
-        for (int i = allConnectAddresses.size(); i-- > 0;) {
-          @SuppressWarnings("unchecked")
-          AbstractFlumeInputOperator<T> operator = getClass().newInstance();
-          operator.setCodec(codec);
-          operator.setMaxEventsPerSecond(maxEventsPerSecondPerOperator);
-          String connectAddress = allConnectAddresses.get(i);
-          operator.connectionSpecs = new String[] {connectAddress};
-
-          String[] parts = connectAddress.split(":", 2);
-          ArrayList<RecoveryAddress> remove = allRecoveryAddresses.remove(parts[0]);
-          if (remove != null) {
-            operator.recoveryAddresses.addAll(remove);
-          }
-
-          partitions.add(new DefaultPartition<AbstractFlumeInputOperator<T>>(operator));
-        }
-      }
-    } catch (IllegalAccessException ex) {
-      throw new RuntimeException(ex);
-    } catch (InstantiationException ex) {
-      throw new RuntimeException(ex);
-    }
-
-    logger.debug("Requesting partitions: {}", partitions);
-    return partitions;
-  }
-
-  @Override
-  public void partitioned(Map<Integer, Partition<AbstractFlumeInputOperator<T>>> partitions)
-  {
-    logger.debug("Partitioned Map: {}", partitions);
-    HashMap<Integer, ConnectionStatus> map = partitionedInstanceStatus.get();
-    map.clear();
-    for (Entry<Integer, Partition<AbstractFlumeInputOperator<T>>> entry: partitions.entrySet()) {
-      if (map.containsKey(entry.getKey())) {
-        // what can be done here?
-      } else {
-        map.put(entry.getKey(), null);
-      }
-    }
-  }
-
-  @Override
-  public String toString()
-  {
-    return "AbstractFlumeInputOperator{" + "connected=" + connected + ", connectionSpecs=" +
-        (connectionSpecs.length == 0 ? "empty" : connectionSpecs[0]) + ", recoveryAddresses=" + recoveryAddresses + '}';
-  }
-
-  class Client extends AbstractLengthPrependerClient
-  {
-    private final String id;
-
-    Client(String id)
-    {
-      this.id = id;
-    }
-
-    @Override
-    public void onMessage(byte[] buffer, int offset, int size)
-    {
-      try {
-        handoverBuffer.put(new Slice(buffer, offset, size));
-      } catch (InterruptedException ex) {
-        handleException(ex, eventloop);
-      }
-    }
-
-    @Override
-    public void connected()
-    {
-      super.connected();
-
-      byte[] address;
-      synchronized (recoveryAddresses) {
-        if (recoveryAddresses.size() > 0) {
-          address = recoveryAddresses.get(recoveryAddresses.size() - 1).address;
-        } else {
-          address = new byte[8];
-        }
-      }
-
-      int len = 1 /* for the message type SEEK */
-          + 8 /* for the address */
-          + 8 /* for storing the current time stamp*/;
-
-      byte[] array = new byte[len];
-      array[0] = Command.SEEK.getOrdinal();
-      System.arraycopy(address, 0, array, 1, 8);
-      Server.writeLong(array, 9, System.currentTimeMillis());
-      write(array);
-
-      connected = true;
-      ConnectionStatus connectionStatus = new ConnectionStatus();
-      connectionStatus.connected = true;
-      connectionStatus.spec = connectionSpecs[0];
-      OperatorContext ctx = context;
-      synchronized (ctx) {
-        logger.debug("{} Submitting ConnectionStatus = {}", AbstractFlumeInputOperator.this, connectionStatus);
-        context.setCounters(connectionStatus);
-      }
-    }
-
-    @Override
-    public void disconnected()
-    {
-      connected = false;
-      ConnectionStatus connectionStatus = new ConnectionStatus();
-      connectionStatus.connected = false;
-      connectionStatus.spec = connectionSpecs[0];
-      OperatorContext ctx = context;
-      synchronized (ctx) {
-        logger.debug("{} Submitting ConnectionStatus = {}", AbstractFlumeInputOperator.this, connectionStatus);
-        context.setCounters(connectionStatus);
-      }
-      super.disconnected();
-    }
-
-  }
-
-  public static class ZKStatsListner extends ZKAssistedDiscovery implements com.datatorrent.api.StatsListener,
-      Serializable
-  {
-    /*
-     * In the current design, one input operator is able to connect
-     * to only one flume adapter. Sometime in future, we should support
-     * any number of input operators connecting to any number of flume
-     * sinks and vice a versa.
-     *
-     * Until that happens the following map should be sufficient to
-     * keep track of which input operator is connected to which flume sink.
-     */
-    long intervalMillis;
-    private final Response response;
-    private transient long nextMillis;
-
-    public ZKStatsListner()
-    {
-      intervalMillis = 60 * 1000L;
-      response = new Response();
-    }
-
-    @Override
-    public Response processStats(BatchedOperatorStats stats)
-    {
-      final HashMap<Integer, ConnectionStatus> map = partitionedInstanceStatus.get();
-      response.repartitionRequired = false;
-
-      Object lastStat = null;
-      List<OperatorStats> lastWindowedStats = stats.getLastWindowedStats();
-      for (OperatorStats os: lastWindowedStats) {
-        if (os.counters != null) {
-          lastStat = os.counters;
-          logger.debug("Received custom stats = {}", lastStat);
-        }
-      }
-
-      if (lastStat instanceof ConnectionStatus) {
-        ConnectionStatus cs = (ConnectionStatus)lastStat;
-        map.put(stats.getOperatorId(), cs);
-        if (!cs.connected) {
-          logger.debug("setting repatitioned = true because of lastStat = {}", lastStat);
-          response.repartitionRequired = true;
-        }
-      }
-
-      if (System.currentTimeMillis() >= nextMillis) {
-        logger.debug("nextMillis = {}", nextMillis);
-        try {
-          super.setup(null);
-          Collection<Service<byte[]>> addresses;
-          try {
-            addresses = discover();
-          } finally {
-            super.teardown();
-          }
-          AbstractFlumeInputOperator.discoveredFlumeSinks.set(addresses);
-          logger.debug("\ncurrent map = {}\ndiscovered sinks = {}", map, addresses);
-          switch (addresses.size()) {
-            case 0:
-              response.repartitionRequired = map.size() != 1;
-              break;
-
-            default:
-              if (addresses.size() == map.size()) {
-                for (ConnectionStatus value: map.values()) {
-                  if (value == null || !value.connected) {
-                    response.repartitionRequired = true;
-                    break;
-                  }
-                }
-              } else {
-                response.repartitionRequired = true;
-              }
-              break;
-          }
-        } catch (Error er) {
-          throw er;
-        } catch (Throwable cause) {
-          logger.warn("Unable to discover services, using values from last successful discovery", cause);
-        } finally {
-          nextMillis = System.currentTimeMillis() + intervalMillis;
-          logger.debug("Proposed NextMillis = {}", nextMillis);
-        }
-      }
-
-      return response;
-    }
-
-    /**
-     * @return the intervalMillis
-     */
-    public long getIntervalMillis()
-    {
-      return intervalMillis;
-    }
-
-    /**
-     * @param intervalMillis the intervalMillis to set
-     */
-    public void setIntervalMillis(long intervalMillis)
-    {
-      this.intervalMillis = intervalMillis;
-    }
-
-    private static final long serialVersionUID = 201312241646L;
-  }
-
-  public static class ConnectionStatus implements Serializable
-  {
-    int id;
-    String spec;
-    boolean connected;
-
-    @Override
-    public int hashCode()
-    {
-      return spec.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object obj)
-    {
-      if (obj == null) {
-        return false;
-      }
-      if (getClass() != obj.getClass()) {
-        return false;
-      }
-      final ConnectionStatus other = (ConnectionStatus)obj;
-      return spec == null ? other.spec == null : spec.equals(other.spec);
-    }
-
-    @Override
-    public String toString()
-    {
-      return "ConnectionStatus{" + "id=" + id + ", spec=" + spec + ", connected=" + connected + '}';
-    }
-
-    private static final long serialVersionUID = 201312261615L;
-  }
-
-  private static final transient ThreadLocal<HashMap<Integer, ConnectionStatus>> partitionedInstanceStatus =
-      new ThreadLocal<HashMap<Integer, ConnectionStatus>>()
-    {
-      @Override
-      protected HashMap<Integer, ConnectionStatus> initialValue()
-      {
-        return new HashMap<Integer, ConnectionStatus>();
-      }
-
-    };
-  /**
-   * When a sink goes away and a replacement sink is not found, we stash the recovery addresses associated
-   * with the sink in a hope that the new sink may show up in near future.
-   */
-  private static final transient ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>> abandonedRecoveryAddresses =
-      new ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>>()
-  {
-    @Override
-    protected HashMap<String, ArrayList<RecoveryAddress>> initialValue()
-    {
-      return new HashMap<String, ArrayList<RecoveryAddress>>();
-    }
-
-  };
-  private static final transient ThreadLocal<Collection<Service<byte[]>>> discoveredFlumeSinks =
-      new ThreadLocal<Collection<Service<byte[]>>>();
-
-  @Override
-  public boolean equals(Object o)
-  {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof AbstractFlumeInputOperator)) {
-      return false;
-    }
-
-    AbstractFlumeInputOperator<?> that = (AbstractFlumeInputOperator<?>)o;
-
-    if (!Arrays.equals(connectionSpecs, that.connectionSpecs)) {
-      return false;
-    }
-    return recoveryAddresses.equals(that.recoveryAddresses);
-
-  }
-
-  @Override
-  public int hashCode()
-  {
-    int result = connectionSpecs != null ? Arrays.hashCode(connectionSpecs) : 0;
-    result = 31 * result + (recoveryAddresses.hashCode());
-    return result;
-  }
-
-  public void setMaxEventsPerSecond(long maxEventsPerSecond)
-  {
-    this.maxEventsPerSecond = maxEventsPerSecond;
-  }
-
-  public long getMaxEventsPerSecond()
-  {
-    return maxEventsPerSecond;
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(AbstractFlumeInputOperator.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/sink/DTFlumeSink.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/sink/DTFlumeSink.java b/flume/src/main/java/com/datatorrent/flume/sink/DTFlumeSink.java
deleted file mode 100644
index 55d3d61..0000000
--- a/flume/src/main/java/com/datatorrent/flume/sink/DTFlumeSink.java
+++ /dev/null
@@ -1,572 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.flume.sink;
-
-import java.io.IOError;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.ServiceConfigurationError;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.sink.AbstractSink;
-
-import com.datatorrent.api.Component;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.flume.discovery.Discovery;
-import com.datatorrent.flume.sink.Server.Client;
-import com.datatorrent.flume.sink.Server.Request;
-import com.datatorrent.flume.storage.EventCodec;
-import com.datatorrent.flume.storage.Storage;
-import com.datatorrent.netlet.DefaultEventLoop;
-import com.datatorrent.netlet.NetletThrowable;
-import com.datatorrent.netlet.NetletThrowable.NetletRuntimeException;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * DTFlumeSink is a flume sink developed to ingest the data into DataTorrent DAG
- * from flume. It's essentially a flume sink which acts as a server capable of
- * talking to one client at a time. The client for this server is AbstractFlumeInputOperator.
- * <p />
- * &lt;experimental&gt;DTFlumeSink auto adjusts the rate at which it consumes the data from channel to
- * match the throughput of the DAG.&lt;/experimental&gt;
- * <p />
- * The properties you can set on the DTFlumeSink are: <br />
- * id - string unique value identifying this sink <br />
- * hostname - string value indicating the fqdn or ip address of the interface on which the server should listen <br />
- * port - integer value indicating the numeric port to which the server should bind <br />
- * sleepMillis - integer value indicating the number of milliseconds the process should sleep when there are no events
- * before checking for next event again <br />
- * throughputAdjustmentPercent - integer value indicating by what percentage the flume transaction size should be
- * adjusted upward or downward at a time <br />
- * minimumEventsPerTransaction - integer value indicating the minimum number of events per transaction <br />
- * maximumEventsPerTransaction - integer value indicating the maximum number of events per transaction. This value can
- * not be more than channel's transaction capacity.<br />
- *
- * @since 0.9.2
- */
-public class DTFlumeSink extends AbstractSink implements Configurable
-{
-  private static final String HOSTNAME_STRING = "hostname";
-  private static final String HOSTNAME_DEFAULT = "locahost";
-  private static final long ACCEPTED_TOLERANCE = 20000;
-  private DefaultEventLoop eventloop;
-  private Server server;
-  private int outstandingEventsCount;
-  private int lastConsumedEventsCount;
-  private int idleCount;
-  private byte[] playback;
-  private Client client;
-  private String hostname;
-  private int port;
-  private String id;
-  private long acceptedTolerance;
-  private long sleepMillis;
-  private double throughputAdjustmentFactor;
-  private int minimumEventsPerTransaction;
-  private int maximumEventsPerTransaction;
-  private long commitEventTimeoutMillis;
-  private transient long lastCommitEventTimeMillis;
-  private Storage storage;
-  Discovery<byte[]> discovery;
-  StreamCodec<Event> codec;
-  /* Begin implementing Flume Sink interface */
-
-  @Override
-  @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch", "UseSpecificCatch", "SleepWhileInLoop"})
-  public Status process() throws EventDeliveryException
-  {
-    Slice slice;
-    synchronized (server.requests) {
-      for (Request r : server.requests) {
-        logger.debug("found {}", r);
-        switch (r.type) {
-          case SEEK:
-            lastCommitEventTimeMillis = System.currentTimeMillis();
-            slice = r.getAddress();
-            playback = storage.retrieve(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length));
-            client = r.client;
-            break;
-
-          case COMMITTED:
-            lastCommitEventTimeMillis = System.currentTimeMillis();
-            slice = r.getAddress();
-            storage.clean(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length));
-            break;
-
-          case CONNECTED:
-            logger.debug("Connected received, ignoring it!");
-            break;
-
-          case DISCONNECTED:
-            if (r.client == client) {
-              client = null;
-              outstandingEventsCount = 0;
-            }
-            break;
-
-          case WINDOWED:
-            lastConsumedEventsCount = r.getEventCount();
-            idleCount = r.getIdleCount();
-            outstandingEventsCount -= lastConsumedEventsCount;
-            break;
-
-          case SERVER_ERROR:
-            throw new IOError(null);
-
-          default:
-            logger.debug("Cannot understand the request {}", r);
-            break;
-        }
-      }
-
-      server.requests.clear();
-    }
-
-    if (client == null) {
-      logger.info("No client expressed interest yet to consume the events.");
-      return Status.BACKOFF;
-    } else if (System.currentTimeMillis() - lastCommitEventTimeMillis > commitEventTimeoutMillis) {
-      logger.info("Client has not processed the workload given for the last {} milliseconds, so backing off.",
-          System.currentTimeMillis() - lastCommitEventTimeMillis);
-      return Status.BACKOFF;
-    }
-
-    int maxTuples;
-    // the following logic needs to be fixed... this is a quick put together.
-    if (outstandingEventsCount < 0) {
-      if (idleCount > 1) {
-        maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * lastConsumedEventsCount);
-      } else {
-        maxTuples = (int)((1 + throughputAdjustmentFactor) * lastConsumedEventsCount);
-      }
-    } else if (outstandingEventsCount > lastConsumedEventsCount) {
-      maxTuples = (int)((1 - throughputAdjustmentFactor) * lastConsumedEventsCount);
-    } else {
-      if (idleCount > 0) {
-        maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * lastConsumedEventsCount);
-        if (maxTuples <= 0) {
-          maxTuples = minimumEventsPerTransaction;
-        }
-      } else {
-        maxTuples = lastConsumedEventsCount;
-      }
-    }
-
-    if (maxTuples >= maximumEventsPerTransaction) {
-      maxTuples = maximumEventsPerTransaction;
-    } else if (maxTuples <= 0) {
-      maxTuples = minimumEventsPerTransaction;
-    }
-
-    if (maxTuples > 0) {
-      if (playback != null) {
-        try {
-          int i = 0;
-          do {
-            if (!client.write(playback)) {
-              retryWrite(playback, null);
-            }
-            outstandingEventsCount++;
-            playback = storage.retrieveNext();
-          }
-          while (++i < maxTuples && playback != null);
-        } catch (Exception ex) {
-          logger.warn("Playback Failed", ex);
-          if (ex instanceof NetletThrowable) {
-            try {
-              eventloop.disconnect(client);
-            } finally {
-              client = null;
-              outstandingEventsCount = 0;
-            }
-          }
-          return Status.BACKOFF;
-        }
-      } else {
-        int storedTuples = 0;
-
-        Transaction t = getChannel().getTransaction();
-        try {
-          t.begin();
-
-          Event e;
-          while (storedTuples < maxTuples && (e = getChannel().take()) != null) {
-            Slice event = codec.toByteArray(e);
-            byte[] address = storage.store(event);
-            if (address != null) {
-              if (!client.write(address, event)) {
-                retryWrite(address, event);
-              }
-              outstandingEventsCount++;
-            } else {
-              logger.debug("Detected the condition of recovery from flume crash!");
-            }
-            storedTuples++;
-          }
-
-          if (storedTuples > 0) {
-            storage.flush();
-          }
-
-          t.commit();
-
-          if (storedTuples > 0) { /* log less frequently */
-            logger.debug("Transaction details maxTuples = {}, storedTuples = {}, outstanding = {}",
-                maxTuples, storedTuples, outstandingEventsCount);
-          }
-        } catch (Error er) {
-          t.rollback();
-          throw er;
-        } catch (Exception ex) {
-          logger.error("Transaction Failed", ex);
-          if (ex instanceof NetletRuntimeException && client != null) {
-            try {
-              eventloop.disconnect(client);
-            } finally {
-              client = null;
-              outstandingEventsCount = 0;
-            }
-          }
-          t.rollback();
-          return Status.BACKOFF;
-        } finally {
-          t.close();
-        }
-
-        if (storedTuples == 0) {
-          sleep();
-        }
-      }
-    }
-
-    return Status.READY;
-  }
-
-  private void sleep()
-  {
-    try {
-      Thread.sleep(sleepMillis);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  @Override
-  public void start()
-  {
-    try {
-      if (storage instanceof Component) {
-        @SuppressWarnings("unchecked")
-        Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)storage;
-        component.setup(null);
-      }
-      if (discovery instanceof Component) {
-        @SuppressWarnings("unchecked")
-        Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)discovery;
-        component.setup(null);
-      }
-      if (codec instanceof Component) {
-        @SuppressWarnings("unchecked")
-        Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)codec;
-        component.setup(null);
-      }
-      eventloop = new DefaultEventLoop("EventLoop-" + id);
-      server = new Server(id, discovery,acceptedTolerance);
-    } catch (Error error) {
-      throw error;
-    } catch (RuntimeException re) {
-      throw re;
-    } catch (IOException ex) {
-      throw new RuntimeException(ex);
-    }
-
-    eventloop.start();
-    eventloop.start(hostname, port, server);
-    super.start();
-  }
-
-  @Override
-  public void stop()
-  {
-    try {
-      super.stop();
-    } finally {
-      try {
-        if (client != null) {
-          eventloop.disconnect(client);
-          client = null;
-        }
-
-        eventloop.stop(server);
-        eventloop.stop();
-
-        if (codec instanceof Component) {
-          @SuppressWarnings("unchecked")
-          Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)codec;
-          component.teardown();
-        }
-        if (discovery instanceof Component) {
-          @SuppressWarnings("unchecked")
-          Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)discovery;
-          component.teardown();
-        }
-        if (storage instanceof Component) {
-          @SuppressWarnings("unchecked")
-          Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)storage;
-          component.teardown();
-        }
-      } catch (Throwable cause) {
-        throw new ServiceConfigurationError("Failed Stop", cause);
-      }
-    }
-  }
-
-  /* End implementing Flume Sink interface */
-
-  /* Begin Configurable Interface */
-  @Override
-  public void configure(Context context)
-  {
-    hostname = context.getString(HOSTNAME_STRING, HOSTNAME_DEFAULT);
-    port = context.getInteger("port", 0);
-    id = context.getString("id");
-    if (id == null) {
-      id = getName();
-    }
-    acceptedTolerance = context.getLong("acceptedTolerance", ACCEPTED_TOLERANCE);
-    sleepMillis = context.getLong("sleepMillis", 5L);
-    throughputAdjustmentFactor = context.getInteger("throughputAdjustmentPercent", 5) / 100.0;
-    maximumEventsPerTransaction = context.getInteger("maximumEventsPerTransaction", 10000);
-    minimumEventsPerTransaction = context.getInteger("minimumEventsPerTransaction", 100);
-    commitEventTimeoutMillis = context.getLong("commitEventTimeoutMillis", Long.MAX_VALUE);
-
-    @SuppressWarnings("unchecked")
-    Discovery<byte[]> ldiscovery = configure("discovery", Discovery.class, context);
-    if (ldiscovery == null) {
-      logger.warn("Discovery agent not configured for the sink!");
-      discovery = new Discovery<byte[]>()
-      {
-        @Override
-        public void unadvertise(Service<byte[]> service)
-        {
-          logger.debug("Sink {} stopped listening on {}:{}", service.getId(), service.getHost(), service.getPort());
-        }
-
-        @Override
-        public void advertise(Service<byte[]> service)
-        {
-          logger.debug("Sink {} started listening on {}:{}", service.getId(), service.getHost(), service.getPort());
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public Collection<Service<byte[]>> discover()
-        {
-          return Collections.EMPTY_SET;
-        }
-
-      };
-    } else {
-      discovery = ldiscovery;
-    }
-
-    storage = configure("storage", Storage.class, context);
-    if (storage == null) {
-      logger.warn("storage key missing... DTFlumeSink may lose data!");
-      storage = new Storage()
-      {
-        @Override
-        public byte[] store(Slice slice)
-        {
-          return null;
-        }
-
-        @Override
-        public byte[] retrieve(byte[] identifier)
-        {
-          return null;
-        }
-
-        @Override
-        public byte[] retrieveNext()
-        {
-          return null;
-        }
-
-        @Override
-        public void clean(byte[] identifier)
-        {
-        }
-
-        @Override
-        public void flush()
-        {
-        }
-
-      };
-    }
-
-    @SuppressWarnings("unchecked")
-    StreamCodec<Event> lCodec = configure("codec", StreamCodec.class, context);
-    if (lCodec == null) {
-      codec = new EventCodec();
-    } else {
-      codec = lCodec;
-    }
-
-  }
-
-  /* End Configurable Interface */
-
-  @SuppressWarnings({"UseSpecificCatch", "BroadCatchBlock", "TooBroadCatch"})
-  private static <T> T configure(String key, Class<T> clazz, Context context)
-  {
-    String classname = context.getString(key);
-    if (classname == null) {
-      return null;
-    }
-
-    try {
-      Class<?> loadClass = Thread.currentThread().getContextClassLoader().loadClass(classname);
-      if (clazz.isAssignableFrom(loadClass)) {
-        @SuppressWarnings("unchecked")
-        T object = (T)loadClass.newInstance();
-        if (object instanceof Configurable) {
-          Context context1 = new Context(context.getSubProperties(key + '.'));
-          String id = context1.getString(Storage.ID);
-          if (id == null) {
-            id = context.getString(Storage.ID);
-            logger.debug("{} inherited id={} from sink", key, id);
-            context1.put(Storage.ID, id);
-          }
-          ((Configurable)object).configure(context1);
-        }
-
-        return object;
-      } else {
-        logger.error("key class {} does not implement {} interface", classname, Storage.class.getCanonicalName());
-        throw new Error("Invalid storage " + classname);
-      }
-    } catch (Error error) {
-      throw error;
-    } catch (RuntimeException re) {
-      throw re;
-    } catch (Throwable t) {
-      throw new RuntimeException(t);
-    }
-  }
-
-  /**
-   * @return the hostname
-   */
-  String getHostname()
-  {
-    return hostname;
-  }
-
-  /**
-   * @param hostname the hostname to set
-   */
-  void setHostname(String hostname)
-  {
-    this.hostname = hostname;
-  }
-
-  /**
-   * @return the port
-   */
-  int getPort()
-  {
-    return port;
-  }
-
-  public long getAcceptedTolerance()
-  {
-    return acceptedTolerance;
-  }
-
-  public void setAcceptedTolerance(long acceptedTolerance)
-  {
-    this.acceptedTolerance = acceptedTolerance;
-  }
-
-  /**
-   * @param port the port to set
-   */
-  void setPort(int port)
-  {
-    this.port = port;
-  }
-
-  /**
-   * @return the discovery
-   */
-  Discovery<byte[]> getDiscovery()
-  {
-    return discovery;
-  }
-
-  /**
-   * @param discovery the discovery to set
-   */
-  void setDiscovery(Discovery<byte[]> discovery)
-  {
-    this.discovery = discovery;
-  }
-
-  /**
-   * Attempt the sequence of writing after sleeping twice and upon failure assume
-   * that the client connection has problems and hence close it.
-   *
-   * @param address
-   * @param e
-   * @throws IOException
-   */
-  private void retryWrite(byte[] address, Slice event) throws IOException
-  {
-    if (event == null) {  /* this happens for playback where address and event are sent as single object */
-      while (client.isConnected()) {
-        sleep();
-        if (client.write(address)) {
-          return;
-        }
-      }
-    } else {  /* this happens when the events are taken from the flume channel and writing first time failed */
-      while (client.isConnected()) {
-        sleep();
-        if (client.write(address, event)) {
-          return;
-        }
-      }
-    }
-
-    throw new IOException("Client disconnected!");
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(DTFlumeSink.class);
-}


Mime
View raw message