flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rafal green <rafal.gree...@gmail.com>
Subject Re: Local Cluster have problem with connect to elasticsearch
Date Thu, 12 May 2016 20:35:15 GMT
This is my working jar that i download it form
*.m2/org/apache/flink/flink-connector-elasticsearch2_2.11/1.1-SNAPSHOT*

2016-05-12 22:26 GMT+02:00 rafal green <rafal.green17@gmail.com>:

> Hi Gordon,
>
> Thanks for advice - it's work perfect but only in elasticsearch case.
>
> This pom version works for elasticsearch 2.2.1.
>
> <artifactItem>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-connector-elasticsearch2_${scala.version}</artifactId>
>    <version>1.1-SNAPSHOT</version>
>    <type>jar</type>
>    <overWrite>false</overWrite>
>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>    <includes>org/apache/flink/**</includes>
> </artifactItem>
> <artifactItem>
>    <groupId>org.elasticsearch</groupId>
>    <artifactId>elasticsearch</artifactId>
>    <version>2.2.1</version>
>    <type>jar</type>
>    <overWrite>false</overWrite>
>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>    <includes>org/elasticsearch/**</includes>
> </artifactItem>
>
>
>
> Why 2.2.1 ? Beacuse if you check the *"flink/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml"
> * you will see this line*
> "<elasticsearch.version>2.2.1</elasticsearch.version>"*
>
>
> But Gordon your idea* not working with twitter-connector*. and I try add
>  this: (to pom) and it's not working
>
> <artifactItem>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-connector-twitter_${scala.version}</artifactId>
>    <version>1.1-SNAPSHOT</version>
>    <type>jar</type>
>    <overWrite>false</overWrite>
>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>    <includes>org/apache/flink/**</includes>
> </artifactItem>
> <artifactItem>
>    <groupId>com.twitter</groupId>
>    <artifactId>hbc-core</artifactId>
>    <version>2.2.0</version>
>    <type>jar</type>
>    <overWrite>false</overWrite>
>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>    <includes>com/twitter/**</includes>
> </artifactItem>
>
>
>
> or that
>
> <artifactItem>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-connector-twitter_${scala.version}</artifactId>
>    <version>1.1-SNAPSHOT</version>
>    <type>jar</type>
>    <overWrite>false</overWrite>
>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>    <includes>org/apache/flink/**</includes>
> </artifactItem>
> <artifactItem>
>    <groupId>com.twitter</groupId>
>    <artifactId>hbc-core</artifactId>
>    <version>2.2.0</version>
>    <type>jar</type>
>    <overWrite>false</overWrite>
>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>    <includes>com/twitter/**</includes>
> </artifactItem>
> <artifactItem>
>    <groupId>org.apache.httpcomponents</groupId>
>    <artifactId>httpclient</artifactId>
>    <version>4.2.5</version>
>    <type>jar</type>
>    <overWrite>false</overWrite>
>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>    <includes>org/apache/httpcomponents/**</includes>
> </artifactItem>
> <artifactItem>
>    <groupId>com.twitter</groupId>
>    <artifactId>joauth</artifactId>
>    <version>6.0.2</version>
>    <type>jar</type>
>    <overWrite>false</overWrite>
>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>    <includes>com/twitter/**</includes>
> </artifactItem>
> <artifactItem>
>    <groupId>org.apache.httpcomponents</groupId>
>    <artifactId>httpcore</artifactId>
>    <version>4.2.4</version>
>    <type>jar</type>
>    <overWrite>false</overWrite>
>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>    <includes>org/apache/httpcomponents/**</includes>
> </artifactItem>
> <artifactItem>
>    <groupId>com.google.guava</groupId>
>    <artifactId>guava</artifactId>
>    <version>14.0.1</version>
>    <type>jar</type>
>    <overWrite>false</overWrite>
>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>    <includes>com/google/guava/**</includes>
> </artifactItem>
>
>
>
> And if I run job I see this error:
>
> 2016-05-12 21:49:37,681 INFO  org.elasticsearch.plugins                             
       - [node-1] modules [], plugins [], sites []
> 2016-05-12 21:49:37,738 INFO  org.apache.flink.runtime.blob.BlobCache               
       - Downloading 5ff307efcde8deebfb2886733e40994c01fbba7d from localhost/127.0.0.1:47639
> 2016-05-12 21:49:38,109 INFO  org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink
 - Created Elasticsearch TransportClient org.elasticsearch.client.transport.TransportClient@66cdf89
> 2016-05-12 21:49:38,114 INFO  org.apache.flink.streaming.connectors.twitter.TwitterSource
  - Initializing Twitter Streaming API connection
> 2016-05-12 21:49:38,357 INFO  com.twitter.hbc.httpclient.BasicClient                
       - New connection executed: flink-twitter-source, endpoint: /1.1/statuses/sample.json
> 2016-05-12 21:49:38,357 INFO  org.apache.flink.streaming.connectors.twitter.TwitterSource
  - Twitter Streaming API connection established successfully
> 2016-05-12 21:49:38,376 WARN  com.twitter.hbc.httpclient.ClientBase                 
       - flink-twitter-source Uncaught exception
> java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
> 	at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140)
> 	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:114)
> 	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:99)
> 	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:85)
> 	at com.twitter.hbc.httpclient.RestartableHttpClient.setup(RestartableHttpClient.java:56)
> 	at com.twitter.hbc.httpclient.ClientBase.run(ClientBase.java:118)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 2016-05-12 21:49:38,379 INFO  com.twitter.hbc.httpclient.ClientBase                 
       - flink-twitter-source exit event - java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
> 2016-05-12 21:49:38,380 INFO  com.twitter.hbc.httpclient.ClientBase                 
       - flink-twitter-source Shutting down httpclient connection manager
>
>
>
>
>  ... and finaly  "flink-connector-twitter_2.11-1.1-SNAPSHOT.jar" -  if I
> add jar to this location: flink/build-target/lib/   - it's working. No idea
> why :P
>
>
> 2016-05-12 0:32 GMT+02:00 Tzu-Li (Gordon) Tai <tzulitai@gmail.com>:
>
>> Hi Rafal,
>>
>> From your description, it seems like Flink is complaining because it
>> cannot
>> access the Elasticsearch API related dependencies as well. You'd also have
>> to include the following into your Maven build, under <artifactItems>:
>>
>> <artifactItem>
>>     <groupId>org.elasticsearch</groupId>
>>     <artifactId>elasticsearch</artifactId>
>>     <version>2.3.2</version>
>>     <type>jar</type>
>>     <overWrite>false</overWrite>
>>     <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>     <includes>org/elasticsearch/**</includes>
>> </artifactItem>
>>
>> Now your built jar should correctly include all required dependencies (the
>> connector & Elasticsearch API).
>>
>> As explained in  Linking with modules not contained in the binary
>> distribution
>> <
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution
>> >
>> , it will be enough to package dependencies along with your code for Flink
>> to access all required dependencies, and you wouldn't need to copy the jar
>> to the lib folder. I would recommend to clean up the lib folder of the
>> previous jars you copied, and follow this approach in the future, just in
>> case they mess up the classloader.
>>
>> As with your first attempt that Flink cannot find any Elasticsearch nodes
>> when executed in the IDE, I'm suspecting the reason is that the
>> elasticsearch2 connector by default uses version 2.2.1, lower than your
>> cluster version 2.3.2. I had previous experience when Elasticsearch
>> strangely complains not finding any nodes when using lower client versions
>> than the deployment. Can you try compiling the elasticsearch2 connector
>> with
>> the option -Delasticsearch.version=2.3.2, and use the newly build
>> connector
>> jar, following the same method mentioned above?
>>
>> Hope this helps!
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Local-Cluster-have-problem-with-connect-to-elasticsearch-tp6788p6838.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>

Mime
View raw message