Return-Path: X-Original-To: apmail-apex-commits-archive@minotaur.apache.org Delivered-To: apmail-apex-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8802E187BA for ; Fri, 26 Feb 2016 23:30:45 +0000 (UTC) Received: (qmail 88625 invoked by uid 500); 26 Feb 2016 23:30:45 -0000 Delivered-To: apmail-apex-commits-archive@apex.apache.org Received: (qmail 88588 invoked by uid 500); 26 Feb 2016 23:30:45 -0000 Mailing-List: contact commits-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list commits@apex.incubator.apache.org Received: (qmail 88579 invoked by uid 99); 26 Feb 2016 23:30:45 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Feb 2016 23:30:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id EF475180492 for ; Fri, 26 Feb 2016 23:30:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.548 X-Spam-Level: X-Spam-Status: No, score=-3.548 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.329, URIBL_RED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id LzCWz1IJOcbb for ; Fri, 26 Feb 2016 23:30:33 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 5562A5FAED for ; Fri, 26 Feb 2016 23:30:30 +0000 (UTC) Received: (qmail 88481 invoked by uid 99); 26 Feb 2016 23:30:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Feb 2016 23:30:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4E929E0211; Fri, 26 Feb 2016 23:30:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: csingh@apache.org To: commits@apex.incubator.apache.org Date: Fri, 26 Feb 2016 23:30:29 -0000 Message-Id: <2439beb25b7e41b3b7b7d621834515d2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-apex-malhar git commit: APEXMALHAR-1720 Implemented Inmemory Join Operator. Supported Inner, LeftOuter, RightOuter and FullOuter join types Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 8331f56da -> fd2f42bd9 APEXMALHAR-1720 Implemented Inmemory Join Operator. Supported Inner, LeftOuter, RightOuter and FullOuter join types Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/fc547d87 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/fc547d87 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/fc547d87 Branch: refs/heads/devel-3 Commit: fc547d871da868c73ed5554f6a26708916202bae Parents: b7199b7 Author: Chaitanya Authored: Thu Jan 7 22:19:06 2016 +0530 Committer: Chaitanya Committed: Thu Jan 7 22:19:06 2016 +0530 ---------------------------------------------------------------------- .../lib/join/AbstractJoinOperator.java | 435 +++++++++++++++++++ .../java/com/datatorrent/lib/join/Bucket.java | 91 ++++ .../com/datatorrent/lib/join/InMemoryStore.java | 107 +++++ .../com/datatorrent/lib/join/JoinStore.java | 91 ++++ .../datatorrent/lib/join/MapJoinOperator.java | 100 +++++ .../datatorrent/lib/join/POJOJoinOperator.java | 266 ++++++++++++ .../datatorrent/lib/join/TimeBasedStore.java | 333 ++++++++++++++ .../com/datatorrent/lib/join/TimeEvent.java | 50 +++ .../com/datatorrent/lib/join/TimeEventImpl.java | 121 ++++++ .../lib/join/MapTimeBasedJoinOperator.java | 118 +++++ .../lib/join/POJOTimeBasedJoinOperatorTest.java | 385 ++++++++++++++++ 11 files changed, 2097 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java new file mode 100644 index 0000000..a3f43b5 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java @@ -0,0 +1,435 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.hadoop.classification.InterfaceStability; +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + *

