apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chin...@apache.org
Subject [1/2] apex-malhar git commit: APEXMALHAR-2185: Added Bounded Deduper
Date Wed, 24 Aug 2016 09:33:07 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 822323d02 -> 17f6c5523


APEXMALHAR-2185: Added Bounded Deduper


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/cc62a5eb
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/cc62a5eb
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/cc62a5eb

Branch: refs/heads/master
Commit: cc62a5eb7d6e58a01bf3e5a32edc889ceb43a75b
Parents: 1700725
Author: bhupeshchawda <bhupeshchawda@gmail.com>
Authored: Thu Aug 11 15:40:07 2016 +0530
Committer: bhupeshchawda <bhupesh@apache.org>
Committed: Mon Aug 22 10:43:33 2016 +0530

----------------------------------------------------------------------
 .../apex/malhar/lib/dedup/AbstractDeduper.java  |  25 ++-
 .../malhar/lib/dedup/BoundedDedupOperator.java  | 206 +++++++++++++++++++
 .../lib/dedup/TimeBasedDedupOperator.java       |  31 ++-
 .../lib/dedup/DeduperBoundedPOJOImplTest.java   | 110 ++++++++++
 4 files changed, 359 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cc62a5eb/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
index d06acc3..13a3475 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
@@ -30,9 +30,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl;
 import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
 import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -48,7 +50,6 @@ import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
-import com.datatorrent.lib.fileaccess.TFileImpl;
 import com.datatorrent.netlet.util.Slice;
 
 /**
@@ -68,6 +69,9 @@ import com.datatorrent.netlet.util.Slice;
 public abstract class AbstractDeduper<T>
     implements Operator, Operator.IdleTimeHandler, ActivationListener<Context>, Operator.CheckpointNotificationListener
 {
+
+  private static final String BUCKET_DIR = "bucket_data";
+
   /**
    * The input port on which events are received.
    */
@@ -102,7 +106,7 @@ public abstract class AbstractDeduper<T>
   private boolean preserveTupleOrder = true;
 
   @NotNull
