beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/4] beam git commit: [BEAM-1637] Create Elasticsearch IO compatible with ES 5.x
Date Fri, 15 Sep 2017 00:25:30 GMT
[BEAM-1637] Create Elasticsearch IO compatible with ES 5.x


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0d4fd190
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0d4fd190
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0d4fd190

Branch: refs/heads/master
Commit: 0d4fd19076722515f29c34144cc93aab3795801f
Parents: e7601aa
Author: Etienne Chauchot <echauchot@gmail.com>
Authored: Mon Jun 26 10:58:21 2017 +0200
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Thu Sep 14 17:01:27 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |  22 ++
 .../elasticsearch-tests-2/pom.xml               |  60 ++++
 .../src/test/contrib/create_elk_container.sh    |  24 ++
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 123 +++++++
 .../io/elasticsearch/ElasticsearchIOTest.java   | 185 ++++++++++
 .../elasticsearch-tests-5/pom.xml               | 126 +++++++
 .../src/test/contrib/create_elk_container.sh    |  24 ++
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 122 +++++++
 .../io/elasticsearch/ElasticsearchIOTest.java   | 184 ++++++++++
 .../org/elasticsearch/bootstrap/JarHell.java    |  39 ++
 .../elasticsearch-tests-common/pom.xml          |  81 +++++
 .../elasticsearch/ElasticSearchIOTestUtils.java | 142 ++++++++
 .../elasticsearch/ElasticsearchIOITCommon.java  |  92 +++++
 .../ElasticsearchIOTestCommon.java              | 306 ++++++++++++++++
 sdks/java/io/elasticsearch-tests/pom.xml        | 144 ++++++++
 sdks/java/io/elasticsearch/pom.xml              | 234 +++++-------
 .../sdk/io/elasticsearch/ElasticsearchIO.java   | 229 +++++++-----
 .../beam/sdk/io/elasticsearch/package-info.java |   1 -
 .../src/test/contrib/create_elk_container.sh    |  24 --
 .../elasticsearch/ElasticSearchIOTestUtils.java | 138 -------
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 155 --------
 .../io/elasticsearch/ElasticsearchIOTest.java   | 355 -------------------
 .../elasticsearch/ElasticsearchTestDataSet.java |  97 -----
 sdks/java/io/pom.xml                            |   1 +
 sdks/java/javadoc/pom.xml                       |  10 +
 25 files changed, 1925 insertions(+), 993 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f9644dd..a2d6aae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -462,6 +462,28 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-elasticsearch-tests-common</artifactId>
