beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] beam git commit: [BEAM-1237] Create AmqpIO
Date Tue, 27 Jun 2017 13:33:57 GMT
Repository: beam
Updated Branches:
  refs/heads/master 8036001da -> 9df865add


[BEAM-1237] Create AmqpIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cbfcad82
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cbfcad82
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cbfcad82

Branch: refs/heads/master
Commit: cbfcad823972270171552e556b7fa8d8d4882f14
Parents: 8036001
Author: Jean-Baptiste Onofré <jbonofre@apache.org>
Authored: Wed Dec 28 15:06:09 2016 +0100
Committer: Jean-Baptiste Onofré <jbonofre@apache.org>
Committed: Tue Jun 27 15:10:13 2017 +0200

----------------------------------------------------------------------
 sdks/java/io/amqp/pom.xml                       | 100 +++++
 .../org/apache/beam/sdk/io/amqp/AmqpIO.java     | 397 +++++++++++++++++++
 .../beam/sdk/io/amqp/AmqpMessageCoder.java      |  79 ++++
 .../amqp/AmqpMessageCoderProviderRegistrar.java |  44 ++
 .../apache/beam/sdk/io/amqp/package-info.java   |  22 +
 .../org/apache/beam/sdk/io/amqp/AmqpIOTest.java | 148 +++++++
 .../beam/sdk/io/amqp/AmqpMessageCoderTest.java  |  89 +++++
 sdks/java/io/pom.xml                            |   1 +
 8 files changed, 880 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cbfcad82/sdks/java/io/amqp/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/pom.xml b/sdks/java/io/amqp/pom.xml
