Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B4640200B49 for ; Wed, 3 Aug 2016 13:18:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B3088160AAA; Wed, 3 Aug 2016 11:18:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D0A0E160A64 for ; Wed, 3 Aug 2016 13:18:25 +0200 (CEST) Received: (qmail 51449 invoked by uid 500); 3 Aug 2016 11:18:24 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 51438 invoked by uid 99); 3 Aug 2016 11:18:24 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Aug 2016 11:18:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 6BCE61A0638 for ; Wed, 3 Aug 2016 11:18:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id nT4Rw0-ie5qw for ; Wed, 3 Aug 2016 11:18:22 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 77B095FB37 for ; Wed, 3 Aug 2016 11:18:21 +0000 (UTC) Received: (qmail 50865 invoked by uid 99); 3 Aug 2016 11:18:20 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Aug 2016 11:18:20 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 7F74C2C0059 for ; Wed, 3 Aug 2016 11:18:20 +0000 (UTC) Date: Wed, 3 Aug 2016 11:18:20 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@apex.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 03 Aug 2016 11:18:26 -0000 [ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15405746#comment-15405746 ] ASF GitHub Bot commented on APEXMALHAR-2100: -------------------------------------------- Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73320743 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator implements Operator.ActivationListener +{ + private static final transient Logger LOG = LoggerFactory.getLogger(POJOInnerJoinOperator.class); + private transient long timeIncrement; + private transient FieldObjectMap[] inputFieldObjects = (FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2); + protected Class outputClass; + private long time = System.currentTimeMillis(); + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort outputPort = new DefaultOutputPort() + { + @Override + public void setup(Context.PortContext context) + { + outputClass = context.getValue(Context.PortContext.TUPLE_CLASS); + } + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input1 = new DefaultInputPort() + { + @Override + public void setup(Context.PortContext context) + { + inputFieldObjects[0].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); + } + + @Override + public void process(Object tuple) + { + processTuple(tuple,true); + } + + @Override + public StreamCodec getStreamCodec() + { + return getInnerJoinStreamCodec(true); + } + }; + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort input2 = new DefaultInputPort() + { + @Override + public void setup(Context.PortContext context) + { + inputFieldObjects[1].inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); + } + + @Override + public void process(Object tuple) + { + processTuple(tuple,false); + } + + @Override + public StreamCodec getStreamCodec() + { + return getInnerJoinStreamCodec(false); + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); + super.setup(context); + for (int i = 0; i < 2; i++) { + inputFieldObjects[i] = new FieldObjectMap(); + } + } + + /** + * Extract the time value from the given tuple + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + * @return + */ + @Override + public long extractTime(Object tuple, boolean isStream1Data) + { + return timeFields == null ? time : (long)(isStream1Data ? inputFieldObjects[0].timeFieldGet.get(tuple) : + inputFieldObjects[1].timeFieldGet.get(tuple)); + } + + /** + * Create getters for the key and time fields and setters for the include fields. + */ + protected void generateSettersAndGetters() + { + for (int i = 0; i < 2; i++) { + Class inputClass = inputFieldObjects[i].inputClass; + try { + Class c = ClassUtils.primitiveToWrapper(inputClass.getField(keyFields.get(i)).getType()); + inputFieldObjects[i].keyGet = PojoUtils.createGetter(inputClass, keyFields.get(i), c); + if (timeFields != null && timeFields.size() != 0) { + Class tc = ClassUtils.primitiveToWrapper(inputClass.getField(timeFields.get(i)).getType()); + inputFieldObjects[i].timeFieldGet = PojoUtils.createGetter(inputClass, timeFields.get(i), tc); + } + for (int j = 0; j < includeFields[i].length; j++) { + Class ic = ClassUtils.primitiveToWrapper(inputClass.getField(includeFields[i][j]).getType()); + Class oc = ClassUtils.primitiveToWrapper(outputClass.getField(includeFields[i][j]).getType()); + inputFieldObjects[i].fieldMap.put(PojoUtils.createGetter(inputClass, includeFields[i][j], ic), + PojoUtils.createSetter(outputClass, includeFields[i][j], oc)); + } + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Extract the key value from the given tuple + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + * @return + */ + @Override + public Object extractKey(Object tuple, boolean isStream1Data) + { + return isStream1Data ? inputFieldObjects[0].keyGet.get(tuple) : + inputFieldObjects[1].keyGet.get(tuple); + } + + /** + * Merge the given tuples + * @param tuple1 tuple belongs to stream1 + * @param tuple2 tuple belongs to stream1 + * @return + */ + @Override + public Object mergeTuples(Object tuple1, Object tuple2) + { + Object o; + try { + o = outputClass.newInstance(); + for (Map.Entry g: inputFieldObjects[0].fieldMap.entrySet()) { + g.getValue().set(o, g.getKey().get(tuple1)); + } + for (Map.Entry g: inputFieldObjects[1].fieldMap.entrySet()) { + g.getValue().set(o, g.getKey().get(tuple2)); + } + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException(e); + } + return o; + } + + /** + * Emit the given tuple through the outputPort + * @param tuple given tuple + */ + @Override + public void emitTuple(Object tuple) + { + outputPort.emit(tuple); + } + + @Override + public void activate(Context context) + { + generateSettersAndGetters(); + } + + @Override + public void deactivate() + { + } + + @Override + public void endWindow() + { + super.endWindow(); + time += timeIncrement; + } + + /** + * Returns the streamcodec for the streams + * @param isStream1data Specifies whether the codec needs for stream1 or stream2. + * @return the object of InnerJoinStreamCodec + */ + public StreamCodec getInnerJoinStreamCodec(boolean isStream1data) + { + if (isStream1data) { + return new InnerJoinStreamCodec(getKeyFieldsStr().split(",")[0]); + } + return new InnerJoinStreamCodec(getKeyFieldsStr().split(",")[1]); + } + + private class FieldObjectMap + { + public Class inputClass; + public PojoUtils.Getter keyGet; + public PojoUtils.Getter timeFieldGet; + public Map fieldMap; --- End diff -- Why do you want to relate it? Shoudn't there be a one-to-one mapping of output POJO fields to input POJO fields from either one of the stream? > Development of Inner Join Operator using Spillable Datastructures > ----------------------------------------------------------------- > > Key: APEXMALHAR-2100 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100 > Project: Apache Apex Malhar > Issue Type: Task > Reporter: Chaitanya > Assignee: Chaitanya > -- This message was sent by Atlassian JIRA (v6.3.4#6332)