streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [1/7] incubator-streams-examples git commit: upgrade to es 2.0 + docker
Date Tue, 11 Oct 2016 21:17:06 GMT
Repository: incubator-streams-examples
Updated Branches:
  refs/heads/master 8fe6860f7 -> 97fca1ac5


upgrade to es 2.0 + docker


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/f76a7e03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/f76a7e03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/f76a7e03

Branch: refs/heads/master
Commit: f76a7e035b76d3180234f41dd3eb7bcef830b5dc
Parents: 8fe6860
Author: Steve Blackmon @steveblackmon <sblackmon@apache.org>
Authored: Thu Oct 6 20:28:34 2016 -0500
Committer: Steve Blackmon @steveblackmon <sblackmon@apache.org>
Committed: Sun Oct 9 16:39:52 2016 -0500

----------------------------------------------------------------------
 local/elasticsearch-hdfs/pom.xml                | 111 ++++++++++++--
 .../elasticsearch/test/ElasticsearchHdfsIT.java | 150 ++++++++-----------
 .../example/elasticsearch/test/ExampleITs.java  |  17 +++
 .../elasticsearch/test/HdfsElasticsearchIT.java | 132 ++++++++++++++++
 .../src/test/resources/ElasticsearchHdfsIT.conf |  14 ++
 .../src/test/resources/HdfsElasticsearchIT.conf |  16 ++
 pom.xml                                         |   2 +-
 7 files changed, 342 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/f76a7e03/local/elasticsearch-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/pom.xml b/local/elasticsearch-hdfs/pom.xml
index 52cd0fc..601cd2b 100644
--- a/local/elasticsearch-hdfs/pom.xml
+++ b/local/elasticsearch-hdfs/pom.xml
@@ -32,9 +32,9 @@
     <description>Copies documents between elasticsearch and file system using the hdfs
persist module.</description>
 
     <properties>
+        <elasticsearch.version>2.3.5</elasticsearch.version>
+        <lucene.version>5.5.0</lucene.version>
         <docker.repo>apachestreams</docker.repo>
-        <elasticsearch.version>1.1.0</elasticsearch.version>
-        <lucene.version>4.7.2</lucene.version>
         <hdfs.version>2.7.0</hdfs.version>
     </properties>
 
@@ -102,6 +102,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-elasticsearch</artifactId>
+            <version>0.4-incubating-SNAPSHOT</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
             <artifactId>streams-persist-hdfs</artifactId>
             <version>0.4-incubating-SNAPSHOT</version>
             <exclusions>
@@ -259,7 +266,7 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-dependency-plugin</artifactId>
                 <configuration>
-                    <includes>**/*.json</includes>
+                    <includes>**/*.json,**/*.conf</includes>
                     <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
                     <includeGroupIds>org.apache.streams</includeGroupIds>
                     <includeTypes>test-jar</includeTypes>
@@ -277,16 +284,24 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-failsafe-plugin</artifactId>
-                <version>2.12.4</version>
-                <executions>
-                    <execution>
-                        <id>integration-tests</id>
-                        <goals>
-                            <goal>integration-test</goal>
-                            <goal>verify</goal>
-                        </goals>
-                    </execution>
-                </executions>
+                <configuration>
+                    <!-- Run integration test suite rather than individual tests. -->
+                    <excludes>
+                        <exclude>**/*Test.java</exclude>
+                        <exclude>**/*Tests.java</exclude>
+                        <exclude>**/*IT.java</exclude>
+                    </excludes>
+                    <includes>
+                        <include>**/*ITs.java</include>
+                    </includes>
+                </configuration>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.apache.maven.surefire</groupId>
+                        <artifactId>surefire-junit47</artifactId>
+                        <version>${failsafe.plugin.version}</version>
+                    </dependency>
+                </dependencies>
             </plugin>
             <plugin>
                 <groupId>io.fabric8</groupId>
@@ -295,4 +310,74 @@
         </plugins>
     </build>
 
+    <profiles>
+        <profile>
+            <id>dockerITs</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+                <property>
+                    <name>skipITs</name>
+                    <value>false</value>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>io.fabric8</groupId>
+                        <artifactId>docker-maven-plugin</artifactId>
+                        <version>${docker.plugin.version}</version>
+                        <configuration combine.self="override">
+                            <watchInterval>500</watchInterval>
+                            <logDate>default</logDate>
+                            <verbose>true</verbose>
+                            <autoPull>on</autoPull>
+                            <images>
+                                <image>
+                                    <name>elasticsearch:2.3.5</name>
+                                    <alias>elasticsearch</alias>
+                                    <run>
+                                        <namingStrategy>none</namingStrategy>
+                                        <ports>
+                                            <port>${es.http.host}:${es.http.port}:9200</port>
+                                            <port>${es.tcp.host}:${es.tcp.port}:9300</port>
+                                        </ports>
+                                        <portPropertyFile>elasticsearch.properties</portPropertyFile>
+                                        <wait>
+                                            <log>elasticsearch startup</log>
+                                            <http>
+                                                <url>http://${es.http.host}:${es.http.port}</url>
+                                                <method>GET</method>
+                                                <status>200</status>
+                                            </http>
+                                            <time>20000</time>
+                                            <kill>1000</kill>
+                                            <shutdown>500</shutdown>
+                                            <!--<tcp>-->
+                                            <!--<host>${es.transport.host}</host>-->
+                                            <!--<ports>-->
+                                            <!--<port>${es.transport.port}</port>-->
+                                            <!--</ports>-->
+                                            <!--</tcp>-->
+                                        </wait>
+                                        <log>
+                                            <enabled>true</enabled>
+                                            <date>default</date>
+                                            <color>cyan</color>
+                                        </log>
+                                    </run>
+                                    <watch>
+                                        <mode>none</mode>
+                                    </watch>
+                                </image>
+
+                            </images>
+                        </configuration>
+
+                    </plugin>
+
+                </plugins>
+            </build>
+
+        </profile>
+    </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/f76a7e03/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
index ad6970c..8dfe244 100644
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
@@ -21,130 +21,108 @@ package org.apache.streams.example.elasticsearch.test;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
 import org.apache.commons.io.Charsets;
 import org.apache.commons.io.IOUtils;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
 import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
 import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
-import org.apache.streams.hdfs.HdfsConfiguration;
-import org.apache.streams.hdfs.WebHdfsPersistWriter;
-import org.apache.streams.hdfs.HdfsWriterConfiguration;
-import org.apache.streams.hdfs.WebHdfsPersistReader;
-import org.apache.streams.hdfs.HdfsReaderConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
 import org.apache.streams.elasticsearch.example.ElasticsearchHdfs;
 import org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration;
 import org.apache.streams.elasticsearch.example.HdfsElasticsearch;
 import org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
-import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.InputStream;
 import java.io.File;
-import java.util.*;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertNotEquals;
+
 /**
  * Test copying documents between hdfs and elasticsearch
  */
