pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: Add a Pulsar IO MongoDB (#3561)
Date Wed, 13 Feb 2019 01:56:25 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 06e911d  Add a Pulsar IO MongoDB (#3561)
06e911d is described below

commit 06e911d4054daabca84556b80976b7e6ffa4c815
Author: Bruno Bonnin <bbonnin@gmail.com>
AuthorDate: Wed Feb 13 02:56:20 2019 +0100

    Add a Pulsar IO MongoDB (#3561)
    
    ### Motivation
    
    Provides a builtin MongoDB Connector, in order to ease the storage of JSON formated message
in MongoDB. It's a sink connector.
    
    
    ### Modifications
    
    Add a new sub-module in the `pulsar-io`module.
    
    ### Verifying this change
    
    This change added tests and can be verified as follows:
    * deploy the connector with configuration file containing the following fields:
    ```
    configs:
      mongoUri: mongodb://hostname:port
      database: pulsar
      collection: messages
    ```
    * start a mongodb instance
    * send messages in the topic declared when deploying the connector
    * check in MongoDB if the messages have been stored in the collection `messages`
---
 pulsar-io/mongo/pom.xml                            |  82 +++++++++
 .../org/apache/pulsar/io/mongodb/MongoConfig.java  | 109 ++++++++++++
 .../org/apache/pulsar/io/mongodb/MongoSink.java    | 182 +++++++++++++++++++
 .../org/apache/pulsar/io/mongodb/package-info.java |  19 ++
 .../resources/META-INF/services/pulsar-io.yaml     |   3 +
 .../apache/pulsar/io/mongodb/MongoConfigTest.java  |  87 +++++++++
 .../apache/pulsar/io/mongodb/MongoSinkTest.java    | 198 +++++++++++++++++++++
 .../org/apache/pulsar/io/mongodb/TestHelper.java   |  55 ++++++
 .../mongo/src/test/resources/mongoSinkConfig.yaml  |  26 +++
 pulsar-io/pom.xml                                  |   1 +
 10 files changed, 762 insertions(+)

diff --git a/pulsar-io/mongo/pom.xml b/pulsar-io/mongo/pom.xml
new file mode 100644
index 0000000..8968c6a
--- /dev/null
+++ b/pulsar-io/mongo/pom.xml
@@ -0,0 +1,82 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-io</artifactId>
+      <version>2.3.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-io-mongo</artifactId>
+  <name>Pulsar IO :: MongoDB</name>
+
+  <properties>
+    <mongo-driver.version>3.8.2</mongo-driver.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.parent.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mongodb</groupId>
+      <artifactId>mongodb-driver-async</artifactId>
+      <version>${mongo-driver.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>${project.parent.groupId}</groupId>
+      <artifactId>buildtools</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java
new file mode 100644
index 0000000..602c105
--- /dev/null
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mongodb;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.*;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Configuration class for the MongoDB Sink Connector.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class MongoConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final int DEFAULT_BATCH_SIZE = 100;
+
+    public static final long DEFAULT_BATCH_TIME_MS = 1000;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The uri of mongodb that the connector connects to" +
+                " (see: https://docs.mongodb.com/manual/reference/connection-string/)"
+    )
+    private String mongoUri;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The name of the database to which the collection belongs to"
+    )
+    private String database;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The collection name that the connector writes messages to"
+    )
+    private String collection;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "" + DEFAULT_BATCH_SIZE,
+        help = "The batch size of write to the collection"
+    )
+    private int batchSize = DEFAULT_BATCH_SIZE;
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "" + DEFAULT_BATCH_TIME_MS,
+            help = "The batch operation interval in milliseconds")
+    private long batchTimeMs = DEFAULT_BATCH_TIME_MS;
+
+
+    public static MongoConfig load(String yamlFile) throws IOException {
+        final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        final MongoConfig cfg = mapper.readValue(new File(yamlFile), MongoConfig.class);
+
+        return cfg;
+    }
+
+    public static MongoConfig load(Map<String, Object> map) throws IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        final MongoConfig cfg = mapper.readValue(new ObjectMapper().writeValueAsString(map),
MongoConfig.class);
+
+        return cfg;
+    }
+
+    public void validate() {
+        if (StringUtils.isEmpty(mongoUri) || StringUtils.isEmpty(database) || StringUtils.isEmpty(collection))
{
+            throw new IllegalArgumentException("Required property not set.");
+        }
+
+        Preconditions.checkArgument(batchSize > 0, "batchSize must be a positive integer.");
+        Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a positive long.");
+    }
+}
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
new file mode 100644
index 0000000..3eb6f6e
--- /dev/null
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mongodb;
+
+import com.google.common.collect.Lists;
+import com.mongodb.MongoBulkWriteException;
+import com.mongodb.async.client.MongoClient;
+import com.mongodb.async.client.MongoClients;
+import com.mongodb.async.client.MongoCollection;
+import com.mongodb.async.client.MongoDatabase;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.bson.BSONException;
+import org.bson.Document;
+import org.bson.json.JsonParseException;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * The base class for MongoDB sinks.
+ * Users need to implement extractKeyValue function to use this sink.
+ * This class assumes that the input will be JSON documents.
+ */
+@Connector(
+    name = "mongo",
+    type = IOType.SINK,
+    help = "A sink connector that sends pulsar messages to mongodb",
+    configClass = MongoConfig.class
+)
+@Slf4j
+public class MongoSink implements Sink<byte[]> {
+
+    private MongoConfig mongoConfig;
+
+    private MongoClient mongoClient;
+
+    private MongoCollection<Document> collection;
+
+    private List<Record<byte[]>> incomingList;
+
+    private ScheduledExecutorService flushExecutor;
+
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception
{
+        log.info("Open MongoDB Sink");
+
+        mongoConfig = MongoConfig.load(config);
+        mongoConfig.validate();
+
+        mongoClient = MongoClients.create(mongoConfig.getMongoUri());
+        final MongoDatabase db = mongoClient.getDatabase(mongoConfig.getDatabase());
+        collection = db.getCollection(mongoConfig.getCollection());
+
+        incomingList = Lists.newArrayList();
+        flushExecutor = Executors.newScheduledThreadPool(1);
+        flushExecutor.scheduleAtFixedRate(() -> flush(),
+                mongoConfig.getBatchTimeMs(), mongoConfig.getBatchTimeMs(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void write(Record<byte[]> record) {
+        final String recordValue = new String(record.getValue(), Charset.forName("UTF-8"));
+
+        if (log.isDebugEnabled()) {
+            log.debug("Received record: " + recordValue);
+        }
+
+        int currentSize;
+
+        synchronized (this) {
+            incomingList.add(record);
+            currentSize = incomingList.size();
+        }
+
+        if (currentSize == mongoConfig.getBatchSize()) {
+            flushExecutor.submit(() -> flush());
+        }
+    }
+
+    private void flush() {
+        final List<Document> docsToInsert = new ArrayList<>();
+        final List<Record<byte[]>> recordsToInsert;
+
+        synchronized (this) {
+            if (incomingList.isEmpty()) {
+                return;
+            }
+
+            recordsToInsert = incomingList;
+            incomingList = Lists.newArrayList();
+        }
+
+        final Iterator<Record<byte[]>> iter = recordsToInsert.iterator();
+
+        while (iter.hasNext()) {
+            final Record<byte[]> record = iter.next();
+
+            try {
+                final byte[] docAsBytes = record.getValue();
+                final Document doc = Document.parse(new String(docAsBytes, Charset.forName("UTF-8")));
+                docsToInsert.add(doc);
+            }
+            catch (JsonParseException | BSONException e) {
+                log.error("Bad message", e);
+                record.fail();
+                iter.remove();
+            }
+        }
+
+        if (docsToInsert.size() > 0) {
+
+            collection.insertMany(docsToInsert, (result, t) -> {
+                final List<Integer> idxToAck = IntStream.range(0, docsToInsert.size()).boxed().collect(toList());
+                final List<Integer> idxToFail = Lists.newArrayList();
+
+                if (t != null) {
+                    log.error("MongoDB insertion error", t);
+
+                    if (t instanceof MongoBulkWriteException) {
+                        // With this exception, we are aware of the items that have not been
inserted.
+                        ((MongoBulkWriteException) t).getWriteErrors().forEach(err ->
{
+                            idxToFail.add(err.getIndex());
+                        });
+                        idxToAck.removeAll(idxToFail);
+                    } else {
+                        idxToFail.addAll(idxToAck);
+                        idxToAck.clear();
+                    }
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Nb ack={}, nb fail={}", idxToAck.size(), idxToFail.size());
+                }
+
+                idxToAck.forEach(idx -> recordsToInsert.get(idx).ack());
+                idxToFail.forEach(idx -> recordsToInsert.get(idx).fail());
+            });
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (flushExecutor != null) {
+            flushExecutor.shutdown();
+        }
+
+        if (mongoClient != null) {
+            mongoClient.close();
+        }
+    }
+}
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/package-info.java
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/package-info.java
new file mode 100644
index 0000000..df14171
--- /dev/null
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mongodb;
\ No newline at end of file
diff --git a/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..476baed
--- /dev/null
+++ b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,3 @@
+name: mongo
+description: Writes data into MongoDB
+sinkClass: org.apache.pulsar.io.mongodb.MongoSink
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java
new file mode 100644
index 0000000..9f98ee4
--- /dev/null
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.mongodb;
+
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+public class MongoConfigTest {
+
+    private static File getFile(String fileName) {
+        return new File(MongoConfigTest.class.getClassLoader().getResource(fileName).getFile());
+    }
+
+    @Test
+    public void testMap() throws IOException {
+        final Map<String, Object> map = TestHelper.createMap(true);
+        final MongoConfig cfg = MongoConfig.load(map);
+
+        assertEquals(cfg.getMongoUri(), TestHelper.URI);
+        assertEquals(cfg.getDatabase(), TestHelper.DB);
+        assertEquals(cfg.getCollection(), TestHelper.COLL);
+        assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "Required property not set.")
+    public void testBadMap() throws IOException {
+        final Map<String, Object> map = TestHelper.createMap(false);
+        final MongoConfig cfg = MongoConfig.load(map);
+
+        cfg.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "batchSize must be a positive integer.")
+    public void testBadBatchSize() throws IOException {
+        final Map<String, Object> map = TestHelper.createMap(true);
+        map.put("batchSize", 0);
+        final MongoConfig cfg = MongoConfig.load(map);
+
+        cfg.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "batchTimeMs must be a positive long.")
+    public void testBadBatchTime() throws IOException {
+        final Map<String, Object> map = TestHelper.createMap(true);
+        map.put("batchTimeMs", 0);
+        final MongoConfig cfg = MongoConfig.load(map);
+
+        cfg.validate();
+    }
+
+    @Test
+    public void testYaml() throws IOException {
+        final File yaml = getFile("mongoSinkConfig.yaml");
+        final MongoConfig cfg = MongoConfig.load(yaml.getAbsolutePath());
+
+        assertEquals(cfg.getMongoUri(), TestHelper.URI);
+        assertEquals(cfg.getDatabase(), TestHelper.DB);
+        assertEquals(cfg.getCollection(), TestHelper.COLL);
+        assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE);
+        assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME);
+    }
+}
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
new file mode 100644
index 0000000..c941708
--- /dev/null
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.mongodb;
+
+import com.mongodb.MongoBulkWriteException;
+import com.mongodb.async.SingleResultCallback;
+import com.mongodb.async.client.MongoClient;
+import com.mongodb.async.client.MongoClients;
+import com.mongodb.async.client.MongoCollection;
+import com.mongodb.async.client.MongoDatabase;
+import com.mongodb.bulk.BulkWriteError;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.bson.BsonDocument;
+import org.mockito.Mock;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+
+@PrepareForTest(MongoClients.class)
+@PowerMockIgnore({"org.apache.logging.log4j.*"})
+public class MongoSinkTest {
+
+    @Mock
+    private Record<byte[]> mockRecord;
+
+    @Mock
+    private SinkContext mockSinkContext;
+
+    @Mock
+    private MongoClient mockMongoClient;
+
+    @Mock
+    private MongoDatabase mockMongoDb;
+
+    @Mock
+    private MongoCollection mockMongoColl;
+
+    private MongoSink sink;
+
+    private Map<String, Object> map;
+
+
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @BeforeMethod
+    public void setUp() {
+        sink = new MongoSink();
+        map = TestHelper.createMap(true);
+
+        mockRecord = mock(Record.class);
+        mockSinkContext = mock(SinkContext.class);
+        mockMongoClient = mock(MongoClient.class);
+        mockMongoDb = mock(MongoDatabase.class);
+        mockMongoColl = mock(MongoCollection.class);
+
+        PowerMockito.mockStatic(MongoClients.class);
+
+        when(MongoClients.create(anyString())).thenReturn(mockMongoClient);
+        when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
+        when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
+    }
+
+    private void initContext(boolean throwBulkError) {
+        when(mockRecord.getValue()).thenReturn("{\"hello\":\"pulsar\"}".getBytes());
+
+        doAnswer((invocation) -> {
+            SingleResultCallback cb = invocation.getArgumentAt(1, SingleResultCallback.class);
+            MongoBulkWriteException exc = null;
+
+            if (throwBulkError) {
+                List<BulkWriteError > writeErrors = Arrays.asList(
+                        new BulkWriteError(0, "error", new BsonDocument(), 1));
+                exc = new MongoBulkWriteException(null, writeErrors, null, null);
+            }
+
+            cb.onResult(null, exc);
+            return null;
+        }).when(mockMongoColl).insertMany(anyObject(), anyObject());
+    }
+
+    private void initFailContext(String msg) {
+        when(mockRecord.getValue()).thenReturn(msg.getBytes());
+
+        doAnswer((invocation) -> {
+            SingleResultCallback cb = invocation.getArgumentAt(1, SingleResultCallback.class);
+            cb.onResult(null, new Exception("Oops"));
+            return null;
+        }).when(mockMongoColl).insertMany(anyObject(), anyObject());
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        sink.close();
+        verify(mockMongoClient, times(1)).close();
+    }
+
+    @Test
+    public void testOpen() throws Exception {
+        sink.open(map, mockSinkContext);
+    }
+
+    @Test
+    public void testWriteNullMessage() throws Exception {
+        when(mockRecord.getValue()).thenReturn("".getBytes());
+
+        sink.open(map, mockSinkContext);
+        sink.write(mockRecord);
+
+        Thread.sleep(1000);
+
+        verify(mockRecord, times(1)).fail();
+    }
+
+    @Test
+    public void testWriteGoodMessage() throws Exception {
+        initContext(false);
+
+        sink.open(map, mockSinkContext);
+        sink.write(mockRecord);
+
+        Thread.sleep(1000);
+
+        verify(mockRecord, times(1)).ack();
+    }
+
+    @Test
+    public void testWriteMultipleMessages() throws Exception {
+        initContext(true);
+
+        sink.open(map, mockSinkContext);
+        sink.write(mockRecord);
+        sink.write(mockRecord);
+        sink.write(mockRecord);
+
+        Thread.sleep(1000);
+
+        verify(mockRecord, times(2)).ack();
+        verify(mockRecord, times(1)).fail();
+    }
+
+    @Test
+    public void testWriteWithError() throws Exception {
+        initFailContext("{\"hello\":\"pulsar\"}");
+
+        sink.open(map, mockSinkContext);
+        sink.write(mockRecord);
+
+        Thread.sleep(1000);
+
+        verify(mockRecord, times(1)).fail();
+    }
+
+    @Test
+    public void testWriteBadMessage() throws Exception {
+        initFailContext("Oops");
+
+        sink.open(map, mockSinkContext);
+        sink.write(mockRecord);
+
+        Thread.sleep(1000);
+
+        verify(mockRecord, times(1)).fail();
+    }
+}
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java
new file mode 100644
index 0000000..82a0744
--- /dev/null
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.mongodb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public final class TestHelper {
+
+    public static final String URI = "mongodb://localhost";
+
+    public static final String DB = "pulsar";
+
+    public static final String COLL = "messages";
+
+    public static final int BATCH_SIZE = 2;
+
+    public static final int BATCH_TIME = 500;
+
+
+    public static Map<String, Object> createMap(boolean full) {
+        final Map<String, Object> map = new HashMap<>();
+        map.put("mongoUri", URI);
+        map.put("database", DB);
+
+        if (full) {
+            map.put("collection", COLL);
+            map.put("batchSize", BATCH_SIZE);
+            map.put("batchTimeMs", BATCH_TIME);
+        }
+
+        return map;
+    }
+
+    private TestHelper() {
+
+    }
+}
diff --git a/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml b/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml
new file mode 100644
index 0000000..f7a9ea2
--- /dev/null
+++ b/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+   "mongoUri": "mongodb://localhost",
+   "database": "pulsar",
+   "collection": "messages",
+   "batchSize": 2,
+   "batchTimeMs": 500
+}
\ No newline at end of file
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index fb88341..4b69d59 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -51,6 +51,7 @@
     <module>file</module>
     <module>netty</module>
     <module>hbase</module>
+    <module>mongo</module>
   </modules>
 
 </project>


Mime
View raw message