apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [1/3] apex-malhar git commit: APEXMALHAR-2220 Move the FunctionOperator to Malhar library
Date Sun, 19 Mar 2017 14:54:12 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master a1c319ca7 -> a017dfaa4


APEXMALHAR-2220 Move the FunctionOperator to Malhar library


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/65488fd6
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/65488fd6
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/65488fd6

Branch: refs/heads/master
Commit: 65488fd6e585ee18d084d05f5a44329eb62753f4
Parents: b43818b
Author: Dongming Liang <d9liang@apache.org>
Authored: Fri Nov 4 18:57:21 2016 -0700
Committer: Dongming Liang <d9liang@apache.org>
Committed: Fri Nov 4 18:57:21 2016 -0700

----------------------------------------------------------------------
 .../malhar/stream/sample/MinimalWordCount.java  |   2 +-
 .../malhar/stream/sample/WindowedWordCount.java |   2 +-
 .../stream/sample/complete/AutoComplete.java    |   2 +-
 .../sample/complete/StreamingWordExtract.java   |   2 +-
 .../sample/complete/TopWikipediaSessions.java   |   2 +-
 .../stream/sample/complete/TrafficRoutes.java   |   2 +-
 .../sample/complete/TwitterAutoComplete.java    |   2 +-
 .../sample/cookbook/CombinePerKeyExamples.java  |   2 +-
 .../stream/sample/cookbook/DeDupExample.java    |   2 +-
 .../sample/cookbook/MaxPerKeyExamples.java      |   2 +-
 .../stream/sample/cookbook/TriggerExample.java  |   2 +-
 .../lib/function/AnnonymousClassModifier.java   | 134 +++++++
 .../apex/malhar/lib/function/Function.java      |  87 +++++
 .../malhar/lib/function/FunctionOperator.java   | 378 +++++++++++++++++++
 .../malhar/lib/utils/ByteArrayClassLoader.java  |  54 +++
 .../apache/apex/malhar/lib/utils/TupleUtil.java |  46 +++
 .../apex/malhar/stream/api/ApexStream.java      |   2 +-
 .../apex/malhar/stream/api/WindowedStream.java  |   5 +-
 .../malhar/stream/api/function/Function.java    |  88 -----
 .../malhar/stream/api/impl/ApexStreamImpl.java  |   6 +-
 .../stream/api/impl/ApexWindowedStreamImpl.java |   2 +-
 .../api/operator/AnnonymousClassModifier.java   | 134 -------
 .../api/operator/ByteArrayClassLoader.java      |  54 ---
 .../stream/api/operator/FunctionOperator.java   | 378 -------------------
 .../apex/malhar/stream/api/util/TupleUtil.java  |  46 ---
 .../FunctionOperator/FunctionOperatorTest.java  |   4 +-
 .../stream/sample/ApplicationWithStreamAPI.java |   2 +-
 .../LocalTestWithoutStreamApplication.java      |   2 +-
 .../apex/malhar/stream/sample/MyStream.java     |   2 +-
 .../apex/malhar/stream/sample/MyStreamTest.java |   2 +-
 .../stream/sample/WordCountWithStreamAPI.java   |   2 +-
 31 files changed, 723 insertions(+), 727 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
index 327c882..160175f 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
@@ -22,10 +22,10 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 import org.apache.hadoop.conf.Configuration;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
index 5b83bd0..6e57bfd 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
@@ -27,12 +27,12 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.joda.time.Duration;
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
index 2db59b6..571a25f 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.Window;
@@ -36,7 +37,6 @@ import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
index 07f01d0..b5e491e 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
@@ -22,9 +22,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.Option;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 import org.apache.hadoop.conf.Configuration;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
index 68ec733..b2b9ae4 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
 
 import org.joda.time.Duration;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.Window;
@@ -34,7 +35,6 @@ import org.apache.apex.malhar.lib.window.accumulation.TopN;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 import org.apache.hadoop.conf.Configuration;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
index e6a53d6..431263a 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
@@ -29,6 +29,7 @@ import javax.annotation.Nullable;
 
 import org.joda.time.Duration;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
@@ -36,7 +37,6 @@ import org.apache.apex.malhar.lib.window.accumulation.Group;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 import org.apache.hadoop.conf.Configuration;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
index 4fc80ea..f6829da 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
@@ -28,6 +28,7 @@ import java.util.regex.Pattern;
 
 import org.joda.time.Duration;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.Window;
@@ -36,7 +37,6 @@ import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
index bfdb268..937476e 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@ -21,6 +21,7 @@ package org.apache.apex.malhar.stream.sample.cookbook;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
@@ -28,7 +29,6 @@ import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
index 4df5fe7..ab6b28b 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
@@ -23,12 +23,12 @@ import java.util.List;
 
 import org.joda.time.Duration;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.lib.window.accumulation.RemoveDuplicates;
 import org.apache.apex.malhar.stream.api.ApexStream;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 import org.apache.hadoop.conf.Configuration;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
index 9fd9495..f28b96a 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
@@ -20,6 +20,7 @@ package org.apache.apex.malhar.stream.sample.cookbook;
 
 import java.util.List;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.Window;