+        <version>${project.version}</version>
+        <scope>test</scope>
+        <classifier>tests</classifier>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-elasticsearch-tests-2</artifactId>
+        <version>${project.version}</version>
+        <scope>test</scope>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-elasticsearch-tests-5</artifactId>
+        <version>${project.version}</version>
+        <scope>test</scope>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/pom.xml b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/pom.xml
new file mode 100644
index 0000000..a56ffa4
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-elasticsearch-tests-parent</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-elasticsearch-tests-2</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: 2.x</name>
+  <description>Tests of ElasticsearchIO on Elasticsearch 2.x</description>
+
+  <properties>
+    <elasticsearch.version>2.4.1</elasticsearch.version>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-elasticsearch-tests-common</artifactId>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+
+    <dependency>
+      <groupId>org.elasticsearch</groupId>
+      <artifactId>elasticsearch</artifactId>
+      <version>${elasticsearch.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/contrib/create_elk_container.sh
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/contrib/create_elk_container.sh b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/contrib/create_elk_container.sh
new file mode 100755
index 0000000..48f6064
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/contrib/create_elk_container.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+################################################################################
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+################################################################################
+
+#Create an ELK (Elasticsearch Logstash Kibana) container for ES v2.4 and compatible Logstash and Kibana versions,
+#bind then on host ports, allow shell access to container and mount current directory on /home/$USER inside the container
+
+docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 -it -v $(pwd):/home/$USER/ --name elk-2.4  sebp/elk:es240_l240_k460

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
new file mode 100644
index 0000000..93fdd9b
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.elasticsearch.client.RestClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * A test of {@link ElasticsearchIO} on an independent Elasticsearch v2.x instance.
+ *
+ * <p>This test requires a running instance of Elasticsearch, and the test dataset must exist in the
+ * database.
+ *
+ * <p>You can run this test by doing the following from the beam parent module directory:
+ *
+ * <pre>
+ *  mvn -e -Pio-it verify -pl sdks/java/io/elasticsearch -DintegrationTestPipelineOptions='[
+ *  "--elasticsearchServer=1.2.3.4",
+ *  "--elasticsearchHttpPort=9200"]'
+ * </pre>
+ */
+public class ElasticsearchIOIT {
+  private static RestClient restClient;
+  private static IOTestPipelineOptions options;
+  private static ConnectionConfiguration readConnectionConfiguration;
+  private static ConnectionConfiguration writeConnectionConfiguration;
+  private static ElasticsearchIOTestCommon elasticsearchIOTestCommon;
+
+  @Rule
+  public TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+    options = TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
+    readConnectionConfiguration = ElasticsearchIOITCommon
+        .getConnectionConfiguration(options, ElasticsearchIOITCommon.ReadOrWrite.READ);
+    writeConnectionConfiguration = ElasticsearchIOITCommon
+        .getConnectionConfiguration(options, ElasticsearchIOITCommon.ReadOrWrite.WRITE);
+    restClient = readConnectionConfiguration.createClient();
+    elasticsearchIOTestCommon = new ElasticsearchIOTestCommon(readConnectionConfiguration,
+        restClient, true);
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    ElasticSearchIOTestUtils.deleteIndex(writeConnectionConfiguration, restClient);
+    restClient.close();
+  }
+
+  @Test
+  public void testSplitsVolume() throws Exception {
+    Read read = ElasticsearchIO.read().withConnectionConfiguration(readConnectionConfiguration);
+    BoundedElasticsearchSource initialSource = new BoundedElasticsearchSource(read, null, null,
+        null);
+    //desiredBundleSize is ignored because in ES 2.x there is no way to split shards. So we get
+    // as many bundles as ES shards and bundle size is shard size
+    long desiredBundleSizeBytes = 0;
+    List<? extends BoundedSource<String>> splits = initialSource
+        .split(desiredBundleSizeBytes, options);
+    SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
+    //this is the number of ES shards
+    // (By default, each index in Elasticsearch is allocated 5 primary shards)
+    long expectedNumSplits = 5;
+    assertEquals(expectedNumSplits, splits.size());
+    int nonEmptySplits = 0;
+    for (BoundedSource<String> subSource : splits) {
+      if (readFromSource(subSource, options).size() > 0) {
+        nonEmptySplits += 1;
+      }
+    }
+    assertEquals(expectedNumSplits, nonEmptySplits);
+  }
+
+  @Test
+  public void testReadVolume() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testRead();
+  }
+
+  @Test
+  public void testWriteVolume() throws Exception {
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite = new ElasticsearchIOTestCommon(
+        writeConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonWrite.setPipeline(pipeline);
+    elasticsearchIOTestCommonWrite.testWrite();
+  }
+
+  @Test
+  public void testSizesVolume() throws Exception {
+    elasticsearchIOTestCommon.testSizes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
new file mode 100644
index 0000000..28ffa02
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.ServerSocket;
+import java.util.List;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.Node;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Tests for {@link ElasticsearchIO} version 2.x. */
+
+@RunWith(JUnit4.class)
+public class ElasticsearchIOTest implements Serializable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOTest.class);
+
+  private static final String ES_IP = "127.0.0.1";
+
+  private static Node node;
+  private static RestClient restClient;
+  private static ConnectionConfiguration connectionConfiguration;
+  //cannot use inheritance because ES5 test already extends ESIntegTestCase.
+  private static ElasticsearchIOTestCommon elasticsearchIOTestCommon;
+
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    ServerSocket serverSocket = new ServerSocket(0);
+    int esHttpPort = serverSocket.getLocalPort();
+    serverSocket.close();
+    LOG.info("Starting embedded Elasticsearch instance ({})", esHttpPort);
+    Settings.Builder settingsBuilder =
+        Settings.settingsBuilder()
+            .put("cluster.name", "beam")
+            .put("http.enabled", "true")
+            .put("node.data", "true")
+            .put("path.data", TEMPORARY_FOLDER.getRoot().getPath())
+            .put("path.home", TEMPORARY_FOLDER.getRoot().getPath())
+            .put("node.name", "beam")
+            .put("network.host", ES_IP)
+            .put("http.port", esHttpPort)
+            .put("index.store.stats_refresh_interval", 0)
+            // had problems with some jdk, embedded ES was too slow for bulk insertion,
+            // and queue of 50 was full. No pb with real ES instance (cf testWrite integration test)
+            .put("threadpool.bulk.queue_size", 100);
+    node = new Node(settingsBuilder.build());
+    LOG.info("Elasticsearch node created");
+    node.start();
+    connectionConfiguration = ConnectionConfiguration
+        .create(new String[] { "http://" + ES_IP + ":" + esHttpPort }, ES_INDEX, ES_TYPE);
+    restClient = connectionConfiguration.createClient();
+    elasticsearchIOTestCommon = new ElasticsearchIOTestCommon(connectionConfiguration, restClient,
+        false);
+  }
+
+  @AfterClass
+  public static void afterClass() throws IOException{
+    restClient.close();
+    node.close();
+  }
+
+  @Before
+  public void before() throws Exception {
+    ElasticSearchIOTestUtils.deleteIndex(connectionConfiguration, restClient);
+  }
+
+  @Test
+  public void testSizes() throws Exception {
+    elasticsearchIOTestCommon.testSizes();
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testRead();
+  }
+
+  @Test
+  public void testReadWithQuery() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testReadWithQuery();
+  }
+
+  @Test
+  public void testWrite() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWrite();
+  }
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testWriteWithErrors() throws Exception {
+    elasticsearchIOTestCommon.setExpectedException(expectedException);
+    elasticsearchIOTestCommon.testWriteWithErrors();
+  }
+
+  @Test
+  public void testWriteWithMaxBatchSize() throws Exception {
+    elasticsearchIOTestCommon.testWriteWithMaxBatchSize();
+  }
+
+  @Test
+  public void testWriteWithMaxBatchSizeBytes() throws Exception {
+    elasticsearchIOTestCommon.testWriteWithMaxBatchSizeBytes();
+  }
+
+  @Test
+  public void testSplit() throws Exception {
+    ElasticSearchIOTestUtils
+        .insertTestDocuments(connectionConfiguration, NUM_DOCS_UTESTS, restClient);
+    PipelineOptions options = PipelineOptionsFactory.create();
+    Read read =
+        ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
+    BoundedElasticsearchSource initialSource = new BoundedElasticsearchSource(read, null, null,
+        null);
+    //desiredBundleSize is ignored because in ES 2.x there is no way to split shards. So we get
+    // as many bundles as ES shards and bundle size is shard size
+    int desiredBundleSizeBytes = 0;
+    List<? extends BoundedSource<String>> splits =
+        initialSource.split(desiredBundleSizeBytes, options);
+    SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
+    //this is the number of ES shards
+    // (By default, each index in Elasticsearch is allocated 5 primary shards)
+    int expectedNumSplits = 5;
+    assertEquals(expectedNumSplits, splits.size());
+    int nonEmptySplits = 0;
+    for (BoundedSource<String> subSource : splits) {
+      if (readFromSource(subSource, options).size() > 0) {
+        nonEmptySplits += 1;
+      }
+    }
+    assertEquals("Wrong number of empty splits", expectedNumSplits, nonEmptySplits);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/pom.xml b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/pom.xml
new file mode 100644
index 0000000..48d75ce
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/pom.xml
@@ -0,0 +1,126 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-elasticsearch-tests-parent</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>beam-sdks-java-io-elasticsearch-tests-5</artifactId>
+    <name>Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: 5.x</name>
+    <description>Tests of ElasticsearchIO on Elasticsearch 5.x</description>
+
+    <properties>
+        <elasticsearch.version>5.4.0</elasticsearch.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.elasticsearch.test</groupId>
+            <artifactId>framework</artifactId>
+            <version>${elasticsearch.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!--to enable rest connection on embeded elasticsearch-->
+        <dependency>
+            <groupId>org.elasticsearch.plugin</groupId>
+            <artifactId>transport-netty4-client</artifactId>
+            <version>${elasticsearch.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.carrotsearch.randomizedtesting</groupId>
+            <artifactId>randomizedtesting-runner</artifactId>
+            <version>2.5.0</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${elasticsearch.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-io-elasticsearch-tests-common</artifactId>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
+
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <!--needed for ESIntegTestCase-->
+                <configuration>
+                    <argLine>-Dtests.security.manager=false</argLine>
+                </configuration>
+            </plugin>
+            <!-- Overridden enforcer plugin for JDK1.8 for running tests -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <version>1.4.1</version>
+                <executions>
+                    <execution>
+                        <id>enforce</id>
+                        <goals>
+                            <goal>enforce</goal>
+                        </goals>
+                        <configuration>
+                            <rules>
+                                <enforceBytecodeVersion>
+                                    <maxJdkVersion>1.8</maxJdkVersion>
+                                    <excludes>
+                                        <!-- Supplied by the user JDK and compiled with matching
+                                          version. Is not shaded, so safe to ignore. -->
+                                        <exclude>jdk.tools:jdk.tools</exclude>
+                                    </excludes>
+                                </enforceBytecodeVersion>
+                                <requireJavaVersion>
+                                    <version>[1.8,)</version>
+                                </requireJavaVersion>
+                            </rules>
+                        </configuration>
+                    </execution>
+                </executions>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>extra-enforcer-rules</artifactId>
+                        <version>1.0-beta-6</version>
+                    </dependency>
+                </dependencies>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/contrib/create_elk_container.sh
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/contrib/create_elk_container.sh b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/contrib/create_elk_container.sh
new file mode 100755
index 0000000..48f6064
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/contrib/create_elk_container.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+################################################################################
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+################################################################################
+
+#Create an ELK (Elasticsearch Logstash Kibana) container for ES v2.4 and compatible Logstash and Kibana versions,
+#bind then on host ports, allow shell access to container and mount current directory on /home/$USER inside the container
+
+docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 -it -v $(pwd):/home/$USER/ --name elk-2.4  sebp/elk:es240_l240_k460

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
new file mode 100644
index 0000000..7c33740
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.elasticsearch.client.RestClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * A test of {@link ElasticsearchIO} on an independent Elasticsearch v5.x instance.
+ *
+ * <p>This test requires a running instance of Elasticsearch, and the test dataset must exist in the
+ * database.
+ *
+ * <p>You can run this test by doing the following from the beam parent module directory:
+ *
+ * <pre>
+ *  mvn -e -Pio-it verify -pl sdks/java/io/elasticsearch -DintegrationTestPipelineOptions='[
+ *  "--elasticsearchServer=1.2.3.4",
+ *  "--elasticsearchHttpPort=9200"]'
+ * </pre>
+ */
+public class ElasticsearchIOIT {
+  private static RestClient restClient;
+  private static IOTestPipelineOptions options;
+  private static ConnectionConfiguration readConnectionConfiguration;
+  private static ConnectionConfiguration writeConnectionConfiguration;
+  private static ElasticsearchIOTestCommon elasticsearchIOTestCommon;
+
+  @Rule
+  public TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+    options = TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
+    readConnectionConfiguration = ElasticsearchIOITCommon
+        .getConnectionConfiguration(options, ElasticsearchIOITCommon.ReadOrWrite.READ);
+    writeConnectionConfiguration = ElasticsearchIOITCommon
+        .getConnectionConfiguration(options, ElasticsearchIOITCommon.ReadOrWrite.WRITE);
+    restClient = readConnectionConfiguration.createClient();
+    elasticsearchIOTestCommon = new ElasticsearchIOTestCommon(readConnectionConfiguration,
+        restClient, true);
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    ElasticSearchIOTestUtils.deleteIndex(writeConnectionConfiguration, restClient);
+    restClient.close();
+  }
+
+  @Test
+  public void testSplitsVolume() throws Exception {
+    Read read = ElasticsearchIO.read().withConnectionConfiguration(readConnectionConfiguration);
+    BoundedElasticsearchSource initialSource = new BoundedElasticsearchSource(read, null, null,
+        null);
+    int desiredBundleSizeBytes = 10000;
+    List<? extends BoundedSource<String>> splits =
+        initialSource.split(desiredBundleSizeBytes, options);
+    SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
+    long indexSize = BoundedElasticsearchSource.estimateIndexSize(readConnectionConfiguration);
+    float expectedNumSourcesFloat = (float) indexSize / desiredBundleSizeBytes;
+    int expectedNumSources = (int) Math.ceil(expectedNumSourcesFloat);
+    assertEquals(expectedNumSources, splits.size());
+    int nonEmptySplits = 0;
+    for (BoundedSource<String> subSource : splits) {
+      if (readFromSource(subSource, options).size() > 0) {
+        nonEmptySplits += 1;
+      }
+    }
+    assertEquals("Wrong number of empty splits", expectedNumSources, nonEmptySplits);
+  }
+
+  @Test
+  public void testReadVolume() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testRead();
+  }
+
+  @Test
+  public void testWriteVolume() throws Exception {
+    //cannot share elasticsearchIOTestCommon because tests run in parallel.
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite = new ElasticsearchIOTestCommon(
+        writeConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonWrite.setPipeline(pipeline);
+    elasticsearchIOTestCommonWrite.testWrite();
+  }
+
+  @Test
+  public void testSizesVolume() throws Exception {
+    elasticsearchIOTestCommon.testSizes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
new file mode 100644
index 0000000..362f0a8
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.transport.Netty4Plugin;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/*
+Cannot use @RunWith(JUnit4.class) with ESIntegTestCase
+Cannot have @BeforeClass @AfterClass with ESIntegTestCase
+*/
+
+/** Tests for {@link ElasticsearchIO} version 5. */
+@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
+public class ElasticsearchIOTest extends ESIntegTestCase implements Serializable {
+
+  private ElasticsearchIOTestCommon elasticsearchIOTestCommon;
+  private ConnectionConfiguration connectionConfiguration;
+
+  private String[] fillAddresses(){
+    ArrayList<String> result = new ArrayList<>();
+    for (InetSocketAddress address : cluster().httpAddresses()){
+      result.add(String.format("http://%s:%d", address.getHostString(), address.getPort()));
+    }
+    return result.toArray(new String[result.size()]);
+  }
+
+
+  @Override
+  protected Settings nodeSettings(int nodeOrdinal) {
+    return Settings.builder().put(super.nodeSettings(nodeOrdinal))
+        .put("http.enabled", "true")
+        // had problems with some jdk, embedded ES was too slow for bulk insertion,
+        // and queue of 50 was full. No pb with real ES instance (cf testWrite integration test)
+        .put("thread_pool.bulk.queue_size", 100)
+        .build();
+  }
+
+  @Override
+  public Settings indexSettings() {
+    return Settings.builder().put(super.indexSettings())
+        //useful to have updated sizes for getEstimatedSize
+        .put("index.store.stats_refresh_interval", 0)
+        .build();
+  }
+
+  @Override
+  protected Collection<Class<? extends Plugin>> nodePlugins() {
+    ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>();
+    plugins.add(Netty4Plugin.class);
+    return plugins;
+  }
+
+  @Before
+  public void setup(){
+    if (connectionConfiguration == null){
+      connectionConfiguration = ConnectionConfiguration.create(fillAddresses(), ES_INDEX, ES_TYPE);
+      elasticsearchIOTestCommon = new ElasticsearchIOTestCommon(connectionConfiguration,
+          getRestClient(), false);
+    }
+  }
+  @Rule
+  public TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void testSizes() throws Exception {
+    // need to create the index using the helper method (not create it at first insertion)
+    // for the indexSettings() to be run
+    createIndex(ES_INDEX);
+    elasticsearchIOTestCommon.testSizes();
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    // need to create the index using the helper method (not create it at first insertion)
+   // for the indexSettings() to be run
+   createIndex(ES_INDEX);
+   elasticsearchIOTestCommon.setPipeline(pipeline);
+   elasticsearchIOTestCommon.testRead();
+ }
+
+  @Test
+  public void testReadWithQuery() throws Exception {
+   // need to create the index using the helper method (not create it at first insertion)
+   // for the indexSettings() to be run
+   createIndex(ES_INDEX);
+   elasticsearchIOTestCommon.setPipeline(pipeline);
+   elasticsearchIOTestCommon.testReadWithQuery();
+  }
+
+  @Test
+  public void testWrite() throws Exception {
+   elasticsearchIOTestCommon.setPipeline(pipeline);
+   elasticsearchIOTestCommon.testWrite();
+  }
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testWriteWithErrors() throws Exception {
+    elasticsearchIOTestCommon.setExpectedException(expectedException);
+    elasticsearchIOTestCommon.testWriteWithErrors();
+  }
+
+  @Test
+  public void testWriteWithMaxBatchSize() throws Exception {
+    elasticsearchIOTestCommon.testWriteWithMaxBatchSize();
+  }
+
+  @Test
+  public void testWriteWithMaxBatchSizeBytes() throws Exception {
+    elasticsearchIOTestCommon.testWriteWithMaxBatchSizeBytes();
+  }
+
+  @Test
+  public void testSplit() throws Exception {
+   //need to create the index using the helper method (not create it at first insertion)
+   // for the indexSettings() to be run
+   createIndex(ES_INDEX);
+    ElasticSearchIOTestUtils
+        .insertTestDocuments(connectionConfiguration, NUM_DOCS_UTESTS, getRestClient());
+    PipelineOptions options = PipelineOptionsFactory.create();
+    Read read =
+        ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
+   BoundedElasticsearchSource initialSource = new BoundedElasticsearchSource(read, null, null,
+       null);
+   int desiredBundleSizeBytes = 1000;
+    List<? extends BoundedSource<String>> splits =
+        initialSource.split(desiredBundleSizeBytes, options);
+    SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
+   long indexSize = BoundedElasticsearchSource.estimateIndexSize(connectionConfiguration);
+   float expectedNumSourcesFloat = (float) indexSize / desiredBundleSizeBytes;
+   int expectedNumSources = (int) Math.ceil(expectedNumSourcesFloat);
+   assertEquals(expectedNumSources, splits.size());
+    int nonEmptySplits = 0;
+    for (BoundedSource<String> subSource : splits) {
+      if (readFromSource(subSource, options).size() > 0) {
+        nonEmptySplits += 1;
+      }
+    }
+    assertEquals("Wrong number of empty splits", expectedNumSources, nonEmptySplits);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/elasticsearch/bootstrap/JarHell.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/elasticsearch/bootstrap/JarHell.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/elasticsearch/bootstrap/JarHell.java
new file mode 100644
index 0000000..c359d1d
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/elasticsearch/bootstrap/JarHell.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.elasticsearch.bootstrap;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+/**
+ * We need a real Elasticsearch instance to properly test the IO
+ * (split, slice API, scroll API, ...). Starting at ES 5, to have Elasticsearch embedded,
+ * we are forced to use Elasticsearch test framework. But this framework checks for class duplicates
+ * in classpath and it cannot be deactivated. When the class duplication come from a dependency,
+ * then it cannot be avoided. Elasticsearch community does not provide a way of deactivating
+ * the jar hell test, so skip it by making this hack. In this case duplicate class is
+ * class: org.apache.maven.surefire.report.SafeThrowable
+ * jar1: surefire-api-2.20.jar
+ * jar2: surefire-junit47-2.20.jar
+ */
+class JarHell {
+
+
+  public static void checkJarHell() throws IOException, URISyntaxException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/pom.xml b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/pom.xml
new file mode 100644
index 0000000..4381eeb
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-elasticsearch-tests-parent</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>beam-sdks-java-io-elasticsearch-tests-common</artifactId>
+    <name>Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: Common </name>
+    <description>Common test classes for ElasticsearchIO </description>
+
+    <properties>
+        <commons-io.version>1.3.2</commons-io.version>
+        <jna.version>4.1.0</jna.version>
+        <log4j.version>2.6.2</log4j.version>
+        <elasticsearch.client.rest.version>5.0.0</elasticsearch.client.rest.version>
+        <httpcomponents.core.version>4.4.5</httpcomponents.core.version>
+        <httpcomponents.httpasyncclient.version>4.1.2</httpcomponents.httpasyncclient.version>
+        <httpcomponents.httpclient.version>4.5.2</httpcomponents.httpclient.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpasyncclient</artifactId>
+            <version>${httpcomponents.httpasyncclient.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore-nio</artifactId>
+            <version>${httpcomponents.core.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>${httpcomponents.core.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpcomponents.httpclient.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
new file mode 100644
index 0000000..2f6ac3e
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+
+/** Test utilities to use with {@link ElasticsearchIO}. */
+class ElasticSearchIOTestUtils {
+
+  /** Enumeration that specifies whether to insert malformed documents. */
+  public enum InjectionMode {
+    INJECT_SOME_INVALID_DOCS,
+    DO_NOT_INJECT_INVALID_DOCS
+  }
+
+  /** Deletes the given index synchronously. */
+  static void deleteIndex(ConnectionConfiguration connectionConfiguration,
+      RestClient restClient) throws IOException {
+    try {
+      restClient.performRequest("DELETE", String.format("/%s", connectionConfiguration.getIndex()));
+    } catch (IOException e) {
+      // it is fine to ignore this expression as deleteIndex occurs in @before,
+      // so when the first tests is run, the index does not exist yet
+      if (!e.getMessage().contains("index_not_found_exception")){
+        throw e;
+      }
+    }
+  }
+
+  /** Inserts the given number of test documents into Elasticsearch. */
+  static void insertTestDocuments(ConnectionConfiguration connectionConfiguration,
+      long numDocs, RestClient restClient) throws IOException {
+    List<String> data =
+        ElasticSearchIOTestUtils.createDocuments(
+            numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+    StringBuilder bulkRequest = new StringBuilder();
+    for (String document : data) {
+      bulkRequest.append(String.format("{ \"index\" : {} }%n%s%n", document));
+    }
+    String endPoint = String.format("/%s/%s/_bulk", connectionConfiguration.getIndex(),
+        connectionConfiguration.getType());
+    HttpEntity requestBody =
+        new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
+    Response response = restClient.performRequest("POST", endPoint,
+        Collections.singletonMap("refresh", "true"), requestBody);
+    JsonNode searchResult = ElasticsearchIO.parseResponse(response);
+    boolean errors = searchResult.path("errors").asBoolean();
+    if (errors){
+      throw new IOException(String.format("Failed to insert test documents in index %s",
+          connectionConfiguration.getIndex()));
+    }
+  }
+
+  /**
+   * Forces a refresh of the given index to make recently inserted documents available for search.
+   *
+   * @return The number of docs in the index
+   */
+  static long refreshIndexAndGetCurrentNumDocs(
+      ConnectionConfiguration connectionConfiguration, RestClient restClient) throws IOException {
+    long result = 0;
+    try {
+      String endPoint = String.format("/%s/_refresh", connectionConfiguration.getIndex());
+      restClient.performRequest("POST", endPoint);
+
+      endPoint = String.format("/%s/%s/_search", connectionConfiguration.getIndex(),
+          connectionConfiguration.getType());
+      Response response = restClient.performRequest("GET", endPoint);
+      JsonNode searchResult = ElasticsearchIO.parseResponse(response);
+      result = searchResult.path("hits").path("total").asLong();
+    } catch (IOException e) {
+      // it is fine to ignore bellow exceptions because in testWriteWithBatchSize* sometimes,
+      // we call upgrade before any doc have been written
+      // (when there are fewer docs processed than batchSize).
+      // In that cases index/type has not been created (created upon first doc insertion)
+      if (!e.getMessage().contains("index_not_found_exception")){
+        throw e;
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Generates a list of test documents for insertion.
+   *
+   * @param numDocs Number of docs to generate
+   * @param injectionMode {@link InjectionMode} that specifies whether to insert malformed documents
+   * @return the list of json String representing the documents
+   */
+  static List<String> createDocuments(long numDocs, InjectionMode injectionMode) {
+    String[] scientists = {
+      "Einstein",
+      "Darwin",
+      "Copernicus",
+      "Pasteur",
+      "Curie",
+      "Faraday",
+      "Newton",
+      "Bohr",
+      "Galilei",
+      "Maxwell"
+    };
+    ArrayList<String> data = new ArrayList<>();
+    for (int i = 0; i < numDocs; i++) {
+      int index = i % scientists.length;
+      // insert 2 malformed documents
+      if (InjectionMode.INJECT_SOME_INVALID_DOCS.equals(injectionMode) && (i == 6 || i == 7)) {
+        data.add(String.format("{\"scientist\";\"%s\", \"id\":%d}", scientists[index], i));
+      } else {
+        data.add(String.format("{\"scientist\":\"%s\", \"id\":%d}", scientists[index], i));
+      }
+    }
+    return data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
new file mode 100644
index 0000000..391062d
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_INDEX;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_ITESTS;
+
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.elasticsearch.client.RestClient;
+
+/**
+ * Manipulates test data used by the {@link ElasticsearchIO}
+ * integration tests.
+ *
+ * <p>This is independent from the tests so that for read tests it can be run separately after data
+ * store creation rather than every time (which can be more fragile.)
+ */
+public class ElasticsearchIOITCommon {
+
+  private static final String writeIndex = ES_INDEX + System.currentTimeMillis();
+
+  /**
+   * Use this to create the index for reading before IT read tests.
+   *
+   * <p>To invoke this class, you can use this command line from elasticsearch io module directory:
+   *
+   * <pre>
+   * mvn test-compile exec:java \
+   * -Dexec.mainClass=ElasticsearchIOITCommon \
+   *   -Dexec.args="--elasticsearchServer=1.2.3.4 \
+   *  --elasticsearchHttpPort=9200 \
+   *   -Dexec.classpathScope=test
+   *   </pre>
+   *
+   * @param args Please pass options from ElasticsearchTestOptions used for connection to
+   *     Elasticsearch as shown above.
+   */
+  public static void main(String[] args) throws Exception {
+    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+    IOTestPipelineOptions options =
+        PipelineOptionsFactory.fromArgs(args).as(IOTestPipelineOptions.class);
+    createAndPopulateReadIndex(options);
+  }
+
+  private static void createAndPopulateReadIndex(IOTestPipelineOptions options) throws Exception {
+    // automatically creates the index and insert docs
+    ConnectionConfiguration connectionConfiguration =
+        getConnectionConfiguration(options, ReadOrWrite.READ);
+    try (RestClient restClient = connectionConfiguration.createClient()) {
+      ElasticSearchIOTestUtils
+          .insertTestDocuments(connectionConfiguration, NUM_DOCS_ITESTS, restClient);
+    }
+  }
+
+  static ConnectionConfiguration getConnectionConfiguration(IOTestPipelineOptions options,
+      ReadOrWrite rOw) {
+    ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.create(
+            new String[] {
+              "http://"
+                  + options.getElasticsearchServer()
+                  + ":"
+                  + options.getElasticsearchHttpPort()
+            },
+            (rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex,
+            ES_TYPE);
+    return connectionConfiguration;
+  }
+
+  /** Enum that tells whether we use the index for reading or for writing. */
+  enum ReadOrWrite {
+    READ,
+    WRITE
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
new file mode 100644
index 0000000..3fb08bb
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write;
+import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.core.Is.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.hamcrest.CustomMatcher;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Common test class for {@link ElasticsearchIO}. */
+class ElasticsearchIOTestCommon implements Serializable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOTestCommon.class);
+
+  static final String ES_INDEX = "beam";
+  static final String ES_TYPE = "test";
+  static final long NUM_DOCS_UTESTS = 400L;
+  static final long NUM_DOCS_ITESTS = 50000L;
+  private static final long AVERAGE_DOC_SIZE = 25L;
+
+
+  private static final int NUM_SCIENTISTS = 10;
+  private static final long BATCH_SIZE = 200L;
+  private static final long BATCH_SIZE_BYTES = 2048L;
+
+  private long numDocs;
+  private ConnectionConfiguration connectionConfiguration;
+  private RestClient restClient;
+  private boolean useAsITests;
+
+  private TestPipeline pipeline;
+  private ExpectedException expectedException;
+
+  ElasticsearchIOTestCommon(ConnectionConfiguration connectionConfiguration, RestClient restClient,
+      boolean useAsITests) {
+    this.connectionConfiguration = connectionConfiguration;
+    this.restClient = restClient;
+    this.numDocs = useAsITests ? NUM_DOCS_ITESTS : NUM_DOCS_UTESTS;
+    this.useAsITests = useAsITests;
+  }
+
+  // lazy init of the test rules (cannot be static)
+  void setPipeline(TestPipeline pipeline) {
+    this.pipeline = pipeline;
+  }
+
+  void setExpectedException(ExpectedException expectedException) {
+    this.expectedException = expectedException;
+  }
+
+  void testSizes() throws Exception {
+    if (!useAsITests) {
+      ElasticSearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient);
+    }
+    PipelineOptions options = PipelineOptionsFactory.create();
+    Read read =
+        ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
+    BoundedElasticsearchSource initialSource = new BoundedElasticsearchSource(read, null, null,
+        null);
+    // can't use equal assert as Elasticsearch indexes never have same size
+    // (due to internal Elasticsearch implementation)
+    long estimatedSize = initialSource.getEstimatedSizeBytes(options);
+    LOG.info("Estimated size: {}", estimatedSize);
+    assertThat("Wrong estimated size", estimatedSize, greaterThan(AVERAGE_DOC_SIZE * numDocs));
+  }
+
+
+  void testRead() throws Exception {
+    if (!useAsITests) {
+      ElasticSearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient);
+    }
+
+    PCollection<String> output =
+        pipeline.apply(
+            ElasticsearchIO.read()
+                .withConnectionConfiguration(connectionConfiguration)
+                //set to default value, useful just to test parameter passing.
+                .withScrollKeepalive("5m")
+                //set to default value, useful just to test parameter passing.
+                .withBatchSize(100L));
+    PAssert.thatSingleton(output.apply("Count", Count.<String>globally())).isEqualTo(numDocs);
+    pipeline.run();
+  }
+
+  void testReadWithQuery() throws Exception {
+    if (!useAsITests){
+      ElasticSearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient);
+    }
+
+    String query =
+        "{\n"
+            + "  \"query\": {\n"
+            + "  \"match\" : {\n"
+            + "    \"scientist\" : {\n"
+            + "      \"query\" : \"Einstein\",\n"
+            + "      \"type\" : \"boolean\"\n"
+            + "    }\n"
+            + "  }\n"
+            + "  }\n"
+            + "}";
+
+    PCollection<String> output =
+        pipeline.apply(
+            ElasticsearchIO.read()
+                .withConnectionConfiguration(connectionConfiguration)
+                .withQuery(query));
+    PAssert.thatSingleton(output.apply("Count", Count.<String>globally()))
+        .isEqualTo(numDocs / NUM_SCIENTISTS);
+    pipeline.run();
+  }
+
+  void testWrite() throws Exception {
+    List<String> data =
+        ElasticSearchIOTestUtils.createDocuments(
+            numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+      pipeline
+        .apply(Create.of(data))
+        .apply(ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration));
+    pipeline.run();
+
+    long currentNumDocs = ElasticSearchIOTestUtils
+        .refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    assertEquals(numDocs, currentNumDocs);
+
+    String requestBody =
+        "{\n"
+        + "  \"query\" : {\"match\": {\n"
+        + "    \"scientist\": \"Einstein\"\n"
+        + "  }}\n"
+        + "}\n";
+    String endPoint = String.format("/%s/%s/_search", connectionConfiguration.getIndex(),
+        connectionConfiguration.getType());
+    HttpEntity httpEntity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
+    Response response =
+        restClient.performRequest(
+            "GET",
+            endPoint,
+            Collections.<String, String>emptyMap(),
+            httpEntity);
+    JsonNode searchResult = parseResponse(response);
+    int count = searchResult.path("hits").path("total").asInt();
+    assertEquals(numDocs / NUM_SCIENTISTS, count);
+  }
+
+  void testWriteWithErrors() throws Exception {
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withMaxBatchSize(BATCH_SIZE);
+    // write bundles size is the runner decision, we cannot force a bundle size,
+    // so we test the Writer as a DoFn outside of a runner.
+    DoFnTester<String, Void> fnTester = DoFnTester.of(new Write.WriteFn(write));
+
+    List<String> input =
+        ElasticSearchIOTestUtils.createDocuments(
+            numDocs, ElasticSearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS);
+    expectedException.expect(isA(IOException.class));
+    expectedException.expectMessage(
+        new CustomMatcher<String>("RegExp matcher") {
+          @Override
+          public boolean matches(Object o) {
+            String message = (String) o;
+            // This regexp tests that 2 malformed documents are actually in error
+            // and that the message contains their IDs.
+            // It also ensures that root reason, root error type,
+            // caused by reason and caused by error type are present in message.
+            // To avoid flakiness of the test in case of Elasticsearch error message change,
+            // only "failed to parse" root reason is matched,
+            // the other messages are matched using .+
+            return message.matches(
+                "(?is).*Error writing to Elasticsearch, some elements could not be inserted"
+                    + ".*Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*"
+                    + "Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*");
+          }
+        });
+    // inserts into Elasticsearch
+    fnTester.processBundle(input);
+  }
+
+  void testWriteWithMaxBatchSize() throws Exception {
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withMaxBatchSize(BATCH_SIZE);
+    // write bundles size is the runner decision, we cannot force a bundle size,
+    // so we test the Writer as a DoFn outside of a runner.
+    DoFnTester<String, Void> fnTester = DoFnTester.of(new Write.WriteFn(write));
+    List<String> input =
+        ElasticSearchIOTestUtils.createDocuments(
+            numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+    long numDocsProcessed = 0;
+    long numDocsInserted = 0;
+    for (String document : input) {
+      fnTester.processElement(document);
+      numDocsProcessed++;
+      // test every 100 docs to avoid overloading ES
+      if ((numDocsProcessed % 100) == 0) {
+        // force the index to upgrade after inserting for the inserted docs
+        // to be searchable immediately
+        long currentNumDocs = ElasticSearchIOTestUtils
+            .refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+        if ((numDocsProcessed % BATCH_SIZE) == 0) {
+          /* bundle end */
+          assertEquals(
+              "we are at the end of a bundle, we should have inserted all processed documents",
+              numDocsProcessed,
+              currentNumDocs);
+          numDocsInserted = currentNumDocs;
+        } else {
+          /* not bundle end */
+          assertEquals(
+              "we are not at the end of a bundle, we should have inserted no more documents",
+              numDocsInserted,
+              currentNumDocs);
+        }
+      }
+    }
+  }
+
+  void testWriteWithMaxBatchSizeBytes() throws Exception {
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withMaxBatchSizeBytes(BATCH_SIZE_BYTES);
+    // write bundles size is the runner decision, we cannot force a bundle size,
+    // so we test the Writer as a DoFn outside of a runner.
+    DoFnTester<String, Void> fnTester = DoFnTester.of(new Write.WriteFn(write));
+    List<String> input =
+        ElasticSearchIOTestUtils.createDocuments(
+            numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+    long numDocsProcessed = 0;
+    long sizeProcessed = 0;
+    long numDocsInserted = 0;
+    long batchInserted = 0;
+    for (String document : input) {
+      fnTester.processElement(document);
+      numDocsProcessed++;
+      sizeProcessed += document.getBytes().length;
+      // test every 40 docs to avoid overloading ES
+      if ((numDocsProcessed % 40) == 0) {
+        // force the index to upgrade after inserting for the inserted docs
+        // to be searchable immediately
+        long currentNumDocs = ElasticSearchIOTestUtils
+            .refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+        if (sizeProcessed / BATCH_SIZE_BYTES > batchInserted) {
+          /* bundle end */
+          assertThat(
+              "we have passed a bundle size, we should have inserted some documents",
+              currentNumDocs,
+              greaterThan(numDocsInserted));
+          numDocsInserted = currentNumDocs;
+          batchInserted = (sizeProcessed / BATCH_SIZE_BYTES);
+        } else {
+          /* not bundle end */
+          assertEquals(
+              "we are not at the end of a bundle, we should have inserted no more documents",
+              numDocsInserted,
+              currentNumDocs);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch-tests/pom.xml b/sdks/java/io/elasticsearch-tests/pom.xml
new file mode 100644
index 0000000..4d947be
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/pom.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-parent</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>beam-sdks-java-io-elasticsearch-tests-parent</artifactId>
+    <name>Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests </name>
+    <description>Tests for ElasticsearchIO.</description>
+    <packaging>pom</packaging>
+
+    <properties>
+        <commons-io.version>1.3.2</commons-io.version>
+        <jna.version>4.1.0</jna.version>
+        <log4j.version>2.6.2</log4j.version>
+        <elasticsearch.client.rest.version>5.0.0</elasticsearch.client.rest.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-runners-direct-java</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- This optional dependency is used by the test framework -->
+        <dependency>
+            <groupId>net.java.dev.jna</groupId>
+            <artifactId>jna</artifactId>
+            <version>${jna.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <!--do not use 2.7 for ES 5.0-->
+            <version>${log4j.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <!--do not use 2.7 for ES 5.0-->
+            <version>${log4j.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-jdk14</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>${commons-io.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.hamcrest</groupId>
+                    <artifactId>hamcrest-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-io-common</artifactId>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
+
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>rest</artifactId>
+            <version>${elasticsearch.client.rest.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <modules>
+        <module>elasticsearch-tests-common</module>
+        <module>elasticsearch-tests-2</module>
+        <module>elasticsearch-tests-5</module>
+    </modules>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml
index a021420..6624c46 100644
--- a/sdks/java/io/elasticsearch/pom.xml
+++ b/sdks/java/io/elasticsearch/pom.xml
@@ -15,145 +15,95 @@
     See the License for the specific language governing permissions and
     limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.beam</groupId>
-    <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
-  <name>Apache Beam :: SDKs :: Java :: IO :: Elasticsearch</name>
-  <description>IO to read and write on Elasticsearch.</description>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.code.findbugs</groupId>
-      <artifactId>jsr305</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.elasticsearch.client</groupId>
-      <artifactId>rest</artifactId>
-      <version>5.0.0</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.httpcomponents</groupId>
-      <artifactId>httpcore-nio</artifactId>
-      <version>4.4.5</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.httpcomponents</groupId>
-      <artifactId>httpcore</artifactId>
-      <version>4.4.5</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.httpcomponents</groupId>
-      <artifactId>httpasyncclient</artifactId>
-      <version>4.1.2</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.httpcomponents</groupId>
-      <artifactId>httpclient</artifactId>
-      <version>4.5.2</version>
-    </dependency>
-
-    <!-- compile dependencies -->
-    <dependency>
-      <groupId>com.google.auto.value</groupId>
-      <artifactId>auto-value</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <!-- test -->
-    <dependency>
-      <groupId>org.elasticsearch</groupId>
-      <artifactId>elasticsearch</artifactId>
-      <version>2.4.1</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.hamcrest</groupId>
-      <artifactId>hamcrest-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.hamcrest</groupId>
-      <artifactId>hamcrest-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>commons-io</groupId>
-      <artifactId>commons-io</artifactId>
-      <version>1.3.2</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-jdk14</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <!-- This optional dependency is used by the test framework. Avoids a warning -->
-    <dependency>
-      <groupId>net.java.dev.jna</groupId>
-      <artifactId>jna</artifactId>
-      <version>4.1.0</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-direct-java</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-io-common</artifactId>
-      <scope>test</scope>
-      <classifier>tests</classifier>
-    </dependency>
-
-  </dependencies>
-
-</project>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-parent</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
+    <name>Apache Beam :: SDKs :: Java :: IO :: Elasticsearch</name>
+    <description>IO to read and write on Elasticsearch</description>
+
+    <properties>
+        <elasticsearch.client.rest.version>5.0.0</elasticsearch.client.rest.version>
+        <httpcomponents.core.version>4.4.5</httpcomponents.core.version>
+        <httpcomponents.httpasyncclient.version>4.1.2</httpcomponents.httpasyncclient.version>
+        <httpcomponents.httpclient.version>4.5.2</httpcomponents.httpclient.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>rest</artifactId>
+            <version>${elasticsearch.client.rest.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.auto.value</groupId>
+            <artifactId>auto-value</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpasyncclient</artifactId>
+            <version>${httpcomponents.httpasyncclient.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore-nio</artifactId>
+            <version>${httpcomponents.core.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>${httpcomponents.core.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpcomponents.httpclient.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-io-common</artifactId>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file


Mime
View raw message