beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [2/2] incubator-beam git commit: [BEAM-13] Add JmsIO
Date Wed, 27 Jul 2016 20:49:48 GMT
[BEAM-13] Add JmsIO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2cf75568
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2cf75568
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2cf75568

Branch: refs/heads/master
Commit: 2cf755686f2518ed9575ec5c087884be0e6ea678
Parents: 76928d3
Author: Jean-Baptiste Onofré <jbonofre@apache.org>
Authored: Thu May 5 19:14:37 2016 +0200
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed Jul 27 13:49:28 2016 -0700

----------------------------------------------------------------------
 sdks/java/io/jms/pom.xml                        | 136 +++++
 .../beam/sdk/io/jms/JmsCheckpointMark.java      |  82 +++
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  | 518 +++++++++++++++++++
 .../org/apache/beam/sdk/io/jms/JmsRecord.java   | 153 ++++++
 .../apache/beam/sdk/io/jms/package-info.java    |  22 +
 .../org/apache/beam/sdk/io/jms/JmsIOTest.java   | 145 ++++++
 sdks/java/io/pom.xml                            |   1 +
 7 files changed, 1057 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cf75568/sdks/java/io/jms/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml
new file mode 100644
index 0000000..e0e0f36
--- /dev/null
+++ b/sdks/java/io/jms/pom.xml
@@ -0,0 +1,136 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>0.2.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>jms</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: JMS</name>
+  <description>IO to read and write to JMS (Java Messaging Service)
+    destinations (queues and topics). </description>
+
+  <properties>
+    <activemq.version>5.13.1</activemq.version>
+    <geronimo-jms.version>1.1.1</geronimo-jms.version>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-jms_1.1_spec</artifactId>
+      <version>${geronimo-jms.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>annotations</artifactId>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <version>${activemq.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-kahadb-store</artifactId>
+      <version>${activemq.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-client</artifactId>
+      <version>${activemq.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cf75568/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
new file mode 100644
index 0000000..81c2b82
--- /dev/null
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+import org.joda.time.Instant;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Message;
+
+/**
+ * Checkpoint for an unbounded JmsIO.Read. Consists of
+ * JMS destination name, and the latest message ID consumed so far.
+ */
+@DefaultCoder(AvroCoder.class)
+public class JmsCheckpointMark implements UnboundedSource.CheckpointMark {
+
+  private final List<Message> messages = new ArrayList<>();
+  private Instant oldestPendingTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  public JmsCheckpointMark() {
+  }
+
+  protected List<Message> getMessages() {
+    return this.messages;
+  }
+
+  protected void addMessage(Message message) throws Exception {
+    Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
+    if (currentMessageTimestamp.isBefore(oldestPendingTimestamp)) {
+      oldestPendingTimestamp = currentMessageTimestamp;
+    }
+    messages.add(message);
+  }
+
+  protected Instant getOldestPendingTimestamp() {
+    return oldestPendingTimestamp;
+  }
+
+  /**
+   * Acknowledge all outstanding message. Since we believe that messages will be delivered
in
+   * timestamp order, and acknowledged messages will not be retried, the newest message in
this
+   * batch is a good bound for future messages.
+   */
+  @Override
+  public void finalizeCheckpoint() {
+    for (Message message : messages) {
+      try {
+        message.acknowledge();
+        Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
+        if (currentMessageTimestamp.isAfter(oldestPendingTimestamp)) {
+          oldestPendingTimestamp = currentMessageTimestamp;
+        }
+      } catch (Exception e) {
+        // nothing to do
+      }
+    }
+    messages.clear();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cf75568/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
new file mode 100644
index 0000000..2de933c
--- /dev/null
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.Read.Unbounded;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * An unbounded source for JMS destinations (queues or topics).
+ *
+ * <h3>Reading from a JMS destination</h3>
+ *
+ * <p>JmsIO source returns unbounded collection of JMS records as {@code PCollection<JmsRecord<T>>}.
+ * A {@link JmsRecord} includes JMS headers and properties, along with the JMS message payload.</p>
+ *
+ * <p>To configure a JMS source, you have to provide a {@link javax.jms.ConnectionFactory}
+ * and the destination (queue or topic) where to consume. The following example
+ * illustrates various options for configuring the source:</p>
+ *
+ * <pre>{@code
+ *
+ * pipeline.apply(JmsIO.read()
+ *    .withConnectionFactory(myConnectionFactory)
+ *    .withQueue("my-queue")
+ *    // above two are required configuration, returns PCollection<JmsRecord<byte[]>>
+ *
+ *    // rest of the settings are optional
+ *
+ * }</pre>
+ *
+ * <h3>Writing to a JMS destination</h3>
+ *
+ * JmsIO sink supports writing text messages to a JMS destination on a broker.
+ * To configure a JMS sink, you must specify a {@link javax.jms.ConnectionFactory} and a
+ * {@link javax.jms.Destination} name.
+ * For instance:
+ *
+ * <pre>{@code
+ *
+ *  pipeline
+ *    .apply(...) // returns PCollection<String>
+ *    .apply(JmsIO.write()
+ *        .withConnectionFactory(myConnectionFactory)
+ *        .withQueue("my-queue")
+ *
+ * }</pre>
+ */
+public class JmsIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);
+
+  public static Read read() {
+    return new Read();
+  }
+
+  public static Write write() {
+    return new Write();
+  }
+
+  /**
+   * A {@link PTransform} to read from a JMS destination. See {@link JmsIO} for more
+   * information on usage and configuration.
+   */
+  public static class Read extends PTransform<PBegin, PCollection<JmsRecord>>
{
+
+    public Read withConnectionFactory(ConnectionFactory connectionFactory) {
+      return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+    }
+
+    public Read withQueue(String queue) {
+      return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+    }
+
+    public Read withTopic(String topic) {
+      return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+    }
+
+    public Read withMaxNumRecords(long maxNumRecords) {
+      return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+    }
+
+    public Read withMaxReadTime(Duration maxReadTime) {
+      return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
+    }
+
+    @Override
+    public PCollection<JmsRecord> apply(PBegin input) {
+      // handles unbounded source to bounded conversion if maxNumRecords is set.
+      Unbounded<JmsRecord> unbounded = org.apache.beam.sdk.io.Read.from(createSource());
+
+      PTransform<PInput, PCollection<JmsRecord>> transform = unbounded;
+
+      if (maxNumRecords != Long.MAX_VALUE) {
+        transform = unbounded.withMaxNumRecords(maxNumRecords);
+      } else if (maxReadTime != null) {
+        transform = unbounded.withMaxReadTime(maxReadTime);
+      }
+
+      return input.getPipeline().apply(transform);
+    }
+
+    @Override
+    public void validate(PBegin input) {
+      checkNotNull(connectionFactory, "ConnectionFactory not specified");
+      checkArgument((queue != null || topic != null), "Either queue or topic not specified");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      builder.addIfNotNull(DisplayData.item("queue", queue));
+      builder.addIfNotNull(DisplayData.item("topic", topic));
+
+    }
+
+    ///////////////////////////////////////////////////////////////////////////////////////
+
+    /**
+     * NB: According to http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html
+     * "It is expected that JMS providers will provide the tools an administrator needs to
create
+     * and configure administered objects in a JNDI namespace. JMS provider implementations
of
+     * administered objects should be both javax.jndi.Referenceable and java.io.Serializable
so
+     * that they can be stored in all JNDI naming contexts. In addition, it is recommended
that
+     * these implementations follow the JavaBeansTM design patterns."
+     *
+     * So, a {@link ConnectionFactory} implementation is serializable.
+     */
+    protected ConnectionFactory connectionFactory;
+    @Nullable
+    protected String queue;
+    @Nullable
+    protected String topic;
+    protected long maxNumRecords;
+    protected Duration maxReadTime;
+
+    private Read() {}
+
+    private Read(
+        ConnectionFactory connectionFactory,
+        String queue,
+        String topic,
+        long maxNumRecords,
+        Duration maxReadTime) {
+      super("JmsIO.Read");
+
+      this.connectionFactory = connectionFactory;
+      this.queue = queue;
+      this.topic = topic;
+      this.maxNumRecords = maxNumRecords;
+      this.maxReadTime = maxReadTime;
+    }
+
+    /**
+     * Creates an {@link UnboundedSource<JmsRecord, ?>} with the configuration in
+     * {@link Read}. Primary use case is unit tests, should not be used in an
+     * application.
+     */
+    @VisibleForTesting
+    UnboundedSource<JmsRecord, JmsCheckpointMark> createSource() {
+      return new UnboundedJmsSource(
+          connectionFactory,
+          queue,
+          topic);
+    }
+
+  }
+
+  private JmsIO() {}
+
+  private static class UnboundedJmsSource extends UnboundedSource<JmsRecord, JmsCheckpointMark>
{
+
+    private final ConnectionFactory connectionFactory;
+    private final String queue;
+    private final String topic;
+
+    public UnboundedJmsSource(
+        ConnectionFactory connectionFactory,
+        String queue,
+        String topic) {
+      this.connectionFactory = connectionFactory;
+      this.queue = queue;
+      this.topic = topic;
+    }
+
+    @Override
+    public List<UnboundedJmsSource> generateInitialSplits(
+        int desiredNumSplits, PipelineOptions options) throws Exception {
+      List<UnboundedJmsSource> sources = new ArrayList<>();
+      for (int i = 0; i < desiredNumSplits; i++) {
+        sources.add(new UnboundedJmsSource(connectionFactory, queue, topic));
+      }
+      return sources;
+    }
+
+    @Override
+    public UnboundedJmsReader createReader(PipelineOptions options,
+                                           JmsCheckpointMark checkpointMark) {
+      return new UnboundedJmsReader(this, checkpointMark);
+    }
+
+    @Override
+    public void validate() {
+      checkNotNull(connectionFactory, "ConnectionFactory is not defined");
+      checkArgument((queue != null || topic != null), "Either queue or topic is not defined");
+    }
+
+    @Override
+    public Coder getCheckpointMarkCoder() {
+      return AvroCoder.of(JmsCheckpointMark.class);
+    }
+
+    @Override
+    public Coder<JmsRecord> getDefaultOutputCoder() {
+      return SerializableCoder.of(JmsRecord.class);
+    }
+
+  }
+
+  private static class UnboundedJmsReader extends UnboundedReader<JmsRecord> {
+
+    private UnboundedJmsSource source;
+    private JmsCheckpointMark checkpointMark;
+    private Connection connection;
+    private Session session;
+    private MessageConsumer consumer;
+
+    private JmsRecord currentRecord;
+    private Instant currentTimestamp;
+
+    public UnboundedJmsReader(
+        UnboundedJmsSource source,
+        JmsCheckpointMark checkpointMark) {
+      this.source = source;
+      if (checkpointMark != null) {
+        this.checkpointMark = checkpointMark;
+      } else {
+        this.checkpointMark = new JmsCheckpointMark();
+      }
+      this.currentRecord = null;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      ConnectionFactory connectionFactory = source.connectionFactory;
+      try {
+        this.connection = connectionFactory.createConnection();
+        this.connection.start();
+        this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        if (source.topic != null) {
+          this.consumer = this.session.createConsumer(this.session.createTopic(source.topic));
+        } else {
+          this.consumer = this.session.createConsumer(this.session.createQueue(source.queue));
+        }
+
+        return advance();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      try {
+        TextMessage message = (TextMessage) this.consumer.receiveNoWait();
+
+        if (message == null) {
+          currentRecord = null;
+          return false;
+        }
+
+        Map<String, Object> properties = new HashMap<>();
+        Enumeration propertyNames = message.getPropertyNames();
+        while (propertyNames.hasMoreElements()) {
+          String propertyName = (String) propertyNames.nextElement();
+          properties.put(propertyName, message.getObjectProperty(propertyName));
+        }
+
+        JmsRecord jmsRecord = new JmsRecord(
+            message.getJMSMessageID(),
+            message.getJMSTimestamp(),
+            message.getJMSCorrelationID(),
+            message.getJMSReplyTo(),
+            message.getJMSDestination(),
+            message.getJMSDeliveryMode(),
+            message.getJMSRedelivered(),
+            message.getJMSType(),
+            message.getJMSExpiration(),
+            message.getJMSPriority(),
+            properties,
+            message.getText());
+
+        checkpointMark.addMessage(message);
+
+        currentRecord = jmsRecord;
+        currentTimestamp = new Instant(message.getJMSTimestamp());
+
+        return true;
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public JmsRecord getCurrent() throws NoSuchElementException {
+      if (currentRecord == null) {
+        throw new NoSuchElementException();
+      }
+      return currentRecord;
+    }
+
+    @Override
+    public Instant getWatermark() {
+      return checkpointMark.getOldestPendingTimestamp();
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() {
+      if (currentRecord == null) {
+        throw new NoSuchElementException();
+      }
+      return currentTimestamp;
+    }
+
+    @Override
+    public CheckpointMark getCheckpointMark() {
+      return checkpointMark;
+    }
+
+    @Override
+    public UnboundedSource<JmsRecord, ?> getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        if (consumer != null) {
+          consumer.close();
+          consumer = null;
+        }
+        if (session != null) {
+          session.close();
+          session = null;
+        }
+        if (connection != null) {
+          connection.stop();
+          connection.close();
+          connection = null;
+        }
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+  }
+
+  /**
+   * A {@link PTransform} to write to a JMS queue. See {@link JmsIO} for
+   * more information on usage and configuration.
+   */
+  public static class Write extends PTransform<PCollection<String>, PDone> {
+
+    protected ConnectionFactory connectionFactory;
+    protected String queue;
+    protected String topic;
+
+    public Write withConnectionFactory(ConnectionFactory connectionFactory) {
+      return new Write(connectionFactory, queue, topic);
+    }
+
+    public Write withQueue(String queue) {
+      return new Write(connectionFactory, queue, topic);
+    }
+
+    public Write withTopic(String topic) {
+      return new Write(connectionFactory, queue, topic);
+    }
+
+    private Write() {}
+
+    private Write(ConnectionFactory connectionFactory, String queue, String topic) {
+      this.connectionFactory = connectionFactory;
+      this.queue = queue;
+      this.topic = topic;
+    }
+
+    @Override
+    public PDone apply(PCollection<String> input) {
+      input.apply(ParDo.of(new JmsWriter(connectionFactory, queue, topic)));
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public void validate(PCollection<String> input) {
+      checkNotNull(connectionFactory, "ConnectionFactory is not defined");
+      checkArgument((queue != null || topic != null), "Either queue or topic is required");
+    }
+
+    private static class JmsWriter extends DoFn<String, Void> {
+
+      private ConnectionFactory connectionFactory;
+      private String queue;
+      private String topic;
+
+      private Connection connection;
+      private Session session;
+      private MessageProducer producer;
+
+      public JmsWriter(ConnectionFactory connectionFactory, String queue, String topic) {
+        this.connectionFactory = connectionFactory;
+        this.queue = queue;
+        this.topic = topic;
+      }
+
+      @Override
+      public void startBundle(Context c) throws Exception {
+        if (producer == null) {
+          this.connection = connectionFactory.createConnection();
+          this.connection.start();
+          /**
+           * false means we don't use JMS transaction.
+           */
+          this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+          Destination destination;
+          if (queue != null) {
+            destination = session.createQueue(queue);
+          } else {
+            destination = session.createTopic(topic);
+          }
+          this.producer = this.session.createProducer(destination);
+        }
+      }
+
+      @Override
+      public void processElement(ProcessContext ctx) throws Exception {
+        String value = ctx.element();
+
+        try {
+          TextMessage message = session.createTextMessage(value);
+          producer.send(message);
+        } catch (Exception t) {
+          finishBundle(null);
+          throw t;
+        }
+      }
+
+      @Override
+      public void finishBundle(Context c) throws Exception {
+        producer.close();
+        producer = null;
+        session.close();
+        session = null;
+        connection.stop();
+        connection.close();
+        connection = null;
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cf75568/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java
new file mode 100644
index 0000000..aa0c472
--- /dev/null
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.jms.Destination;
+
+/**
+ * JmsRecord contains message payload of the record
+ * as well as metadata (JMS headers and properties).
+ */
+public class JmsRecord implements Serializable {
+
+  private final String jmsMessageID;
+  private final long jmsTimestamp;
+  private final String jmsCorrelationID;
+  private final Destination jmsReplyTo;
+  private final Destination jmsDestination;
+  private final int jmsDeliveryMode;
+  private final boolean jmsRedelivered;
+  private final String jmsType;
+  private final long jmsExpiration;
+  private final int jmsPriority;
+  private final Map<String, Object> properties;
+  private final String text;
+
+  public JmsRecord(
+      String jmsMessageID,
+      long jmsTimestamp,
+      String jmsCorrelationID,
+      Destination jmsReplyTo,
+      Destination jmsDestination,
+      int jmsDeliveryMode,
+      boolean jmsRedelivered,
+      String jmsType,
+      long jmsExpiration,
+      int jmsPriority,
+      Map<String, Object> properties,
+      String text) {
+    this.jmsMessageID = jmsMessageID;
+    this.jmsTimestamp = jmsTimestamp;
+    this.jmsCorrelationID = jmsCorrelationID;
+    this.jmsReplyTo = jmsReplyTo;
+    this.jmsDestination = jmsDestination;
+    this.jmsDeliveryMode = jmsDeliveryMode;
+    this.jmsRedelivered = jmsRedelivered;
+    this.jmsType = jmsType;
+    this.jmsExpiration = jmsExpiration;
+    this.jmsPriority = jmsPriority;
+    this.properties = properties;
+    this.text = text;
+  }
+
+  public String getJmsMessageID() {
+    return jmsMessageID;
+  }
+
+  public long getJmsTimestamp() {
+    return jmsTimestamp;
+  }
+
+  public String getJmsCorrelationID() {
+    return jmsCorrelationID;
+  }
+
+  public Destination getJmsReplyTo() {
+    return jmsReplyTo;
+  }
+
+  public Destination getJmsDestination() {
+    return jmsDestination;
+  }
+
+  public int getJmsDeliveryMode() {
+    return jmsDeliveryMode;
+  }
+
+  public boolean getJmsRedelivered() {
+    return jmsRedelivered;
+  }
+
+  public String getJmsType() {
+    return jmsType;
+  }
+
+  public long getJmsExpiration() {
+    return jmsExpiration;
+  }
+
+  public int getJmsPriority() {
+    return jmsPriority;
+  }
+
+  public Map<String, Object> getProperties() {
+    return this.properties;
+  }
+
+  public String getPayload() {
+    return this.text;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(jmsMessageID,
+        jmsTimestamp,
+        jmsCorrelationID,
+        jmsReplyTo,
+        jmsDestination,
+        jmsDeliveryMode,
+        jmsRedelivered,
+        jmsType,
+        jmsExpiration,
+        jmsPriority,
+        properties,
+        text);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof JmsRecord) {
+      JmsRecord other = (JmsRecord) obj;
+      return jmsMessageID.equals(other.jmsMessageID)
+          && jmsDestination.equals(other.jmsDestination)
+          && jmsDeliveryMode == other.jmsDeliveryMode
+          && jmsRedelivered == other.jmsRedelivered
+          && jmsExpiration == other.jmsExpiration
+          && jmsPriority == other.jmsPriority
+          && properties.equals(other.properties)
+          && text.equals(other.text);
+    } else {
+      return false;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cf75568/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/package-info.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/package-info.java
new file mode 100644
index 0000000..3845b07
--- /dev/null
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from JMS (Java Messaging Service).
+ */
+package org.apache.beam.sdk.io.jms;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cf75568/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
new file mode 100644
index 0000000..020794c
--- /dev/null
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ArrayList;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * Tests of {@link JmsIO}.
+ */
+@RunWith(JUnit4.class)
+public class JmsIOTest {
+
+
+  private static final String BROKER_URL = "vm://localhost";
+
+  private BrokerService broker;
+  private ConnectionFactory connectionFactory;
+
+  @Before
+  public void startBroker() throws Exception {
+    broker = new BrokerService();
+    broker.setUseJmx(false);
+    broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
+    broker.addConnector(BROKER_URL);
+    broker.setBrokerName("localhost");
+    broker.start();
+
+    // create JMS connection factory
+    connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
+  }
+
+  @After
+  public void stopBroker() throws Exception {
+    broker.stop();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadMessages() throws Exception {
+
+    // produce message
+    Connection connection = connectionFactory.createConnection();
+    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    MessageProducer producer = session.createProducer(session.createQueue("test"));
+    TextMessage message = session.createTextMessage("This Is A Test");
+    producer.send(message);
+    producer.send(message);
+    producer.send(message);
+    producer.send(message);
+    producer.send(message);
+    producer.send(message);
+    producer.close();
+    session.close();
+    connection.close();
+
+    Pipeline pipeline = TestPipeline.create();
+
+    // read from the queue
+    PCollection<JmsRecord> output = pipeline.apply(
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withQueue("test")
+            .withMaxNumRecords(5));
+
+    PAssert
+        .thatSingleton(output.apply("Count", Count.<JmsRecord>globally()))
+        .isEqualTo(new Long(5));
+    pipeline.run();
+
+    connection = connectionFactory.createConnection();
+    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    MessageConsumer consumer = session.createConsumer(session.createQueue("test"));
+    Message msg = consumer.receiveNoWait();
+    Assert.assertNull(msg);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteMessage() throws Exception {
+
+    Pipeline pipeline = TestPipeline.create();
+
+    ArrayList<String> data = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      data.add("Message " + i);
+    }
+    pipeline.apply(Create.of(data))
+        .apply(JmsIO.write().withConnectionFactory(connectionFactory).withQueue("test"));
+
+    pipeline.run();
+
+    Connection connection = connectionFactory.createConnection();
+    connection.start();
+    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    MessageConsumer consumer = session.createConsumer(session.createQueue("test"));
+    int count = 0;
+    while (consumer.receive(1000) != null) {
+      count++;
+    }
+    Assert.assertEquals(100, count);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cf75568/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index e55f08a..fce1f65 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -35,6 +35,7 @@
   <modules>
     <module>google-cloud-platform</module>
     <module>hdfs</module>
+    <module>jms</module>
     <module>kafka</module>
   </modules>
 



Mime
View raw message