flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rafal green <rafal.gree...@gmail.com>
Subject Re: Local Cluster have problem with connect to elasticsearch
Date Thu, 12 May 2016 20:39:43 GMT
Sorry not jar from elasticsearch-connector but from twitter-connector
 *".m2/org/apache/flink/flink-connector-twitter_2.11/1.1-SNAPSHOT"
- *it's work fine

2016-05-12 22:35 GMT+02:00 rafal green <rafal.green17@gmail.com>:

> This is my working jar that i download it form
> *.m2/org/apache/flink/flink-connector-elasticsearch2_2.11/1.1-SNAPSHOT*
>
> 2016-05-12 22:26 GMT+02:00 rafal green <rafal.green17@gmail.com>:
>
>> Hi Gordon,
>>
>> Thanks for advice - it's work perfect but only in elasticsearch case.
>>
>> This pom version works for elasticsearch 2.2.1.
>>
>> <artifactItem>
>>    <groupId>org.apache.flink</groupId>
>>    <artifactId>flink-connector-elasticsearch2_${scala.version}</artifactId>
>>    <version>1.1-SNAPSHOT</version>
>>    <type>jar</type>
>>    <overWrite>false</overWrite>
>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>    <includes>org/apache/flink/**</includes>
>> </artifactItem>
>> <artifactItem>
>>    <groupId>org.elasticsearch</groupId>
>>    <artifactId>elasticsearch</artifactId>
>>    <version>2.2.1</version>
>>    <type>jar</type>
>>    <overWrite>false</overWrite>
>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>    <includes>org/elasticsearch/**</includes>
>> </artifactItem>
>>
>>
>>
>> Why 2.2.1 ? Beacuse if you check the *"flink/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml"
>> * you will see this line*
>> "<elasticsearch.version>2.2.1</elasticsearch.version>"*
>>
>>
>> But Gordon your idea* not working with twitter-connector*. and I try add
>>  this: (to pom) and it's not working
>>
>> <artifactItem>
>>    <groupId>org.apache.flink</groupId>
>>    <artifactId>flink-connector-twitter_${scala.version}</artifactId>
>>    <version>1.1-SNAPSHOT</version>
>>    <type>jar</type>
>>    <overWrite>false</overWrite>
>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>    <includes>org/apache/flink/**</includes>
>> </artifactItem>
>> <artifactItem>
>>    <groupId>com.twitter</groupId>
>>    <artifactId>hbc-core</artifactId>
>>    <version>2.2.0</version>
>>    <type>jar</type>
>>    <overWrite>false</overWrite>
>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>    <includes>com/twitter/**</includes>
>> </artifactItem>
>>
>>
>>
>> or that
>>
>> <artifactItem>
>>    <groupId>org.apache.flink</groupId>
>>    <artifactId>flink-connector-twitter_${scala.version}</artifactId>
>>    <version>1.1-SNAPSHOT</version>
>>    <type>jar</type>
>>    <overWrite>false</overWrite>
>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>    <includes>org/apache/flink/**</includes>
>> </artifactItem>
>> <artifactItem>
>>    <groupId>com.twitter</groupId>
>>    <artifactId>hbc-core</artifactId>
>>    <version>2.2.0</version>
>>    <type>jar</type>
>>    <overWrite>false</overWrite>
>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>    <includes>com/twitter/**</includes>
>> </artifactItem>
>> <artifactItem>
>>    <groupId>org.apache.httpcomponents</groupId>
>>    <artifactId>httpclient</artifactId>
>>    <version>4.2.5</version>
>>    <type>jar</type>
>>    <overWrite>false</overWrite>
>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>    <includes>org/apache/httpcomponents/**</includes>
>> </artifactItem>
>> <artifactItem>
>>    <groupId>com.twitter</groupId>
>>    <artifactId>joauth</artifactId>
>>    <version>6.0.2</version>
>>    <type>jar</type>
>>    <overWrite>false</overWrite>
>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>    <includes>com/twitter/**</includes>
>> </artifactItem>
>> <artifactItem>
>>    <groupId>org.apache.httpcomponents</groupId>
>>    <artifactId>httpcore</artifactId>
>>    <version>4.2.4</version>
>>    <type>jar</type>
>>    <overWrite>false</overWrite>
>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>    <includes>org/apache/httpcomponents/**</includes>
>> </artifactItem>
>> <artifactItem>
>>    <groupId>com.google.guava</groupId>
>>    <artifactId>guava</artifactId>
>>    <version>14.0.1</version>
>>    <type>jar</type>
>>    <overWrite>false</overWrite>
>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>    <includes>com/google/guava/**</includes>
>> </artifactItem>
>>
>>
>>
>> And if I run job I see this error:
>>
>> 2016-05-12 21:49:37,681 INFO  org.elasticsearch.plugins                         
           - [node-1] modules [], plugins [], sites []
>> 2016-05-12 21:49:37,738 INFO  org.apache.flink.runtime.blob.BlobCache           
           - Downloading 5ff307efcde8deebfb2886733e40994c01fbba7d from localhost/127.0.0.1:47639
>> 2016-05-12 21:49:38,109 INFO  org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink
 - Created Elasticsearch TransportClient org.elasticsearch.client.transport.TransportClient@66cdf89
>> 2016-05-12 21:49:38,114 INFO  org.apache.flink.streaming.connectors.twitter.TwitterSource
  - Initializing Twitter Streaming API connection
>> 2016-05-12 21:49:38,357 INFO  com.twitter.hbc.httpclient.BasicClient            
           - New connection executed: flink-twitter-source, endpoint: /1.1/statuses/sample.json
>> 2016-05-12 21:49:38,357 INFO  org.apache.flink.streaming.connectors.twitter.TwitterSource
  - Twitter Streaming API connection established successfully
>> 2016-05-12 21:49:38,376 WARN  com.twitter.hbc.httpclient.ClientBase             
           - flink-twitter-source Uncaught exception
>> java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
>> 	at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140)
>> 	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:114)
>> 	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:99)
>> 	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:85)
>> 	at com.twitter.hbc.httpclient.RestartableHttpClient.setup(RestartableHttpClient.java:56)
>> 	at com.twitter.hbc.httpclient.ClientBase.run(ClientBase.java:118)
>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> 	at java.lang.Thread.run(Thread.java:745)
>> 2016-05-12 21:49:38,379 INFO  com.twitter.hbc.httpclient.ClientBase             
           - flink-twitter-source exit event - java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
>> 2016-05-12 21:49:38,380 INFO  com.twitter.hbc.httpclient.ClientBase             
           - flink-twitter-source Shutting down httpclient connection manager
>>
>>
>>
>>
>>  ... and finaly  "flink-connector-twitter_2.11-1.1-SNAPSHOT.jar" -  if I
>> add jar to this location: flink/build-target/lib/   - it's working. No idea
>> why :P
>>
>>
>> 2016-05-12 0:32 GMT+02:00 Tzu-Li (Gordon) Tai <tzulitai@gmail.com>:
>>
>>> Hi Rafal,
>>>
>>> From your description, it seems like Flink is complaining because it
>>> cannot
>>> access the Elasticsearch API related dependencies as well. You'd also
>>> have
>>> to include the following into your Maven build, under <artifactItems>:
>>>
>>> <artifactItem>
>>>     <groupId>org.elasticsearch</groupId>
>>>     <artifactId>elasticsearch</artifactId>
>>>     <version>2.3.2</version>
>>>     <type>jar</type>
>>>     <overWrite>false</overWrite>
>>>     <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>>     <includes>org/elasticsearch/**</includes>
>>> </artifactItem>
>>>
>>> Now your built jar should correctly include all required dependencies
>>> (the
>>> connector & Elasticsearch API).
>>>
>>> As explained in  Linking with modules not contained in the binary
>>> distribution
>>> <
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution
>>> >
>>> , it will be enough to package dependencies along with your code for
>>> Flink
>>> to access all required dependencies, and you wouldn't need to copy the
>>> jar
>>> to the lib folder. I would recommend to clean up the lib folder of the
>>> previous jars you copied, and follow this approach in the future, just in
>>> case they mess up the classloader.
>>>
>>> As with your first attempt that Flink cannot find any Elasticsearch nodes
>>> when executed in the IDE, I'm suspecting the reason is that the
>>> elasticsearch2 connector by default uses version 2.2.1, lower than your
>>> cluster version 2.3.2. I had previous experience when Elasticsearch
>>> strangely complains not finding any nodes when using lower client
>>> versions
>>> than the deployment. Can you try compiling the elasticsearch2 connector
>>> with
>>> the option -Delasticsearch.version=2.3.2, and use the newly build
>>> connector
>>> jar, following the same method mentioned above?
>>>
>>> Hope this helps!
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Local-Cluster-have-problem-with-connect-to-elasticsearch-tp6788p6838.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>

Mime
View raw message