-@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST,
numNodes=1)
-public class ElasticsearchHdfsIT extends ElasticsearchIntegrationTest {
+public class ElasticsearchHdfsIT {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
 
     ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    ElasticsearchConfiguration testConfiguration = new ElasticsearchConfiguration();
+    protected ElasticsearchHdfsConfiguration testConfiguration;
+    protected Client testClient;
+
+    private int count = 0;
 
     @Before
     public void prepareTest() throws Exception {
 
-        testConfiguration = new ElasticsearchConfiguration();
-        testConfiguration.setHosts(Lists.newArrayList("localhost"));
-        testConfiguration.setClusterName(cluster().getClusterName());
-
-        ElasticsearchWriterConfiguration setupWriterConfiguration = MAPPER.convertValue(testConfiguration,
ElasticsearchWriterConfiguration.class);
-        setupWriterConfiguration.setIndex("source");
-        setupWriterConfiguration.setType("activity");
-        setupWriterConfiguration.setBatchSize(5l);
-
-        ElasticsearchPersistWriter setupWriter = new ElasticsearchPersistWriter(setupWriterConfiguration);
-        setupWriter.prepare(null);
-
-        InputStream testActivityFolderStream = ElasticsearchHdfsIT.class.getClassLoader()
-                .getResourceAsStream("activities");
-        List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
-
-        for( String file : files) {
-            LOGGER.info("File: " + file );
-            InputStream testActivityFileStream = ElasticsearchHdfsIT.class.getClassLoader()
-                    .getResourceAsStream("activities/" + file);
-            Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
-            StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
-            setupWriter.write( datum );
-            LOGGER.info("Wrote: " + activity.getVerb() );
-        }
-
-        setupWriter.cleanUp();
-
-        flushAndRefresh();
-
+        Config reference  = ConfigFactory.load();
+        File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf");
+        assert(conf_file.exists());
+        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+        Properties es_properties  = new Properties();
+        InputStream es_stream  = new FileInputStream("elasticsearch.properties");
+        es_properties.load(es_stream);
+        Config esProps  = ConfigFactory.parseProperties(es_properties);
+        Config typesafe  = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+        StreamsConfiguration streams  = StreamsConfigurator.detectConfiguration(typesafe);
+        testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe);
+        testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
+
+        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+
+        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        assertTrue(indicesExistsResponse.isExists());
+
+        SearchRequestBuilder countRequest = testClient
+                .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+                .setTypes(testConfiguration.getSource().getTypes().get(0));
+        SearchResponse countResponse = countRequest.execute().actionGet();
+
+        count = (int)countResponse.getHits().getTotalHits();
+
+        assertNotEquals(count, 0);
     }
 
     @Test
-    public void testElasticsearchHdfs() throws Exception {
-
-        ElasticsearchHdfsConfiguration backupConfiguration = MAPPER.readValue(
-                ElasticsearchHdfsIT.class.getResourceAsStream("/testBackup.json"), ElasticsearchHdfsConfiguration.class);
-
-        backupConfiguration.getSource().setClusterName(cluster().getClusterName());
-
-        // backupConfiguration.getDestination().setClusterName(cluster().getClusterName());
-
-        assert(indexExists("source"));
-        long srcCount = client().count(client().prepareCount("source").request()).get().getCount();
-        assert srcCount > 0;
-
-        ElasticsearchHdfs backup = new ElasticsearchHdfs(backupConfiguration);
-
-        Thread backupThread = new Thread(backup);
-        backupThread.start();
-        backupThread.join();
-
-        HdfsElasticsearchConfiguration restoreConfiguration = MAPPER.readValue(
-                ElasticsearchHdfsIT.class.getResourceAsStream("/testRestore.json"), HdfsElasticsearchConfiguration.class);
-
-        restoreConfiguration.getDestination().setClusterName(cluster().getClusterName());
-
-        assert(!indexExists("destination"));
-
-        HdfsElasticsearch restore = new HdfsElasticsearch(restoreConfiguration);
-
-        Thread restoreThread = new Thread(restore);
-        restoreThread.start();
-
-        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
-
-        restoreThread.join();
-
-        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
-
-        flushAndRefresh();
-
-        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+    public void ElasticsearchHdfsIT() throws Exception {
 
-        assert(indexExists("destination"));
+        ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration);
 
-        long destCount = client().count(client().prepareCount("destination").request()).get().getCount();
+        backup.run();
 
-        assert srcCount == destCount;
+        // assert lines in file
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/f76a7e03/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java
b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java
new file mode 100644
index 0000000..ab882c8
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java
@@ -0,0 +1,17 @@
+package org.apache.streams.example.elasticsearch.test;
+
+import org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+        ElasticsearchPersistWriterIT.class,
+        ElasticsearchHdfsIT.class,
+        HdfsElasticsearchIT.class,
+})
+
+public class ExampleITs {
+    // the class remains empty,
+    // used only as a holder for the above annotations
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/f76a7e03/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java
b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java
new file mode 100644
index 0000000..1a055f6
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.elasticsearch.example.ElasticsearchHdfs;
+import org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration;
+import org.apache.streams.elasticsearch.example.HdfsElasticsearch;
+import org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test copying documents between hdfs and elasticsearch
+ */
+public class HdfsElasticsearchIT {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class);
+
+    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    protected HdfsElasticsearchConfiguration testConfiguration;
+    protected Client testClient;
+
+    @Before
+    public void prepareTest() throws Exception {
+
+        Config reference  = ConfigFactory.load();
+        File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf");
+        assert(conf_file.exists());
+        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+        Properties es_properties  = new Properties();
+        InputStream es_stream  = new FileInputStream("elasticsearch.properties");
+        es_properties.load(es_stream);
+        Config esProps  = ConfigFactory.parseProperties(es_properties);
+        Config typesafe  = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+        StreamsConfiguration streams  = StreamsConfigurator.detectConfiguration(typesafe);
+        testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe);
+        testClient = new ElasticsearchClientManager(testConfiguration.getDestination()).getClient();
+
+        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+
+        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        if(indicesExistsResponse.isExists()) {
+            DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex());
+            DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+            assertTrue(deleteIndexResponse.isAcknowledged());
+        };
+    }
+
+    @Test
+    public void ElasticsearchHdfsIT() throws Exception {
+
+        HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration);
+
+        restore.run();
+
+        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        assertTrue(indicesExistsResponse.isExists());
+
+        SearchRequestBuilder countRequest = testClient
+                .prepareSearch(testConfiguration.getDestination().getIndex())
+                .setTypes(testConfiguration.getDestination().getType());
+        SearchResponse countResponse = countRequest.execute().actionGet();
+
+        assertEquals(89, countResponse.getHits().getTotalHits());
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/f76a7e03/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf b/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
new file mode 100644
index 0000000..0505876
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
@@ -0,0 +1,14 @@
+source {
+  hosts += ${es.tcp.host}
+  port = ${es.tcp.port}
+  clusterName = "elasticsearch"
+  indexes += "elasticsearch_persist_writer_it"
+  types += "activity"
+}
+destination {
+  fields = ["ID","DOC"]
+  scheme = file
+  user = hadoop
+  path = "target/test-classes"
+  writerPath = "elasticsearch_hdfs_it"
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/f76a7e03/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf b/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
new file mode 100644
index 0000000..35b17a0
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
@@ -0,0 +1,16 @@
+source {
+  fields = ["ID","DOC"]
+  scheme = file
+  user = hadoop
+  path = "target/test-classes"
+  readerPath = "elasticsearch_hdfs_it"
+}
+destination {
+  hosts += ${es.tcp.host}
+  port = ${es.tcp.port}
+  clusterName = "elasticsearch"
+  index = "hdfs_elasticsearch_it"
+  type = "activity"
+  refresh = true
+  forceUseConfig = true
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/f76a7e03/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0e891ed..882aa13 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
 	</parent>
 
     <artifactId>streams-examples</artifactId>
-  <version>0.4-incubating-SNAPSHOT</version>
+    <version>0.4-incubating-SNAPSHOT</version>
 
     <packaging>pom</packaging>
     <name>streams-examples</name>


Mime
View raw message