beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: [BEAM-1127] Create an unique source when using a JMS topic to avoid elements duplication
Date Fri, 03 Feb 2017 11:28:40 GMT
Repository: beam
Updated Branches:
  refs/heads/master 8ee3572b4 -> cef31093f


[BEAM-1127] Create an unique source when using a JMS topic to avoid elements duplication


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eed4efc5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eed4efc5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eed4efc5

Branch: refs/heads/master
Commit: eed4efc537e5214d037665561c0e901931929c29
Parents: 8ee3572
Author: Jean-Baptiste Onofré <jbonofre@apache.org>
Authored: Thu Feb 2 16:17:25 2017 +0100
Committer: Dan Halperin <dhalperi@google.com>
Committed: Fri Feb 3 03:28:33 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  | 15 +++++++-
 .../org/apache/beam/sdk/io/jms/JmsIOTest.java   | 39 ++++++++++++++++++--
 2 files changed, 49 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/eed4efc5/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index c1f1cb4..270fe31 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -325,7 +325,11 @@ public class JmsIO {
 
   private JmsIO() {}
 
-  private static class UnboundedJmsSource extends UnboundedSource<JmsRecord, JmsCheckpointMark>
{
+  /**
+   * An unbounded JMS source.
+   */
+  @VisibleForTesting
+  protected static class UnboundedJmsSource extends UnboundedSource<JmsRecord, JmsCheckpointMark>
{
 
     private final Read spec;
 
@@ -337,8 +341,15 @@ public class JmsIO {
     public List<UnboundedJmsSource> generateInitialSplits(
         int desiredNumSplits, PipelineOptions options) throws Exception {
       List<UnboundedJmsSource> sources = new ArrayList<>();
-      for (int i = 0; i < desiredNumSplits; i++) {
+      if (spec.getTopic() != null) {
+        // in the case of a topic, we create a single source, so an unique subscriber, to
avoid
+        // element duplication
         sources.add(new UnboundedJmsSource(spec));
+      } else {
+        // in the case of a queue, we allow concurrent consumers
+        for (int i = 0; i < desiredNumSplits; i++) {
+          sources.add(new UnboundedJmsSource(spec));
+        }
       }
       return sources;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/eed4efc5/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index c756cd0..a06bba3 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.io.jms;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -33,6 +36,8 @@ import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.security.AuthenticationUser;
 import org.apache.activemq.security.SimpleAuthenticationPlugin;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -40,7 +45,6 @@ import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -60,6 +64,7 @@ public class JmsIOTest {
   private static final String USERNAME = "test_user";
   private static final String PASSWORD = "test_password";
   private static final String QUEUE = "test_queue";
+  private static final String TOPIC = "test_topic";
 
   private BrokerService broker;
   private ConnectionFactory connectionFactory;
@@ -167,7 +172,7 @@ public class JmsIOTest {
     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE));
     Message msg = consumer.receiveNoWait();
-    Assert.assertNull(msg);
+    assertNull(msg);
   }
 
   @Test
@@ -195,7 +200,35 @@ public class JmsIOTest {
     while (consumer.receive(1000) != null) {
       count++;
     }
-    Assert.assertEquals(100, count);
+    assertEquals(100, count);
+  }
+
+  @Test
+  public void testSplitForQueue() throws Exception {
+    JmsIO.Read read = JmsIO.read().withQueue(QUEUE);
+    PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
+    int desiredNumSplits = 5;
+    JmsIO.UnboundedJmsSource initialSource = new JmsIO.UnboundedJmsSource(read);
+    List<JmsIO.UnboundedJmsSource> splits = initialSource.generateInitialSplits(desiredNumSplits,
+        pipelineOptions);
+    // in the case of a queue, we have concurrent consumers by default, so the initial number
+    // splits is equal to the desired number of splits
+    assertEquals(desiredNumSplits, splits.size());
+  }
+
+  @Test
+  public void testSplitForTopic() throws Exception {
+    JmsIO.Read read = JmsIO.read().withTopic(TOPIC);
+    PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
+    int desiredNumSplits = 5;
+    JmsIO.UnboundedJmsSource initialSource = new JmsIO.UnboundedJmsSource(read);
+    List<JmsIO.UnboundedJmsSource> splits = initialSource.generateInitialSplits(desiredNumSplits,
+        pipelineOptions);
+    // in the case of a topic, we can have only an unique subscriber on the topic per pipeline
+    // else it means we can have duplicate messages (all subscribers on the topic receive
every
+    // message).
+    // So, whatever the desizedNumSplits is, the actual number of splits should be 1.
+    assertEquals(1, splits.size());
   }
 
 }


Mime
View raw message