apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From csi...@apache.org
Subject [1/2] incubator-apex-malhar git commit: APEXMALHAR-1720 Implemented Inmemory Join Operator. Supported Inner, LeftOuter, RightOuter and FullOuter join types
Date Fri, 26 Feb 2016 23:30:29 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 8331f56da -> fd2f42bd9


APEXMALHAR-1720 Implemented Inmemory Join Operator. Supported Inner, LeftOuter, RightOuter and FullOuter join types


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/fc547d87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/fc547d87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/fc547d87

Branch: refs/heads/devel-3
Commit: fc547d871da868c73ed5554f6a26708916202bae
Parents: b7199b7
Author: Chaitanya <chaitanya@datatorrent.com>
Authored: Thu Jan 7 22:19:06 2016 +0530
Committer: Chaitanya <chaitanya@datatorrent.com>
Committed: Thu Jan 7 22:19:06 2016 +0530

----------------------------------------------------------------------
 .../lib/join/AbstractJoinOperator.java          | 435 +++++++++++++++++++
 .../java/com/datatorrent/lib/join/Bucket.java   |  91 ++++
 .../com/datatorrent/lib/join/InMemoryStore.java | 107 +++++
 .../com/datatorrent/lib/join/JoinStore.java     |  91 ++++
 .../datatorrent/lib/join/MapJoinOperator.java   | 100 +++++
 .../datatorrent/lib/join/POJOJoinOperator.java  | 266 ++++++++++++
 .../datatorrent/lib/join/TimeBasedStore.java    | 333 ++++++++++++++
 .../com/datatorrent/lib/join/TimeEvent.java     |  50 +++
 .../com/datatorrent/lib/join/TimeEventImpl.java | 121 ++++++
 .../lib/join/MapTimeBasedJoinOperator.java      | 118 +++++
 .../lib/join/POJOTimeBasedJoinOperatorTest.java | 385 ++++++++++++++++
 11 files changed, 2097 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