new file mode 100644
index 0000000..45b295d
--- /dev/null
+++ b/sdks/java/io/amqp/pom.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>2.1.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-amqp</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: AMQP</name>
+  <description>IO to read and write using AMQP 1.0 protocol (http://www.amqp.org).</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>proton-j</artifactId>
+      <version>0.13.1</version>
+    </dependency>
+
+    <!-- compile dependencies -->
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/cbfcad82/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
new file mode 100644
index 0000000..b9a0be9
--- /dev/null
+++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.amqp;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Joiner;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.messenger.Messenger;
+import org.apache.qpid.proton.messenger.Tracker;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * AmqpIO supports AMQP 1.0 protocol using the Apache QPid Proton-J library.
+ *
+ * <p>It's also possible to use AMQP 1.0 protocol via Apache Qpid JMS connection factory
and the
+ * Apache Beam JmsIO.
+ *
+ * <h3>Binding AMQP and receive messages</h3>
+ *
+ * <p>The {@link AmqpIO} {@link Read} can bind a AMQP listener endpoint and receive
messages. It can
+ * also connect to a AMPQ broker (such as Apache Qpid or Apache ActiveMQ).
+ *
+ * <p>{@link AmqpIO} {@link Read} returns an unbounded {@link PCollection} of {@link
Message}
+ * containing the received messages.
+ *
+ * <p>To configure a AMQP source, you have to provide a list of addresses where it
will receive
+ * messages. An address has the following form: {@code
+ * [amqp[s]://][user[:password]@]domain[/[name]]} where {@code domain} can be one of {@code
+ * host | host:port | ip | ip:port | name}. NB: the {@code ~} character allows to bind
a AMQP
+ * listener instead of connecting to a remote broker. For instance {@code amqp://~0.0.0.0:1234}
+ * will bind a AMQP listener on any network interface on the 1234 port number.
+ *
+ * <p>The following example illustrates how to configure a AMQP source:
+ *
+ * <pre>{@code
+ *
+ *  pipeline.apply(AmqpIO.read()
+ *    .withAddresses(Collections.singletonList("amqp://host:1234")))
+ *
+ * }</pre>
+ *
+ * <h3>Sending messages to a AMQP endpoint</h3>
+ *
+ * <p>{@link AmqpIO} provides a sink to send {@link PCollection} elements as messages.
+ *
+ * <p>As for the {@link Read}, {@link AmqpIO} {@link Write} requires a list of addresses
where to
+ * send messages. The following example illustrates how to configure the {@link AmqpIO}
+ * {@link Write}:
+ *
+ * <pre>{@code
+ *
+ *  pipeline
+ *    .apply(...) // provide PCollection<Message>
+ *    .apply(AmqpIO.write());
+ *
+ * }</pre>
+ */
+public class AmqpIO {
+
+  public static Read read() {
+    return new AutoValue_AmqpIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build();
+  }
+
+  public static Write write() {
+    return new AutoValue_AmqpIO_Write();
+  }
+
+  private AmqpIO() {
+  }
+
+  /**
+   * A {@link PTransform} to read/receive messages using AMQP 1.0 protocol.
+   */
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<Message>>
{
+
+    @Nullable abstract List<String> addresses();
+    abstract long maxNumRecords();
+    @Nullable abstract Duration maxReadTime();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setAddresses(List<String> addresses);
+      abstract Builder setMaxNumRecords(long maxNumRecords);
+      abstract Builder setMaxReadTime(Duration maxReadTime);
+      abstract Read build();
+    }
+
+    /**
+     * Define the AMQP addresses where to receive messages.
+     */
+    public Read withAddresses(List<String> addresses) {
+      checkArgument(addresses != null, "AmqpIO.read().withAddresses(addresses) called with
null"
+          + " addresses");
+      checkArgument(!addresses.isEmpty(), "AmqpIO.read().withAddresses(addresses) called
with "
+          + "empty addresses list");
+      return builder().setAddresses(addresses).build();
+    }
+
+    /**
+     * Define the max number of records received by the {@link Read}.
+     * When the max number of records is lower than {@code Long.MAX_VALUE}, the {@link Read}
will
+     * provide a bounded {@link PCollection}.
+     */
+    public Read withMaxNumRecords(long maxNumRecords) {
+      checkArgument(maxReadTime() == null,
+          "maxNumRecord and maxReadTime are exclusive");
+      return builder().setMaxNumRecords(maxNumRecords).build();
+    }
+
+    /**
+     * Define the max read time (duration) while the {@link Read} will receive messages.
+     * When this max read time is not null, the {@link Read} will provide a bounded
+     * {@link PCollection}.
+     */
+    public Read withMaxReadTime(Duration maxReadTime) {
+      checkArgument(maxNumRecords() == Long.MAX_VALUE,
+          "maxNumRecord and maxReadTime are exclusive");
+      return builder().setMaxReadTime(maxReadTime).build();
+    }
+
+    @Override
+    public void validate(PipelineOptions pipelineOptions) {
+      checkState(addresses() != null, "AmqIO.read() requires addresses list to be set via
"
+          + "withAddresses(addresses)");
+      checkState(!addresses().isEmpty(), "AmqIO.read() requires a non-empty addresses list
to be"
+          + " set via withAddresses(addresses)");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.add(DisplayData.item("addresses", Joiner.on(" ").join(addresses())));
+    }
+
+    @Override
+    public PCollection<Message> expand(PBegin input) {
+      org.apache.beam.sdk.io.Read.Unbounded<Message> unbounded =
+          org.apache.beam.sdk.io.Read.from(new UnboundedAmqpSource(this));
+
+      PTransform<PBegin, PCollection<Message>> transform = unbounded;
+
+      if (maxNumRecords() != Long.MAX_VALUE) {
+        transform = unbounded.withMaxNumRecords(maxNumRecords());
+      } else if (maxReadTime() != null) {
+        transform = unbounded.withMaxReadTime(maxReadTime());
+      }
+
+      return input.getPipeline().apply(transform);
+    }
+
+  }
+
+  private static class AmqpCheckpointMark implements UnboundedSource.CheckpointMark, Serializable
{
+
+    private transient Messenger messenger;
+    private transient List<Tracker> trackers = new ArrayList<>();
+
+    public AmqpCheckpointMark() {
+    }
+
+    @Override
+    public void finalizeCheckpoint() {
+      for (Tracker tracker : trackers) {
+        // flag as not cumulative
+        messenger.accept(tracker, 0);
+      }
+      trackers.clear();
+    }
+
+    // set an empty list to messages when deserialize
+    private void readObject(java.io.ObjectInputStream stream)
+        throws java.io.IOException, ClassNotFoundException {
+      trackers = new ArrayList<>();
+    }
+
+  }
+
+  private static class UnboundedAmqpSource
+      extends UnboundedSource<Message, AmqpCheckpointMark> {
+
+    private final Read spec;
+
+    public UnboundedAmqpSource(Read spec) {
+      this.spec = spec;
+    }
+
+    @Override
+    public List<UnboundedAmqpSource> split(int desiredNumSplits,
+                                                           PipelineOptions pipelineOptions)
{
+      // amqp is a queue system, so, it's possible to have multiple concurrent sources, even
if
+      // they bind the listener
+      List<UnboundedAmqpSource> sources = new ArrayList<>();
+      for (int i = 0; i < Math.max(1, desiredNumSplits); ++i) {
+        sources.add(new UnboundedAmqpSource(spec));
+      }
+      return sources;
+    }
+
+    @Override
+    public UnboundedReader<Message> createReader(PipelineOptions pipelineOptions,
+                                                AmqpCheckpointMark checkpointMark) {
+      return new UnboundedAmqpReader(this, checkpointMark);
+    }
+
+    @Override
+    public Coder<Message> getDefaultOutputCoder() {
+      return new AmqpMessageCoder();
+    }
+
+    @Override
+    public Coder<AmqpCheckpointMark> getCheckpointMarkCoder() {
+      return SerializableCoder.of(AmqpCheckpointMark.class);
+    }
+
+    @Override
+    public void validate() {
+      spec.validate(null);
+    }
+
+  }
+
+  private static class UnboundedAmqpReader extends UnboundedSource.UnboundedReader<Message>
{
+
+    private final UnboundedAmqpSource source;
+
+    private Messenger messenger;
+    private Message current;
+    private Instant currentTimestamp;
+    private Instant watermark = new Instant(Long.MIN_VALUE);
+    private AmqpCheckpointMark checkpointMark;
+
+    public UnboundedAmqpReader(UnboundedAmqpSource source, AmqpCheckpointMark checkpointMark)
{
+      this.source = source;
+      this.current = null;
+      if (checkpointMark != null) {
+        this.checkpointMark = checkpointMark;
+      } else {
+        this.checkpointMark = new AmqpCheckpointMark();
+      }
+    }
+
+    @Override
+    public Instant getWatermark() {
+      return watermark;
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      return currentTimestamp;
+    }
+
+    @Override
+    public Message getCurrent() {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      return current;
+    }
+
+    @Override
+    public UnboundedSource.CheckpointMark getCheckpointMark() {
+      return checkpointMark;
+    }
+
+    @Override
+    public UnboundedAmqpSource getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      Read spec = source.spec;
+      messenger = Messenger.Factory.create();
+      messenger.start();
+      for (String address : spec.addresses()) {
+        messenger.subscribe(address);
+      }
+      checkpointMark.messenger = messenger;
+      return advance();
+    }
+
+    @Override
+    public boolean advance() {
+      messenger.recv();
+      if (messenger.incoming() <= 0) {
+        current = null;
+        return false;
+      }
+      Message message = messenger.get();
+      Tracker tracker = messenger.incomingTracker();
+      checkpointMark.trackers.add(tracker);
+      currentTimestamp = new Instant(message.getCreationTime());
+      watermark = currentTimestamp;
+      current = message;
+      return true;
+    }
+
+    @Override
+    public void close() {
+      if (messenger != null) {
+        messenger.stop();
+      }
+    }
+
+  }
+
+  /**
+   * A {@link PTransform} to send messages using AMQP 1.0 protocol.
+   */
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<Message>, PDone>
{
+
+    @Override
+    public PDone expand(PCollection<Message> input) {
+      input.apply(ParDo.of(new WriteFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriteFn extends DoFn<Message, Void> {
+
+      private final Write spec;
+
+      private transient Messenger messenger;
+
+      public WriteFn(Write spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() throws Exception {
+        messenger = Messenger.Factory.create();
+        messenger.start();
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext processContext) throws Exception {
+        Message message = processContext.element();
+        messenger.put(message);
+        messenger.send();
+      }
+
+      @Teardown
+      public void teardown() throws Exception {
+        if (messenger != null) {
+          messenger.stop();
+        }
+      }
+
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cbfcad82/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java
b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java
new file mode 100644
index 0000000..5a55260
--- /dev/null
+++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.amqp;
+
+import com.google.common.io.ByteStreams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * A coder for AMQP message.
+ */
+public class AmqpMessageCoder extends CustomCoder<Message> {
+
+  private static final int[] MESSAGE_SIZES = new int[]{
+      8 * 1024,
+      64 * 1024,
+      1 * 1024 * 1024,
+      64 * 1024 * 1024
+  };
+
+  static AmqpMessageCoder of() {
+    return new AmqpMessageCoder();
+  }
+
+  @Override
+  public void encode(Message value, OutputStream outStream) throws CoderException, IOException
{
+    for (int maxMessageSize : MESSAGE_SIZES) {
+      try {
+        encode(value, outStream, maxMessageSize);
+        return;
+      } catch (Exception e) {
+        continue;
+      }
+    }
+    throw new CoderException("Message is larger than the max size supported by the coder");
+  }
+
+  private void encode(Message value, OutputStream outStream, int messageSize) throws
+      IOException, BufferOverflowException {
+    byte[] data = new byte[messageSize];
+    int bytesWritten = value.encode(data, 0, data.length);
+    VarInt.encode(bytesWritten, outStream);
+    outStream.write(data, 0, bytesWritten);
+  }
+
+  @Override
+  public Message decode(InputStream inStream) throws CoderException, IOException {
+    Message message = Message.Factory.create();
+    int bytesToRead = VarInt.decodeInt(inStream);
+    byte[] encodedMessage = new byte[bytesToRead];
+    ByteStreams.readFully(inStream, encodedMessage);
+    message.decode(encodedMessage, 0, encodedMessage.length);
+    return message;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cbfcad82/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java
b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java
new file mode 100644
index 0000000..bc3445c
--- /dev/null
+++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.amqp;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * A {@link CoderProviderRegistrar} for standard types used with {@link AmqpIO}.
+ */
+@AutoService(CoderProviderRegistrar.class)
+public class AmqpMessageCoderProviderRegistrar implements CoderProviderRegistrar {
+
+  @Override
+  public List<CoderProvider> getCoderProviders() {
+    return ImmutableList.of(
+        CoderProviders.forCoder(TypeDescriptor.of(Message.class),
+            AmqpMessageCoder.of()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cbfcad82/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java
b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java
new file mode 100644
index 0000000..091f234
--- /dev/null
+++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing using AMQP 1.0 protocol.
+ */
+package org.apache.beam.sdk.io.amqp;

http://git-wip-us.apache.org/repos/asf/beam/blob/cbfcad82/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
new file mode 100644
index 0000000..c8fe4e8
--- /dev/null
+++ b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.amqp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.messenger.Messenger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests on {@link AmqpIO}.
+ */
+@RunWith(JUnit4.class)
+public class AmqpIOTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AmqpIOTest.class);
+
+  private int port;
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Before
+  public void findFreeNetworkPort() throws Exception {
+    LOG.info("Finding free network port");
+    ServerSocket socket = new ServerSocket(0);
+    port = socket.getLocalPort();
+    socket.close();
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    PCollection<Message> output = pipeline.apply(AmqpIO.read()
+        .withMaxNumRecords(100)
+        .withAddresses(Collections.singletonList("amqp://~localhost:" + port)));
+    PAssert.thatSingleton(output.apply(Count.<Message>globally())).isEqualTo(100L);
+
+    Thread sender = new Thread() {
+      public void run() {
+        try {
+          Thread.sleep(500);
+          Messenger sender = Messenger.Factory.create();
+          sender.start();
+          for (int i = 0; i < 100; i++) {
+            Message message = Message.Factory.create();
+            message.setAddress("amqp://localhost:" + port);
+            message.setBody(new AmqpValue("Test " + i));
+            sender.put(message);
+            sender.send();
+          }
+          sender.stop();
+        } catch (Exception e) {
+          LOG.error("Sender error", e);
+        }
+      }
+    };
+    try {
+      sender.start();
+      pipeline.run();
+    } finally {
+      sender.join();
+    }
+  }
+
+  @Test
+  public void testWrite() throws Exception {
+    final List<String> received = new ArrayList<>();
+    Thread receiver = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Messenger messenger = Messenger.Factory.create();
+          messenger.start();
+          messenger.subscribe("amqp://~localhost:" + port);
+          while (received.size() < 100) {
+            messenger.recv();
+            while (messenger.incoming() > 0) {
+              Message message = messenger.get();
+              LOG.info("Received: " + message.getBody().toString());
+              received.add(message.getBody().toString());
+            }
+          }
+          messenger.stop();
+        } catch (Exception e) {
+          LOG.error("Receiver error", e);
+        }
+      }
+    };
+    LOG.info("Starting AMQP receiver");
+    receiver.start();
+
+    List<Message> data = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      Message message = Message.Factory.create();
+      message.setBody(new AmqpValue("Test " + i));
+      message.setAddress("amqp://localhost:" + port);
+      message.setSubject("test");
+      data.add(message);
+    }
+    pipeline.apply(Create.of(data).withCoder(AmqpMessageCoder.of())).apply(AmqpIO.write());
+    LOG.info("Starting pipeline");
+    try {
+      pipeline.run();
+    } finally {
+      LOG.info("Join receiver thread");
+      receiver.join();
+    }
+
+    assertEquals(100, received.size());
+    for (int i = 0; i < 100; i++) {
+      assertTrue(received.contains("AmqpValue{Test " + i + "}"));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cbfcad82/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java
b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java
new file mode 100644
index 0000000..7a8efeb
--- /dev/null
+++ b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.amqp;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.base.Joiner;
+
+import java.util.Collections;
+
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.message.Message;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test on {@link AmqpMessageCoder}.
+ */
+@RunWith(JUnit4.class)
+public class AmqpMessageCoderTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void encodeDecode() throws Exception {
+    Message message = Message.Factory.create();
+    message.setBody(new AmqpValue("body"));
+    message.setAddress("address");
+    message.setSubject("test");
+    AmqpMessageCoder coder = AmqpMessageCoder.of();
+
+    Message clone = CoderUtils.clone(coder, message);
+
+    assertEquals("AmqpValue{body}", clone.getBody().toString());
+    assertEquals("address", clone.getAddress());
+    assertEquals("test", clone.getSubject());
+  }
+
+  @Test
+  public void encodeDecodeTooMuchLargerMessage() throws Exception {
+    thrown.expect(CoderException.class);
+    Message message = Message.Factory.create();
+    message.setAddress("address");
+    message.setSubject("subject");
+    String body = Joiner.on("").join(Collections.nCopies(64 * 1024 * 1024, " "));
+    message.setBody(new AmqpValue(body));
+
+    AmqpMessageCoder coder = AmqpMessageCoder.of();
+
+    byte[] encoded = CoderUtils.encodeToByteArray(coder, message);
+  }
+
+  @Test
+  public void encodeDecodeLargeMessage() throws Exception {
+    Message message = Message.Factory.create();
+    message.setAddress("address");
+    message.setSubject("subject");
+    String body = Joiner.on("").join(Collections.nCopies(32 * 1024 * 1024, " "));
+    message.setBody(new AmqpValue(body));
+
+    AmqpMessageCoder coder = AmqpMessageCoder.of();
+
+    Message clone = CoderUtils.clone(coder, message);
+
+    clone.getBody().toString().equals(message.getBody().toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cbfcad82/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 13cd418..e5db41b 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -64,6 +64,7 @@
   </dependencyManagement>
 
   <modules>
+    <module>amqp</module>
     <module>cassandra</module>
     <module>common</module>
     <module>elasticsearch</module>


Mime
View raw message