beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aromane...@apache.org
Subject [beam] 01/01: Revert "[BEAM-7916] Add ElasticsearchIO query parameter to take a ValueProvider (#9285)"
Date Fri, 09 Aug 2019 09:29:26 GMT
This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch revert-9285-BEAM-7916_Query_ValueProvider
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 9701f8e9006a8ce67c92c77377cb63556a97e197
Author: Alexey Romanenko <33895511+aromanenko-dev@users.noreply.github.com>
AuthorDate: Fri Aug 9 11:28:38 2019 +0200

    Revert "[BEAM-7916] Add ElasticsearchIO query parameter to take a ValueProvider (#9285)"
    
    This reverts commit c545b8aae7cf2519c7776279132650a68445510f.
---
 .../sdk/io/elasticsearch/ElasticsearchIOTest.java  | 10 ++-------
 .../sdk/io/elasticsearch/ElasticsearchIOTest.java  | 13 ++---------
 .../sdk/io/elasticsearch/ElasticsearchIOTest.java  | 13 ++---------
 .../elasticsearch/ElasticsearchIOTestCommon.java   | 25 ++++++----------------
 .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 21 +++---------------
 5 files changed, 15 insertions(+), 67 deletions(-)

diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index f234716..19bda80 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -131,15 +131,9 @@ public class ElasticsearchIOTest implements Serializable {
   }
 
   @Test
-  public void testReadWithQueryString() throws Exception {
+  public void testReadWithQuery() throws Exception {
     elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testReadWithQueryString();
-  }
-
-  @Test
-  public void testReadWithQueryValueProvider() throws Exception {
-    elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testReadWithQueryValueProvider();
+    elasticsearchIOTestCommon.testReadWithQuery();
   }
 
   @Test
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index d809cfd..05686cd 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -122,21 +122,12 @@ public class ElasticsearchIOTest extends ESIntegTestCase implements
Serializable
   }
 
   @Test
-  public void testReadWithQueryString() throws Exception {
+  public void testReadWithQuery() throws Exception {
     // need to create the index using the helper method (not create it at first insertion)
     // for the indexSettings() to be run
     createIndex(getEsIndex());
     elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testReadWithQueryString();
-  }
-
-  @Test
-  public void testReadWithQueryValueProvider() throws Exception {
-    // need to create the index using the helper method (not create it at first insertion)
-    // for the indexSettings() to be run
-    createIndex(getEsIndex());
-    elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testReadWithQueryValueProvider();
+    elasticsearchIOTestCommon.testReadWithQuery();
   }
 
   @Test
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 84696e5..6638b7d 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -122,21 +122,12 @@ public class ElasticsearchIOTest extends ESIntegTestCase implements
Serializable
   }
 
   @Test
-  public void testReadWithQueryString() throws Exception {
+  public void testReadWithQuery() throws Exception {
     // need to create the index using the helper method (not create it at first insertion)
     // for the indexSettings() to be run
     createIndex(getEsIndex());
     elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testReadWithQueryString();
-  }
-
-  @Test
-  public void testReadWithQueryValueProvider() throws Exception {
-    // need to create the index using the helper method (not create it at first insertion)
-    // for the indexSettings() to be run
-    createIndex(getEsIndex());
-    elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testReadWithQueryValueProvider();
+    elasticsearchIOTestCommon.testReadWithQuery();
   }
 
   @Test
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index a39f2c7..112e0e2 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -45,13 +45,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.function.BiFunction;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DefaultRetryPredicate;
 import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.RetryPredicate;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -198,17 +196,7 @@ class ElasticsearchIOTestCommon implements Serializable {
     pipeline.run();
   }
 
-  void testReadWithQueryString() throws Exception {
-    testReadWithQueryInternal(Read::withQuery);
-  }
-
-  void testReadWithQueryValueProvider() throws Exception {
-    testReadWithQueryInternal(
-        (read, query) -> read.withQuery(ValueProvider.StaticValueProvider.of(query)));
-  }
-
-  private void testReadWithQueryInternal(BiFunction<Read, String, Read> queryConfigurer)
-      throws IOException {
+  void testReadWithQuery() throws Exception {
     if (!useAsITests) {
       ElasticSearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient);
     }
@@ -224,12 +212,11 @@ class ElasticsearchIOTestCommon implements Serializable {
             + "  }\n"
             + "}";
 
-    Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
-
-    read = queryConfigurer.apply(read, query);
-
-    PCollection<String> output = pipeline.apply(read);
-
+    PCollection<String> output =
+        pipeline.apply(
+            ElasticsearchIO.read()
+                .withConnectionConfiguration(connectionConfiguration)
+                .withQuery(query));
     PAssert.thatSingleton(output.apply("Count", Count.globally()))
         .isEqualTo(numDocs / NUM_SCIENTISTS);
     pipeline.run();
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 41b2eb9..ec688fb 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -51,7 +51,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -455,7 +454,7 @@ public class ElasticsearchIO {
     abstract ConnectionConfiguration getConnectionConfiguration();
 
     @Nullable
-    abstract ValueProvider<String> getQuery();
+    abstract String getQuery();
 
     abstract boolean isWithMetadata();
 
@@ -469,7 +468,7 @@ public class ElasticsearchIO {
     abstract static class Builder {
       abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);
 
-      abstract Builder setQuery(ValueProvider<String> query);
+      abstract Builder setQuery(String query);
 
       abstract Builder setWithMetadata(boolean withMetadata);
 
@@ -503,20 +502,6 @@ public class ElasticsearchIO {
     public Read withQuery(String query) {
       checkArgument(query != null, "query can not be null");
       checkArgument(!query.isEmpty(), "query can not be empty");
-      return withQuery(ValueProvider.StaticValueProvider.of(query));
-    }
-
-    /**
-     * Provide a {@link ValueProvider} that provides the query used while reading from
-     * Elasticsearch. This is useful for cases when the query must be dynamic.
-     *
-     * @param query the query. See <a
-     *     href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/query-dsl.html">Query
-     *     DSL</a>
-     * @return a {@link PTransform} reading data from Elasticsearch.
-     */
-    public Read withQuery(ValueProvider<String> query) {
-      checkArgument(query != null, "query can not be null");
       return builder().setQuery(query).build();
     }
 
@@ -741,7 +726,7 @@ public class ElasticsearchIO {
     public boolean start() throws IOException {
       restClient = source.spec.getConnectionConfiguration().createClient();
 
-      String query = source.spec.getQuery() != null ? source.spec.getQuery().get() : null;
+      String query = source.spec.getQuery();
       if (query == null) {
         query = "{\"query\": { \"match_all\": {} }}";
       }


Mime
View raw message