-  protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl();
+  protected AbstractManagedStateImpl managedState;
 
   /**
    * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until
previous
@@ -123,9 +127,8 @@ public abstract class AbstractDeduper<T>
   @Override
   public void setup(OperatorContext context)
   {
-    FileAccessFSImpl fAccessImpl = new TFileImpl.DTFileImpl();
-    fAccessImpl.setBasePath(context.getValue(DAG.APPLICATION_PATH) + "/bucket_data");
-    managedState.setFileAccess(fAccessImpl);
+    ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH)
+        + Path.SEPARATOR + BUCKET_DIR);
     managedState.setup(context);
 
     if (preserveTupleOrder) {
@@ -155,9 +158,7 @@ public abstract class AbstractDeduper<T>
    */
   protected void processTuple(T tuple)
   {
-
-    long time = getTime(tuple);
-    Future<Slice> valFuture = managedState.getAsync(time, getKey(tuple));
+    Future<Slice> valFuture = getAsyncManagedState(tuple);
 
     if (valFuture.isDone()) {
       try {
@@ -211,7 +212,7 @@ public abstract class AbstractDeduper<T>
   {
     if (!preserveTupleOrder || waitingEvents.isEmpty()) {
       if (value == null) {
-        managedState.put(getTime(tuple), getKey(tuple), new Slice(new byte[0]));
+        putManagedState(tuple);
         processUnique(tuple);
       } else {
         processDuplicate(tuple);
@@ -309,7 +310,7 @@ public abstract class AbstractDeduper<T>
         if (future.isDone() || finalize ) {
           try {
             if (future.get() == null && asyncEvents.get(tupleKey) == null) {
-              managedState.put(tupleTime, tupleKey, new Slice(new byte[0]));
+              putManagedState(tuple);
               asyncEvents.put(tupleKey, tupleTime);
               processUnique(tuple);
             } else {
@@ -339,6 +340,10 @@ public abstract class AbstractDeduper<T>
     managedState.endWindow();
   }
 
+  protected abstract Future<Slice> getAsyncManagedState(T tuple);
+
+  protected abstract void putManagedState(T tuple);
+
   /**
    * Records a decision for use later. This is needed to ensure that the order of incoming
tuples is maintained.
    *

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cc62a5eb/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
new file mode 100644
index 0000000..e71762e
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Arrays;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An implementation for {@link AbstractDeduper} which handles the case of bounded data set.
+ * This implementation assumes that the incoming tuple does not have a time field, and the
de-duplication
+ * is to be strictly based on the key of the tuple.
+ * 
+ * This implementation uses {@link ManagedTimeStateImpl} for storing the tuple keys on the
persistent storage.
+ * 
+ * Following properties need to be configured for the functioning of the operator:
+ * 1. {@link #keyExpression}: The java expression to extract the key fields in the incoming
tuple (POJO)
+ * 2. {@link #numBuckets} (optional): The number of buckets that need to be used for storing
the keys of the 
+ * incoming tuples.
+ * NOTE: Users can decide upon the proper value for this parameter by guessing the number
of distinct keys 
+ * in the application. A appropriate value would be sqrt(num distinct keys). In case, the
number of distinct keys is a
+ * huge number, leave it blank so that the default value of 46340 will be used. The rationale
for using this number is
+ * that sqrt(max integer) = 46340. This implies that the number of buckets used will roughly
be equal to the size of 
+ * each bucket, thus spreading the load equally among each bucket.
+ *
+ */
+@Evolving
+public class BoundedDedupOperator extends AbstractDeduper<Object>
+{
+  private static final long DEFAULT_CONSTANT_TIME = 0;
+  private static final int DEFAULT_NUM_BUCKETS = 46340;
+
+  // Required properties
+  @NotNull
+  private String keyExpression;
+
+  //Optional, but recommended to be provided by user
+  private int numBuckets = DEFAULT_NUM_BUCKETS;
+
+  private transient Class<?> pojoClass;
+  private transient Getter<Object, Object> keyGetter;
+  private transient StreamCodec<Object> streamCodec;
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+  {
+    @Override
+    public void setup(PortContext context)
+    {
+      pojoClass = context.getAttributes().get(PortContext.TUPLE_CLASS);
+      streamCodec = getDeduperStreamCodec();
+    }
+
+    @Override
+    public void process(Object tuple)
+    {
+      processTuple(tuple);
+    }
+
+    @Override
+    public StreamCodec<Object> getStreamCodec()
+    {
+      return streamCodec;
+    }
+  };
+
+  public BoundedDedupOperator()
+  {
+    managedState = new ManagedTimeStateImpl();
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    if (numBuckets == 0) {
+      numBuckets = DEFAULT_NUM_BUCKETS;
+    }
+    ((ManagedTimeStateImpl)managedState).setNumBuckets(numBuckets);
+    TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+    managedState.setTimeBucketAssigner(timeBucketAssigner);
+    super.setup(context);
+  }
+
+  @Override
+  public void activate(Context context)
+  {
+    keyGetter = PojoUtils.createGetter(pojoClass, keyExpression, Object.class);
+  }
+
+  @Override
+  public void deactivate()
+  {
+  }
+
+  @Override
+  protected long getTime(Object tuple)
+  {
+    return DEFAULT_CONSTANT_TIME;
+  }
+
+  @Override
+  protected Slice getKey(Object tuple)
+  {
+    Object key = keyGetter.get(tuple);
+    return streamCodec.toByteArray(key);
+  }
+
+  protected StreamCodec<Object> getDeduperStreamCodec()
+  {
+    return new DeduperStreamCodec(keyExpression);
+  }
+
+  @Override
+  protected Future<Slice> getAsyncManagedState(Object tuple)
+  {
+    Slice key = getKey(tuple);
+    Future<Slice> valFuture = ((ManagedTimeStateImpl)managedState).getAsync(getBucketId(key),
key);
+    return valFuture;
+  }
+
+  @Override
+  protected void putManagedState(Object tuple)
+  {
+    Slice key = getKey(tuple);
+    ((ManagedTimeStateImpl)managedState).put(getBucketId(key), DEFAULT_CONSTANT_TIME,
+        key, new Slice(new byte[0]));
+  }
+
+  protected int getBucketId(Slice key)
+  {
+    return Arrays.hashCode(key.buffer) % numBuckets;
+  }
+
+  /**
+   * Returns the key expression
+   * @return key expression
+   */
+  public String getKeyExpression()
+  {
+    return keyExpression;
+  }
+
+  /**
+   * Sets the key expression for the fields used for de-duplication
+   * @param keyExpression the expression
+   */
+  public void setKeyExpression(String keyExpression)
+  {
+    this.keyExpression = keyExpression;
+  }
+
+  /**
+   * Returns the number of buckets
+   * @return number of buckets
+   */
+  public int getNumBuckets()
+  {
+    return numBuckets;
+  }
+
+  /**
+   * Sets the number of buckets
+   * NOTE: Users can decide upon the proper value for this parameter by guessing the number
of distinct keys 
+   * in the application. A appropriate value would be sqrt(num distinct keys). In case, the
number of distinct keys is a
+   * huge number, leave it blank so that the default value of 46340 will be used. The rationale
for using this number is
+   * that sqrt(max integer) = 46340. This implies that the number of buckets used will roughly
be equal to the size of 
+   * each bucket, thus spreading the load equally among each bucket.
+   * @param numBuckets the number of buckets
+   */
+  public void setNumBuckets(int numBuckets)
+  {
+    this.numBuckets = numBuckets;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cc62a5eb/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
index 6aebe6b..225c8a3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
@@ -18,11 +18,13 @@
  */
 package org.apache.apex.malhar.lib.dedup;
 
+import java.util.concurrent.Future;
+
 import javax.validation.constraints.NotNull;
 
 import org.joda.time.Duration;
 import org.joda.time.Instant;
-
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
 import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 
@@ -95,6 +97,13 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object>
implements A
 
   private transient Getter<Object, Object> keyGetter;
 
+  private transient StreamCodec<Object> streamCodec;
+
+  public TimeBasedDedupOperator()
+  {
+    managedState = new ManagedTimeUnifiedStateImpl();
+  }
+
   @InputPortFieldAnnotation(schemaRequired = true)
   public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
   {
@@ -102,6 +111,7 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object>
implements A
     public void setup(PortContext context)
     {
       pojoClass = context.getAttributes().get(PortContext.TUPLE_CLASS);
+      streamCodec = getDeduperStreamCodec();
     }
 
     @Override
@@ -113,7 +123,7 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object>
implements A
     @Override
     public StreamCodec<Object> getStreamCodec()
     {
-      return getDeduperStreamCodec();
+      return streamCodec;
     }
   };
 
@@ -130,7 +140,7 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object>
implements A
   protected Slice getKey(Object tuple)
   {
     Object key = keyGetter.get(tuple);
-    return new Slice(key.toString().getBytes());
+    return streamCodec.toByteArray(key);
   }
 
   protected StreamCodec<Object> getDeduperStreamCodec()
@@ -165,6 +175,21 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object>
implements A
   {
   }
 
+  @Override
+  protected Future<Slice> getAsyncManagedState(Object tuple)
+  {
+    Future<Slice> valFuture = ((ManagedTimeUnifiedStateImpl)managedState).getAsync(getTime(tuple),
+        getKey(tuple));
+    return valFuture;
+  }
+
+  @Override
+  protected void putManagedState(Object tuple)
+  {
+    ((ManagedTimeUnifiedStateImpl)managedState).put(getTime(tuple), getKey(tuple), new Slice(new
byte[0]));
+  }
+
+
   public String getKeyExpression()
   {
     return keyExpression;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cc62a5eb/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperBoundedPOJOImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperBoundedPOJOImplTest.java
b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperBoundedPOJOImplTest.java
new file mode 100644
index 0000000..448e76f
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperBoundedPOJOImplTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Random;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.stram.engine.PortContext;
+
+public class DeduperBoundedPOJOImplTest
+{
+  private static String applicationPath;
+  private static final String APPLICATION_PATH_PREFIX = "target/DeduperBoundedPOJOImplTest";
+  private static final String APP_ID = "DeduperBoundedPOJOImplTest";
+  private static final int OPERATOR_ID = 0;
+  private static BoundedDedupOperator deduper;
+  private static final int NUM_BUCKETS = 10;
+
+  @Before
+  public void setup()
+  {
+    applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX);
+    deduper = new BoundedDedupOperator();
+    deduper.setKeyExpression("key");
+    deduper.setNumBuckets(NUM_BUCKETS);
+  }
+
+  @Test
+  public void testDedup()
+  {
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes =
+        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_ID, APP_ID);
+    attributes.put(DAG.APPLICATION_PATH, applicationPath);
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, TestPojo.class);
+    OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID,
attributes);
+    deduper.setup(context);
+    deduper.input.setup(new PortContext(attributes, context));
+    deduper.activate(context);
+    CollectorTestSink<TestPojo> uniqueSink = new CollectorTestSink<TestPojo>();
+    TestUtils.setSink(deduper.unique, uniqueSink);
+    CollectorTestSink<TestPojo> duplicateSink = new CollectorTestSink<TestPojo>();
+    TestUtils.setSink(deduper.duplicate, duplicateSink);
+    CollectorTestSink<TestPojo> expiredSink = new CollectorTestSink<TestPojo>();
+    TestUtils.setSink(deduper.expired, expiredSink);
+
+    deduper.beginWindow(0);
+    Random r = new Random();
+    int k = 1;
+    for (int i = 1; i <= 1000; i++) {
+      TestPojo pojo = new TestPojo(i, new Date(), k++);
+      deduper.input.process(pojo);
+      if (i % 10 == 0) {
+        int dupId = r.nextInt(i);
+        TestPojo pojoDuplicate = new TestPojo(dupId == 0 ? 1 : dupId, new Date(), k++);
+        deduper.input.process(pojoDuplicate);
+      }
+    }
+    deduper.handleIdleTime();
+    deduper.endWindow();
+
+    Assert.assertTrue(uniqueSink.collectedTuples.size() == 1000);
+    Assert.assertTrue(duplicateSink.collectedTuples.size() == 100);
+
+    deduper.teardown();
+  }
+
+  @After
+  public void teardown()
+  {
+    Path root = new Path(applicationPath);
+    try {
+      FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration());
+      fs.delete(root, true);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}


Mime
View raw message