+ * This is the base implementation of join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Subclasses should provide implementation to createOutputTuple,copyValue, getKeyValue, getTime methods. + * + * Properties:
+ * expiryTime: Expiry time for stored tuples
+ * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2
+ * bucketSpanInMillis: Span of each bucket in milliseconds.
+ * strategy: Type of join operation. Default type is inner join
+ *
+ * + * Example:
+ * Left input port receives customer details and right input port receives Order details. + * Schema for the Customer be in the form of + * Schema for the Order be in the form of + * Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is + * matched tuples must have timestamp within 5 minutes. + * Here, key Fields = ID, CID and Time Fields = RTime, OTime, expiryTime = 5 minutes
+ * + * + * @displayName Abstract Join Operator + * @tags join + */ +@InterfaceStability.Unstable +public abstract class AbstractJoinOperator extends BaseOperator implements Operator.CheckpointListener +{ + @AutoMetric + private long tuplesJoinedPerSec; + private double windowTimeSec; + protected int tuplesCount; + public final transient DefaultOutputPort> outputPort = new DefaultOutputPort<>(); + + // Strategy of Join operation, by default the option is inner join + protected JoinStrategy strategy = JoinStrategy.INNER_JOIN; + // This represents whether the processing tuple is from left port or not + protected boolean isLeft; + + @InputPortFieldAnnotation + public transient DefaultInputPort input1 = new DefaultInputPort() + { + @Override + public void process(T tuple) + { + isLeft = true; + processTuple(tuple); + } + }; + @InputPortFieldAnnotation + public transient DefaultInputPort input2 = new DefaultInputPort() + { + @Override + public void process(T tuple) + { + isLeft = false; + processTuple(tuple); + } + }; + + // Stores for each of the input port + @NotNull + protected StoreContext leftStore; + @NotNull + protected StoreContext rightStore; + private String includeFieldStr; + private String keyFieldStr; + private String timeFieldStr; + + @Override + public void setup(Context.OperatorContext context) + { + // Checks whether the strategy is outer join and set it to store + boolean isOuter = strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN); + leftStore.getStore().isOuterJoin(isOuter); + isOuter = strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN); + rightStore.getStore().isOuterJoin(isOuter); + // Setup the stores + leftStore.getStore().setup(context); + rightStore.getStore().setup(context); + populateFields(); + windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0; + } + + /** + * Create the event with the given tuple. If it successfully inserted it into the store + * then it does the join operation + * + * @param tuple Tuple to process + */ + protected void processTuple(T tuple) + { + JoinStore store = isLeft ? leftStore.getStore() : rightStore.getStore(); + TimeEvent t = createEvent(tuple); + if (store.put(t)) { + join(t, isLeft); + } + } + + private void populateFields() + { + populateIncludeFields(); + populateKeyFields(); + if (timeFieldStr != null) { + populateTimeFields(); + } + } + + /** + * Populate the fields from the includeFiledStr + */ + private void populateIncludeFields() + { + String[] portFields = includeFieldStr.split(";"); + assert (portFields.length == 2); + leftStore.setIncludeFields(portFields[0].split(",")); + rightStore.setIncludeFields(portFields[1].split(",")); + } + + /** + * Get the tuples from another store based on join constraint and key + * + * @param tuple input + * @param isLeft whether the given tuple is from first port or not + */ + private void join(TimeEvent tuple, boolean isLeft) + { + // Get the valid tuples from the store based on key + // If the tuple is null means the join type is outer and return unmatched tuples from store. + + ArrayList value; + JoinStore store = isLeft ? rightStore.getStore() : leftStore.getStore(); + + if (tuple != null) { + value = (ArrayList)store.getValidTuples(tuple); + } else { + value = (ArrayList)store.getUnMatchedTuples(); + } + + // Join the input tuple with the joined tuples + if (value != null) { + List result = new ArrayList<>(); + for (TimeEvent joinedValue : value) { + T output = createOutputTuple(); + Object tupleValue = null; + if (tuple != null) { + tupleValue = tuple.getValue(); + } + copyValue(output, tupleValue, isLeft); + copyValue(output, joinedValue.getValue(), !isLeft); + result.add(output); + joinedValue.setMatch(true); + } + if (tuple != null) { + tuple.setMatch(true); + } + if (result.size() != 0) { + outputPort.emit(result); + tuplesCount += result.size(); + } + } + } + + // Emit the unmatched tuples, if the strategy is outer join + @Override + public void endWindow() + { + if (strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) { + join(null, false); + } + if (strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || strategy.equals(JoinStrategy.OUTER_JOIN)) { + join(null, true); + } + leftStore.getStore().endWindow(); + rightStore.getStore().endWindow(); + tuplesJoinedPerSec = (long)(tuplesCount / windowTimeSec); + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + tuplesJoinedPerSec = 0; + tuplesCount = 0; + } + + @Override + public void checkpointed(long windowId) + { + leftStore.getStore().checkpointed(windowId); + rightStore.getStore().checkpointed(windowId); + } + + @Override + public void committed(long windowId) + { + leftStore.getStore().committed(windowId); + rightStore.getStore().committed(windowId); + } + + /** + * Convert the given tuple to event + * + * @param tuple Given tuple to convert into event + * @return event + */ + protected TimeEvent createEvent(Object tuple) + { + String key = leftStore.getKeys(); + String timeField = leftStore.getTimeFields(); + if (!isLeft) { + key = rightStore.getKeys(); + timeField = rightStore.getTimeFields(); + } + if (timeField != null) { + return new TimeEventImpl(getKeyValue(key, tuple), (Long)getTime(timeField, tuple), tuple); + } else { + return new TimeEventImpl(getKeyValue(key, tuple), Calendar.getInstance().getTimeInMillis(), tuple); + } + } + + private void populateKeyFields() + { + leftStore.setKeys(keyFieldStr.split(",")[0]); + rightStore.setKeys(keyFieldStr.split(",")[1]); + } + + public JoinStrategy getStrategy() + { + return strategy; + } + + public void setStrategy(JoinStrategy strategy) + { + this.strategy = strategy; + } + + public void setLeftStore(@NotNull JoinStore lStore) + { + leftStore = new StoreContext(lStore); + } + + public void setRightStore(@NotNull JoinStore rStore) + { + rightStore = new StoreContext(rStore); + } + + public void setKeyFields(String keyFieldStr) + { + this.keyFieldStr = keyFieldStr; + } + + public void setTimeFieldStr(String timeFieldStr) + { + this.timeFieldStr = timeFieldStr; + } + + public void setIncludeFields(String includeFieldStr) + { + this.includeFieldStr = includeFieldStr; + } + + public StoreContext getLeftStore() + { + return leftStore; + } + + public StoreContext getRightStore() + { + return rightStore; + } + + public String getIncludeFieldStr() + { + return includeFieldStr; + } + + public String getKeyFieldStr() + { + return keyFieldStr; + } + + public String getTimeFieldStr() + { + return timeFieldStr; + } + + /** + * Specify the comma separated time fields for both steams + */ + private void populateTimeFields() + { + leftStore.setTimeFields(timeFieldStr.split(",")[0]); + rightStore.setTimeFields(timeFieldStr.split(",")[1]); + } + + public void setStrategy(String policy) + { + this.strategy = JoinStrategy.valueOf(policy.toUpperCase()); + } + + /** + * Create the output object + * + * @return output tuple + */ + protected abstract T createOutputTuple(); + + /** + * Get the values from extractTuple and set these values to the output + * + * @param output otuput tuple + * @param extractTuple Extract the values from this tuple + * @param isLeft Whether the extracted tuple belongs to left stream or not + */ + protected abstract void copyValue(T output, Object extractTuple, boolean isLeft); + + /** + * Get the value of the key field from the given tuple + * + * @param keyField Value of the field to extract from given tuple + * @param tuple Given tuple + * @return the value of field from given tuple + */ + protected abstract Object getKeyValue(String keyField, Object tuple); + + /** + * Get the value of the time field from the given tuple + * + * @param field Time field + * @param tuple given tuple + * @return the value of time field from given tuple + */ + protected abstract Object getTime(String field, Object tuple); + + public static enum JoinStrategy + { + INNER_JOIN, + LEFT_OUTER_JOIN, + RIGHT_OUTER_JOIN, + OUTER_JOIN + } + + public static class StoreContext + { + private transient String timeFields; + private transient String[] includeFields; + private transient String keys; + private JoinStore store; + + public StoreContext(JoinStore store) + { + this.store = store; + } + + public String getTimeFields() + { + return timeFields; + } + + public void setTimeFields(String timeFields) + { + this.timeFields = timeFields; + } + + public String[] getIncludeFields() + { + return includeFields; + } + + public void setIncludeFields(String[] includeFields) + { + this.includeFields = includeFields; + } + + public String getKeys() + { + return keys; + } + + public void setKeys(String keys) + { + this.keys = keys; + } + + public JoinStore getStore() + { + return store; + } + + public void setStore(JoinStore store) + { + this.store = store; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/Bucket.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/join/Bucket.java b/library/src/main/java/com/datatorrent/lib/join/Bucket.java new file mode 100644 index 0000000..13ea496 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/join/Bucket.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + *

+ * This is the base implementation of bucket which contains all the events which belong to the same bucket. + *

+ * + * @param type of bucket events + */ +@InterfaceStability.Unstable +public class Bucket +{ + public final long bucketKey; + protected Map> unwrittenEvents; + + public Bucket() + { + bucketKey = -1L; + } + + protected Bucket(long bucketKey) + { + this.bucketKey = bucketKey; + } + + /** + * Add the given event into the unwritternEvents map + * + * @param eventKey event key + * @param event Given key + */ + protected void addNewEvent(Object eventKey, T event) + { + if (unwrittenEvents == null) { + unwrittenEvents = Maps.newHashMap(); + } + List listEvents = unwrittenEvents.get(eventKey); + if (listEvents == null) { + unwrittenEvents.put(eventKey, Lists.newArrayList(event)); + } else { + listEvents.add(event); + } + } + + /** + * Return the unwritten events in the bucket + * + * @return the unwritten events + */ + public Map> getEvents() + { + return unwrittenEvents; + } + + /** + * Return the list of events for the given key + * + * @param key given key + * @return the list of events + */ + public List get(Object key) + { + return unwrittenEvents.get(key); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/InMemoryStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/join/InMemoryStore.java b/library/src/main/java/com/datatorrent/lib/join/InMemoryStore.java new file mode 100644 index 0000000..7161faa --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/join/InMemoryStore.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; + +/** + * Wrapper class for TimeBased Store. + */ +@InterfaceStability.Unstable +public class InMemoryStore extends TimeBasedStore implements JoinStore +{ + public InMemoryStore() + { + } + + public InMemoryStore(long spanTimeInMillis, int bucketSpanInMillis) + { + super(); + setSpanTimeInMillis(spanTimeInMillis); + setBucketSpanInMillis(bucketSpanInMillis); + } + + @Override + public void committed(long windowId) + { + + } + + @Override + public void checkpointed(long windowId) + { + + } + + @Override + public void beginWindow(long windowId) + { + + } + + @Override + public void endWindow() + { + + } + + @Override + public List getUnMatchedTuples() + { + return super.getUnmatchedEvents(); + } + + @Override + public void isOuterJoin(boolean isOuter) + { + super.isOuterJoin(isOuter); + } + + @Override + public List getValidTuples(Object tuple) + { + return super.getValidTuples((TimeEvent)tuple); + } + + @Override + public boolean put(Object tuple) + { + return super.put((TimeEvent)tuple); + } + + private static final Logger logger = LoggerFactory.getLogger(InMemoryStore.class); + + @Override + public void setup(Context context) + { + super.setup(); + } + + @Override + public void teardown() + { + super.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/JoinStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/join/JoinStore.java b/library/src/main/java/com/datatorrent/lib/join/JoinStore.java new file mode 100644 index 0000000..7b95acc --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/join/JoinStore.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Component; + +/** + *

+ * Interface of store for join operation. + *

+ */ +@InterfaceStability.Unstable +public interface JoinStore extends Component +{ + /** + * Generate the store + */ + + /** + * Perform the committed operation + * @param windowId + */ + void committed(long windowId); + + /** + * Save the state of store + * @param windowId + */ + void checkpointed(long windowId); + + /** + * Add the operations, any needed for store before begin the window + * @param windowId + */ + void beginWindow(long windowId); + + /** + * + */ + void endWindow(); + + /** + * Get the key from the given tuple and with that key, get the tuples which satisfies the join constraint + * from the store. + * + * @param tuple Given tuple + * @return the valid tuples which statisfies the join constraint + */ + List getValidTuples(Object tuple); + + /** + * Insert the given tuple + * + * @param tuple Given tuple + */ + boolean put(Object tuple); + + /** + * Return the unmatched events from store + * + * @return the unmatched events + */ + List getUnMatchedTuples(); + + /** + * Set if the join type is outer + * + * @param isOuter Specifies the join type is outer join or not + */ + void isOuterJoin(boolean isOuter); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/MapJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/join/MapJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/MapJoinOperator.java new file mode 100644 index 0000000..3e23f73 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/join/MapJoinOperator.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Calendar; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This class takes a HashMap tuple as input from each of the input port. Operator joines the input tuples + * based on join constraint and emit the result. + * + *
+ * Ports :
+ * input1 : Input port for stream 1, expects HashMap<String, Object>
+ * input2 : Input port for stream 2, expects HashMap<String, Object>
+ * outputPort: Output port emits ArrayList<HashMap<String, Object>>
+ *
+ * Example: + * Input tuple from port1 is + * {timestamp = 5000, productId = 3, customerId = 108, regionId = 4, amount = $560 } + * + * Input tuple from port2 is + * { timestamp = 5500, productCategory = 8, productId=3 } + * + * Properties: + * expiryTime: 1000
+ * includeFieldStr: timestamp, customerId, amount; productCategory, productId
+ * keyFields: productId, productId
+ * timeFields: timestamp, timestamp
+ * bucketSpanInMillis: 500
+ * + * Output + * { timestamp = 5000, customerId = 108, amount = $560, productCategory = 8, productId=3} + * + * @displayName MapJoin Operator + * @category join + * @tags join + */ +@InterfaceStability.Unstable +public class MapJoinOperator extends AbstractJoinOperator> +{ + @Override + protected Map createOutputTuple() + { + return new HashMap(); + } + + @Override + protected void copyValue(Map output, Object extractTuple, boolean isLeft) + { + String[] fields; + if (isLeft) { + fields = leftStore.getIncludeFields(); + } else { + fields = rightStore.getIncludeFields(); + } + for (int i = 0; i < fields.length; i++) { + Object value = null; + if (extractTuple != null) { + value = ((Map)extractTuple).get(fields[i]); + } + output.put(fields[i], value); + } + } + + public Object getKeyValue(String keyField, Object tuple) + { + Map o = (Map)tuple; + return o.get(keyField); + } + + @Override + protected Object getTime(String field, Object tuple) + { + if (getTimeFieldStr() != null) { + Map o = (Map)tuple; + return o.get(field); + } + return Calendar.getInstance().getTimeInMillis(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/POJOJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/join/POJOJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/POJOJoinOperator.java new file mode 100644 index 0000000..34101ff --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/join/POJOJoinOperator.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.Calendar; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.lang3.ClassUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.lib.util.PojoUtils; + +/** + * This class takes a POJO as input from each of the input port. Operator joines the input tuples + * based on join constraint and emit the result. + * + *
+ * Ports :
+ * input1 : Input port for stream 1, expects POJO
+ * input2 : Input port for stream 2, expects POJO
+ * outputPort: Output port emits POJO
+ *
+ * Example: + * Input tuple from port1 is + * {timestamp = 5000, productId = 3, customerId = 108, regionId = 4, amount = $560 } + * + * Input tuple from port2 is + * { timestamp = 5500, productCategory = 8, productId=3 } + * + * Properties: + * expiryTime: 1000
+ * includeFieldStr: timestamp, customerId, amount; productCategory, productId
+ * keyFields: productId, productId
+ * timeFields: timestamp, timestamp
+ * bucketSpanInMillis: 500
+ * + * Output + * { timestamp = 5000, customerId = 108, amount = $560, productCategory = 8, productId=3} + * + * @displayName BeanJoin Operator + * @category join + * @tags join + */ +@InterfaceStability.Unstable +public class POJOJoinOperator extends AbstractJoinOperator +{ + protected Class outputClass; + protected transient Class leftClass; + protected transient Class rightClass; + private String outputClassStr; + private transient List[] fieldMap = (List[])Array.newInstance( + (new LinkedList()).getClass(), 2); + private transient PojoUtils.Getter[] keyGetters = (PojoUtils.Getter[])Array.newInstance(PojoUtils.Getter.class, 2); + private transient PojoUtils.Getter[] timeGetters = (PojoUtils.Getter[])Array.newInstance(PojoUtils.Getter.class, 2); + + // Populate the getters from the input tuple + @Override + protected void processTuple(Object tuple) + { + setAndPopulateGetters(tuple, isLeft); + super.processTuple(tuple); + } + + /** + * Populate the class and getters from the given tuple + * + * @param tuple Given tuple + * @param isLeft Whether the given tuple belongs to left stream or not + */ + private void setAndPopulateGetters(Object tuple, boolean isLeft) + { + if (isLeft && leftClass == null) { + leftClass = tuple.getClass(); + populateGettersFromInput(isLeft); + } + if (!isLeft && rightClass == null) { + rightClass = tuple.getClass(); + populateGettersFromInput(isLeft); + } + } + + /** + * Populate the getters from the input class + * + * @param isLeft isLeft specifies whether the class is left or right + */ + private void populateGettersFromInput(boolean isLeft) + { + Class inputClass; + int idx; + StoreContext store; + if (isLeft) { + idx = 0; + inputClass = leftClass; + store = leftStore; + } else { + idx = 1; + inputClass = rightClass; + store = rightStore; + } + String key = store.getKeys(); + String timeField = store.getTimeFields(); + String[] fields = store.getIncludeFields(); + + // Create getter for the key field + try { + Class c = ClassUtils.primitiveToWrapper(inputClass.getField(key).getType()); + keyGetters[idx] = PojoUtils.createGetter(inputClass, key, c); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + + // Create getter for time field + if (timeField != null) { + try { + Class c = ClassUtils.primitiveToWrapper(inputClass.getField(timeField).getType()); + timeGetters[idx] = PojoUtils.createGetter(inputClass, timeField, c); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + } + + fieldMap[idx] = new LinkedList(); + List fieldsMap = fieldMap[idx]; + // Create getters for the include fields + for (String f : fields) { + try { + Field field = inputClass.getField(f); + Class c; + if (field.getType().isPrimitive()) { + c = ClassUtils.primitiveToWrapper(field.getType()); + } else { + c = field.getType(); + } + FieldObjectMap fm = new FieldObjectMap(); + fm.get = PojoUtils.createGetter(inputClass, f, c); + fm.set = PojoUtils.createSetter(outputClass, f, c); + fieldsMap.add(fm); + } catch (Throwable e) { + throw new RuntimeException("Failed to populate gettter for field: " + f, e); + } + } + } + + /** + * Create the output class object + * + * @return the new output object + */ + @Override + protected Object createOutputTuple() + { + try { + return outputClass.newInstance(); + } catch (InstantiationException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + /** + * Copy the field values of extractTuple to output object + * + * @param output + * @param extractTuple + * @param isLeft + */ + @Override + protected void copyValue(Object output, Object extractTuple, boolean isLeft) + { + if (extractTuple == null) { + return; + } + + setAndPopulateGetters(extractTuple, isLeft); + + List fieldsMap; + if (isLeft) { + fieldsMap = fieldMap[0]; + } else { + fieldsMap = fieldMap[1]; + } + + for (FieldObjectMap map : fieldsMap) { + map.set.set(output, map.get.get(extractTuple)); + } + } + + /** + * Return the keyField value of tuple object + * + * @param keyField + * @param tuple + * @return the tuple value for the given keyfield + */ + public Object getKeyValue(String keyField, Object tuple) + { + if (isLeft) { + return keyGetters[0].get(tuple); + } + return keyGetters[1].get(tuple); + } + + @Override + protected Object getTime(String field, Object tuple) + { + if (getTimeFieldStr() != null) { + if (isLeft) { + return timeGetters[0].get(tuple); + } + return timeGetters[1].get(tuple); + } + return Calendar.getInstance().getTimeInMillis(); + } + + public void populateOutputClass() + { + try { + this.outputClass = this.getClass().getClassLoader().loadClass(outputClassStr); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + public String getOutputClass() + { + return outputClassStr; + } + + /** + * Load the output class + * + * @param outputClassStr + */ + public void setOutputClass(String outputClassStr) + { + this.outputClassStr = outputClassStr; + populateOutputClass(); + } + + private class FieldObjectMap + { + public PojoUtils.Getter get; + public PojoUtils.Setter set; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java b/library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java new file mode 100644 index 0000000..88597f3 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/join/TimeBasedStore.java @@ -0,0 +1,333 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; + +import javax.validation.constraints.Min; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Base implementation of time based store for key-value pair tuples. + * + * @param + */ +@InterfaceStability.Unstable +public class TimeBasedStore +{ + private static final Logger logger = LoggerFactory.getLogger(TimeBasedStore.class); + + private final transient Lock lock; + @Min(1) + protected int noOfBuckets; + protected Bucket[] buckets; + @Min(1) + protected long expiryTimeInMillis; + @Min(1) + protected long spanTimeInMillis; + protected int bucketSpanInMillis; + protected long startOfBucketsInMillis; + protected long endOBucketsInMillis; + protected transient Map dirtyBuckets = new HashMap(); + private boolean isOuter = false; + private List unmatchedEvents = new ArrayList(); + private Map> key2Buckets = new ConcurrentHashMap>(); + private transient Timer bucketSlidingTimer; + + public TimeBasedStore() + { + lock = new Lock(); + } + + /** + * Compute the number of buckets based on spantime and bucketSpanInMillis + */ + private void recomputeNumBuckets() + { + Calendar calendar = Calendar.getInstance(); + long now = calendar.getTimeInMillis(); + startOfBucketsInMillis = now - spanTimeInMillis; + expiryTimeInMillis = startOfBucketsInMillis; + endOBucketsInMillis = now; + noOfBuckets = (int)Math.ceil((now - startOfBucketsInMillis) / (bucketSpanInMillis * 1.0)); + buckets = (Bucket[])Array.newInstance(Bucket.class, noOfBuckets); + } + + /** + * Compute the buckets and start the service + */ + public void setup() + { + setBucketSpanInMillis((int)(spanTimeInMillis > (long)bucketSpanInMillis ? bucketSpanInMillis : spanTimeInMillis)); + if (buckets == null) { + recomputeNumBuckets(); + } + startService(); + } + + /** + * Return the tuples which satisfies the join constraint + * + * @param tuple + * @return the list of events + */ + public List getValidTuples(T tuple) + { + // Get the key from the given tuple + Object key = tuple.getEventKey(); + // Get the buckets where the key is present + Set keyBuckets = key2Buckets.get(key); + if (keyBuckets == null) { + return null; + } + List validTuples = new ArrayList(); + for (Long idx : keyBuckets) { + int bucketIdx = (int)(idx % noOfBuckets); + Bucket tb = buckets[bucketIdx]; + if (tb == null || tb.bucketKey != idx) { + continue; + } + List events = tb.get(key); + if (events != null) { + validTuples.addAll(events); + } + } + return validTuples; + } + + /** + * Insert the given tuple into the bucket + * + * @param tuple + */ + public boolean put(T tuple) + { + long bucketKey = getBucketKeyFor(tuple); + if (bucketKey < 0) { + return false; + } + newEvent(bucketKey, tuple); + return true; + } + + /** + * Calculates the bucket key for the given event + * + * @param event + * @return the bucket key + */ + public long getBucketKeyFor(T event) + { + long eventTime = event.getTime(); + // Negative indicates the invalid events + if (eventTime < expiryTimeInMillis) { + return -1; + } + long diffFromStart = eventTime - startOfBucketsInMillis; + long key = diffFromStart / bucketSpanInMillis; + synchronized (lock) { + if (eventTime > endOBucketsInMillis) { + long move = ((eventTime - endOBucketsInMillis) / bucketSpanInMillis + 1) * bucketSpanInMillis; + expiryTimeInMillis += move; + endOBucketsInMillis += move; + } + } + return key; + } + + /** + * Insert the event into the specified bucketKey + * + * @param bucketKey + * @param event + */ + public void newEvent(long bucketKey, T event) + { + int bucketIdx = (int)(bucketKey % noOfBuckets); + + Bucket bucket = buckets[bucketIdx]; + + if (bucket == null || bucket.bucketKey != bucketKey) { + // If the bucket is already present then the bucket is expirable + if (bucket != null) { + dirtyBuckets.put(bucket.bucketKey, bucket); + } + bucket = createBucket(bucketKey); + buckets[bucketIdx] = bucket; + } + + // Insert the key into the key2Buckets map + Object key = event.getEventKey(); + Set keyBuckets = key2Buckets.get(key); + if (keyBuckets == null) { + keyBuckets = new HashSet(); + keyBuckets.add(bucketKey); + key2Buckets.put(key, keyBuckets); + } else { + keyBuckets.add(bucketKey); + } + bucket.addNewEvent(key, event); + } + + /** + * Delete the expired buckets at every bucketSpanInMillis periodically + */ + public void startService() + { + bucketSlidingTimer = new Timer(); + endOBucketsInMillis = expiryTimeInMillis + (noOfBuckets * bucketSpanInMillis); + logger.debug("bucket properties {}, {}", spanTimeInMillis, bucketSpanInMillis); + logger.debug("bucket time params: start {}, end {}", startOfBucketsInMillis, endOBucketsInMillis); + + bucketSlidingTimer.scheduleAtFixedRate(new TimerTask() + { + @Override + public void run() + { + long time = 0; + synchronized (lock) { + time = (expiryTimeInMillis += bucketSpanInMillis); + endOBucketsInMillis += bucketSpanInMillis; + } + deleteExpiredBuckets(time); + } + }, bucketSpanInMillis, bucketSpanInMillis); + } + + /** + * Remove the expired buckets. + * + * @param time + */ + void deleteExpiredBuckets(long time) + { + Iterator iterator = dirtyBuckets.keySet().iterator(); + for (; iterator.hasNext(); ) { + long key = iterator.next(); + Bucket t = dirtyBuckets.get(key); + if (startOfBucketsInMillis + (t.bucketKey * bucketSpanInMillis) < time) { + deleteBucket(t); + iterator.remove(); + } + } + } + + /** + * Return the unmatched events which are present in the expired buckets + * + * @return the list of unmatched events + */ + public List getUnmatchedEvents() + { + List copyEvents = new ArrayList(unmatchedEvents); + unmatchedEvents.clear(); + return copyEvents; + } + + /** + * Delete the given bucket + * + * @param bucket + */ + private void deleteBucket(Bucket bucket) + { + if (bucket == null) { + return; + } + Map> writtens = bucket.getEvents(); + if (writtens == null) { + return; + } + + for (Map.Entry> e : writtens.entrySet()) { + // Check the events which are unmatched and add those into the unmatchedEvents list + if (isOuter) { + for (T event : e.getValue()) { + if (!event.isMatch()) { + unmatchedEvents.add(event); + } + } + } + key2Buckets.get(e.getKey()).remove(bucket.bucketKey); + if (key2Buckets.get(e.getKey()).size() == 0) { + key2Buckets.remove(e.getKey()); + } + } + } + + /** + * Create the bucket with the given key + * + * @param bucketKey + * @return the bucket for the given key + */ + protected Bucket createBucket(long bucketKey) + { + return new Bucket(bucketKey); + } + + public void shutdown() + { + bucketSlidingTimer.cancel(); + } + + public void isOuterJoin(boolean isOuter) + { + this.isOuter = isOuter; + } + + public long getSpanTimeInMillis() + { + return spanTimeInMillis; + } + + public void setSpanTimeInMillis(long spanTimeInMillis) + { + this.spanTimeInMillis = spanTimeInMillis; + } + + public int getBucketSpanInMillis() + { + return bucketSpanInMillis; + } + + public void setBucketSpanInMillis(int bucketSpanInMillis) + { + this.bucketSpanInMillis = bucketSpanInMillis; + } + + private static class Lock + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/TimeEvent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/join/TimeEvent.java b/library/src/main/java/com/datatorrent/lib/join/TimeEvent.java new file mode 100644 index 0000000..d1259d2 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/join/TimeEvent.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceStability.Unstable +public interface TimeEvent +{ + /** + * Returns the time of the event + * + * @return the time of event + */ + long getTime(); + + /** + * Returns the key of the event + * + * @return the key + */ + Object getEventKey(); + + Object getValue(); + + /** + * Returns whether the event has matched tuples or not + * + * @return whether the event has matched tuples or not + */ + boolean isMatch(); + + void setMatch(boolean match); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/main/java/com/datatorrent/lib/join/TimeEventImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/join/TimeEventImpl.java b/library/src/main/java/com/datatorrent/lib/join/TimeEventImpl.java new file mode 100644 index 0000000..bafab29 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/join/TimeEventImpl.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import javax.annotation.Nonnull; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Time event Implementation. + */ +@InterfaceStability.Unstable +public class TimeEventImpl implements TimeEvent, Comparable +{ + protected Object key; + protected long time; + protected Object tuple; + protected boolean match; + + @SuppressWarnings("unused") + public TimeEventImpl() + { + } + + public TimeEventImpl(Object key, long time, Object tuple) + { + this.key = key; + this.time = time; + this.tuple = tuple; + this.match = false; + } + + @Override + public long getTime() + { + return time; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof TimeEventImpl)) { + return false; + } + + TimeEventImpl that = (TimeEventImpl)o; + + return time == that.time && !(key != null ? !key.equals(that.key) : that.key != null) && + !(tuple != null ? !tuple.equals(that.tuple) : that.tuple != null); + + } + + @Override + public int hashCode() + { + int result = key != null ? key.hashCode() : 0; + result = 31 * result + (int)(time ^ (time >>> 32)); + return result; + } + + @Override + public Object getEventKey() + { + return key; + } + + @Override + public int compareTo(@Nonnull TimeEventImpl dummyEvent) + { + if (key.equals(dummyEvent.key)) { + return 0; + } + return -1; + } + + public Object getValue() + { + return tuple; + } + + @Override + public String toString() + { + return "TimeEvent{" + + "key=" + key + + ", time=" + time + + ", tuple=" + tuple + + ", match=" + match + + '}'; + } + + public boolean isMatch() + { + return match; + } + + public void setMatch(boolean match) + { + this.match = match; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/test/java/com/datatorrent/lib/join/MapTimeBasedJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/join/MapTimeBasedJoinOperator.java b/library/src/test/java/com/datatorrent/lib/join/MapTimeBasedJoinOperator.java new file mode 100644 index 0000000..391b37d --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/join/MapTimeBasedJoinOperator.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class MapTimeBasedJoinOperator +{ + @Rule + public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo(); + private static Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + public static final Context.OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes); + + @Test + public void testJoinOperator() throws IOException, InterruptedException + { + + AbstractJoinOperator oper = new MapJoinOperator(); + oper.setLeftStore(new InMemoryStore(200, 200)); + oper.setRightStore(new InMemoryStore(200, 200)); + oper.setIncludeFields("ID,Name;OID,Amount"); + oper.setKeyFields("ID,CID"); + + oper.setup(context); + + CollectorTestSink>> sink = new CollectorTestSink>>(); + @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink tmp = (CollectorTestSink)sink; + oper.outputPort.setSink(tmp); + + oper.beginWindow(0); + Map tuple1 = Maps.newHashMap(); + tuple1.put("ID", 1); + tuple1.put("Name", "Anil"); + + oper.input1.process(tuple1); + + CountDownLatch latch = new CountDownLatch(1); + Map order1 = Maps.newHashMap(); + order1.put("OID", 102); + order1.put("CID", 1); + order1.put("Amount", 300); + + oper.input2.process(order1); + + Map order2 = Maps.newHashMap(); + order2.put("OID", 103); + order2.put("CID", 3); + order2.put("Amount", 300); + + oper.input2.process(order2); + latch.await(200, TimeUnit.MILLISECONDS); + oper.endWindow(); + + oper.beginWindow(1); + Map tuple2 = Maps.newHashMap(); + tuple2.put("ID", 4); + tuple2.put("Name", "DT"); + oper.input1.process(tuple2); + + Map order3 = Maps.newHashMap(); + order3.put("OID", 104); + order3.put("CID", 1); + order3.put("Amount", 300); + + oper.input2.process(order2); + + latch.await(200, TimeUnit.MILLISECONDS); + + oper.endWindow(); + + /* Number of tuple, emitted */ + Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size()); + List> emittedList = sink.collectedTuples.iterator().next(); + Assert.assertEquals("Size of Joined Tuple ", 1, emittedList.size()); + Map emitted = emittedList.get(0); + + /* The fields present in original event is kept as it is */ + Assert.assertEquals("Number of fields in emitted tuple", 4, emitted.size()); + Assert.assertEquals("value of ID :", tuple1.get("ID"), emitted.get("ID")); + Assert.assertEquals("value of Name :", tuple1.get("Name"), emitted.get("Name")); + + Assert.assertEquals("value of OID: ", order1.get("OID"), emitted.get("OID")); + Assert.assertEquals("value of Amount: ", order1.get("Amount"), emitted.get("Amount")); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fc547d87/library/src/test/java/com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest.java new file mode 100644 index 0000000..6914da0 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/join/POJOTimeBasedJoinOperatorTest.java @@ -0,0 +1,385 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import com.esotericsoftware.kryo.Kryo; + +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class POJOTimeBasedJoinOperatorTest +{ + + @Rule + public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo(); + + public class Customer + { + public int ID; + public String Name; + + public Customer() + { + + } + + public Customer(int ID, String name) + { + this.ID = ID; + Name = name; + } + + @Override + public String toString() + { + return "Customer{" + + "ID=" + ID + + ", Name='" + Name + '\'' + + '}'; + } + } + + public class Order + { + public int OID; + public int CID; + public int Amount; + + public Order() + { + } + + public Order(int OID, int CID, int amount) + { + this.OID = OID; + this.CID = CID; + Amount = amount; + } + + @Override + public String toString() + { + return "Order{" + + "OID=" + OID + + ", CID=" + CID + + ", Amount=" + Amount + + '}'; + } + } + + public static class CustOrder + { + public int ID; + public String Name; + public int OID; + public int Amount; + + public CustOrder() + { + } + + @Override + public String toString() + { + return "{" + + "ID=" + ID + + ", Name='" + Name + '\'' + + ", OID=" + OID + + ", Amount=" + Amount + + '}'; + } + } + + @Test + public void testInnerJoinOperator() throws IOException, InterruptedException + { + Kryo kryo = new Kryo(); + POJOJoinOperator oper = new POJOJoinOperator(); + JoinStore store = new InMemoryStore(200, 200); + oper.setLeftStore(kryo.copy(store)); + oper.setRightStore(kryo.copy(store)); + oper.setIncludeFields("ID,Name;OID,Amount"); + oper.setKeyFields("ID,CID"); + oper.outputClass = CustOrder.class; + + oper.setup(MapTimeBasedJoinOperator.context); + + CollectorTestSink> sink = new CollectorTestSink>(); + @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink tmp = (CollectorTestSink)sink; + oper.outputPort.setSink(tmp); + + oper.beginWindow(0); + + Customer tuple = new Customer(1, "Anil"); + + oper.input1.process(tuple); + + CountDownLatch latch = new CountDownLatch(1); + + Order order = new Order(102, 1, 300); + + oper.input2.process(order); + + Order order2 = new Order(103, 3, 300); + oper.input2.process(order2); + + Order order3 = new Order(104, 7, 300); + oper.input2.process(order3); + + latch.await(3000, TimeUnit.MILLISECONDS); + + oper.endWindow(); + + /* Number of tuple, emitted */ + Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size()); + List emittedList = sink.collectedTuples.iterator().next(); + CustOrder emitted = emittedList.get(0); + + Assert.assertEquals("value of ID :", tuple.ID, emitted.ID); + Assert.assertEquals("value of Name :", tuple.Name, emitted.Name); + + Assert.assertEquals("value of OID: ", order.OID, emitted.OID); + Assert.assertEquals("value of Amount: ", order.Amount, emitted.Amount); + + } + + @Test + public void testLeftOuterJoinOperator() throws IOException, InterruptedException + { + Kryo kryo = new Kryo(); + POJOJoinOperator oper = new POJOJoinOperator(); + JoinStore store = new InMemoryStore(200, 200); + oper.setLeftStore(kryo.copy(store)); + oper.setRightStore(kryo.copy(store)); + oper.setIncludeFields("ID,Name;OID,Amount"); + oper.setKeyFields("ID,CID"); + oper.outputClass = CustOrder.class; + oper.setStrategy("left_outer_join"); + + oper.setup(MapTimeBasedJoinOperator.context); + + CollectorTestSink> sink = new CollectorTestSink>(); + @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink tmp = (CollectorTestSink)sink; + oper.outputPort.setSink(tmp); + + oper.beginWindow(0); + + Customer tuple1 = new Customer(1, "Anil"); + + oper.input1.process(tuple1); + + CountDownLatch latch = new CountDownLatch(1); + + Order order = new Order(102, 3, 300); + + oper.input2.process(order); + + Order order2 = new Order(103, 7, 300); + oper.input2.process(order2); + + oper.endWindow(); + + latch.await(500, TimeUnit.MILLISECONDS); + + oper.beginWindow(1); + Order order3 = new Order(104, 5, 300); + oper.input2.process(order3); + + Customer tuple2 = new Customer(5, "DT"); + + oper.input1.process(tuple2); + + latch.await(500, TimeUnit.MILLISECONDS); + + oper.endWindow(); + latch.await(500, TimeUnit.MILLISECONDS); + oper.beginWindow(2); + oper.endWindow(); + latch.await(5000, TimeUnit.MILLISECONDS); + + + /* Number of tuple, emitted */ + Assert.assertEquals("Number of tuple emitted ", 2, sink.collectedTuples.size()); + Iterator> ite = sink.collectedTuples.iterator(); + List emittedList = ite.next(); + CustOrder emitted = emittedList.get(0); + + Assert.assertEquals("value of ID :", tuple2.ID, emitted.ID); + Assert.assertEquals("value of Name :", tuple2.Name, emitted.Name); + + Assert.assertEquals("value of OID: ", order3.OID, emitted.OID); + Assert.assertEquals("value of Amount: ", order3.Amount, emitted.Amount); + + emittedList = ite.next(); + emitted = emittedList.get(0); + Assert.assertEquals("Joined Tuple ", "{ID=1, Name='Anil', OID=0, Amount=0}", emitted.toString()); + } + + @Test + public void testRightOuterJoinOperator() throws IOException, InterruptedException + { + Kryo kryo = new Kryo(); + POJOJoinOperator oper = new POJOJoinOperator(); + JoinStore store = new InMemoryStore(200, 200); + oper.setLeftStore(kryo.copy(store)); + oper.setRightStore(kryo.copy(store)); + oper.setIncludeFields("ID,Name;OID,Amount"); + oper.setKeyFields("ID,CID"); + oper.outputClass = CustOrder.class; + oper.setStrategy("right_outer_join"); + + oper.setup(MapTimeBasedJoinOperator.context); + + CollectorTestSink> sink = new CollectorTestSink>(); + @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink tmp = (CollectorTestSink)sink; + oper.outputPort.setSink(tmp); + + oper.beginWindow(0); + + Customer tuple1 = new Customer(1, "Anil"); + + oper.input1.process(tuple1); + + CountDownLatch latch = new CountDownLatch(1); + + Order order = new Order(102, 3, 300); + + oper.input2.process(order); + + Order order2 = new Order(103, 7, 300); + oper.input2.process(order2); + oper.endWindow(); + + latch.await(500, TimeUnit.MILLISECONDS); + + oper.beginWindow(1); + Order order3 = new Order(104, 5, 300); + oper.input2.process(order3); + + Customer tuple2 = new Customer(5, "DT"); + oper.input1.process(tuple2); + + latch.await(500, TimeUnit.MILLISECONDS); + + oper.endWindow(); + latch.await(500, TimeUnit.MILLISECONDS); + oper.beginWindow(2); + oper.endWindow(); + latch.await(5000, TimeUnit.MILLISECONDS); + + /* Number of tuple, emitted */ + Assert.assertEquals("Number of tuple emitted ", 2, sink.collectedTuples.size()); + Iterator> ite = sink.collectedTuples.iterator(); + List emittedList = ite.next(); + CustOrder emitted = emittedList.get(0); + + Assert.assertEquals("value of ID :", tuple2.ID, emitted.ID); + Assert.assertEquals("value of Name :", tuple2.Name, emitted.Name); + + Assert.assertEquals("value of OID: ", order3.OID, emitted.OID); + Assert.assertEquals("value of Amount: ", order3.Amount, emitted.Amount); + + emittedList = ite.next(); + Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=102, Amount=300}", emittedList.get(0).toString()); + Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=103, Amount=300}", emittedList.get(1).toString()); + } + + @Test + public void testFullOuterJoinOperator() throws IOException, InterruptedException + { + Kryo kryo = new Kryo(); + POJOJoinOperator oper = new POJOJoinOperator(); + JoinStore store = new InMemoryStore(200, 200); + oper.setLeftStore(kryo.copy(store)); + oper.setRightStore(kryo.copy(store)); + oper.setIncludeFields("ID,Name;OID,Amount"); + oper.setKeyFields("ID,CID"); + oper.outputClass = CustOrder.class; + oper.setStrategy("outer_join"); + + oper.setup(MapTimeBasedJoinOperator.context); + + CollectorTestSink> sink = new CollectorTestSink>(); + @SuppressWarnings({"unchecked", "rawtypes"}) CollectorTestSink tmp = (CollectorTestSink)sink; + oper.outputPort.setSink(tmp); + + oper.beginWindow(0); + + Customer tuple1 = new Customer(1, "Anil"); + + oper.input1.process(tuple1); + + CountDownLatch latch = new CountDownLatch(1); + + Order order = new Order(102, 3, 300); + + oper.input2.process(order); + + Order order2 = new Order(103, 7, 300); + oper.input2.process(order2); + oper.endWindow(); + + latch.await(500, TimeUnit.MILLISECONDS); + + oper.beginWindow(1); + Order order3 = new Order(104, 5, 300); + oper.input2.process(order3); + + Customer tuple2 = new Customer(5, "DT"); + oper.input1.process(tuple2); + + latch.await(500, TimeUnit.MILLISECONDS); + + oper.endWindow(); + latch.await(500, TimeUnit.MILLISECONDS); + oper.beginWindow(2); + oper.endWindow(); + latch.await(5000, TimeUnit.MILLISECONDS); + + /* Number of tuple, emitted */ + Assert.assertEquals("Number of tuple emitted ", 3, sink.collectedTuples.size()); + Iterator> ite = sink.collectedTuples.iterator(); + List emittedList = ite.next(); + CustOrder emitted = emittedList.get(0); + + Assert.assertEquals("value of ID :", tuple2.ID, emitted.ID); + Assert.assertEquals("value of Name :", tuple2.Name, emitted.Name); + + Assert.assertEquals("value of OID: ", order3.OID, emitted.OID); + Assert.assertEquals("value of Amount: ", order3.Amount, emitted.Amount); + + emittedList = ite.next(); + Assert.assertEquals("Joined Tuple ", "{ID=1, Name='Anil', OID=0, Amount=0}", emittedList.get(0).toString()); + + emittedList = ite.next(); + Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=102, Amount=300}", emittedList.get(0).toString()); + Assert.assertEquals("Joined Tuple ", "{ID=0, Name='null', OID=103, Amount=300}", emittedList.get(1).toString()); + } + +}