@@ -28,7 +29,6 @@ import org.apache.apex.malhar.lib.window.accumulation.Max;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 import org.apache.hadoop.conf.Configuration;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
index 962faa5..2fa7619 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
@@ -23,13 +23,13 @@ import java.util.Objects;
 
 import org.joda.time.Duration;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 
 import com.datatorrent.lib.util.KeyValPair;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/library/src/main/java/org/apache/apex/malhar/lib/function/AnnonymousClassModifier.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/function/AnnonymousClassModifier.java b/library/src/main/java/org/apache/apex/malhar/lib/function/AnnonymousClassModifier.java
new file mode 100644
index 0000000..c84d4bd
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/function/AnnonymousClassModifier.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.function;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.AnnotationVisitor;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Attribute;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassVisitor;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.FieldVisitor;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.MethodVisitor;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
+
+/**
+ * Because annonymous class serialization is not supported by default in most serialization library
+ * This class is used to modify the bytecode of annonymous at runtime.
+ * The limit for this is the annonymous class that is being modified must by stateless
+ *
+ *
+ * @since 3.4.0
+ */
+@InterfaceStability.Evolving
+public class AnnonymousClassModifier extends ClassVisitor
+{
+  private String className;
+
+  private boolean hasDefaultConstructor = false;
+
+  public AnnonymousClassModifier(int i)
+  {
+    super(i);
+  }
+
+  public AnnonymousClassModifier(int i, ClassVisitor classVisitor)
+  {
+    super(i, classVisitor);
+  }
+
+  @Override
+  public void visit(int i, int i1, String s, String s1, String s2, String[] strings)
+  {
+    className = s;
+    super.visit(i, 33, s, s1, s2, strings);
+  }
+
+  @Override
+  public void visitSource(String s, String s1)
+  {
+    super.visitSource(s, s1);
+  }
+
+  @Override
+  public void visitOuterClass(String s, String s1, String s2)
+  {
+    // skip outer class, make it top level. For now only one level annonymous class
+    return;
+  }
+
+  @Override
+  public AnnotationVisitor visitAnnotation(String s, boolean b)
+  {
+    return super.visitAnnotation(s, b);
+  }
+
+
+  @Override
+  public void visitAttribute(Attribute attribute)
+  {
+    super.visitAttribute(attribute);
+  }
+
+  @Override
+  public void visitInnerClass(String s, String s1, String s2, int i)
+  {
+    if (s.equals(className)) {
+      return;
+    }
+    super.visitInnerClass(s, s1, s2, i);
+  }
+
+  @Override
+  public FieldVisitor visitField(int i, String s, String s1, String s2, Object o)
+  {
+    return super.visitField(i, s, s1, s2, o);
+  }
+
+  @Override
+  public MethodVisitor visitMethod(int i, String s, String s1, String s2, String[] strings)
+  {
+    //make the constructor public
+    int j = s.equals("<init>") ? i | Opcodes.ACC_PUBLIC : i;
+    if (s1.contains("()V")) {
+      hasDefaultConstructor = true;
+    }
+
+    return super.visitMethod(i, s, s1, s2, strings);
+  }
+
+  @Override
+  public void visitEnd()
+  {
+
+    // If there is no default constructor, create one
+    if (!hasDefaultConstructor) {
+      MethodVisitor mv = super.visitMethod(Opcodes.ACC_PUBLIC, "<init>", "()V", null, null);
+      mv.visitVarInsn(Opcodes.ALOAD, 0);
+      mv.visitMethodInsn(Opcodes.INVOKESPECIAL,
+          "java/lang/Object",
+          "<init>",
+          "()V");
+      mv.visitInsn(Opcodes.RETURN);
+      mv.visitMaxs(5, 1);
+      mv.visitEnd();
+    }
+
+    super.visitEnd();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java b/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java
new file mode 100644
index 0000000..0d43cd2
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.function;
+
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * The top level function interface <br>
+ * The function is wrapped by {@link FunctionOperator} <br>
+ * It takes input from input port of {@link FunctionOperator} ex. {@link FunctionOperator.MapFunctionOperator#input} <br>
+ * And the output will be emitted using {@link FunctionOperator#tupleOutput} <br>
+ * Anonymous function is not fully supported. It must be <b>stateless</b> should not be defined in any static context<br>
+ * If anonymous function does not working, you can should use top level function class<br>
+ * Top level function class should have public non-arg constructor
+ *
+ * @since 3.4.0
+ */
+@InterfaceStability.Evolving
+public interface Function
+{
+  /**
+   * If the {@link Function} implements this interface.
+   * The state of the function will be checkpointed
+   */
+  public static interface Stateful
+  {
+
+  }
+
+  /**
+   * An interface defines a one input one output transformation
+   * @param <I>
+   * @param <O>
+   */
+  public static interface MapFunction<I, O> extends Function
+  {
+    O f(I input);
+  }
+
+  /**
+   * A special map function to convert any pojo to key value pair datastructure
+   * @param <T>
+   * @param <K>
+   * @param <V>
+   */
+  public static interface ToKeyValue<T, K, V> extends MapFunction<T, Tuple<KeyValPair<K, V>>>
+  {
+
+  }
+
+  /**
+   * An interface that defines flatmap transformation
+   * @param <I>
+   * @param <O>
+   */
+  public static interface FlatMapFunction<I, O> extends MapFunction<I, Iterable<O>>
+  {
+  }
+
+  /**
+   * An interface that defines filter transformation
+   * @param <T>
+   */
+  public static interface FilterFunction<T> extends Function
+  {
+    boolean f(T input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java
new file mode 100644
index 0000000..b6190a0
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.function;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.utils.ByteArrayClassLoader;
+import org.apache.apex.malhar.lib.utils.TupleUtil;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+
+/**
+ * Operators that wrap the functions
+ *
+ * @since 3.4.0
+ */
+@InterfaceStability.Evolving
+public class FunctionOperator<OUT, FUNCTION extends Function> implements Operator
+{
+  private byte[] annonymousFunctionClass;
+
+  protected transient FUNCTION statelessF;
+
+  protected FUNCTION statefulF;
+
+  protected boolean stateful = false;
+
+  protected boolean isAnnonymous = false;
+
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<OUT> output = new DefaultOutputPort<>();
+
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Tuple<OUT>> tupleOutput = new DefaultOutputPort<>();
+
+  public FunctionOperator(FUNCTION f)
+  {
+    isAnnonymous = f.getClass().isAnonymousClass();
+    if (isAnnonymous) {
+      annonymousFunctionClass = functionClassData(f);
+    } else if (f instanceof Function.Stateful) {
+      statelessF = f;
+    } else {
+      statefulF = f;
+      stateful = true;
+    }
+  }
+
+  private byte[] functionClassData(Function f)
+  {
+    Class<? extends Function> classT = f.getClass();
+
+    byte[] classBytes = null;
+    byte[] classNameBytes = null;
+    String className = classT.getName();
+    try {
+      classNameBytes = className.replace('.', '/').getBytes();
+      classBytes = IOUtils.toByteArray(classT.getClassLoader().getResourceAsStream(className.replace('.', '/') + ".class"));
+      int cursor = 0;
+      for (int j = 0; j < classBytes.length; j++) {
+        if (classBytes[j] != classNameBytes[cursor]) {
+          cursor = 0;
+        } else {
+          cursor++;
+        }
+
+        if (cursor == classNameBytes.length) {
+          for (int p = 0; p < classNameBytes.length; p++) {
+            if (classBytes[j - p] == '$') {
+              classBytes[j - p] = '_';
+            }
+          }
+          cursor = 0;
+        }
+      }
+      ClassReader cr = new ClassReader(new ByteArrayInputStream(classBytes));
+      ClassWriter cw = new ClassWriter(0);
+      AnnonymousClassModifier annonymousClassModifier = new AnnonymousClassModifier(Opcodes.ASM4, cw);
+      cr.accept(annonymousClassModifier, 0);
+      classBytes = cw.toByteArray();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    int dataLength = classNameBytes.length + 4 + 4;
+
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(dataLength);
+    DataOutputStream output = new DataOutputStream(byteArrayOutputStream);
+
+    try {
+      output.writeInt(classNameBytes.length);
+      output.write(className.replace('$', '_').getBytes());
+      output.writeInt(classBytes.length);
+      output.write(classBytes);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      try {
+        output.flush();
+        output.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    return byteArrayOutputStream.toByteArray();
+
+  }
+
+  /**
+   * Default constructor to make kryo happy
+   */
+  public FunctionOperator()
+  {
+
+  }
+
+  @Override
+  public void beginWindow(long l)
+  {
+
+  }
+
+  @Override
+  public void endWindow()
+  {
+
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    readFunction();
+  }
+
+
+  @SuppressWarnings("unchecked")
+  private void readFunction()
+  {
+    try {
+      if (statelessF != null || statefulF != null) {
+        return;
+      }
+      DataInputStream input = new DataInputStream(new ByteArrayInputStream(annonymousFunctionClass));
+      byte[] classNameBytes = new byte[input.readInt()];
+      input.read(classNameBytes);
+      String className = new String(classNameBytes);
+      byte[] classData = new byte[input.readInt()];
+      input.read(classData);
+      Map<String, byte[]> classBin = new HashMap<>();
+      classBin.put(className, classData);
+      ByteArrayClassLoader byteArrayClassLoader = new ByteArrayClassLoader(classBin, Thread.currentThread().getContextClassLoader());
+      statelessF = ((Class<FUNCTION>)byteArrayClassLoader.findClass(className)).newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  public FUNCTION getFunction()
+  {
+    readFunction();
+    if (stateful) {
+      return statefulF;
+    } else {
+      return statelessF;
+    }
+  }
+
+  public FUNCTION getStatelessF()
+  {
+    return statelessF;
+  }
+
+  public void setStatelessF(FUNCTION statelessF)
+  {
+    this.statelessF = statelessF;
+  }
+
+  public FUNCTION getStatefulF()
+  {
+    return statefulF;
+  }
+
+  public void setStatefulF(FUNCTION statefulF)
+  {
+    this.statefulF = statefulF;
+  }
+
+  public boolean isStateful()
+  {
+    return stateful;
+  }
+
+  public void setStateful(boolean stateful)
+  {
+    this.stateful = stateful;
+  }
+
+  public boolean isAnnonymous()
+  {
+    return isAnnonymous;
+  }
+
+  public void setIsAnnonymous(boolean isAnnonymous)
+  {
+    this.isAnnonymous = isAnnonymous;
+  }
+
+  public static class MapFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.MapFunction<IN, OUT>>
+  {
+
+    public MapFunctionOperator()
+    {
+
+    }
+
+    @InputPortFieldAnnotation(optional = true)
+    public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+    {
+      @Override
+      public void process(IN t)
+      {
+        Function.MapFunction<IN, OUT> f = getFunction();
+        output.emit(f.f(t));
+      }
+    };
+
+    @InputPortFieldAnnotation(optional = true)
+    public final transient DefaultInputPort<Tuple<IN>> tupleInput =  new DefaultInputPort<Tuple<IN>>()
+    {
+      @Override
+      public void process(Tuple<IN> t)
+      {
+        Function.MapFunction<IN, OUT> f = getFunction();
+        if (t instanceof Tuple.PlainTuple) {
+          TupleUtil.buildOf((Tuple.PlainTuple<IN>)t, f.f(t.getValue()));
+        } else {
+          output.emit(f.f(t.getValue()));
+        }
+      }
+    };
+
+    public MapFunctionOperator(Function.MapFunction<IN, OUT> f)
+    {
+      super(f);
+    }
+  }
+
+  public static class FlatMapFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.FlatMapFunction<IN, OUT>>
+  {
+
+    public FlatMapFunctionOperator()
+    {
+
+    }
+
+
+    @InputPortFieldAnnotation(optional = true)
+    public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+    {
+      @Override
+      public void process(IN t)
+      {
+        Function.FlatMapFunction<IN, OUT> f = getFunction();
+        for (OUT out : f.f(t)) {
+          output.emit(out);
+        }
+      }
+    };
+
+    @InputPortFieldAnnotation(optional = true)
+    public final transient DefaultInputPort<Tuple<IN>> tupleInput =  new DefaultInputPort<Tuple<IN>>()
+    {
+      @Override
+      public void process(Tuple<IN> t)
+      {
+        Function.FlatMapFunction<IN, OUT> f = getFunction();
+        if (t instanceof Tuple.PlainTuple) {
+          for (OUT out : f.f(t.getValue())) {
+            tupleOutput.emit(TupleUtil.buildOf((Tuple.PlainTuple<IN>)t, out));
+          }
+        } else {
+          for (OUT out : f.f(t.getValue())) {
+            output.emit(out);
+          }
+        }
+      }
+    };
+
+    public FlatMapFunctionOperator(Function.FlatMapFunction<IN, OUT> f)
+    {
+      super(f);
+    }
+  }
+
+
+  public static class FilterFunctionOperator<IN> extends FunctionOperator<IN, Function.FilterFunction<IN>>
+  {
+
+    public FilterFunctionOperator()
+    {
+
+    }
+
+    @InputPortFieldAnnotation(optional = true)
+    public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+    {
+      @Override
+      public void process(IN t)
+      {
+        Function.FilterFunction<IN> f = getFunction();
+        // fold the value
+        if (f.f(t)) {
+          output.emit(t);
+        }
+      }
+    };
+
+    @InputPortFieldAnnotation(optional = true)
+    public final transient DefaultInputPort<Tuple<IN>> tupleInput =  new DefaultInputPort<Tuple<IN>>()
+    {
+      @Override
+      public void process(Tuple<IN> t)
+      {
+        Function.FilterFunction<IN> f = getFunction();
+        if (f.f(t.getValue())) {
+          tupleOutput.emit(t);
+        }
+
+      }
+    };
+
+    public FilterFunctionOperator(Function.FilterFunction<IN> f)
+    {
+      super(f);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/library/src/main/java/org/apache/apex/malhar/lib/utils/ByteArrayClassLoader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/ByteArrayClassLoader.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/ByteArrayClassLoader.java
new file mode 100644
index 0000000..9c1aa96
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/ByteArrayClassLoader.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils;
+
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * @since 3.4.0
+ */
+@InterfaceStability.Evolving
+public class ByteArrayClassLoader extends ClassLoader
+{
+  private final Map<String, byte[]> classes;
+
+  public ByteArrayClassLoader(Map<String, byte[]> classes)
+  {
+    this.classes = classes;
+  }
+
+  public ByteArrayClassLoader(Map<String, byte[]> classes, ClassLoader parent)
+  {
+    super(parent);
+    this.classes = classes;
+  }
+
+  public Class findClass(String name) throws ClassNotFoundException
+  {
+    byte[] data = (byte[])((byte[])this.classes.get(name));
+    if (data == null) {
+      throw new ClassNotFoundException(name);
+    } else {
+      return super.defineClass(name, data, 0, data.length);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/library/src/main/java/org/apache/apex/malhar/lib/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/TupleUtil.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/TupleUtil.java
new file mode 100644
index 0000000..d2c25fe
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/TupleUtil.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils;
+
+import org.apache.apex.malhar.lib.window.Tuple;
+
+/**
+ * The tuple util will be used to extract fields that are used as key or value<br>
+ * Or converting from data tuples to display tuples <br>
+ * Or generating watermark tuples <br>
+ *
+ *
+ * @since 3.4.0
+ */
+public class TupleUtil
+{
+
+  public static <T, O> Tuple.PlainTuple<O> buildOf(Tuple.PlainTuple<T> t, O newValue)
+  {
+
+    if (t instanceof Tuple.WindowedTuple) {
+      Tuple.WindowedTuple windowedTuple = (Tuple.WindowedTuple)t;
+      return new Tuple.WindowedTuple<>(windowedTuple.getWindows(), windowedTuple.getTimestamp(), newValue);
+    } else if (t instanceof Tuple.TimestampedTuple) {
+      return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)t).getTimestamp(), newValue);
+    } else {
+      return new Tuple.PlainTuple<>(newValue);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
index 47f358f..c09efa5 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
@@ -22,9 +22,9 @@ import java.util.concurrent.Callable;
 
 import org.joda.time.Duration;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.api.Attribute;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
index 0f5ce1e..554f5d6 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.joda.time.Duration;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.Accumulation;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
@@ -29,7 +30,6 @@ import org.apache.apex.malhar.lib.window.accumulation.FoldFn;
 import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
 import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
 import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.lib.util.KeyValPair;
@@ -79,7 +79,6 @@ public interface WindowedStream<T> extends ApexStream<T>
 
   /**
    * Count tuples by the key<br>
-   * @param name name of the operator
    * @param convertToKeyValue The function convert plain tuple to k,v pair
    * @return new stream of Key Value Pair
    */
@@ -88,7 +87,6 @@ public interface WindowedStream<T> extends ApexStream<T>
   /**
    * Return top N tuples by the selected key
    * @param N how many tuples you want to keep
-   * @param name name of the operator
    * @param convertToKeyVal The function convert plain tuple to k,v pair
    * @return new stream of Key and top N tuple of the key
    */
@@ -97,7 +95,6 @@ public interface WindowedStream<T> extends ApexStream<T>
   /**
    * Return top N tuples of all tuples in the window
    * @param N
-   * @param name name of the operator
    * @return new stream of topN
    */
   <STREAM extends WindowedStream<Tuple.WindowedTuple<List<T>>>> STREAM top(int N, Option... opts);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
deleted file mode 100644
index d516064..0000000
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.api.function;
-
-import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.stream.api.operator.FunctionOperator;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * The top level function interface <br>
- * The function is wrapped by {@link FunctionOperator} <br>
- * It takes input from input port of {@link FunctionOperator} ex. {@link FunctionOperator.MapFunctionOperator#input} <br>
- * And the output will be emitted using {@link FunctionOperator#tupleOutput} <br>
- * Anonymous function is not fully supported. It must be <b>stateless</b> should not be defined in any static context<br>
- * If anonymous function does not working, you can should use top level function class<br>
- * Top level function class should have public non-arg constructor
- *
- * @since 3.4.0
- */
-@InterfaceStability.Evolving
-public interface Function
-{
-  /**
-   * If the {@link Function} implements this interface.
-   * The state of the function will be checkpointed
-   */
-  public static interface Stateful
-  {
-
-  }
-
-  /**
-   * An interface defines a one input one output transformation
-   * @param <I>
-   * @param <O>
-   */
-  public static interface MapFunction<I, O> extends Function
-  {
-    O f(I input);
-  }
-
-  /**
-   * A special map function to convert any pojo to key value pair datastructure
-   * @param <T>
-   * @param <K>
-   * @param <V>
-   */
-  public static interface ToKeyValue<T, K, V> extends MapFunction<T, Tuple<KeyValPair<K, V>>>
-  {
-
-  }
-
-  /**
-   * An interface that defines flatmap transformation
-   * @param <I>
-   * @param <O>
-   */
-  public static interface FlatMapFunction<I, O> extends MapFunction<I, Iterable<O>>
-  {
-  }
-
-  /**
-   * An interface that defines filter transformation
-   * @param <T>
-   */
-  public static interface FilterFunction<T> extends Function
-  {
-    boolean f(T input);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
index ba399de..bb0f781 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
@@ -29,6 +29,9 @@ import java.util.concurrent.Callable;
 
 import org.joda.time.Duration;
 
+import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.lib.function.Function.FlatMapFunction;
+import org.apache.apex.malhar.lib.function.FunctionOperator;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.WindowOption;
 
@@ -36,9 +39,6 @@ import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 import org.apache.apex.malhar.stream.api.Option;
 import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
-import org.apache.apex.malhar.stream.api.function.Function.FlatMapFunction;
-import org.apache.apex.malhar.stream.api.operator.FunctionOperator;
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
index 5866a4c..3f08d8c 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.joda.time.Duration;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.Accumulation;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
@@ -40,7 +41,6 @@ import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.Option;
 import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
 
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
deleted file mode 100644
index b0fe3c5..0000000
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.api.operator;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.AnnotationVisitor;
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Attribute;
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassVisitor;
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.FieldVisitor;
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.MethodVisitor;
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
-
-/**
- * Because annonymous class serialization is not supported by default in most serialization library
- * This class is used to modify the bytecode of annonymous at runtime.
- * The limit for this is the annonymous class that is being modified must by stateless
- *
- *
- * @since 3.4.0
- */
-@InterfaceStability.Evolving
-public class AnnonymousClassModifier extends ClassVisitor
-{
-  private String className;
-
-  private boolean hasDefaultConstructor = false;
-
-  public AnnonymousClassModifier(int i)
-  {
-    super(i);
-  }
-
-  public AnnonymousClassModifier(int i, ClassVisitor classVisitor)
-  {
-    super(i, classVisitor);
-  }
-
-  @Override
-  public void visit(int i, int i1, String s, String s1, String s2, String[] strings)
-  {
-    className = s;
-    super.visit(i, 33, s, s1, s2, strings);
-  }
-
-  @Override
-  public void visitSource(String s, String s1)
-  {
-    super.visitSource(s, s1);
-  }
-
-  @Override
-  public void visitOuterClass(String s, String s1, String s2)
-  {
-    // skip outer class, make it top level. For now only one level annonymous class
-    return;
-  }
-
-  @Override
-  public AnnotationVisitor visitAnnotation(String s, boolean b)
-  {
-    return super.visitAnnotation(s, b);
-  }
-
-
-  @Override
-  public void visitAttribute(Attribute attribute)
-  {
-    super.visitAttribute(attribute);
-  }
-
-  @Override
-  public void visitInnerClass(String s, String s1, String s2, int i)
-  {
-    if (s.equals(className)) {
-      return;
-    }
-    super.visitInnerClass(s, s1, s2, i);
-  }
-
-  @Override
-  public FieldVisitor visitField(int i, String s, String s1, String s2, Object o)
-  {
-    return super.visitField(i, s, s1, s2, o);
-  }
-
-  @Override
-  public MethodVisitor visitMethod(int i, String s, String s1, String s2, String[] strings)
-  {
-    //make the constructor public
-    int j = s.equals("<init>") ? i | Opcodes.ACC_PUBLIC : i;
-    if (s1.contains("()V")) {
-      hasDefaultConstructor = true;
-    }
-
-    return super.visitMethod(i, s, s1, s2, strings);
-  }
-
-  @Override
-  public void visitEnd()
-  {
-
-    // If there is no default constructor, create one
-    if (!hasDefaultConstructor) {
-      MethodVisitor mv = super.visitMethod(Opcodes.ACC_PUBLIC, "<init>", "()V", null, null);
-      mv.visitVarInsn(Opcodes.ALOAD, 0);
-      mv.visitMethodInsn(Opcodes.INVOKESPECIAL,
-          "java/lang/Object",
-          "<init>",
-          "()V");
-      mv.visitInsn(Opcodes.RETURN);
-      mv.visitMaxs(5, 1);
-      mv.visitEnd();
-    }
-
-    super.visitEnd();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
deleted file mode 100644
index 05a791c..0000000
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.api.operator;
-
-
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * @since 3.4.0
- */
-@InterfaceStability.Evolving
-public class ByteArrayClassLoader extends ClassLoader
-{
-  private final Map<String, byte[]> classes;
-
-  public ByteArrayClassLoader(Map<String, byte[]> classes)
-  {
-    this.classes = classes;
-  }
-
-  public ByteArrayClassLoader(Map<String, byte[]> classes, ClassLoader parent)
-  {
-    super(parent);
-    this.classes = classes;
-  }
-
-  protected Class findClass(String name) throws ClassNotFoundException
-  {
-    byte[] data = (byte[])((byte[])this.classes.get(name));
-    if (data == null) {
-      throw new ClassNotFoundException(name);
-    } else {
-      return super.defineClass(name, data, 0, data.length);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
deleted file mode 100644
index 1e2066c..0000000
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.api.operator;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.stream.api.function.Function;
-import org.apache.apex.malhar.stream.api.util.TupleUtil;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader;
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter;
-import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-
-/**
- * Operators that wrap the functions
- *
- * @since 3.4.0
- */
-@InterfaceStability.Evolving
-public class FunctionOperator<OUT, FUNCTION extends Function> implements Operator
-{
-  private byte[] annonymousFunctionClass;
-
-  protected transient FUNCTION statelessF;
-
-  protected FUNCTION statefulF;
-
-  protected boolean stateful = false;
-
-  protected boolean isAnnonymous = false;
-
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<OUT> output = new DefaultOutputPort<>();
-
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<Tuple<OUT>> tupleOutput = new DefaultOutputPort<>();
-
-  public FunctionOperator(FUNCTION f)
-  {
-    isAnnonymous = f.getClass().isAnonymousClass();
-    if (isAnnonymous) {
-      annonymousFunctionClass = functionClassData(f);
-    } else if (f instanceof Function.Stateful) {
-      statelessF = f;
-    } else {
-      statefulF = f;
-      stateful = true;
-    }
-  }
-
-  private byte[] functionClassData(Function f)
-  {
-    Class<? extends Function> classT = f.getClass();
-
-    byte[] classBytes = null;
-    byte[] classNameBytes = null;
-    String className = classT.getName();
-    try {
-      classNameBytes = className.replace('.', '/').getBytes();
-      classBytes = IOUtils.toByteArray(classT.getClassLoader().getResourceAsStream(className.replace('.', '/') + ".class"));
-      int cursor = 0;
-      for (int j = 0; j < classBytes.length; j++) {
-        if (classBytes[j] != classNameBytes[cursor]) {
-          cursor = 0;
-        } else {
-          cursor++;
-        }
-
-        if (cursor == classNameBytes.length) {
-          for (int p = 0; p < classNameBytes.length; p++) {
-            if (classBytes[j - p] == '$') {
-              classBytes[j - p] = '_';
-            }
-          }
-          cursor = 0;
-        }
-      }
-      ClassReader cr = new ClassReader(new ByteArrayInputStream(classBytes));
-      ClassWriter cw = new ClassWriter(0);
-      AnnonymousClassModifier annonymousClassModifier = new AnnonymousClassModifier(Opcodes.ASM4, cw);
-      cr.accept(annonymousClassModifier, 0);
-      classBytes = cw.toByteArray();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    int dataLength = classNameBytes.length + 4 + 4;
-
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(dataLength);
-    DataOutputStream output = new DataOutputStream(byteArrayOutputStream);
-
-    try {
-      output.writeInt(classNameBytes.length);
-      output.write(className.replace('$', '_').getBytes());
-      output.writeInt(classBytes.length);
-      output.write(classBytes);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } finally {
-      try {
-        output.flush();
-        output.close();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    return byteArrayOutputStream.toByteArray();
-
-  }
-
-  /**
-   * Default constructor to make kryo happy
-   */
-  public FunctionOperator()
-  {
-
-  }
-
-  @Override
-  public void beginWindow(long l)
-  {
-
-  }
-
-  @Override
-  public void endWindow()
-  {
-
-  }
-
-  @Override
-  public void setup(Context.OperatorContext context)
-  {
-    readFunction();
-  }
-
-
-  @SuppressWarnings("unchecked")
-  private void readFunction()
-  {
-    try {
-      if (statelessF != null || statefulF != null) {
-        return;
-      }
-      DataInputStream input = new DataInputStream(new ByteArrayInputStream(annonymousFunctionClass));
-      byte[] classNameBytes = new byte[input.readInt()];
-      input.read(classNameBytes);
-      String className = new String(classNameBytes);
-      byte[] classData = new byte[input.readInt()];
-      input.read(classData);
-      Map<String, byte[]> classBin = new HashMap<>();
-      classBin.put(className, classData);
-      ByteArrayClassLoader byteArrayClassLoader = new ByteArrayClassLoader(classBin, Thread.currentThread().getContextClassLoader());
-      statelessF = ((Class<FUNCTION>)byteArrayClassLoader.findClass(className)).newInstance();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void teardown()
-  {
-
-  }
-
-  public FUNCTION getFunction()
-  {
-    readFunction();
-    if (stateful) {
-      return statefulF;
-    } else {
-      return statelessF;
-    }
-  }
-
-  public FUNCTION getStatelessF()
-  {
-    return statelessF;
-  }
-
-  public void setStatelessF(FUNCTION statelessF)
-  {
-    this.statelessF = statelessF;
-  }
-
-  public FUNCTION getStatefulF()
-  {
-    return statefulF;
-  }
-
-  public void setStatefulF(FUNCTION statefulF)
-  {
-    this.statefulF = statefulF;
-  }
-
-  public boolean isStateful()
-  {
-    return stateful;
-  }
-
-  public void setStateful(boolean stateful)
-  {
-    this.stateful = stateful;
-  }
-
-  public boolean isAnnonymous()
-  {
-    return isAnnonymous;
-  }
-
-  public void setIsAnnonymous(boolean isAnnonymous)
-  {
-    this.isAnnonymous = isAnnonymous;
-  }
-
-  public static class MapFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.MapFunction<IN, OUT>>
-  {
-
-    public MapFunctionOperator()
-    {
-
-    }
-
-    @InputPortFieldAnnotation(optional = true)
-    public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
-    {
-      @Override
-      public void process(IN t)
-      {
-        Function.MapFunction<IN, OUT> f = getFunction();
-        output.emit(f.f(t));
-      }
-    };
-
-    @InputPortFieldAnnotation(optional = true)
-    public final transient DefaultInputPort<Tuple<IN>> tupleInput =  new DefaultInputPort<Tuple<IN>>()
-    {
-      @Override
-      public void process(Tuple<IN> t)
-      {
-        Function.MapFunction<IN, OUT> f = getFunction();
-        if (t instanceof Tuple.PlainTuple) {
-          TupleUtil.buildOf((Tuple.PlainTuple<IN>)t, f.f(t.getValue()));
-        } else {
-          output.emit(f.f(t.getValue()));
-        }
-      }
-    };
-
-    public MapFunctionOperator(Function.MapFunction<IN, OUT> f)
-    {
-      super(f);
-    }
-  }
-
-  public static class FlatMapFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.FlatMapFunction<IN, OUT>>
-  {
-
-    public FlatMapFunctionOperator()
-    {
-
-    }
-
-
-    @InputPortFieldAnnotation(optional = true)
-    public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
-    {
-      @Override
-      public void process(IN t)
-      {
-        Function.FlatMapFunction<IN, OUT> f = getFunction();
-        for (OUT out : f.f(t)) {
-          output.emit(out);
-        }
-      }
-    };
-
-    @InputPortFieldAnnotation(optional = true)
-    public final transient DefaultInputPort<Tuple<IN>> tupleInput =  new DefaultInputPort<Tuple<IN>>()
-    {
-      @Override
-      public void process(Tuple<IN> t)
-      {
-        Function.FlatMapFunction<IN, OUT> f = getFunction();
-        if (t instanceof Tuple.PlainTuple) {
-          for (OUT out : f.f(t.getValue())) {
-            tupleOutput.emit(TupleUtil.buildOf((Tuple.PlainTuple<IN>)t, out));
-          }
-        } else {
-          for (OUT out : f.f(t.getValue())) {
-            output.emit(out);
-          }
-        }
-      }
-    };
-
-    public FlatMapFunctionOperator(Function.FlatMapFunction<IN, OUT> f)
-    {
-      super(f);
-    }
-  }
-
-
-  public static class FilterFunctionOperator<IN> extends FunctionOperator<IN, Function.FilterFunction<IN>>
-  {
-
-    public FilterFunctionOperator()
-    {
-
-    }
-
-    @InputPortFieldAnnotation(optional = true)
-    public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
-    {
-      @Override
-      public void process(IN t)
-      {
-        Function.FilterFunction<IN> f = getFunction();
-        // fold the value
-        if (f.f(t)) {
-          output.emit(t);
-        }
-      }
-    };
-
-    @InputPortFieldAnnotation(optional = true)
-    public final transient DefaultInputPort<Tuple<IN>> tupleInput =  new DefaultInputPort<Tuple<IN>>()
-    {
-      @Override
-      public void process(Tuple<IN> t)
-      {
-        Function.FilterFunction<IN> f = getFunction();
-        if (f.f(t.getValue())) {
-          tupleOutput.emit(t);
-        }
-
-      }
-    };
-
-    public FilterFunctionOperator(Function.FilterFunction<IN> f)
-    {
-      super(f);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
deleted file mode 100644
index f9a4ed8..0000000
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.api.util;
-
-import org.apache.apex.malhar.lib.window.Tuple;
-
-/**
- * The tuple util will be used to extract fields that are used as key or value<br>
- * Or converting from data tuples to display tuples <br>
- * Or generating watermark tuples <br>
- *
- *
- * @since 3.4.0
- */
-public class TupleUtil
-{
-
-  public static <T, O> Tuple.PlainTuple<O> buildOf(Tuple.PlainTuple<T> t, O newValue)
-  {
-
-    if (t instanceof Tuple.WindowedTuple) {
-      Tuple.WindowedTuple windowedTuple = (Tuple.WindowedTuple)t;
-      return new Tuple.WindowedTuple<>(windowedTuple.getWindows(), windowedTuple.getTimestamp(), newValue);
-    } else if (t instanceof Tuple.TimestampedTuple) {
-      return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)t).getTimestamp(), newValue);
-    } else {
-      return new Tuple.PlainTuple<>(newValue);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
index a5da669..c33d9af 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
@@ -26,10 +26,10 @@ import java.util.concurrent.Callable;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.lib.function.FunctionOperator;
 import org.apache.apex.malhar.stream.api.ApexStream;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
-import org.apache.apex.malhar.stream.api.operator.FunctionOperator;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
index a39ff35..c64126c 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
@@ -22,11 +22,11 @@ import java.util.Arrays;
 
 import org.joda.time.Duration;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.stream.api.ApexStream;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 import org.apache.hadoop.conf.Configuration;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
index f46fb14..45a2363 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
@@ -27,10 +27,10 @@ import java.util.concurrent.Callable;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 
 import com.datatorrent.lib.util.KeyValPair;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
index 20d7aed..ef6a88e 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
@@ -18,7 +18,7 @@
  */
 package org.apache.apex.malhar.stream.sample;
 
-import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl;
 
 import com.datatorrent.api.DAG;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
index d912117..893f546 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
@@ -28,10 +28,10 @@ import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/65488fd6/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java
index 11dabe4..c476055 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java
@@ -22,11 +22,11 @@ import java.util.Arrays;
 
 import org.joda.time.Duration;
 
+import org.apache.apex.malhar.lib.function.Function;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.stream.api.ApexStream;
-import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 import org.apache.hadoop.conf.Configuration;
 


Mime
View raw message