new file mode 100644
index 0000000..a3f43b5
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * <p>
+ * This is the base implementation of join operator. Operator receives tuples from two streams,
+ * applies the join operation based on constraint and emit the joined value.
+ * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods.
+ *
+ * <b>Properties:</b><br>
+ * <b>expiryTime</b>: Expiry time for stored tuples<br>
+ * <b>includeFieldStr</b>: List of comma separated fields to be added to the output tuple.
+ *                         Ex: Field1,Field2;Field3,Field4<br>
+ * <b>keyFields</b>: List of comma separated key field for both the streams. Ex: Field1,Field2<br>
+ * <b>timeFields</b>: List of comma separated time field for both the streams. Ex: Field1,Field2<br>
+ * <b>bucketSpanInMillis</b>: Span of each bucket in milliseconds.<br>
+ * <b>strategy</b>: Type of join operation. Default type is inner join<br>
+ * <br>
+ *
+ * <b> Example: </b> <br>
+ *  Left input port receives customer details and right input port receives Order details.
+ *  Schema for the Customer be in the form of
+ *  Schema for the Order be in the form of
+ *  Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is
+ *  matched tuples must have timestamp within 5 minutes.
+ *  Here, key Fields = ID, CID and Time Fields = RTime, OTime, expiryTime = 5 minutes </b> <br>
+ *
+ *
+ * @displayName Abstract Join Operator
+ * @tags join
+ */
+@InterfaceStability.Unstable
+public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener
+{
+  @AutoMetric
+  private long tuplesJoinedPerSec;
+  private double windowTimeSec;
+  protected int tuplesCount;
+  public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
+
+  // Strategy of Join operation, by default the option is inner join
+  protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
+  // This represents whether the processing tuple is from left port or not
+  protected boolean isLeft;
+
+  @InputPortFieldAnnotation
+  public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T tuple)
+    {
+      isLeft = true;
+      processTuple(tuple);
+    }
+  };
+  @InputPortFieldAnnotation
+  public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T tuple)
+    {
+      isLeft = false;
+      processTuple(tuple);
+    }
+  };
+
+  // Stores for each of the input port
+  @NotNull
+  protected StoreContext leftStore;
+  @NotNull
+  protected StoreContext rightStore;
+  private String includeFieldStr;
+  private String keyFieldStr;
+  private String timeFieldStr;
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    // Checks whether the strategy is outer join and set it to store
+    boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
+    leftStore.getStore().isOuterJoin(isOuter);
+    isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN);
+    rightStore.getStore().isOuterJoin(isOuter);
+    // Setup the stores
+    leftStore.getStore().setup(context);
+    rightStore.getStore().setup(context);
+    populateFields();
+    windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+      context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0;
+  }
+
+  /**
+   * Create the event with the given tuple. If it successfully inserted it into the store
+   * then it does the join operation
+   *
+   * @param tuple Tuple to process
+   */
+  protected void processTuple(T tuple)
+  {
+    JoinStore store = isLeft ? leftStore.getStore() : rightStore.getStore();
+    TimeEvent t = createEvent(tuple);
+    if (store.put(t)) {
+      join(t, isLeft);
+    }
+  }
+
+  private void populateFields()
+  {
+    populateIncludeFields();
+    populateKeyFields();
+    if (timeFieldStr != null) {
+      populateTimeFields();
+    }
+  }
+
+  /**
+   * Populate the fields from the includeFiledStr
+   */
+  private void populateIncludeFields()
+  {
+    String[] portFields = includeFieldStr.split(";");
+    assert (portFields.length == 2);
+    leftStore.setIncludeFields(portFields[0].split(","));
+    rightStore.setIncludeFields(portFields[1].split(","));
+  }
+
+  /**
+   * Get the tuples from another store based on join constraint and key
+   *
+   * @param tuple  input
+   * @param isLeft whether the given tuple is from first port or not
+   */
+  private void join(TimeEvent tuple, boolean isLeft)
+  {
+    // Get the valid tuples from the store based on key
+    // If the tuple is null means the join type is outer and return unmatched tuples from store.
+
+    ArrayList<TimeEvent> value;
+    JoinStore store = isLeft ? rightStore.getStore() : leftStore.getStore();
+
+    if (tuple != null) {
+      value = (ArrayList<TimeEvent>)store.getValidTuples(tuple);
+    } else {
+      value = (ArrayList<TimeEvent>)store.getUnMatchedTuples();
+    }
+
+    // Join the input tuple with the joined tuples
+    if (value != null) {
+      List<T> result = new ArrayList<>();
+      for (TimeEvent joinedValue : value) {
+        T output = createOutputTuple();
+        Object tupleValue = null;
+        if (tuple != null) {
+          tupleValue = tuple.getValue();
+        }
+        copyValue(output, tupleValue, isLeft);
+        copyValue(output, joinedValue.getValue(), !isLeft);
+        result.add(output);
+        joinedValue.setMatch(true);
+      }
+      if (tuple != null) {
+        tuple.setMatch(true);
+      }
+      if (result.size() != 0) {
+        outputPort.emit(result);
+        tuplesCount += result.size();
+      }
+    }
+  }
+
+  // Emit the unmatched tuples, if the strategy is outer join
+  @Override
+  public void endWindow()
+  {
+    if (strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
+      join(null, false);
+    }
+    if (strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) {
+      join(null, true);
+    }
+    leftStore.getStore().endWindow();
+    rightStore.getStore().endWindow();
+    tuplesJoinedPerSec = (long)(tuplesCount / windowTimeSec);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    tuplesJoinedPerSec = 0;
+    tuplesCount = 0;
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+    leftStore.getStore().checkpointed(windowId);
+    rightStore.getStore().checkpointed(windowId);
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    leftStore.getStore().committed(windowId);
+    rightStore.getStore().committed(windowId);
+  }
+
+  /**
+   * Convert the given tuple to event
+   *
+   * @param tuple Given tuple to convert into event
+   * @return event
+   */
+  protected TimeEvent createEvent(Object tuple)
+  {
+    String key = leftStore.getKeys();
+    String timeField = leftStore.getTimeFields();
+    if (!isLeft) {
+      key = rightStore.getKeys();
+      timeField = rightStore.getTimeFields();
+    }
+    if (timeField != null) {
+      return new TimeEventImpl(getKeyValue(key, tuple), (Long)getTime(timeField, tuple), tuple);
+    } else {
+      return new TimeEventImpl(getKeyValue(key, tuple), Calendar.getInstance().getTimeInMillis(), tuple);
+    }
+  }
+
+  private void populateKeyFields()
+  {
+    leftStore.setKeys(keyFieldStr.split(",")[0]);
+    rightStore.setKeys(keyFieldStr.split(",")[1]);
+  }
+
+  public JoinStrategy getStrategy()
+  {
+    return strategy;
+  }
+
+  public void setStrategy(JoinStrategy strategy)
+  {
+    this.strategy = strategy;
+  }
+
+  public void setLeftStore(@NotNull JoinStore lStore)
+  {
+    leftStore = new StoreContext(lStore);
+  }
+
+  public void setRightStore(@NotNull JoinStore rStore)
+  {
+    rightStore = new StoreContext(rStore);
+  }
+
+  public void setKeyFields(String keyFieldStr)
+  {
+    this.keyFieldStr = keyFieldStr;
+  }
+
+  public void setTimeFieldStr(String timeFieldStr)
+  {
+    this.timeFieldStr = timeFieldStr;
+  }
+
+  public void setIncludeFields(String includeFieldStr)
+  {
+    this.includeFieldStr = includeFieldStr;
+  }
+
+  public StoreContext getLeftStore()
+  {
+    return leftStore;
+  }
+
+  public StoreContext getRightStore()
+  {
+    return rightStore;
+  }
+
+  public String getIncludeFieldStr()
+  {
+    return includeFieldStr;
+  }
+
+  public String getKeyFieldStr()
+  {
+    return keyFieldStr;
+  }
+
+  public String getTimeFieldStr()
+  {
+    return timeFieldStr;
+  }
+
+  /**
+   * Specify the comma separated time fields for both steams
+   */
+  private void populateTimeFields()
+  {
+    leftStore.setTimeFields(timeFieldStr.split(",")[0]);
+    rightStore.setTimeFields(timeFieldStr.split(",")[1]);
+  }
+
+  public void setStrategy(String policy)
+  {
+    this.strategy = JoinStrategy.valueOf(policy.toUpperCase());
+  }
+
+  /**
+   * Create the output object
+   *
+   * @return output tuple
+   */
+  protected abstract T createOutputTuple();
+
+  /**
+   * Get the values from extractTuple and set these values to the output
+   *
+   * @param output otuput tuple
+   * @param extractTuple Extract the values from this tuple
+   * @param isLeft Whether the extracted tuple belongs to left stream or not
+   */
+  protected abstract void copyValue(T output, Object extractTuple, boolean isLeft);
+
+  /**
+   * Get the value of the key field from the given tuple
+   *
+   * @param keyField Value of the field to extract from given tuple
+   * @param tuple Given tuple
+   * @return the value of field from given tuple
+   */
+  protected abstract Object getKeyValue(String keyField, Object tuple);
+
+  /**
+   * Get the value of the time field from the given tuple
+   *
+   * @param field Time field
+   * @param tuple given tuple
+   * @return the value of time field from given tuple
+   */
+  protected abstract Object getTime(String field, Object tuple);
+
+  public static enum JoinStrategy
+  {
+    INNER_JOIN,
+    LEFT_OUTER_JOIN,
+    RIGHT_OUTER_JOIN,
+    OUTER_JOIN
+  }
+
+  public static class StoreContext
+  {
+    private transient String timeFields;
+    private transient String[] includeFields;
+    private transient String keys;
+    private JoinStore store;
+
+    public StoreContext(JoinStore store)
+    {
+      this.store = store;
+    }
+
+    public String getTimeFields()
+    {
+      return timeFields;
+    }
+
+    public void setTimeFields(String timeFields)
+    {
+      this.timeFields = timeFields;
+    }
+
+    public String[] getIncludeFields()
+    {
+      return includeFields;
+    }
+
+    public void setIncludeFields(String[] includeFields)
+    {
+      this.includeFields = includeFields;
+    }
+
+    public String getKeys()
+    {
+      return keys;
+    }
+
+    public void setKeys(String keys)
+    {
+      this.keys = keys;
+    }
+
+    public JoinStore getStore()
+    {
+      return store;
+    }
+
+    public void setStore(JoinStore store)
+    {
+      this.store = store;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/Bucket.java b/library/src/main/java/com/datatorrent/lib/join/Bucket.java
new file mode 100644
index 0000000..13ea496
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/Bucket.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * <p>
+ * This is the base implementation of bucket which contains all the events which belong to the same bucket.
+ * </p>
+ *
+ * @param <T> type of bucket events
+ */
+@InterfaceStability.Unstable
+public class Bucket<T extends TimeEvent>
+{
+  public final long bucketKey;
+  protected Map<Object, List<T>> unwrittenEvents;
+
+  public Bucket()
+  {
+    bucketKey = -1L;
+  }
+
+  protected Bucket(long bucketKey)
+  {
+    this.bucketKey = bucketKey;
+  }
+
+  /**
+   * Add the given event into the unwritternEvents map
+   *
+   * @param eventKey event key
+   * @param event Given key
+   */
+  protected void addNewEvent(Object eventKey, T event)
+  {
+    if (unwrittenEvents == null) {
+      unwrittenEvents = Maps.newHashMap();
+    }
+    List<T> listEvents = unwrittenEvents.get(eventKey);
+    if (listEvents == null) {
+      unwrittenEvents.put(eventKey, Lists.newArrayList(event));
+    } else {
+      listEvents.add(event);
+    }
+  }
+
+  /**
+   * Return the unwritten events in the bucket
+   *
+   * @return the unwritten events
+   */
+  public Map<Object, List<T>> getEvents()
+  {
+    return unwrittenEvents;
+  }
+
+  /**
+   * Return the list of events for the given key
+   *
+   * @param key given key
+   * @return the list of events
+   */
+  public List<T> get(Object key)
+  {
+    return unwrittenEvents.get(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/InMemoryStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/InMemoryStore.java b/library/src/main/java/com/datatorrent/lib/join/InMemoryStore.java
new file mode 100644
index 0000000..7161faa
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/InMemoryStore.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+
+/**
+ * Wrapper class for TimeBased Store.
+ */
+@InterfaceStability.Unstable
+public class InMemoryStore extends TimeBasedStore<TimeEvent> implements JoinStore
+{
+  public InMemoryStore()
+  {
+  }
+
+  public InMemoryStore(long spanTimeInMillis, int bucketSpanInMillis)
+  {
+    super();
+    setSpanTimeInMillis(spanTimeInMillis);
+    setBucketSpanInMillis(bucketSpanInMillis);
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+
+  }
+
+  @Override
+  public void endWindow()
+  {
+
+  }
+
+  @Override
+  public List<TimeEvent> getUnMatchedTuples()
+  {
+    return super.getUnmatchedEvents();
+  }
+
+  @Override
+  public void isOuterJoin(boolean isOuter)
+  {
+    super.isOuterJoin(isOuter);
+  }
+
+  @Override
+  public List<TimeEvent> getValidTuples(Object tuple)
+  {
+    return super.getValidTuples((TimeEvent)tuple);
+  }
+
+  @Override
+  public boolean put(Object tuple)
+  {
+    return super.put((TimeEvent)tuple);
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(InMemoryStore.class);
+
+  @Override
+  public void setup(Context context)
+  {
+    super.setup();
+  }
+
+  @Override
+  public void teardown()
+  {
+    super.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/JoinStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/JoinStore.java b/library/src/main/java/com/datatorrent/lib/join/JoinStore.java
new file mode 100644
index 0000000..7b95acc
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/JoinStore.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Component;
+
+/**
+ * <p>
+ * Interface of store for join operation.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public interface JoinStore extends Component
+{
+  /**
+   * Generate the store
+   */
+
+  /**
+   * Perform the committed operation
+   * @param windowId
+   */
+  void committed(long windowId);
+
+  /**
+   * Save the state of store
+   * @param windowId
+   */
+  void checkpointed(long windowId);
+
+  /**
+   * Add the operations, any needed for store before begin the window
+   * @param windowId
+   */
+  void beginWindow(long windowId);
+
+  /**
+   *
+   */
+  void endWindow();
+
+  /**
+   * Get the key from the given tuple and with that key, get the tuples which satisfies the join constraint
+   * from the store.
+   *
+   * @param tuple Given tuple
+   * @return the valid tuples which statisfies the join constraint
+   */
+  List<?> getValidTuples(Object tuple);
+
+  /**
+   * Insert the given tuple
+   *
+   * @param tuple Given tuple
+   */
+  boolean put(Object tuple);
+
+  /**
+   * Return the unmatched events from store
+   *
+   * @return the unmatched events
+   */
+  List<?> getUnMatchedTuples();
+
+  /**
+   * Set if the join type is outer
+   *
+   * @param isOuter Specifies the join type is outer join or not
+   */
+  void isOuterJoin(boolean isOuter);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/MapJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/MapJoinOperator.java
new file mode 100644
index 0000000..3e23f73
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/MapJoinOperator.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This class takes a HashMap tuple as input from each of the input port. Operator joines the input tuples
+ * based on join constraint and emit the result.
+ *
+ * <br>
+ * <b>Ports : </b> <br>
+ * <b> input1 : </b> Input port for stream 1, expects HashMap&lt;String, Object&gt; <br>
+ * <b> input2 : </b> Input port for stream 2, expects HashMap&lt;String, Object&gt; <br>
+ * <b> outputPort: </b> Output port emits ArrayList&lt;HashMap&lt;String, Object&gt;&gt; <br>
+ * <br>
+ * <b>Example:</b>
+ * Input tuple from port1 is
+ * {timestamp = 5000, productId = 3, customerId = 108, regionId = 4, amount = $560 }
+ *
+ * Input tuple from port2 is
+ * { timestamp = 5500, productCategory = 8, productId=3 }
+ *
+ * <b>Properties: </b>
+ * <b>expiryTime</b>: 1000<br>
+ * <b>includeFieldStr</b>: timestamp, customerId, amount; productCategory, productId<br>
+ * <b>keyFields</b>: productId, productId<br>
+ * <b>timeFields</b>: timestamp, timestamp<br>
+ * <b>bucketSpanInMillis</b>: 500<br>
+ *
+ * <b>Output</b>
+ * { timestamp = 5000, customerId = 108, amount = $560, productCategory = 8, productId=3}
+ *
+ * @displayName MapJoin Operator
+ * @category join
+ * @tags join
+ */
+@InterfaceStability.Unstable
+public class MapJoinOperator extends AbstractJoinOperator<Map<String, Object>>
+{
+  @Override
+  protected Map<String, Object> createOutputTuple()
+  {
+    return new HashMap<String, Object>();
+  }
+
+  @Override
+  protected void copyValue(Map<String, Object> output, Object extractTuple, boolean isLeft)
+  {
+    String[] fields;
+    if (isLeft) {
+      fields = leftStore.getIncludeFields();
+    } else {
+      fields = rightStore.getIncludeFields();
+    }
+    for (int i = 0; i < fields.length; i++) {
+      Object value = null;
+      if (extractTuple != null) {
+        value = ((Map<String, Object>)extractTuple).get(fields[i]);
+      }
+      output.put(fields[i], value);
+    }
+  }
+
+  public Object getKeyValue(String keyField, Object tuple)
+  {
+    Map<String, Object> o = (Map<String, Object>)tuple;
+    return o.get(keyField);
+  }
+
+  @Override
+  protected Object getTime(String field, Object tuple)
+  {
+    if (getTimeFieldStr() != null) {
+      Map<String, Object> o = (Map<String, Object>)tuple;
+      return o.get(field);
+    }
+    return Calendar.getInstance().getTimeInMillis();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/POJOJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/POJOJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/POJOJoinOperator.java
new file mode 100644
index 0000000..34101ff
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/POJOJoinOperator.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.Calendar;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * This class takes a POJO as input from each of the input port. Operator joines the input tuples
+ * based on join constraint and emit the result.
+ *
+ * <br>
+ * <b>Ports : </b> <br>
+ * <b> input1 : </b> Input port for stream 1, expects POJO <br>
+ * <b> input2 : </b> Input port for stream 2, expects POJO <br>
+ * <b> outputPort: </b> Output port emits POJO <br>
+ * <br>
+ * <b>Example:</b>
+ * Input tuple from port1 is
+ * {timestamp = 5000, productId = 3, customerId = 108, regionId = 4, amount = $560 }
+ *
+ * Input tuple from port2 is
+ * { timestamp = 5500, productCategory = 8, productId=3 }
+ *
+ * <b>Properties: </b>
+ * <b>expiryTime</b>: 1000<br>
+ * <b>includeFieldStr</b>: timestamp, customerId, amount; productCategory, productId<br>
+ * <b>keyFields</b>: productId, productId<br>
+ * <b>timeFields</b>: timestamp, timestamp<br>
+ * <b>bucketSpanInMillis</b>: 500<br>
+ *
+ * <b>Output</b>
+ * { timestamp = 5000, customerId = 108, amount = $560, productCategory = 8, productId=3}
+ *
+ * @displayName BeanJoin Operator
+ * @category join
+ * @tags join
+ */
+@InterfaceStability.Unstable
+public class POJOJoinOperator extends AbstractJoinOperator
+{
+  protected Class outputClass;
+  protected transient Class leftClass;
+  protected transient Class rightClass;
+  private String outputClassStr;
+  private transient List<FieldObjectMap>[] fieldMap = (List<FieldObjectMap>[])Array.newInstance(
+      (new LinkedList<FieldObjectMap>()).getClass(), 2);
+  private transient PojoUtils.Getter[] keyGetters = (PojoUtils.Getter[])Array.newInstance(PojoUtils.Getter.class, 2);
+  private transient PojoUtils.Getter[] timeGetters = (PojoUtils.Getter[])Array.newInstance(PojoUtils.Getter.class, 2);
+
+  // Populate the getters from the input tuple
+  @Override
+  protected void processTuple(Object tuple)
+  {
+    setAndPopulateGetters(tuple, isLeft);
+    super.processTuple(tuple);
+  }
+
+  /**
+   * Populate the class and getters from the given tuple
+   *
+   * @param tuple Given tuple
+   * @param isLeft Whether the given tuple belongs to left stream or not
+   */
+  private void setAndPopulateGetters(Object tuple, boolean isLeft)
+  {
+    if (isLeft && leftClass == null) {
+      leftClass = tuple.getClass();
+      populateGettersFromInput(isLeft);
+    }
+    if (!isLeft && rightClass == null) {
+      rightClass = tuple.getClass();
+      populateGettersFromInput(isLeft);
+    }
+  }
+
+  /**
+   * Populate the getters from the input class
+   *
+   * @param isLeft isLeft specifies whether the class is left or right
+   */
+  private void populateGettersFromInput(boolean isLeft)
+  {
+    Class inputClass;
+    int idx;
+    StoreContext store;
+    if (isLeft) {
+      idx = 0;
+      inputClass = leftClass;
+      store = leftStore;
+    } else {
+      idx = 1;
+      inputClass = rightClass;
+      store = rightStore;
+    }
+    String key = store.getKeys();
+    String timeField = store.getTimeFields();
+    String[] fields = store.getIncludeFields();
+
+    // Create getter for the key field
+    try {
+      Class c = ClassUtils.primitiveToWrapper(inputClass.getField(key).getType());
+      keyGetters[idx] = PojoUtils.createGetter(inputClass, key, c);
+    } catch (NoSuchFieldException e) {
+      throw new RuntimeException(e);
+    }
+
+    // Create getter for time field
+    if (timeField != null) {
+      try {
+        Class c = ClassUtils.primitiveToWrapper(inputClass.getField(timeField).getType());
+        timeGetters[idx] = PojoUtils.createGetter(inputClass, timeField, c);
+      } catch (NoSuchFieldException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    fieldMap[idx] = new LinkedList<FieldObjectMap>();
+    List<FieldObjectMap> fieldsMap = fieldMap[idx];
+    // Create getters for the include fields
+    for (String f : fields) {
+      try {
+        Field field = inputClass.getField(f);
+        Class c;
+        if (field.getType().isPrimitive()) {
+          c = ClassUtils.primitiveToWrapper(field.getType());
+        } else {
+          c = field.getType();
+        }
+        FieldObjectMap fm = new FieldObjectMap();
+        fm.get = PojoUtils.createGetter(inputClass, f, c);
+        fm.set = PojoUtils.createSetter(outputClass, f, c);
+        fieldsMap.add(fm);
+      } catch (Throwable e) {
+        throw new RuntimeException("Failed to populate gettter for field: " + f, e);
+      }
+    }
+  }
+
+  /**
+   * Create the output class object
+   *
+   * @return the new output object
+   */
+  @Override
+  protected Object createOutputTuple()
+  {
+    try {
+      return outputClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Copy the field values of extractTuple to output object
+   *
+   * @param output
+   * @param extractTuple
+   * @param isLeft
+   */
+  @Override
+  protected void copyValue(Object output, Object extractTuple, boolean isLeft)
+  {
+    if (extractTuple == null) {
+      return;
+    }
+
+    setAndPopulateGetters(extractTuple, isLeft);
+
+    List<FieldObjectMap> fieldsMap;
+    if (isLeft) {
+      fieldsMap = fieldMap[0];
+    } else {
+      fieldsMap = fieldMap[1];
+    }
+
+    for (FieldObjectMap map : fieldsMap) {
+      map.set.set(output, map.get.get(extractTuple));
+    }
+  }
+
+  /**
+   * Return the keyField value of tuple object
+   *
+   * @param keyField
+   * @param tuple
+   * @return the tuple value for the given keyfield
+   */
+  public Object getKeyValue(String keyField, Object tuple)
+  {
+    if (isLeft) {
+      return keyGetters[0].get(tuple);
+    }
+    return keyGetters[1].get(tuple);
+  }
+
+  @Override
+  protected Object getTime(String field, Object tuple)
+  {
+    if (getTimeFieldStr() != null) {
+      if (isLeft) {
+        return timeGetters[0].get(tuple);
+      }
+      return timeGetters[1].get(tuple);
+    }
+    return Calendar.getInstance().getTimeInMillis();
+  }
+
+  public void populateOutputClass()
+  {
+    try {
+      this.outputClass = this.getClass().getClassLoader().loadClass(outputClassStr);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public String getOutputClass()
+  {
+    return outputClassStr;
+  }
+
+  /**
+   * Load the output class
+   *
+   * @param outputClassStr
+   */
+  public void setOutputClass(String outputClassStr)
+  {
+    this.outputClassStr = outputClassStr;
+    populateOutputClass();
+  }
+
+  private class FieldObjectMap
+  {
+    public PojoUtils.Getter get;
+    public PojoUtils.Setter set;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java b/library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java
new file mode 100644
index 0000000..88597f3
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Base implementation of time based store for key-value pair tuples.
+ *
+ * @param <T>
+ */
+@InterfaceStability.Unstable
+public class TimeBasedStore<T extends TimeEvent>
+{
+  private static final Logger logger = LoggerFactory.getLogger(TimeBasedStore.class);
+
+  private final transient Lock lock;
+  @Min(1)
+  protected int noOfBuckets;
+  protected Bucket<T>[] buckets;
+  @Min(1)
+  protected long expiryTimeInMillis;
+  @Min(1)
+  protected long spanTimeInMillis;
+  protected int bucketSpanInMillis;
+  protected long startOfBucketsInMillis;
+  protected long endOBucketsInMillis;
+  protected transient Map<Long, Bucket> dirtyBuckets = new HashMap<Long, Bucket>();
+  private boolean isOuter = false;
+  private List<T> unmatchedEvents = new ArrayList<T>();
+  private Map<Object, Set<Long>> key2Buckets = new ConcurrentHashMap<Object, Set<Long>>();
+  private transient Timer bucketSlidingTimer;
+
+  public TimeBasedStore()
+  {
+    lock = new Lock();
+  }
+
+  /**
+   * Compute the number of buckets based on spantime and bucketSpanInMillis
+   */
+  private void recomputeNumBuckets()
+  {
+    Calendar calendar = Calendar.getInstance();
+    long now = calendar.getTimeInMillis();
+    startOfBucketsInMillis = now - spanTimeInMillis;
+    expiryTimeInMillis = startOfBucketsInMillis;
+    endOBucketsInMillis = now;
+    noOfBuckets = (int)Math.ceil((now - startOfBucketsInMillis) / (bucketSpanInMillis * 1.0));
+    buckets = (Bucket<T>[])Array.newInstance(Bucket.class, noOfBuckets);
+  }
+
+  /**
+   * Compute the buckets and start the service
+   */
+  public void setup()
+  {
+    setBucketSpanInMillis((int)(spanTimeInMillis > (long)bucketSpanInMillis ? bucketSpanInMillis : spanTimeInMillis));
+    if (buckets == null) {
+      recomputeNumBuckets();
+    }
+    startService();
+  }
+
+  /**
+   * Return the tuples which satisfies the join constraint
+   *
+   * @param tuple
+   * @return the list of events
+   */
+  public List<TimeEvent> getValidTuples(T tuple)
+  {
+    // Get the key from the given tuple
+    Object key = tuple.getEventKey();
+    // Get the buckets where the key is present
+    Set<Long> keyBuckets = key2Buckets.get(key);
+    if (keyBuckets == null) {
+      return null;
+    }
+    List<TimeEvent> validTuples = new ArrayList<TimeEvent>();
+    for (Long idx : keyBuckets) {
+      int bucketIdx = (int)(idx % noOfBuckets);
+      Bucket tb = buckets[bucketIdx];
+      if (tb == null || tb.bucketKey != idx) {
+        continue;
+      }
+      List<T> events = tb.get(key);
+      if (events != null) {
+        validTuples.addAll(events);
+      }
+    }
+    return validTuples;
+  }
+
+  /**
+   * Insert the given tuple into the bucket
+   *
+   * @param tuple
+   */
+  public boolean put(T tuple)
+  {
+    long bucketKey = getBucketKeyFor(tuple);
+    if (bucketKey < 0) {
+      return false;
+    }
+    newEvent(bucketKey, tuple);
+    return true;
+  }
+
+  /**
+   * Calculates the bucket key for the given event
+   *
+   * @param event
+   * @return the bucket key
+   */
+  public long getBucketKeyFor(T event)
+  {
+    long eventTime = event.getTime();
+    // Negative indicates the invalid events
+    if (eventTime < expiryTimeInMillis) {
+      return -1;
+    }
+    long diffFromStart = eventTime - startOfBucketsInMillis;
+    long key = diffFromStart / bucketSpanInMillis;
+    synchronized (lock) {
+      if (eventTime > endOBucketsInMillis) {
+        long move = ((eventTime - endOBucketsInMillis) / bucketSpanInMillis + 1) * bucketSpanInMillis;
+        expiryTimeInMillis += move;
+        endOBucketsInMillis += move;
+      }
+    }
+    return key;
+  }
+
+  /**
+   * Insert the event into the specified bucketKey
+   *
+   * @param bucketKey
+   * @param event
+   */
+  public void newEvent(long bucketKey, T event)
+  {
+    int bucketIdx = (int)(bucketKey % noOfBuckets);
+
+    Bucket<T> bucket = buckets[bucketIdx];
+
+    if (bucket == null || bucket.bucketKey != bucketKey) {
+      // If the bucket is already present then the bucket is expirable
+      if (bucket != null) {
+        dirtyBuckets.put(bucket.bucketKey, bucket);
+      }
+      bucket = createBucket(bucketKey);
+      buckets[bucketIdx] = bucket;
+    }
+
+    // Insert the key into the key2Buckets map
+    Object key = event.getEventKey();
+    Set<Long> keyBuckets = key2Buckets.get(key);
+    if (keyBuckets == null) {
+      keyBuckets = new HashSet<Long>();
+      keyBuckets.add(bucketKey);
+      key2Buckets.put(key, keyBuckets);
+    } else {
+      keyBuckets.add(bucketKey);
+    }
+    bucket.addNewEvent(key, event);
+  }
+
+  /**
+   * Delete the expired buckets at every bucketSpanInMillis periodically
+   */
+  public void startService()
+  {
+    bucketSlidingTimer = new Timer();
+    endOBucketsInMillis = expiryTimeInMillis + (noOfBuckets * bucketSpanInMillis);
+    logger.debug("bucket properties {}, {}", spanTimeInMillis, bucketSpanInMillis);
+    logger.debug("bucket time params: start {}, end {}", startOfBucketsInMillis, endOBucketsInMillis);
+
+    bucketSlidingTimer.scheduleAtFixedRate(new TimerTask()
+    {
+      @Override
+      public void run()
+      {
+        long time = 0;
+        synchronized (lock) {
+          time = (expiryTimeInMillis += bucketSpanInMillis);
+          endOBucketsInMillis += bucketSpanInMillis;
+        }
+        deleteExpiredBuckets(time);
+      }
+    }, bucketSpanInMillis, bucketSpanInMillis);
+  }
+
+  /**
+   * Remove the expired buckets.
+   *
+   * @param time
+   */
+  void deleteExpiredBuckets(long time)
+  {
+    Iterator<Long> iterator = dirtyBuckets.keySet().iterator();
+    for (; iterator.hasNext(); ) {
+      long key = iterator.next();
+      Bucket t = dirtyBuckets.get(key);
+      if (startOfBucketsInMillis + (t.bucketKey * bucketSpanInMillis) < time) {
+        deleteBucket(t);
+        iterator.remove();
+      }
+    }
+  }
+
+  /**
+   * Return the unmatched events which are present in the expired buckets
+   *
+   * @return the list of unmatched events
+   */
+  public List<T> getUnmatchedEvents()
+  {
+    List<T> copyEvents = new ArrayList<T>(unmatchedEvents);
+    unmatchedEvents.clear();
+    return copyEvents;
+  }
+
+  /**
+   * Delete the given bucket
+   *
+   * @param bucket
+   */
+  private void deleteBucket(Bucket bucket)
+  {
+    if (bucket == null) {
+      return;
+    }
+    Map<Object, List<T>> writtens = bucket.getEvents();
+    if (writtens == null) {
+      return;
+    }
+
+    for (Map.Entry<Object, List<T>> e : writtens.entrySet()) {
+      // Check the events which are unmatched and add those into the unmatchedEvents list
+      if (isOuter) {
+        for (T event : e.getValue()) {
+          if (!event.isMatch()) {
+            unmatchedEvents.add(event);
+          }
+        }
+      }
+      key2Buckets.get(e.getKey()).remove(bucket.bucketKey);
+      if (key2Buckets.get(e.getKey()).size() == 0) {
+        key2Buckets.remove(e.getKey());
+      }
+    }
+  }
+
+  /**
+   * Create the bucket with the given key
+   *
+   * @param bucketKey
+   * @return the bucket for the given key
+   */
+  protected Bucket<T> createBucket(long bucketKey)
+  {
+    return new Bucket<T>(bucketKey);
+  }
+
+  public void shutdown()
+  {
+    bucketSlidingTimer.cancel();
+  }
+
+  public void isOuterJoin(boolean isOuter)
+  {
+    this.isOuter = isOuter;
+  }
+
+  public long getSpanTimeInMillis()
+  {
+    return spanTimeInMillis;
+  }
+
+  public void setSpanTimeInMillis(long spanTimeInMillis)
+  {
+    this.spanTimeInMillis = spanTimeInMillis;
+  }
+
+  public int getBucketSpanInMillis()
+  {
+    return bucketSpanInMillis;
+  }
+
+  public void setBucketSpanInMillis(int bucketSpanInMillis)
+  {
+    this.bucketSpanInMillis = bucketSpanInMillis;
+  }
+
+  private static class Lock
+  {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/TimeEvent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/TimeEvent.java b/library/src/main/java/com/datatorrent/lib/join/TimeEvent.java
new file mode 100644
index 0000000..d1259d2
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/TimeEvent.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceStability.Unstable
+public interface TimeEvent
+{
+  /**
+   * Returns the time of the event
+   *
+   * @return the time of event
+   */
+  long getTime();
+
+  /**
+   * Returns the key of the event
+   *
+   * @return the key
+   */
+  Object getEventKey();
+
+  Object getValue();
+
+  /**
+   * Returns whether the event has matched tuples or not
+   *
+   * @return whether the event has matched tuples or not
+   */
+  boolean isMatch();
+
+  void setMatch(boolean match);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/TimeEventImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/join/TimeEventImpl.java b/library/src/main/java/com/datatorrent/lib/join/TimeEventImpl.java
new file mode 100644
index 0000000..bafab29
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/join/TimeEventImpl.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import javax.annotation.Nonnull;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Time event Implementation.
+ */
+@InterfaceStability.Unstable
+public class TimeEventImpl implements TimeEvent, Comparable<TimeEventImpl>
+{
+  protected Object key;
+  protected long time;
+  protected Object tuple;
+  protected boolean match;
+
+  @SuppressWarnings("unused")
+  public TimeEventImpl()
+  {
+  }
+
+  public TimeEventImpl(Object key, long time, Object tuple)
+  {
+    this.key = key;
+    this.time = time;
+    this.tuple = tuple;
+    this.match = false;
+  }
+
+  @Override
+  public long getTime()
+  {
+    return time;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof TimeEventImpl)) {
+      return false;
+    }
+
+    TimeEventImpl that = (TimeEventImpl)o;
+
+    return time == that.time && !(key != null ? !key.equals(that.key) : that.key != null) &&
+        !(tuple != null ? !tuple.equals(that.tuple) : that.tuple != null);
+
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int result = key != null ? key.hashCode() : 0;
+    result = 31 * result + (int)(time ^ (time >>> 32));
+    return result;
+  }
+
+  @Override
+  public Object getEventKey()
+  {
+    return key;
+  }
+
+  @Override
+  public int compareTo(@Nonnull TimeEventImpl dummyEvent)
+  {
+    if (key.equals(dummyEvent.key)) {
+      return 0;
+    }
+    return -1;
+  }
+
+  public Object getValue()
+  {
+    return tuple;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "TimeEvent{" +
+      "key=" + key +
+      ", time=" + time +
+      ", tuple=" + tuple +
+      ", match=" + match +
+      '}';
+  }
+
+  public boolean isMatch()
+  {
+    return match;
+  }
+
+  public void setMatch(boolean match)
+  {
+    this.match = match;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/test/java/com/datatorrent/lib/join/MapTimeBasedJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/join/MapTimeBasedJoinOperator.java b/library/src/test/java/com/datatorrent/lib/join/MapTimeBasedJoinOperator.java
new file mode 100644
index 0000000..391b37d
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/join/MapTimeBasedJoinOperator.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class MapTimeBasedJoinOperator
+{
+  @Rule
+  public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo();
+  private static Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+  public static final Context.OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+  @Test
+  public void testJoinOperator() throws IOException, InterruptedException
+  {
+
+    AbstractJoinOperator oper = new MapJoinOperator();
+    oper.setLeftStore(new InMemoryStore(200, 200));
+    oper.setRightStore(new InMemoryStore(200, 200));
+    oper.setIncludeFields("ID,Name;OID,Amount");
+    oper.setKeyFields("ID,CID");
+
+    oper.setup(context);
+
+    CollectorTestSink<List<Map<String, Object>>> sink = new CollectorTestSink<List<Map<String, Object>>>();
+    @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+    oper.outputPort.setSink(tmp);
+
+    oper.beginWindow(0);
+    Map<String, Object> tuple1 = Maps.newHashMap();
+    tuple1.put("ID", 1);
+    tuple1.put("Name", "Anil");
+
+    oper.input1.process(tuple1);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Map<String, Object> order1 = Maps.newHashMap();
+    order1.put("OID", 102);
+    order1.put("CID", 1);
+    order1.put("Amount", 300);
+
+    oper.input2.process(order1);
+
+    Map<String, Object> order2 = Maps.newHashMap();
+    order2.put("OID", 103);
+    order2.put("CID", 3);
+    order2.put("Amount", 300);
+
+    oper.input2.process(order2);
+    latch.await(200, TimeUnit.MILLISECONDS);
+    oper.endWindow();
+
+    oper.beginWindow(1);
+    Map<String, Object> tuple2 = Maps.newHashMap();
+    tuple2.put("ID", 4);
+    tuple2.put("Name", "DT");
+    oper.input1.process(tuple2);
+
+    Map<String, Object> order3 = Maps.newHashMap();
+    order3.put("OID", 104);
+    order3.put("CID", 1);
+    order3.put("Amount", 300);
+
+    oper.input2.process(order2);
+
+    latch.await(200, TimeUnit.MILLISECONDS);
+
+    oper.endWindow();
+
+    /* Number of tuple, emitted */
+    Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
+    List<Map<String, Object>> emittedList = sink.collectedTuples.iterator().next();
+    Assert.assertEquals("Size of Joined Tuple ", 1, emittedList.size());
+    Map<String, Object> emitted = emittedList.get(0);
+
+    /* The fields present in original event is kept as it is */
+    Assert.assertEquals("Number of fields in emitted tuple", 4, emitted.size());
+    Assert.assertEquals("value of ID :", tuple1.get("ID"), emitted.get("ID"));
+    Assert.assertEquals("value of Name :", tuple1.get("Name"), emitted.get("Name"));
+
+    Assert.assertEquals("value of OID: ", order1.get("OID"), emitted.get("OID"));
+    Assert.assertEquals("value of Amount: ", order1.get("Amount"), emitted.get("Amount"));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/test/java/com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest.java
new file mode 100644
index 0000000..6914da0
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class POJOTimeBasedJoinOperatorTest
+{
+
+  @Rule
+  public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo();
+
+  public class Customer
+  {
+    public int ID;
+    public String Name;
+
+    public Customer()
+    {
+
+    }
+
+    public Customer(int ID, String name)
+    {
+      this.ID = ID;
+      Name = name;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Customer{" +
+        "ID=" + ID +
+        ", Name='" + Name + '\'' +
+        '}';
+    }
+  }
+
+  public class Order
+  {
+    public int OID;
+    public int CID;
+    public int Amount;
+
+    public Order()
+    {
+    }
+
+    public Order(int OID, int CID, int amount)
+    {
+      this.OID = OID;
+      this.CID = CID;
+      Amount = amount;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Order{" +
+        "OID=" + OID +
+        ", CID=" + CID +
+        ", Amount=" + Amount +
+        '}';
+    }
+  }
+
+  public static class CustOrder
+  {
+    public int ID;
+    public String Name;
+    public int OID;
+    public int Amount;
+
+    public CustOrder()
+    {
+    }
+
+    @Override
+    public String toString()
+    {
+      return "{" +
+        "ID=" + ID +
+        ", Name='" + Name + '\'' +
+        ", OID=" + OID +
+        ", Amount=" + Amount +
+        '}';
+    }
+  }
+
+  @Test
+  public void testInnerJoinOperator() throws IOException, InterruptedException
+  {
+    Kryo kryo = new Kryo();
+    POJOJoinOperator oper = new POJOJoinOperator();
+    JoinStore store = new InMemoryStore(200, 200);
+    oper.setLeftStore(kryo.copy(store));
+    oper.setRightStore(kryo.copy(store));
+    oper.setIncludeFields("ID,Name;OID,Amount");
+    oper.setKeyFields("ID,CID");
+    oper.outputClass = CustOrder.class;
+
+    oper.setup(MapTimeBasedJoinOperator.context);
+
+    CollectorTestSink<List<CustOrder>> sink = new CollectorTestSink<List<CustOrder>>();
+    @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+    oper.outputPort.setSink(tmp);
+
+    oper.beginWindow(0);
+
+    Customer tuple = new Customer(1, "Anil");
+
+    oper.input1.process(tuple);
+
+    CountDownLatch latch = new CountDownLatch(1);
+
+    Order order = new Order(102, 1, 300);
+
+    oper.input2.process(order);
+
+    Order order2 = new Order(103, 3, 300);
+    oper.input2.process(order2);
+
+    Order order3 = new Order(104, 7, 300);
+    oper.input2.process(order3);
+
+    latch.await(3000, TimeUnit.MILLISECONDS);
+
+    oper.endWindow();
+
+    /* Number of tuple, emitted */
+    Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
+    List<CustOrder> emittedList = sink.collectedTuples.iterator().next();
+    CustOrder emitted = emittedList.get(0);
+
+    Assert.assertEquals("value of ID :", tuple.ID, emitted.ID);
+    Assert.assertEquals("value of Name :", tuple.Name, emitted.Name);
+
+    Assert.assertEquals("value of OID: ", order.OID, emitted.OID);
+    Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount);
+
+  }
+
+  @Test
+  public void testLeftOuterJoinOperator() throws IOException, InterruptedException
+  {
+    Kryo kryo = new Kryo();
+    POJOJoinOperator oper = new POJOJoinOperator();
+    JoinStore store = new InMemoryStore(200, 200);
+    oper.setLeftStore(kryo.copy(store));
+    oper.setRightStore(kryo.copy(store));
+    oper.setIncludeFields("ID,Name;OID,Amount");
+    oper.setKeyFields("ID,CID");
+    oper.outputClass = CustOrder.class;
+    oper.setStrategy("left_outer_join");
+
+    oper.setup(MapTimeBasedJoinOperator.context);
+
+    CollectorTestSink<List<CustOrder>> sink = new CollectorTestSink<List<CustOrder>>();
+    @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+    oper.outputPort.setSink(tmp);
+
+    oper.beginWindow(0);
+
+    Customer tuple1 = new Customer(1, "Anil");
+
+    oper.input1.process(tuple1);
+
+    CountDownLatch latch = new CountDownLatch(1);
+
+    Order order = new Order(102, 3, 300);
+
+    oper.input2.process(order);
+
+    Order order2 = new Order(103, 7, 300);
+    oper.input2.process(order2);
+
+    oper.endWindow();
+
+    latch.await(500, TimeUnit.MILLISECONDS);
+
+    oper.beginWindow(1);
+    Order order3 = new Order(104, 5, 300);
+    oper.input2.process(order3);
+
+    Customer tuple2 = new Customer(5, "DT");
+
+    oper.input1.process(tuple2);
+
+    latch.await(500, TimeUnit.MILLISECONDS);
+
+    oper.endWindow();
+    latch.await(500, TimeUnit.MILLISECONDS);
+    oper.beginWindow(2);
+    oper.endWindow();
+    latch.await(5000, TimeUnit.MILLISECONDS);
+
+
+    /* Number of tuple, emitted */
+    Assert.assertEquals("Number of tuple emitted ", 2, sink.collectedTuples.size());
+    Iterator<List<CustOrder>> ite = sink.collectedTuples.iterator();
+    List<CustOrder> emittedList = ite.next();
+    CustOrder emitted = emittedList.get(0);
+
+    Assert.assertEquals("value of ID :", tuple2.ID, emitted.ID);
+    Assert.assertEquals("value of Name :", tuple2.Name, emitted.Name);
+
+    Assert.assertEquals("value of OID: ", order3.OID, emitted.OID);
+    Assert.assertEquals("value of Amount: ", order3.Amount, emitted.Amount);
+
+    emittedList = ite.next();
+    emitted = emittedList.get(0);
+    Assert.assertEquals("Joined Tuple ", "{ID=1, Name='Anil', OID=0, Amount=0}", emitted.toString());
+  }
+
+  @Test
+  public void testRightOuterJoinOperator() throws IOException, InterruptedException
+  {
+    Kryo kryo = new Kryo();
+    POJOJoinOperator oper = new POJOJoinOperator();
+    JoinStore store = new InMemoryStore(200, 200);
+    oper.setLeftStore(kryo.copy(store));
+    oper.setRightStore(kryo.copy(store));
+    oper.setIncludeFields("ID,Name;OID,Amount");
+    oper.setKeyFields("ID,CID");
+    oper.outputClass = CustOrder.class;
+    oper.setStrategy("right_outer_join");
+
+    oper.setup(MapTimeBasedJoinOperator.context);
+
+    CollectorTestSink<List<CustOrder>> sink = new CollectorTestSink<List<CustOrder>>();
+    @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+    oper.outputPort.setSink(tmp);
+
+    oper.beginWindow(0);
+
+    Customer tuple1 = new Customer(1, "Anil");
+
+    oper.input1.process(tuple1);
+
+    CountDownLatch latch = new CountDownLatch(1);
+
+    Order order = new Order(102, 3, 300);
+
+    oper.input2.process(order);
+
+    Order order2 = new Order(103, 7, 300);
+    oper.input2.process(order2);
+    oper.endWindow();
+
+    latch.await(500, TimeUnit.MILLISECONDS);
+
+    oper.beginWindow(1);
+    Order order3 = new Order(104, 5, 300);
+    oper.input2.process(order3);
+
+    Customer tuple2 = new Customer(5, "DT");
+    oper.input1.process(tuple2);
+
+    latch.await(500, TimeUnit.MILLISECONDS);
+
+    oper.endWindow();
+    latch.await(500, TimeUnit.MILLISECONDS);
+    oper.beginWindow(2);
+    oper.endWindow();
+    latch.await(5000, TimeUnit.MILLISECONDS);
+
+    /* Number of tuple, emitted */
+    Assert.assertEquals("Number of tuple emitted ", 2, sink.collectedTuples.size());
+    Iterator<List<CustOrder>> ite = sink.collectedTuples.iterator();
+    List<CustOrder> emittedList = ite.next();
+    CustOrder emitted = emittedList.get(0);
+
+    Assert.assertEquals("value of ID :", tuple2.ID, emitted.ID);
+    Assert.assertEquals("value of Name :", tuple2.Name, emitted.Name);
+
+    Assert.assertEquals("value of OID: ", order3.OID, emitted.OID);
+    Assert.assertEquals("value of Amount: ", order3.Amount, emitted.Amount);
+
+    emittedList = ite.next();
+    Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=102, Amount=300}", emittedList.get(0).toString());
+    Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=103, Amount=300}", emittedList.get(1).toString());
+  }
+
+  @Test
+  public void testFullOuterJoinOperator() throws IOException, InterruptedException
+  {
+    Kryo kryo = new Kryo();
+    POJOJoinOperator oper = new POJOJoinOperator();
+    JoinStore store = new InMemoryStore(200, 200);
+    oper.setLeftStore(kryo.copy(store));
+    oper.setRightStore(kryo.copy(store));
+    oper.setIncludeFields("ID,Name;OID,Amount");
+    oper.setKeyFields("ID,CID");
+    oper.outputClass = CustOrder.class;
+    oper.setStrategy("outer_join");
+
+    oper.setup(MapTimeBasedJoinOperator.context);
+
+    CollectorTestSink<List<CustOrder>> sink = new CollectorTestSink<List<CustOrder>>();
+    @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+    oper.outputPort.setSink(tmp);
+
+    oper.beginWindow(0);
+
+    Customer tuple1 = new Customer(1, "Anil");
+
+    oper.input1.process(tuple1);
+
+    CountDownLatch latch = new CountDownLatch(1);
+
+    Order order = new Order(102, 3, 300);
+
+    oper.input2.process(order);
+
+    Order order2 = new Order(103, 7, 300);
+    oper.input2.process(order2);
+    oper.endWindow();
+
+    latch.await(500, TimeUnit.MILLISECONDS);
+
+    oper.beginWindow(1);
+    Order order3 = new Order(104, 5, 300);
+    oper.input2.process(order3);
+
+    Customer tuple2 = new Customer(5, "DT");
+    oper.input1.process(tuple2);
+
+    latch.await(500, TimeUnit.MILLISECONDS);
+
+    oper.endWindow();
+    latch.await(500, TimeUnit.MILLISECONDS);
+    oper.beginWindow(2);
+    oper.endWindow();
+    latch.await(5000, TimeUnit.MILLISECONDS);
+
+    /* Number of tuple, emitted */
+    Assert.assertEquals("Number of tuple emitted ", 3, sink.collectedTuples.size());
+    Iterator<List<CustOrder>> ite = sink.collectedTuples.iterator();
+    List<CustOrder> emittedList = ite.next();
+    CustOrder emitted = emittedList.get(0);
+
+    Assert.assertEquals("value of ID :", tuple2.ID, emitted.ID);
+    Assert.assertEquals("value of Name :", tuple2.Name, emitted.Name);
+
+    Assert.assertEquals("value of OID: ", order3.OID, emitted.OID);
+    Assert.assertEquals("value of Amount: ", order3.Amount, emitted.Amount);
+
+    emittedList = ite.next();
+    Assert.assertEquals("Joined Tuple ", "{ID=1, Name='Anil', OID=0, Amount=0}", emittedList.get(0).toString());
+
+    emittedList = ite.next();
+    Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=102, Amount=300}", emittedList.get(0).toString());
+    Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=103, Amount=300}", emittedList.get(1).toString());
+  }
+
+}


Mime
View raw message