apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [05/13] apex-malhar git commit: Changed package path for files to be included under malhar. Modifications to build files for project to build under malhar.
Date Tue, 23 May 2017 01:24:03 GMT
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java
new file mode 100644
index 0000000..bd7e5e0
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.interceptor;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.interceptor.Interceptor;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+
+import static org.apache.apex.malhar.flume.interceptor.ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER;
+
+/**
+ * <p>ColumnFilteringFormattingInterceptor class.</p>
+ *
+ * @since 0.9.4
+ */
+public class ColumnFilteringFormattingInterceptor implements Interceptor
+{
+  private final byte srcSeparator;
+  private final byte[][] dstSeparators;
+  private final byte[] prefix;
+  private final int maxIndex;
+  private final int maxColumn;
+  private final int[] columns;
+  private final int[] positions;
+
+  private ColumnFilteringFormattingInterceptor(int[] columns, byte srcSeparator, byte[][] dstSeparators, byte[] prefix)
+  {
+    this.columns = columns;
+
+    int tempMaxColumn = Integer.MIN_VALUE;
+    for (int column : columns) {
+      if (column > tempMaxColumn) {
+        tempMaxColumn = column;
+      }
+    }
+    maxIndex = tempMaxColumn;
+    maxColumn = tempMaxColumn + 1;
+    positions = new int[maxColumn + 1];
+    this.srcSeparator = srcSeparator;
+    this.dstSeparators = dstSeparators;
+    this.prefix = prefix;
+  }
+
+  @Override
+  public void initialize()
+  {
+    /* no-op */
+  }
+
+  @Override
+  public Event intercept(Event event)
+  {
+    byte[] body = event.getBody();
+    if (body == null) {
+      return event;
+    }
+
+    final int length = body.length;
+
+    /* store positions of character after the separators */
+    int i = 0;
+    int index = 0;
+    while (i < length) {
+      if (body[i++] == srcSeparator) {
+        positions[++index] = i;
+        if (index >= maxIndex) {
+          break;
+        }
+      }
+    }
+
+    int nextVirginIndex;
+    boolean separatorAtEnd = true;
+    if (i == length && index < maxColumn) {
+      nextVirginIndex = index + 2;
+      positions[nextVirginIndex - 1] = length;
+      separatorAtEnd = length > 0 ? body[length - 1] == srcSeparator : false;
+    } else {
+      nextVirginIndex = index + 1;
+    }
+
+    int newArrayLen = prefix.length;
+    for (i = columns.length; i-- > 0; ) {
+      int column = columns[i];
+      int len = positions[column + 1] - positions[column];
+      if (len > 0) {
+        if (positions[column + 1] == length && !separatorAtEnd) {
+          newArrayLen += len;
+        } else {
+          newArrayLen += len - 1;
+        }
+      }
+      newArrayLen += dstSeparators[i].length;
+    }
+
+    byte[] newBody = new byte[newArrayLen];
+    int newOffset = 0;
+    if (prefix.length > 0) {
+      System.arraycopy(prefix, 0, newBody, 0, prefix.length);
+      newOffset += prefix.length;
+    }
+    int dstSeparatorsIdx = 0;
+    for (int column : columns) {
+      int len = positions[column + 1] - positions[column];
+      byte[] separator = dstSeparators[dstSeparatorsIdx++];
+      if (len > 0) {
+        System.arraycopy(body, positions[column], newBody, newOffset, len);
+        newOffset += len;
+        if (newBody[newOffset - 1] == srcSeparator) {
+          newOffset--;
+        }
+      }
+      System.arraycopy(separator, 0, newBody, newOffset, separator.length);
+      newOffset += separator.length;
+    }
+    event.setBody(newBody);
+    Arrays.fill(positions, 1, nextVirginIndex, 0);
+    return event;
+  }
+
+  @Override
+  public List<Event> intercept(List<Event> events)
+  {
+    for (Event event : events) {
+      intercept(event);
+    }
+    return events;
+  }
+
+  @Override
+  public void close()
+  {
+  }
+
+  public static class Builder implements Interceptor.Builder
+  {
+    private int[] columns;
+    private byte srcSeparator;
+    private byte[][] dstSeparators;
+    private byte[] prefix;
+
+    @Override
+    public Interceptor build()
+    {
+      return new ColumnFilteringFormattingInterceptor(columns, srcSeparator, dstSeparators, prefix);
+    }
+
+    @Override
+    public void configure(Context context)
+    {
+      String formatter = context.getString(COLUMNS_FORMATTER);
+      if (Strings.isNullOrEmpty(formatter)) {
+        throw new IllegalArgumentException("This interceptor requires columns format to be specified!");
+      }
+      List<String> lSeparators = Lists.newArrayList();
+      List<Integer> lColumns = Lists.newArrayList();
+      Pattern colPat = Pattern.compile("\\{\\d+?\\}");
+      Matcher matcher = colPat.matcher(formatter);
+      int separatorStart = 0;
+      String lPrefix = "";
+      while (matcher.find()) {
+        String col = matcher.group();
+        lColumns.add(Integer.parseInt(col.substring(1, col.length() - 1)));
+        if (separatorStart == 0 && matcher.start() > 0) {
+          lPrefix = formatter.substring(0, matcher.start());
+        } else if (separatorStart > 0) {
+          lSeparators.add(formatter.substring(separatorStart, matcher.start()));
+        }
+
+        separatorStart = matcher.end();
+      }
+      if (separatorStart < formatter.length()) {
+        lSeparators.add(formatter.substring(separatorStart, formatter.length()));
+      }
+      columns = Ints.toArray(lColumns);
+      byte[] emptyStringBytes = "".getBytes();
+
+      dstSeparators = new byte[columns.length][];
+
+      for (int i = 0; i < columns.length; i++) {
+        if (i < lSeparators.size()) {
+          dstSeparators[i] = lSeparators.get(i).getBytes();
+        } else {
+          dstSeparators[i] = emptyStringBytes;
+        }
+      }
+      srcSeparator = context.getInteger(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, (int) ColumnFilteringInterceptor.Constants.SRC_SEPARATOR_DFLT).byteValue();
+      this.prefix = lPrefix.getBytes();
+    }
+  }
+
+  public static class Constants extends ColumnFilteringInterceptor.Constants
+  {
+    public static final String COLUMNS_FORMATTER = "columnsFormatter";
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(ColumnFilteringFormattingInterceptor.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptor.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptor.java b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptor.java
new file mode 100644
index 0000000..f0de5e0
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptor.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.interceptor;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.interceptor.Interceptor;
+
+import static org.apache.apex.malhar.flume.interceptor.ColumnFilteringInterceptor.Constants.COLUMNS;
+import static org.apache.apex.malhar.flume.interceptor.ColumnFilteringInterceptor.Constants.DST_SEPARATOR;
+import static org.apache.apex.malhar.flume.interceptor.ColumnFilteringInterceptor.Constants.DST_SEPARATOR_DFLT;
+import static org.apache.apex.malhar.flume.interceptor.ColumnFilteringInterceptor.Constants.SRC_SEPARATOR;
+import static org.apache.apex.malhar.flume.interceptor.ColumnFilteringInterceptor.Constants.SRC_SEPARATOR_DFLT;
+
+/**
+ * <p>ColumnFilteringInterceptor class.</p>
+ *
+ * @since 0.9.4
+ */
+public class ColumnFilteringInterceptor implements Interceptor
+{
+  private final byte srcSeparator;
+  private final byte dstSeparator;
+
+  private final int maxIndex;
+  private final int maxColumn;
+  private final int[] columns;
+  private final int[] positions;
+
+  private ColumnFilteringInterceptor(int[] columns, byte srcSeparator, byte dstSeparator)
+  {
+    this.columns = columns;
+
+    int tempMaxColumn = Integer.MIN_VALUE;
+    for (int column: columns) {
+      if (column > tempMaxColumn) {
+        tempMaxColumn = column;
+      }
+    }
+    maxIndex = tempMaxColumn;
+    maxColumn = tempMaxColumn + 1;
+    positions = new int[maxColumn + 1];
+
+    this.srcSeparator = srcSeparator;
+    this.dstSeparator = dstSeparator;
+  }
+
+  @Override
+  public void initialize()
+  {
+    /* no-op */
+  }
+
+  @Override
+  public Event intercept(Event event)
+  {
+    byte[] body = event.getBody();
+    if (body == null) {
+      return event;
+    }
+
+    final int length = body.length;
+
+    /* store positions of character after the separators */
+    int i = 0;
+    int index = 0;
+    while (i < length) {
+      if (body[i++] == srcSeparator) {
+        positions[++index] = i;
+        if (index >= maxIndex) {
+          break;
+        }
+      }
+    }
+
+    int nextVirginIndex;
+    boolean separatorTerminated;
+    if (i == length && index < maxColumn) {
+      nextVirginIndex = index + 2;
+      positions[nextVirginIndex - 1] = length;
+      separatorTerminated = length > 0 ? body[length - 1]  != srcSeparator : false;
+    } else {
+      nextVirginIndex = index + 1;
+      separatorTerminated = true;
+    }
+
+    int newArrayLen = 0;
+    for (i = columns.length; i-- > 0;) {
+      int column = columns[i];
+      int len = positions[column + 1] - positions[column];
+      if (len <= 0) {
+        newArrayLen++;
+      } else {
+        if (separatorTerminated && positions[column + 1] == length) {
+          newArrayLen++;
+        }
+        newArrayLen += len;
+      }
+    }
+
+    byte[] newbody = new byte[newArrayLen];
+    int newoffset = 0;
+    for (int column: columns) {
+      int len = positions[column + 1] - positions[column];
+      if (len > 0) {
+        System.arraycopy(body, positions[column], newbody, newoffset, len);
+        newoffset += len;
+        if (newbody[newoffset - 1] == srcSeparator) {
+          newbody[newoffset - 1] = dstSeparator;
+        } else {
+          newbody[newoffset++] = dstSeparator;
+        }
+      } else {
+        newbody[newoffset++] = dstSeparator;
+      }
+    }
+
+    event.setBody(newbody);
+    Arrays.fill(positions, 1, nextVirginIndex, 0);
+    return event;
+  }
+
+  @Override
+  public List<Event> intercept(List<Event> events)
+  {
+    for (Event event: events) {
+      intercept(event);
+    }
+    return events;
+  }
+
+  @Override
+  public void close()
+  {
+  }
+
+  public static class Builder implements Interceptor.Builder
+  {
+    private int[] columns;
+    private byte srcSeparator;
+    private byte dstSeparator;
+
+    @Override
+    public Interceptor build()
+    {
+      return new ColumnFilteringInterceptor(columns, srcSeparator, dstSeparator);
+    }
+
+    @Override
+    public void configure(Context context)
+    {
+      String sColumns = context.getString(COLUMNS);
+      if (sColumns == null || sColumns.trim().isEmpty()) {
+        throw new Error("This interceptor requires filtered columns to be specified!");
+      }
+
+      String[] parts = sColumns.split(" ");
+      columns = new int[parts.length];
+      for (int i = parts.length; i-- > 0;) {
+        columns[i] = Integer.parseInt(parts[i]);
+      }
+
+      srcSeparator = context.getInteger(SRC_SEPARATOR, (int)SRC_SEPARATOR_DFLT).byteValue();
+      dstSeparator = context.getInteger(DST_SEPARATOR, (int)DST_SEPARATOR_DFLT).byteValue();
+    }
+
+  }
+
+  @SuppressWarnings("ClassMayBeInterface") /* adhering to flume until i understand it completely */
+
+  public static class Constants
+  {
+    public static final String SRC_SEPARATOR = "srcSeparator";
+    public static final byte SRC_SEPARATOR_DFLT = 2;
+
+    public static final String DST_SEPARATOR = "dstSeparator";
+    public static final byte DST_SEPARATOR_DFLT = 1;
+
+    public static final String COLUMNS = "columns";
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(ColumnFilteringInterceptor.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java b/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java
new file mode 100644
index 0000000..da1a8aa
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java
@@ -0,0 +1,759 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.operator;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.flume.discovery.Discovery;
+import org.apache.apex.malhar.flume.sink.Server;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Event;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.Stats.OperatorStats;
+import com.datatorrent.api.StreamCodec;
+import org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery;
+import com.datatorrent.netlet.AbstractLengthPrependerClient;
+import com.datatorrent.netlet.DefaultEventLoop;
+import com.datatorrent.netlet.util.Slice;
+
+import static java.lang.Thread.sleep;
+
+/**
+ * <p>
+ * Abstract AbstractFlumeInputOperator class.</p>
+ *
+ * @param <T> Type of the output payload.
+ * @since 0.9.2
+ */
+public abstract class AbstractFlumeInputOperator<T>
+    implements InputOperator, Operator.ActivationListener<OperatorContext>, Operator.IdleTimeHandler,
+    Operator.CheckpointListener, Partitioner<AbstractFlumeInputOperator<T>>
+{
+  public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+  public final transient DefaultOutputPort<Slice> drop = new DefaultOutputPort<Slice>();
+  @NotNull
+  private String[] connectionSpecs;
+  @NotNull
+  private StreamCodec<Event> codec;
+  private final ArrayList<RecoveryAddress> recoveryAddresses;
+  @SuppressWarnings("FieldMayBeFinal") // it's not final because that mucks with the serialization somehow
+  private transient ArrayBlockingQueue<Slice> handoverBuffer;
+  private transient int idleCounter;
+  private transient int eventCounter;
+  private transient DefaultEventLoop eventloop;
+  private transient volatile boolean connected;
+  private transient OperatorContext context;
+  private transient Client client;
+  private transient long windowId;
+  private transient byte[] address;
+  @Min(0)
+  private long maxEventsPerSecond;
+  //This is calculated from maxEventsPerSecond, App window count and streaming window size
+  private transient long maxEventsPerWindow;
+
+  public AbstractFlumeInputOperator()
+  {
+    handoverBuffer = new ArrayBlockingQueue<Slice>(1024 * 5);
+    connectionSpecs = new String[0];
+    recoveryAddresses = new ArrayList<RecoveryAddress>();
+    maxEventsPerSecond = Long.MAX_VALUE;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    long windowDurationMillis = context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) *
+        context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+    maxEventsPerWindow = (long)(windowDurationMillis / 1000.0 * maxEventsPerSecond);
+    logger.debug("max-events per-second {} per-window {}", maxEventsPerSecond, maxEventsPerWindow);
+
+    try {
+      eventloop = new DefaultEventLoop("EventLoop-" + context.getId());
+      eventloop.start();
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  @SuppressWarnings({"unchecked"})
+  public void activate(OperatorContext ctx)
+  {
+    if (connectionSpecs.length == 0) {
+      logger.info("Discovered zero DTFlumeSink");
+    } else if (connectionSpecs.length == 1) {
+      for (String connectAddresse: connectionSpecs) {
+        logger.debug("Connection spec is {}", connectAddresse);
+        String[] parts = connectAddresse.split(":");
+        eventloop.connect(new InetSocketAddress(parts[1], Integer.parseInt(parts[2])), client = new Client(parts[0]));
+      }
+    } else {
+      throw new IllegalArgumentException(
+          String.format("A physical %s operator cannot connect to more than 1 addresses!",
+              this.getClass().getSimpleName()));
+    }
+
+    context = ctx;
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    this.windowId = windowId;
+    idleCounter = 0;
+    eventCounter = 0;
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    int i = handoverBuffer.size();
+    if (i > 0 && eventCounter < maxEventsPerWindow) {
+
+      while (--i > 0 && eventCounter < maxEventsPerWindow - 1) {
+        final Slice slice = handoverBuffer.poll();
+        slice.offset += 8;
+        slice.length -= 8;
+        T convert = convert((Event)codec.fromByteArray(slice));
+        if (convert == null) {
+          drop.emit(slice);
+        } else {
+          output.emit(convert);
+        }
+        eventCounter++;
+      }
+
+      final Slice slice = handoverBuffer.poll();
+      slice.offset += 8;
+      slice.length -= 8;
+      T convert = convert((Event)codec.fromByteArray(slice));
+      if (convert == null) {
+        drop.emit(slice);
+      } else {
+        output.emit(convert);
+      }
+      eventCounter++;
+
+      address = Arrays.copyOfRange(slice.buffer, slice.offset - 8, slice.offset);
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (connected) {
+      byte[] array = new byte[Server.Request.FIXED_SIZE];
+
+      array[0] = Server.Command.WINDOWED.getOrdinal();
+      Server.writeInt(array, 1, eventCounter);
+      Server.writeInt(array, 5, idleCounter);
+      Server.writeLong(array, Server.Request.TIME_OFFSET, System.currentTimeMillis());
+
+      logger.debug("wrote {} with eventCounter = {} and idleCounter = {}", Server.Command.WINDOWED, eventCounter, idleCounter);
+      client.write(array);
+    }
+
+    if (address != null) {
+      RecoveryAddress rAddress = new RecoveryAddress();
+      rAddress.address = address;
+      address = null;
+      rAddress.windowId = windowId;
+      recoveryAddresses.add(rAddress);
+    }
+  }
+
+  @Override
+  public void deactivate()
+  {
+    if (connected) {
+      eventloop.disconnect(client);
+    }
+    context = null;
+  }
+
+  @Override
+  public void teardown()
+  {
+    eventloop.stop();
+    eventloop = null;
+  }
+
+  @Override
+  public void handleIdleTime()
+  {
+    idleCounter++;
+    try {
+      sleep(context.getValue(OperatorContext.SPIN_MILLIS));
+    } catch (InterruptedException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  public abstract T convert(Event event);
+
+  /**
+   * @return the connectAddress
+   */
+  public String[] getConnectAddresses()
+  {
+    return connectionSpecs.clone();
+  }
+
+  /**
+   * @param specs - sinkid:host:port specification of all the sinks.
+   */
+  public void setConnectAddresses(String[] specs)
+  {
+    this.connectionSpecs = specs.clone();
+  }
+
+  /**
+   * @return the codec
+   */
+  public StreamCodec<Event> getCodec()
+  {
+    return codec;
+  }
+
+  /**
+   * @param codec the codec to set
+   */
+  public void setCodec(StreamCodec<Event> codec)
+  {
+    this.codec = codec;
+  }
+
+  private static class RecoveryAddress implements Serializable
+  {
+    long windowId;
+    byte[] address;
+
+    @Override
+    public String toString()
+    {
+      return "RecoveryAddress{" + "windowId=" + windowId + ", address=" + Arrays.toString(address) + '}';
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof RecoveryAddress)) {
+        return false;
+      }
+
+      RecoveryAddress that = (RecoveryAddress)o;
+
+      if (windowId != that.windowId) {
+        return false;
+      }
+      return Arrays.equals(address, that.address);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      int result = (int)(windowId ^ (windowId >>> 32));
+      result = 31 * result + (address != null ? Arrays.hashCode(address) : 0);
+      return result;
+    }
+
+    private static final long serialVersionUID = 201312021432L;
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+    /* dont do anything */
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    if (!connected) {
+      return;
+    }
+
+    synchronized (recoveryAddresses) {
+      byte[] addr = null;
+
+      Iterator<RecoveryAddress> iterator = recoveryAddresses.iterator();
+      while (iterator.hasNext()) {
+        RecoveryAddress ra = iterator.next();
+        if (ra.windowId > windowId) {
+          break;
+        }
+
+        iterator.remove();
+        if (ra.address != null) {
+          addr = ra.address;
+        }
+      }
+
+      if (addr != null) {
+        /*
+         * Make sure that we store the last valid address processed
+         */
+        if (recoveryAddresses.isEmpty()) {
+          RecoveryAddress ra = new RecoveryAddress();
+          ra.address = addr;
+          recoveryAddresses.add(ra);
+        }
+
+        int arraySize = 1/* for the type of the message */
+            + 8 /* for the location to commit */
+            + 8 /* for storing the current time stamp*/;
+        byte[] array = new byte[arraySize];
+
+        array[0] = Server.Command.COMMITTED.getOrdinal();
+        System.arraycopy(addr, 0, array, 1, 8);
+        Server.writeLong(array, Server.Request.TIME_OFFSET, System.currentTimeMillis());
+        logger.debug("wrote {} with recoveryOffset = {}", Server.Command.COMMITTED, Arrays.toString(addr));
+        client.write(array);
+      }
+    }
+  }
+
+  @Override
+  public Collection<Partition<AbstractFlumeInputOperator<T>>> definePartitions(
+      Collection<Partition<AbstractFlumeInputOperator<T>>> partitions, PartitioningContext context)
+  {
+    Collection<Discovery.Service<byte[]>> discovered = discoveredFlumeSinks.get();
+    if (discovered == null) {
+      return partitions;
+    }
+
+    HashMap<String, ArrayList<RecoveryAddress>> allRecoveryAddresses = abandonedRecoveryAddresses.get();
+    ArrayList<String> allConnectAddresses = new ArrayList<String>(partitions.size());
+    for (Partition<AbstractFlumeInputOperator<T>> partition: partitions) {
+      String[] lAddresses = partition.getPartitionedInstance().connectionSpecs;
+      allConnectAddresses.addAll(Arrays.asList(lAddresses));
+      for (int i = lAddresses.length; i-- > 0;) {
+        String[] parts = lAddresses[i].split(":", 2);
+        allRecoveryAddresses.put(parts[0], partition.getPartitionedInstance().recoveryAddresses);
+      }
+    }
+
+    HashMap<String, String> connections = new HashMap<String, String>(discovered.size());
+    for (Discovery.Service<byte[]> service: discovered) {
+      String previousSpec = connections.get(service.getId());
+      String newspec = service.getId() + ':' + service.getHost() + ':' + service.getPort();
+      if (previousSpec == null) {
+        connections.put(service.getId(), newspec);
+      } else {
+        boolean found = false;
+        for (ConnectionStatus cs: partitionedInstanceStatus.get().values()) {
+          if (previousSpec.equals(cs.spec) && !cs.connected) {
+            connections.put(service.getId(), newspec);
+            found = true;
+            break;
+          }
+        }
+
+        if (!found) {
+          logger.warn("2 sinks found with the same id: {} and {}... Ignoring previous.", previousSpec, newspec);
+          connections.put(service.getId(), newspec);
+        }
+      }
+    }
+
+    for (int i = allConnectAddresses.size(); i-- > 0;) {
+      String[] parts = allConnectAddresses.get(i).split(":");
+      String connection = connections.remove(parts[0]);
+      if (connection == null) {
+        allConnectAddresses.remove(i);
+      } else {
+        allConnectAddresses.set(i, connection);
+      }
+    }
+
+    allConnectAddresses.addAll(connections.values());
+
+    partitions.clear();
+    try {
+      if (allConnectAddresses.isEmpty()) {
+        /* return at least one of them; otherwise stram becomes grumpy */
+        @SuppressWarnings("unchecked")
+        AbstractFlumeInputOperator<T> operator = getClass().newInstance();
+        operator.setCodec(codec);
+        operator.setMaxEventsPerSecond(maxEventsPerSecond);
+        for (ArrayList<RecoveryAddress> lRecoveryAddresses: allRecoveryAddresses.values()) {
+          operator.recoveryAddresses.addAll(lRecoveryAddresses);
+        }
+        operator.connectionSpecs = new String[allConnectAddresses.size()];
+        for (int i = connectionSpecs.length; i-- > 0;) {
+          connectionSpecs[i] = allConnectAddresses.get(i);
+        }
+
+        partitions.add(new DefaultPartition<AbstractFlumeInputOperator<T>>(operator));
+      } else {
+        long maxEventsPerSecondPerOperator = maxEventsPerSecond / allConnectAddresses.size();
+        for (int i = allConnectAddresses.size(); i-- > 0;) {
+          @SuppressWarnings("unchecked")
+          AbstractFlumeInputOperator<T> operator = getClass().newInstance();
+          operator.setCodec(codec);
+          operator.setMaxEventsPerSecond(maxEventsPerSecondPerOperator);
+          String connectAddress = allConnectAddresses.get(i);
+          operator.connectionSpecs = new String[] {connectAddress};
+
+          String[] parts = connectAddress.split(":", 2);
+          ArrayList<RecoveryAddress> remove = allRecoveryAddresses.remove(parts[0]);
+          if (remove != null) {
+            operator.recoveryAddresses.addAll(remove);
+          }
+
+          partitions.add(new DefaultPartition<AbstractFlumeInputOperator<T>>(operator));
+        }
+      }
+    } catch (IllegalAccessException ex) {
+      throw new RuntimeException(ex);
+    } catch (InstantiationException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    logger.debug("Requesting partitions: {}", partitions);
+    return partitions;
+  }
+
+  @Override
+  public void partitioned(Map<Integer, Partition<AbstractFlumeInputOperator<T>>> partitions)
+  {
+    logger.debug("Partitioned Map: {}", partitions);
+    HashMap<Integer, ConnectionStatus> map = partitionedInstanceStatus.get();
+    map.clear();
+    for (Entry<Integer, Partition<AbstractFlumeInputOperator<T>>> entry: partitions.entrySet()) {
+      if (map.containsKey(entry.getKey())) {
+        // what can be done here?
+      } else {
+        map.put(entry.getKey(), null);
+      }
+    }
+  }
+
+  @Override
+  public String toString()
+  {
+    return "AbstractFlumeInputOperator{" + "connected=" + connected + ", connectionSpecs=" +
+        (connectionSpecs.length == 0 ? "empty" : connectionSpecs[0]) + ", recoveryAddresses=" + recoveryAddresses + '}';
+  }
+
+  class Client extends AbstractLengthPrependerClient
+  {
+    private final String id;
+
+    Client(String id)
+    {
+      this.id = id;
+    }
+
+    @Override
+    public void onMessage(byte[] buffer, int offset, int size)
+    {
+      try {
+        handoverBuffer.put(new Slice(buffer, offset, size));
+      } catch (InterruptedException ex) {
+        handleException(ex, eventloop);
+      }
+    }
+
+    @Override
+    public void connected()
+    {
+      super.connected();
+
+      byte[] address;
+      synchronized (recoveryAddresses) {
+        if (recoveryAddresses.size() > 0) {
+          address = recoveryAddresses.get(recoveryAddresses.size() - 1).address;
+        } else {
+          address = new byte[8];
+        }
+      }
+
+      int len = 1 /* for the message type SEEK */
+          + 8 /* for the address */
+          + 8 /* for storing the current time stamp*/;
+
+      byte[] array = new byte[len];
+      array[0] = Server.Command.SEEK.getOrdinal();
+      System.arraycopy(address, 0, array, 1, 8);
+      Server.writeLong(array, 9, System.currentTimeMillis());
+      write(array);
+
+      connected = true;
+      ConnectionStatus connectionStatus = new ConnectionStatus();
+      connectionStatus.connected = true;
+      connectionStatus.spec = connectionSpecs[0];
+      OperatorContext ctx = context;
+      synchronized (ctx) {
+        logger.debug("{} Submitting ConnectionStatus = {}", AbstractFlumeInputOperator.this, connectionStatus);
+        context.setCounters(connectionStatus);
+      }
+    }
+
+    @Override
+    public void disconnected()
+    {
+      connected = false;
+      ConnectionStatus connectionStatus = new ConnectionStatus();
+      connectionStatus.connected = false;
+      connectionStatus.spec = connectionSpecs[0];
+      OperatorContext ctx = context;
+      synchronized (ctx) {
+        logger.debug("{} Submitting ConnectionStatus = {}", AbstractFlumeInputOperator.this, connectionStatus);
+        context.setCounters(connectionStatus);
+      }
+      super.disconnected();
+    }
+
+  }
+
+  public static class ZKStatsListner extends ZKAssistedDiscovery implements com.datatorrent.api.StatsListener,
+      Serializable
+  {
+    /*
+     * In the current design, one input operator is able to connect
+     * to only one flume adapter. Sometime in future, we should support
+     * any number of input operators connecting to any number of flume
+     * sinks and vice a versa.
+     *
+     * Until that happens the following map should be sufficient to
+     * keep track of which input operator is connected to which flume sink.
+     */
+    long intervalMillis;
+    private final Response response;
+    private transient long nextMillis;
+
+    public ZKStatsListner()
+    {
+      intervalMillis = 60 * 1000L;
+      response = new Response();
+    }
+
+    @Override
+    public Response processStats(BatchedOperatorStats stats)
+    {
+      final HashMap<Integer, ConnectionStatus> map = partitionedInstanceStatus.get();
+      response.repartitionRequired = false;
+
+      Object lastStat = null;
+      List<OperatorStats> lastWindowedStats = stats.getLastWindowedStats();
+      for (OperatorStats os: lastWindowedStats) {
+        if (os.counters != null) {
+          lastStat = os.counters;
+          logger.debug("Received custom stats = {}", lastStat);
+        }
+      }
+
+      if (lastStat instanceof ConnectionStatus) {
+        ConnectionStatus cs = (ConnectionStatus)lastStat;
+        map.put(stats.getOperatorId(), cs);
+        if (!cs.connected) {
+          logger.debug("setting repatitioned = true because of lastStat = {}", lastStat);
+          response.repartitionRequired = true;
+        }
+      }
+
+      if (System.currentTimeMillis() >= nextMillis) {
+        logger.debug("nextMillis = {}", nextMillis);
+        try {
+          super.setup(null);
+          Collection<Discovery.Service<byte[]>> addresses;
+          try {
+            addresses = discover();
+          } finally {
+            super.teardown();
+          }
+          AbstractFlumeInputOperator.discoveredFlumeSinks.set(addresses);
+          logger.debug("\ncurrent map = {}\ndiscovered sinks = {}", map, addresses);
+          switch (addresses.size()) {
+            case 0:
+              response.repartitionRequired = map.size() != 1;
+              break;
+
+            default:
+              if (addresses.size() == map.size()) {
+                for (ConnectionStatus value: map.values()) {
+                  if (value == null || !value.connected) {
+                    response.repartitionRequired = true;
+                    break;
+                  }
+                }
+              } else {
+                response.repartitionRequired = true;
+              }
+              break;
+          }
+        } catch (Error er) {
+          throw er;
+        } catch (Throwable cause) {
+          logger.warn("Unable to discover services, using values from last successful discovery", cause);
+        } finally {
+          nextMillis = System.currentTimeMillis() + intervalMillis;
+          logger.debug("Proposed NextMillis = {}", nextMillis);
+        }
+      }
+
+      return response;
+    }
+
+    /**
+     * @return the intervalMillis
+     */
+    public long getIntervalMillis()
+    {
+      return intervalMillis;
+    }
+
+    /**
+     * @param intervalMillis the intervalMillis to set
+     */
+    public void setIntervalMillis(long intervalMillis)
+    {
+      this.intervalMillis = intervalMillis;
+    }
+
+    private static final long serialVersionUID = 201312241646L;
+  }
+
+  public static class ConnectionStatus implements Serializable
+  {
+    int id;
+    String spec;
+    boolean connected;
+
+    @Override
+    public int hashCode()
+    {
+      return spec.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      final ConnectionStatus other = (ConnectionStatus)obj;
+      return spec == null ? other.spec == null : spec.equals(other.spec);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "ConnectionStatus{" + "id=" + id + ", spec=" + spec + ", connected=" + connected + '}';
+    }
+
+    private static final long serialVersionUID = 201312261615L;
+  }
+
+  private static final transient ThreadLocal<HashMap<Integer, ConnectionStatus>> partitionedInstanceStatus =
+      new ThreadLocal<HashMap<Integer, ConnectionStatus>>()
+    {
+      @Override
+      protected HashMap<Integer, ConnectionStatus> initialValue()
+      {
+        return new HashMap<Integer, ConnectionStatus>();
+      }
+
+    };
+  /**
+   * When a sink goes away and a replacement sink is not found, we stash the recovery addresses associated
+   * with the sink in a hope that the new sink may show up in near future.
+   */
+  private static final transient ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>> abandonedRecoveryAddresses =
+      new ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>>()
+  {
+    @Override
+    protected HashMap<String, ArrayList<RecoveryAddress>> initialValue()
+    {
+      return new HashMap<String, ArrayList<RecoveryAddress>>();
+    }
+
+  };
+  private static final transient ThreadLocal<Collection<Discovery.Service<byte[]>>> discoveredFlumeSinks =
+      new ThreadLocal<Collection<Discovery.Service<byte[]>>>();
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof AbstractFlumeInputOperator)) {
+      return false;
+    }
+
+    AbstractFlumeInputOperator<?> that = (AbstractFlumeInputOperator<?>)o;
+
+    if (!Arrays.equals(connectionSpecs, that.connectionSpecs)) {
+      return false;
+    }
+    return recoveryAddresses.equals(that.recoveryAddresses);
+
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int result = connectionSpecs != null ? Arrays.hashCode(connectionSpecs) : 0;
+    result = 31 * result + (recoveryAddresses.hashCode());
+    return result;
+  }
+
+  public void setMaxEventsPerSecond(long maxEventsPerSecond)
+  {
+    this.maxEventsPerSecond = maxEventsPerSecond;
+  }
+
+  public long getMaxEventsPerSecond()
+  {
+    return maxEventsPerSecond;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(AbstractFlumeInputOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java b/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java
new file mode 100644
index 0000000..306ce13
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java
@@ -0,0 +1,572 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.sink;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ServiceConfigurationError;
+
+import org.apache.apex.malhar.flume.discovery.Discovery;
+import org.apache.apex.malhar.flume.storage.EventCodec;
+import org.apache.apex.malhar.flume.storage.Storage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.StreamCodec;
+import org.apache.apex.malhar.flume.sink.Server.Client;
+import org.apache.apex.malhar.flume.sink.Server.Request;
+import com.datatorrent.netlet.DefaultEventLoop;
+import com.datatorrent.netlet.NetletThrowable;
+import com.datatorrent.netlet.NetletThrowable.NetletRuntimeException;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * DTFlumeSink is a flume sink developed to ingest the data into DataTorrent DAG
+ * from flume. It's essentially a flume sink which acts as a server capable of
+ * talking to one client at a time. The client for this server is AbstractFlumeInputOperator.
+ * <p />
+ * &lt;experimental&gt;DTFlumeSink auto adjusts the rate at which it consumes the data from channel to
+ * match the throughput of the DAG.&lt;/experimental&gt;
+ * <p />
+ * The properties you can set on the DTFlumeSink are: <br />
+ * id - string unique value identifying this sink <br />
+ * hostname - string value indicating the fqdn or ip address of the interface on which the server should listen <br />
+ * port - integer value indicating the numeric port to which the server should bind <br />
+ * sleepMillis - integer value indicating the number of milliseconds the process should sleep when there are no events
+ * before checking for next event again <br />
+ * throughputAdjustmentPercent - integer value indicating by what percentage the flume transaction size should be
+ * adjusted upward or downward at a time <br />
+ * minimumEventsPerTransaction - integer value indicating the minimum number of events per transaction <br />
+ * maximumEventsPerTransaction - integer value indicating the maximum number of events per transaction. This value can
+ * not be more than channel's transaction capacity.<br />
+ *
+ * @since 0.9.2
+ */
+public class DTFlumeSink extends AbstractSink implements Configurable
+{
+  private static final String HOSTNAME_STRING = "hostname";
+  private static final String HOSTNAME_DEFAULT = "locahost";
+  private static final long ACCEPTED_TOLERANCE = 20000;
+  private DefaultEventLoop eventloop;
+  private Server server;
+  private int outstandingEventsCount;
+  private int lastConsumedEventsCount;
+  private int idleCount;
+  private byte[] playback;
+  private Client client;
+  private String hostname;
+  private int port;
+  private String id;
+  private long acceptedTolerance;
+  private long sleepMillis;
+  private double throughputAdjustmentFactor;
+  private int minimumEventsPerTransaction;
+  private int maximumEventsPerTransaction;
+  private long commitEventTimeoutMillis;
+  private transient long lastCommitEventTimeMillis;
+  private Storage storage;
+  Discovery<byte[]> discovery;
+  StreamCodec<Event> codec;
+  /* Begin implementing Flume Sink interface */
+
+  @Override
+  @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch", "UseSpecificCatch", "SleepWhileInLoop"})
+  public Status process() throws EventDeliveryException
+  {
+    Slice slice;
+    synchronized (server.requests) {
+      for (Request r : server.requests) {
+        logger.debug("found {}", r);
+        switch (r.type) {
+          case SEEK:
+            lastCommitEventTimeMillis = System.currentTimeMillis();
+            slice = r.getAddress();
+            playback = storage.retrieve(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length));
+            client = r.client;
+            break;
+
+          case COMMITTED:
+            lastCommitEventTimeMillis = System.currentTimeMillis();
+            slice = r.getAddress();
+            storage.clean(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length));
+            break;
+
+          case CONNECTED:
+            logger.debug("Connected received, ignoring it!");
+            break;
+
+          case DISCONNECTED:
+            if (r.client == client) {
+              client = null;
+              outstandingEventsCount = 0;
+            }
+            break;
+
+          case WINDOWED:
+            lastConsumedEventsCount = r.getEventCount();
+            idleCount = r.getIdleCount();
+            outstandingEventsCount -= lastConsumedEventsCount;
+            break;
+
+          case SERVER_ERROR:
+            throw new IOError(null);
+
+          default:
+            logger.debug("Cannot understand the request {}", r);
+            break;
+        }
+      }
+
+      server.requests.clear();
+    }
+
+    if (client == null) {
+      logger.info("No client expressed interest yet to consume the events.");
+      return Status.BACKOFF;
+    } else if (System.currentTimeMillis() - lastCommitEventTimeMillis > commitEventTimeoutMillis) {
+      logger.info("Client has not processed the workload given for the last {} milliseconds, so backing off.",
+          System.currentTimeMillis() - lastCommitEventTimeMillis);
+      return Status.BACKOFF;
+    }
+
+    int maxTuples;
+    // the following logic needs to be fixed... this is a quick put together.
+    if (outstandingEventsCount < 0) {
+      if (idleCount > 1) {
+        maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * lastConsumedEventsCount);
+      } else {
+        maxTuples = (int)((1 + throughputAdjustmentFactor) * lastConsumedEventsCount);
+      }
+    } else if (outstandingEventsCount > lastConsumedEventsCount) {
+      maxTuples = (int)((1 - throughputAdjustmentFactor) * lastConsumedEventsCount);
+    } else {
+      if (idleCount > 0) {
+        maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * lastConsumedEventsCount);
+        if (maxTuples <= 0) {
+          maxTuples = minimumEventsPerTransaction;
+        }
+      } else {
+        maxTuples = lastConsumedEventsCount;
+      }
+    }
+
+    if (maxTuples >= maximumEventsPerTransaction) {
+      maxTuples = maximumEventsPerTransaction;
+    } else if (maxTuples <= 0) {
+      maxTuples = minimumEventsPerTransaction;
+    }
+
+    if (maxTuples > 0) {
+      if (playback != null) {
+        try {
+          int i = 0;
+          do {
+            if (!client.write(playback)) {
+              retryWrite(playback, null);
+            }
+            outstandingEventsCount++;
+            playback = storage.retrieveNext();
+          }
+          while (++i < maxTuples && playback != null);
+        } catch (Exception ex) {
+          logger.warn("Playback Failed", ex);
+          if (ex instanceof NetletThrowable) {
+            try {
+              eventloop.disconnect(client);
+            } finally {
+              client = null;
+              outstandingEventsCount = 0;
+            }
+          }
+          return Status.BACKOFF;
+        }
+      } else {
+        int storedTuples = 0;
+
+        Transaction t = getChannel().getTransaction();
+        try {
+          t.begin();
+
+          Event e;
+          while (storedTuples < maxTuples && (e = getChannel().take()) != null) {
+            Slice event = codec.toByteArray(e);
+            byte[] address = storage.store(event);
+            if (address != null) {
+              if (!client.write(address, event)) {
+                retryWrite(address, event);
+              }
+              outstandingEventsCount++;
+            } else {
+              logger.debug("Detected the condition of recovery from flume crash!");
+            }
+            storedTuples++;
+          }
+
+          if (storedTuples > 0) {
+            storage.flush();
+          }
+
+          t.commit();
+
+          if (storedTuples > 0) { /* log less frequently */
+            logger.debug("Transaction details maxTuples = {}, storedTuples = {}, outstanding = {}",
+                maxTuples, storedTuples, outstandingEventsCount);
+          }
+        } catch (Error er) {
+          t.rollback();
+          throw er;
+        } catch (Exception ex) {
+          logger.error("Transaction Failed", ex);
+          if (ex instanceof NetletRuntimeException && client != null) {
+            try {
+              eventloop.disconnect(client);
+            } finally {
+              client = null;
+              outstandingEventsCount = 0;
+            }
+          }
+          t.rollback();
+          return Status.BACKOFF;
+        } finally {
+          t.close();
+        }
+
+        if (storedTuples == 0) {
+          sleep();
+        }
+      }
+    }
+
+    return Status.READY;
+  }
+
+  private void sleep()
+  {
+    try {
+      Thread.sleep(sleepMillis);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public void start()
+  {
+    try {
+      if (storage instanceof Component) {
+        @SuppressWarnings("unchecked")
+        Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)storage;
+        component.setup(null);
+      }
+      if (discovery instanceof Component) {
+        @SuppressWarnings("unchecked")
+        Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)discovery;
+        component.setup(null);
+      }
+      if (codec instanceof Component) {
+        @SuppressWarnings("unchecked")
+        Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)codec;
+        component.setup(null);
+      }
+      eventloop = new DefaultEventLoop("EventLoop-" + id);
+      server = new Server(id, discovery,acceptedTolerance);
+    } catch (Error error) {
+      throw error;
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    eventloop.start();
+    eventloop.start(hostname, port, server);
+    super.start();
+  }
+
+  @Override
+  public void stop()
+  {
+    try {
+      super.stop();
+    } finally {
+      try {
+        if (client != null) {
+          eventloop.disconnect(client);
+          client = null;
+        }
+
+        eventloop.stop(server);
+        eventloop.stop();
+
+        if (codec instanceof Component) {
+          @SuppressWarnings("unchecked")
+          Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)codec;
+          component.teardown();
+        }
+        if (discovery instanceof Component) {
+          @SuppressWarnings("unchecked")
+          Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)discovery;
+          component.teardown();
+        }
+        if (storage instanceof Component) {
+          @SuppressWarnings("unchecked")
+          Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)storage;
+          component.teardown();
+        }
+      } catch (Throwable cause) {
+        throw new ServiceConfigurationError("Failed Stop", cause);
+      }
+    }
+  }
+
+  /* End implementing Flume Sink interface */
+
+  /* Begin Configurable Interface */
+  @Override
+  public void configure(Context context)
+  {
+    hostname = context.getString(HOSTNAME_STRING, HOSTNAME_DEFAULT);
+    port = context.getInteger("port", 0);
+    id = context.getString("id");
+    if (id == null) {
+      id = getName();
+    }
+    acceptedTolerance = context.getLong("acceptedTolerance", ACCEPTED_TOLERANCE);
+    sleepMillis = context.getLong("sleepMillis", 5L);
+    throughputAdjustmentFactor = context.getInteger("throughputAdjustmentPercent", 5) / 100.0;
+    maximumEventsPerTransaction = context.getInteger("maximumEventsPerTransaction", 10000);
+    minimumEventsPerTransaction = context.getInteger("minimumEventsPerTransaction", 100);
+    commitEventTimeoutMillis = context.getLong("commitEventTimeoutMillis", Long.MAX_VALUE);
+
+    @SuppressWarnings("unchecked")
+    Discovery<byte[]> ldiscovery = configure("discovery", Discovery.class, context);
+    if (ldiscovery == null) {
+      logger.warn("Discovery agent not configured for the sink!");
+      discovery = new Discovery<byte[]>()
+      {
+        @Override
+        public void unadvertise(Service<byte[]> service)
+        {
+          logger.debug("Sink {} stopped listening on {}:{}", service.getId(), service.getHost(), service.getPort());
+        }
+
+        @Override
+        public void advertise(Service<byte[]> service)
+        {
+          logger.debug("Sink {} started listening on {}:{}", service.getId(), service.getHost(), service.getPort());
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public Collection<Service<byte[]>> discover()
+        {
+          return Collections.EMPTY_SET;
+        }
+
+      };
+    } else {
+      discovery = ldiscovery;
+    }
+
+    storage = configure("storage", Storage.class, context);
+    if (storage == null) {
+      logger.warn("storage key missing... DTFlumeSink may lose data!");
+      storage = new Storage()
+      {
+        @Override
+        public byte[] store(Slice slice)
+        {
+          return null;
+        }
+
+        @Override
+        public byte[] retrieve(byte[] identifier)
+        {
+          return null;
+        }
+
+        @Override
+        public byte[] retrieveNext()
+        {
+          return null;
+        }
+
+        @Override
+        public void clean(byte[] identifier)
+        {
+        }
+
+        @Override
+        public void flush()
+        {
+        }
+
+      };
+    }
+
+    @SuppressWarnings("unchecked")
+    StreamCodec<Event> lCodec = configure("codec", StreamCodec.class, context);
+    if (lCodec == null) {
+      codec = new EventCodec();
+    } else {
+      codec = lCodec;
+    }
+
+  }
+
+  /* End Configurable Interface */
+
+  @SuppressWarnings({"UseSpecificCatch", "BroadCatchBlock", "TooBroadCatch"})
+  private static <T> T configure(String key, Class<T> clazz, Context context)
+  {
+    String classname = context.getString(key);
+    if (classname == null) {
+      return null;
+    }
+
+    try {
+      Class<?> loadClass = Thread.currentThread().getContextClassLoader().loadClass(classname);
+      if (clazz.isAssignableFrom(loadClass)) {
+        @SuppressWarnings("unchecked")
+        T object = (T)loadClass.newInstance();
+        if (object instanceof Configurable) {
+          Context context1 = new Context(context.getSubProperties(key + '.'));
+          String id = context1.getString(Storage.ID);
+          if (id == null) {
+            id = context.getString(Storage.ID);
+            logger.debug("{} inherited id={} from sink", key, id);
+            context1.put(Storage.ID, id);
+          }
+          ((Configurable)object).configure(context1);
+        }
+
+        return object;
+      } else {
+        logger.error("key class {} does not implement {} interface", classname, Storage.class.getCanonicalName());
+        throw new Error("Invalid storage " + classname);
+      }
+    } catch (Error error) {
+      throw error;
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
+  }
+
+  /**
+   * @return the hostname
+   */
+  String getHostname()
+  {
+    return hostname;
+  }
+
+  /**
+   * @param hostname the hostname to set
+   */
+  void setHostname(String hostname)
+  {
+    this.hostname = hostname;
+  }
+
+  /**
+   * @return the port
+   */
+  int getPort()
+  {
+    return port;
+  }
+
+  public long getAcceptedTolerance()
+  {
+    return acceptedTolerance;
+  }
+
+  public void setAcceptedTolerance(long acceptedTolerance)
+  {
+    this.acceptedTolerance = acceptedTolerance;
+  }
+
+  /**
+   * @param port the port to set
+   */
+  void setPort(int port)
+  {
+    this.port = port;
+  }
+
+  /**
+   * @return the discovery
+   */
+  Discovery<byte[]> getDiscovery()
+  {
+    return discovery;
+  }
+
+  /**
+   * @param discovery the discovery to set
+   */
+  void setDiscovery(Discovery<byte[]> discovery)
+  {
+    this.discovery = discovery;
+  }
+
+  /**
+   * Attempt the sequence of writing after sleeping twice and upon failure assume
+   * that the client connection has problems and hence close it.
+   *
+   * @param address
+   * @param e
+   * @throws IOException
+   */
+  private void retryWrite(byte[] address, Slice event) throws IOException
+  {
+    if (event == null) {  /* this happens for playback where address and event are sent as single object */
+      while (client.isConnected()) {
+        sleep();
+        if (client.write(address)) {
+          return;
+        }
+      }
+    } else {  /* this happens when the events are taken from the flume channel and writing first time failed */
+      while (client.isConnected()) {
+        sleep();
+        if (client.write(address, event)) {
+          return;
+        }
+      }
+    }
+
+    throw new IOException("Client disconnected!");
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(DTFlumeSink.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java b/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java
new file mode 100644
index 0000000..a771cb3
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.sink;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.apex.malhar.flume.discovery.Discovery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.netlet.AbstractLengthPrependerClient;
+import com.datatorrent.netlet.AbstractServer;
+import com.datatorrent.netlet.EventLoop;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>
+ * Server class.</p>
+ *
+ * @since 0.9.2
+ */
+public class Server extends AbstractServer
+{
+  private final String id;
+  private final Discovery<byte[]> discovery;
+  private final long acceptedTolerance;
+
+  public Server(String id, Discovery<byte[]> discovery, long acceptedTolerance)
+  {
+    this.id = id;
+    this.discovery = discovery;
+    this.acceptedTolerance = acceptedTolerance;
+  }
+
+  @Override
+  public void handleException(Exception cce, EventLoop el)
+  {
+    logger.error("Server Error", cce);
+    Request r = new Request(Command.SERVER_ERROR, null)
+    {
+      @Override
+      public Slice getAddress()
+      {
+        throw new UnsupportedOperationException("Not supported yet.");
+      }
+
+      @Override
+      public int getEventCount()
+      {
+        throw new UnsupportedOperationException("Not supported yet.");
+      }
+
+      @Override
+      public int getIdleCount()
+      {
+        throw new UnsupportedOperationException("Not supported yet.");
+      }
+
+    };
+    synchronized (requests) {
+      requests.add(r);
+    }
+  }
+
+  private final Discovery.Service<byte[]> service = new Discovery.Service<byte[]>()
+  {
+    @Override
+    public String getHost()
+    {
+      return ((InetSocketAddress)getServerAddress()).getHostName();
+    }
+
+    @Override
+    public int getPort()
+    {
+      return ((InetSocketAddress)getServerAddress()).getPort();
+    }
+
+    @Override
+    public byte[] getPayload()
+    {
+      return null;
+    }
+
+    @Override
+    public String getId()
+    {
+      return id;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Server.Service{id=" + id + ", host=" + getHost() + ", port=" + getPort() + ", payload=" +
+          Arrays.toString(getPayload()) + '}';
+    }
+
+  };
+
+  @Override
+  public void unregistered(final SelectionKey key)
+  {
+    discovery.unadvertise(service);
+    super.unregistered(key);
+  }
+
+  @Override
+  public void registered(final SelectionKey key)
+  {
+    super.registered(key);
+    discovery.advertise(service);
+  }
+
+  public enum Command
+  {
+    ECHO((byte)0),
+    SEEK((byte)1),
+    COMMITTED((byte)2),
+    CHECKPOINTED((byte)3),
+    CONNECTED((byte)4),
+    DISCONNECTED((byte)5),
+    WINDOWED((byte)6),
+    SERVER_ERROR((byte)7);
+
+    Command(byte b)
+    {
+      this.ord = b;
+    }
+
+    public byte getOrdinal()
+    {
+      return ord;
+    }
+
+    public static Command getCommand(byte b)
+    {
+      Command c;
+      switch (b) {
+        case 0:
+          c = ECHO;
+          break;
+
+        case 1:
+          c = SEEK;
+          break;
+
+        case 2:
+          c = COMMITTED;
+          break;
+
+        case 3:
+          c = CHECKPOINTED;
+          break;
+
+        case 4:
+          c = CONNECTED;
+          break;
+
+        case 5:
+          c = DISCONNECTED;
+          break;
+
+        case 6:
+          c = WINDOWED;
+          break;
+
+        case 7:
+          c = SERVER_ERROR;
+          break;
+
+        default:
+          throw new IllegalArgumentException(String.format("No Command defined for ordinal %b", b));
+      }
+
+      assert (b == c.ord);
+      return c;
+    }
+
+    private final byte ord;
+  }
+
+  public final ArrayList<Request> requests = new ArrayList<Request>(4);
+
+  @Override
+  public ClientListener getClientConnection(SocketChannel sc, ServerSocketChannel ssc)
+  {
+    Client lClient = new Client();
+    lClient.connected();
+    return lClient;
+  }
+
+  public class Client extends AbstractLengthPrependerClient
+  {
+
+    @Override
+    public void onMessage(byte[] buffer, int offset, int size)
+    {
+      if (size != Request.FIXED_SIZE) {
+        logger.warn("Invalid Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size),
+            key.channel());
+        return;
+      }
+
+      long requestTime = Server.readLong(buffer, offset + Request.TIME_OFFSET);
+      if (System.currentTimeMillis() > (requestTime + acceptedTolerance)) {
+        logger.warn("Expired Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size),
+            key.channel());
+        return;
+      }
+
+      try {
+        if (Command.getCommand(buffer[offset]) == Command.ECHO) {
+          write(buffer, offset, size);
+          return;
+        }
+      } catch (IllegalArgumentException ex) {
+        logger.warn("Invalid Request Received: {} from {}!", Arrays.copyOfRange(buffer, offset, offset + size),
+            key.channel(), ex);
+        return;
+      }
+
+      Request r = Request.getRequest(buffer, offset, this);
+      synchronized (requests) {
+        requests.add(r);
+      }
+    }
+
+    @Override
+    public void disconnected()
+    {
+      synchronized (requests) {
+        requests.add(Request.getRequest(
+            new byte[] {Command.DISCONNECTED.getOrdinal(), 0, 0, 0, 0, 0, 0, 0, 0}, 0, this));
+      }
+      super.disconnected();
+    }
+
+    public boolean write(byte[] address, Slice event)
+    {
+      if (event.offset == 0 && event.length == event.buffer.length) {
+        return write(address, event.buffer);
+      }
+
+      // a better method would be to replace the write implementation and allow it to natively support writing slices
+      return write(address, event.toByteArray());
+    }
+
+  }
+
+  public abstract static class Request
+  {
+    public static final int FIXED_SIZE = 17;
+    public static final int TIME_OFFSET = 9;
+    public final Command type;
+    public final Client client;
+
+    public Request(Command type, Client client)
+    {
+      this.type = type;
+      this.client = client;
+    }
+
+    public abstract Slice getAddress();
+
+    public abstract int getEventCount();
+
+    public abstract int getIdleCount();
+
+    @Override
+    public String toString()
+    {
+      return "Request{" + "type=" + type + '}';
+    }
+
+    public static Request getRequest(final byte[] buffer, final int offset, Client client)
+    {
+      Command command = Command.getCommand(buffer[offset]);
+      switch (command) {
+        case WINDOWED:
+          return new Request(Command.WINDOWED, client)
+          {
+            final int eventCount;
+            final int idleCount;
+
+            {
+              eventCount = Server.readInt(buffer, offset + 1);
+              idleCount = Server.readInt(buffer, offset + 5);
+            }
+
+            @Override
+            public Slice getAddress()
+            {
+              throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public int getEventCount()
+            {
+              return eventCount;
+            }
+
+            @Override
+            public int getIdleCount()
+            {
+              return idleCount;
+            }
+
+            @Override
+            public String toString()
+            {
+              return "Request{" + "type=" + type + ", eventCount=" + eventCount + ", idleCount=" + idleCount + '}';
+            }
+
+          };
+
+        default:
+          return new Request(command, client)
+          {
+            final Slice address;
+
+            {
+              address = new Slice(buffer, offset + 1, 8);
+            }
+
+            @Override
+            public Slice getAddress()
+            {
+              return address;
+            }
+
+            @Override
+            public int getEventCount()
+            {
+              throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public int getIdleCount()
+            {
+              throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public String toString()
+            {
+              return "Request{" + "type=" + type + ", address=" + address + '}';
+            }
+
+          };
+
+      }
+
+    }
+
+  }
+
+  public static int readInt(byte[] buffer, int offset)
+  {
+    return buffer[offset++] & 0xff
+           | (buffer[offset++] & 0xff) << 8
+           | (buffer[offset++] & 0xff) << 16
+           | (buffer[offset++] & 0xff) << 24;
+  }
+
+  public static void writeInt(byte[] buffer, int offset, int i)
+  {
+    buffer[offset++] = (byte)i;
+    buffer[offset++] = (byte)(i >>> 8);
+    buffer[offset++] = (byte)(i >>> 16);
+    buffer[offset++] = (byte)(i >>> 24);
+  }
+
+  public static long readLong(byte[] buffer, int offset)
+  {
+    return (long)buffer[offset++] & 0xff
+           | (long)(buffer[offset++] & 0xff) << 8
+           | (long)(buffer[offset++] & 0xff) << 16
+           | (long)(buffer[offset++] & 0xff) << 24
+           | (long)(buffer[offset++] & 0xff) << 32
+           | (long)(buffer[offset++] & 0xff) << 40
+           | (long)(buffer[offset++] & 0xff) << 48
+           | (long)(buffer[offset++] & 0xff) << 56;
+  }
+
+  public static void writeLong(byte[] buffer, int offset, long l)
+  {
+    buffer[offset++] = (byte)l;
+    buffer[offset++] = (byte)(l >>> 8);
+    buffer[offset++] = (byte)(l >>> 16);
+    buffer[offset++] = (byte)(l >>> 24);
+    buffer[offset++] = (byte)(l >>> 32);
+    buffer[offset++] = (byte)(l >>> 40);
+    buffer[offset++] = (byte)(l >>> 48);
+    buffer[offset++] = (byte)(l >>> 56);
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(Server.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/source/HdfsTestSource.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/source/HdfsTestSource.java b/flume/src/main/java/org/apache/apex/malhar/flume/source/HdfsTestSource.java
new file mode 100644
index 0000000..6160bd5
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/source/HdfsTestSource.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.source;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+/**
+ * <p>TestSource class.</p>
+ *
+ * @since 0.9.4
+ */
+public class HdfsTestSource extends AbstractSource implements EventDrivenSource, Configurable
+{
+  public static final String SOURCE_DIR = "sourceDir";
+  public static final String RATE = "rate";
+  public static final String INIT_DATE = "initDate";
+
+  static byte FIELD_SEPARATOR = 2;
+  public Timer emitTimer;
+  @Nonnull
+  String directory;
+  Path directoryPath;
+  int rate;
+  String initDate;
+  long initTime;
+  List<String> dataFiles;
+  long oneDayBack;
+
+  private transient BufferedReader br = null;
+  protected transient FileSystem fs;
+  private transient Configuration configuration;
+
+  private transient int currentFile = 0;
+  private transient boolean finished;
+  private List<Event> events;
+
+  public HdfsTestSource()
+  {
+    super();
+    this.rate = 2500;
+    dataFiles = Lists.newArrayList();
+    Calendar calendar = Calendar.getInstance();
+    calendar.add(Calendar.DATE, -1);
+    oneDayBack = calendar.getTimeInMillis();
+    configuration = new Configuration();
+    events = Lists.newArrayList();
+  }
+
+  @Override
+  public void configure(Context context)
+  {
+    directory = context.getString(SOURCE_DIR);
+    rate = context.getInteger(RATE, rate);
+    initDate = context.getString(INIT_DATE);
+
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(directory));
+    directoryPath = new Path(directory);
+
+    String[] parts = initDate.split("-");
+    Preconditions.checkArgument(parts.length == 3);
+    Calendar calendar = Calendar.getInstance();
+    calendar.set(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]) - 1, Integer.parseInt(parts[2]), 0, 0, 0);
+    initTime = calendar.getTimeInMillis();
+
+    try {
+      List<String> files = findFiles();
+      for (String file : files) {
+        dataFiles.add(file);
+      }
+      if (logger.isDebugEnabled()) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+        logger.debug("settings {} {} {} {} {}", directory, rate, dateFormat.format(oneDayBack),
+            dateFormat.format(new Date(initTime)), currentFile);
+        for (String file : dataFiles) {
+          logger.debug("settings add file {}", file);
+        }
+      }
+
+      fs = FileSystem.newInstance(new Path(directory).toUri(), configuration);
+      Path filePath = new Path(dataFiles.get(currentFile));
+      br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(fs.open(filePath))));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    finished = true;
+
+  }
+
+  private List<String> findFiles() throws IOException
+  {
+    List<String> files = Lists.newArrayList();
+    Path directoryPath = new Path(directory);
+    FileSystem lfs = FileSystem.newInstance(directoryPath.toUri(), configuration);
+    try {
+      logger.debug("checking for new files in {}", directoryPath);
+      RemoteIterator<LocatedFileStatus> statuses = lfs.listFiles(directoryPath, true);
+      for (; statuses.hasNext(); ) {
+        FileStatus status = statuses.next();
+        Path path = status.getPath();
+        String filePathStr = path.toString();
+        if (!filePathStr.endsWith(".gz")) {
+          continue;
+        }
+        logger.debug("new file {}", filePathStr);
+        files.add(path.toString());
+      }
+    } catch (FileNotFoundException e) {
+      logger.warn("Failed to list directory {}", directoryPath, e);
+      throw new RuntimeException(e);
+    } finally {
+      lfs.close();
+    }
+    return files;
+  }
+
+  @Override
+  public void start()
+  {
+    super.start();
+    emitTimer = new Timer();
+
+    final ChannelProcessor channelProcessor = getChannelProcessor();
+    emitTimer.scheduleAtFixedRate(new TimerTask()
+    {
+      @Override
+      public void run()
+      {
+        int lineCount = 0;
+        events.clear();
+        try {
+          while (lineCount < rate && !finished) {
+            String line = br.readLine();
+
+            if (line == null) {
+              logger.debug("completed file {}", currentFile);
+              br.close();
+              currentFile++;
+              if (currentFile == dataFiles.size()) {
+                logger.info("finished all files");
+                finished = true;
+                break;
+              }
+              Path filePath = new Path(dataFiles.get(currentFile));
+              br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(fs.open(filePath))));
+              logger.info("opening file {}. {}", currentFile, filePath);
+              continue;
+            }
+            lineCount++;
+            Event flumeEvent = EventBuilder.withBody(line.getBytes());
+            events.add(flumeEvent);
+          }
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        if (events.size() > 0) {
+          channelProcessor.processEventBatch(events);
+        }
+        if (finished) {
+          emitTimer.cancel();
+        }
+      }
+
+    }, 0, 1000);
+  }
+
+  @Override
+  public void stop()
+  {
+    emitTimer.cancel();
+    super.stop();
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(HdfsTestSource.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/source/TestSource.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/source/TestSource.java b/flume/src/main/java/org/apache/apex/malhar/flume/source/TestSource.java
new file mode 100644
index 0000000..87c118f
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/source/TestSource.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.source;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * <p>TestSource class.</p>
+ *
+ * @since 0.9.4
+ */
+public class TestSource extends AbstractSource implements EventDrivenSource, Configurable
+{
+  public static final String SOURCE_FILE = "sourceFile";
+  public static final String LINE_NUMBER = "lineNumber";
+  public static final String RATE = "rate";
+  public static final String PERCENT_PAST_EVENTS = "percentPastEvents";
+  static byte FIELD_SEPARATOR = 1;
+  static int DEF_PERCENT_PAST_EVENTS = 5;
+  public Timer emitTimer;
+  @Nonnull
+  String filePath;
+  int rate;
+  int numberOfPastEvents;
+  transient List<Row> cache;
+  private transient int startIndex;
+  private transient Random random;
+  private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+  private SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+  public TestSource()
+  {
+    super();
+    this.rate = 2500;
+    this.numberOfPastEvents = DEF_PERCENT_PAST_EVENTS * 25;
+    this.random = new Random();
+
+  }
+
+  @Override
+  public void configure(Context context)
+  {
+    filePath = context.getString(SOURCE_FILE);
+    rate = context.getInteger(RATE, rate);
+    int percentPastEvents = context.getInteger(PERCENT_PAST_EVENTS, DEF_PERCENT_PAST_EVENTS);
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(filePath));
+    try {
+      BufferedReader lineReader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
+      try {
+        buildCache(lineReader);
+      } finally {
+        lineReader.close();
+      }
+    } catch (FileNotFoundException e) {
+      throw new RuntimeException(e);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (DEF_PERCENT_PAST_EVENTS != percentPastEvents) {
+      numberOfPastEvents = (int)(percentPastEvents / 100.0 * cache.size());
+    }
+  }
+
+  @Override
+  public void start()
+  {
+    super.start();
+    emitTimer = new Timer();
+
+    final ChannelProcessor channel = getChannelProcessor();
+    final int cacheSize = cache.size();
+    emitTimer.scheduleAtFixedRate(new TimerTask()
+    {
+      @Override
+      public void run()
+      {
+        int lastIndex = startIndex + rate;
+        if (lastIndex > cacheSize) {
+          lastIndex -= cacheSize;
+          processBatch(channel, cache.subList(startIndex, cacheSize));
+          startIndex = 0;
+          while (lastIndex > cacheSize) {
+            processBatch(channel, cache);
+            lastIndex -= cacheSize;
+          }
+          processBatch(channel, cache.subList(0, lastIndex));
+        } else {
+          processBatch(channel, cache.subList(startIndex, lastIndex));
+        }
+        startIndex = lastIndex;
+      }
+
+    }, 0, 1000);
+  }
+
+  private void processBatch(ChannelProcessor channelProcessor, List<Row> rows)
+  {
+    if (rows.isEmpty()) {
+      return;
+    }
+
+    int noise = random.nextInt(numberOfPastEvents + 1);
+    Set<Integer> pastIndices = Sets.newHashSet();
+    for (int i = 0; i < noise; i++) {
+      pastIndices.add(random.nextInt(rows.size()));
+    }
+
+    Calendar calendar = Calendar.getInstance();
+    long high = calendar.getTimeInMillis();
+    calendar.add(Calendar.DATE, -2);
+    long low = calendar.getTimeInMillis();
+
+
+
+    List<Event> events = Lists.newArrayList();
+    for (int i = 0; i < rows.size(); i++) {
+      Row eventRow = rows.get(i);
+      if (pastIndices.contains(i)) {
+        long pastTime = (long)((Math.random() * (high - low)) + low);
+        byte[] pastDateField = dateFormat.format(pastTime).getBytes();
+        byte[] pastTimeField = timeFormat.format(pastTime).getBytes();
+
+        System.arraycopy(pastDateField, 0, eventRow.bytes, eventRow.dateFieldStart, pastDateField.length);
+        System.arraycopy(pastTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, pastTimeField.length);
+      } else {
+        calendar.setTimeInMillis(System.currentTimeMillis());
+        byte[] currentDateField = dateFormat.format(calendar.getTime()).getBytes();
+        byte[] currentTimeField = timeFormat.format(calendar.getTime()).getBytes();
+
+        System.arraycopy(currentDateField, 0, eventRow.bytes, eventRow.dateFieldStart, currentDateField.length);
+        System.arraycopy(currentTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, currentTimeField.length);
+      }
+
+      HashMap<String, String> headers = new HashMap<String, String>(2);
+      headers.put(SOURCE_FILE, filePath);
+      headers.put(LINE_NUMBER, String.valueOf(startIndex + i));
+      events.add(EventBuilder.withBody(eventRow.bytes, headers));
+    }
+    channelProcessor.processEventBatch(events);
+  }
+
+  @Override
+  public void stop()
+  {
+    emitTimer.cancel();
+    super.stop();
+  }
+
+  private void buildCache(BufferedReader lineReader) throws IOException
+  {
+    cache = Lists.newArrayListWithCapacity(rate);
+
+    String line;
+    while ((line = lineReader.readLine()) != null) {
+      byte[] row = line.getBytes();
+      Row eventRow = new Row(row);
+      final int rowsize = row.length;
+
+      /* guid */
+      int sliceLengh = -1;
+      while (++sliceLengh < rowsize) {
+        if (row[sliceLengh] == FIELD_SEPARATOR) {
+          break;
+        }
+      }
+      int recordStart = sliceLengh + 1;
+      int pointer = sliceLengh + 1;
+      while (pointer < rowsize) {
+        if (row[pointer++] == FIELD_SEPARATOR) {
+          eventRow.dateFieldStart = recordStart;
+          break;
+        }
+      }
+
+      /* lets parse the date */
+      int dateStart = pointer;
+      while (pointer < rowsize) {
+        if (row[pointer++] == FIELD_SEPARATOR) {
+          eventRow.timeFieldStart = dateStart;
+          break;
+        }
+      }
+
+      cache.add(eventRow);
+    }
+  }
+
+  private static class Row
+  {
+    final byte[] bytes;
+    int dateFieldStart;
+    int timeFieldStart;
+//    boolean past;
+
+    Row(byte[] bytes)
+    {
+      this.bytes = bytes;
+    }
+
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(TestSource.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/storage/DebugWrapper.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/DebugWrapper.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/DebugWrapper.java
new file mode 100644
index 0000000..ae1ed23
--- /dev/null
+++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/DebugWrapper.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.flume.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>DebugWrapper class.</p>
+ *
+ * @since 0.9.4
+ */
+public class DebugWrapper implements Storage, Configurable, Component<com.datatorrent.api.Context>
+{
+  HDFSStorage storage = new HDFSStorage();
+
+  @Override
+  public byte[] store(Slice bytes)
+  {
+    byte[] ret = null;
+
+    try {
+      ret = storage.store(bytes);
+    } finally {
+      logger.debug("storage.store(new byte[]{{}});", bytes);
+    }
+
+    return ret;
+  }
+
+  @Override
+  public byte[] retrieve(byte[] identifier)
+  {
+    byte[] ret = null;
+
+    try {
+      ret = storage.retrieve(identifier);
+    } finally {
+      logger.debug("storage.retrieve(new byte[]{{}});", identifier);
+    }
+
+    return ret;
+  }
+
+  @Override
+  public byte[] retrieveNext()
+  {
+    byte[] ret = null;
+    try {
+      ret = storage.retrieveNext();
+    } finally {
+      logger.debug("storage.retrieveNext();");
+    }
+
+    return ret;
+  }
+
+  @Override
+  public void clean(byte[] identifier)
+  {
+    try {
+      storage.clean(identifier);
+    } finally {
+      logger.debug("storage.clean(new byte[]{{}});", identifier);
+    }
+  }
+
+  @Override
+  public void flush()
+  {
+    try {
+      storage.flush();
+    } finally {
+      logger.debug("storage.flush();");
+    }
+  }
+
+  @Override
+  public void configure(Context cntxt)
+  {
+    try {
+      storage.configure(cntxt);
+    } finally {
+      logger.debug("storage.configure({});", cntxt);
+    }
+  }
+
+  @Override
+  public void setup(com.datatorrent.api.Context t1)
+  {
+    try {
+      storage.setup(t1);
+    } finally {
+      logger.debug("storage.setup({});", t1);
+    }
+
+  }
+
+  @Override
+  public void teardown()
+  {
+    try {
+      storage.teardown();
+    } finally {
+      logger.debug("storage.teardown();");
+    }
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(DebugWrapper.class);
+}


Mime
View raw message