Return-Path: X-Original-To: apmail-flume-user-archive@www.apache.org Delivered-To: apmail-flume-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 561C0102B2 for ; Wed, 12 Jun 2013 14:10:26 +0000 (UTC) Received: (qmail 4460 invoked by uid 500); 12 Jun 2013 14:10:26 -0000 Delivered-To: apmail-flume-user-archive@flume.apache.org Received: (qmail 4185 invoked by uid 500); 12 Jun 2013 14:10:24 -0000 Mailing-List: contact user-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flume.apache.org Delivered-To: mailing list user@flume.apache.org Received: (qmail 4131 invoked by uid 99); 12 Jun 2013 14:10:23 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Jun 2013 14:10:23 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,NORMAL_HTTP_TO_IP,RCVD_IN_DNSWL_LOW,SPF_PASS,WEIRD_PORT X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of allanfeid@gmail.com designates 209.85.215.47 as permitted sender) Received: from [209.85.215.47] (HELO mail-la0-f47.google.com) (209.85.215.47) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Jun 2013 14:10:18 +0000 Received: by mail-la0-f47.google.com with SMTP id fe20so7802915lab.6 for ; Wed, 12 Jun 2013 07:09:56 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=Vt5OvmWr2nv9lOt2jK14DQ7uT0Sca9F8ZWptcjQkJXg=; b=ynO7G1i5Ggd93U5wy9/6X+QqDvqtvDDN8/qWk5RUpqkHMKlJeJR7IPI5k2oba4xKCg Yg/K7x0qwEAUR0kiVbjoelWx5DBHuwQ/om7vckTj2myTtWrlUWH22axImZ7tR/psIM44 20kecCgXKr2sIq+u7ON2cQK6iIMmCFPn4Tq1QCQr5pDTBzmcqHAM97b8vopkGRJ3cv4y iLKdUKJ7wJ9C86LA15+SuKgJgfrnNoHBKqPrRB+rM8DR4keGk6lsTozhrTJn46PH129H 4utuPtyvl48GU+XDc+aGJaoUyxxiq5gqVZrOLjmSSstl9sWPjzd8djBCf4Yfy4W5iPKj XfHA== MIME-Version: 1.0 X-Received: by 10.152.28.40 with SMTP id y8mr10001042lag.15.1371046196824; Wed, 12 Jun 2013 07:09:56 -0700 (PDT) Received: by 10.112.77.134 with HTTP; Wed, 12 Jun 2013 07:09:56 -0700 (PDT) In-Reply-To: <1371004623.32292.YahooMailNeo@web163902.mail.gq1.yahoo.com> References: <1371004623.32292.YahooMailNeo@web163902.mail.gq1.yahoo.com> Date: Wed, 12 Jun 2013 10:09:56 -0400 Message-ID: Subject: Re: ElasticSearchSink does not work From: Allan Feid To: user@flume.apache.org, shushuai zhu Content-Type: multipart/alternative; boundary=089e0158ca68884ee404def59194 X-Virus-Checked: Checked by ClamAV on apache.org --089e0158ca68884ee404def59194 Content-Type: text/plain; charset=ISO-8859-1 Hi Shushuai, I've had a similar issue, and in my case it was because I was using the same channel for multiple sinks. I believe what happens is whatever sink can remove the event from the queue first will have it written out, but I don't know the specifics since I haven't had a chance to read through the codebase. If you add a second channel for your elasticsearch sink and make sure your avro-source writes to both channels, you should see data going to both locations. Thanks, Allan On Tue, Jun 11, 2013 at 10:37 PM, shushuai zhu wrote: > Hi, > > I am new to Flume. I am trying to send data using Flume Client perl API to > Flume Avro source then ElasticSearchSink to an ElasticSearch engine. I > could make the file_roll sink to work by sending the data to some file. > However, I am encountering issue with ElasticSearchSink. The data did not > go through to ElasticSearch engine: > > use Flume::Client; > my $ng_client = Flume::Client::Transceiver::Socket->new(host => 'host > name', port => 41414); > my $ng_requestor = Flume::Client::Requestor::FlumeNG->new(client => > $ng_client); > my ($result, $response) = $ng_requestor->request('appendBatch', [{ headers > => {}, body => "hello, this is sent from perl (using FlumeNG)"}]); > print "$response\n"; # response will be 'OK' on success > > since the returned $response is not defined (again this worked when > file_roll sink was used). > > The ElasticSearch engine is working since I could load data to it via > LogStash's EalsticSearch output type. > > The Flume agent was installed by downloading tarball from Cloudera: > > http://archive.cloudera.com/cdh4/cdh/4/flume-ng-1.3.0-cdh4.3.0.tar.gz > > The flume.conf is as followings. I played around the "hostNames" with full > name, IP address, etc. Do not see output message in flume.log. Could > someone let me know what could potentially cause the issue? > > Thanks. > > Shushuai > > > > # Define a memory channel called ch1 on agent1 > agent1.channels = ch1 > agent1.channels.ch1.type = memory > > # Define an Avro source called avro-source1 on agent1 and tell it to bind > to 0.0.0.0:41414. Connect it to channel ch1. > agent1.sources = avro-source1 > agent1.sources.avro-source1.channels = ch1 > agent1.sources.avro-source1.type = avro > agent1.sources.avro-source1.bind = 0.0.0.0 > agent1.sources.avro-source1.port = 41414 > > # Define a local file sink that simply logs all events it receives (this > works well) > #agent1.sinks = localout > #agent1.sinks.localout.type = file_roll > #agent1.sinks.localout.sink.directory = /scratch/flume-ng/log > #agent1.sinks.localout.sink.rollInterval = 0 > #agent1.sinks.localout.channel = ch1 > > # Define ElasticSearchSink sink (this does not work) > agent1.sinks = k1 > agent1.sinks.k1.type = > org.apache.flume.sink.elasticsearch.ElasticSearchSink > agent1.sinks.k1.hostNames = localhost:9300 > agent1.sinks.k1.indexName = flume > agent1.sinks.k1.indexType = logs > agent1.sinks.k1.clusterName = elasticsearch > agent1.sinks.k1.batchSize = 2 > agent1.sinks.k1.serializer = > org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer > agent1.sinks.k1.channel = ch1 > --089e0158ca68884ee404def59194 Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable
Hi Shushuai,

I've had a simil= ar issue, and in my case it was because I was using the same channel for mu= ltiple sinks. I believe what happens is whatever sink can remove the event = from the queue first will have it written out, but I don't know the spe= cifics since I haven't had a chance to read through the codebase. If yo= u add a second channel for your elasticsearch sink and make sure your avro-= source writes to both channels, you should see data going to both locations= .

Thanks,
Allan


On Tue, Jun 11,= 2013 at 10:37 PM, shushuai zhu <sszhu@yahoo.com> wrote:
Hi,
=A0
I = am new=A0to Flume. I am trying to send data=A0using Flume Client perl API t= o Flume Avro source then ElasticSearchSink to an ElasticSearch engine. I co= uld make the file_roll sink to work by sending the data to some file. Howev= er, I am encountering issue with ElasticSearchSink. The data did not go thr= ough to ElasticSearch engine:
=A0
use Flume::Client;
my $ng_client =3D Flume::Cl= ient::Transceiver::Socket->new(host =3D> 'host name', port = =3D> 41414);
my $ng_requestor =3D Flume::Client::Requestor::FlumeNG-&= gt;new(client =3D> $ng_client);
my ($result, $response) =3D $ng_requestor->request('appendBatch'= , [{ headers =3D> {}, body =3D> "hello, this is sent from perl (= using FlumeNG)"}]);
print "$response\n";=A0=A0=A0 # respo= nse will be 'OK' on success
=A0
since the returned $response is not def= ined (again this worked when file_roll sink was used).
=A0
<= div>The ElasticSearch engine is working since I could load data to it via L= ogStash's EalsticSearch output type.
=A0
The Flume agent was installed by downloading tarball fro= m Cloudera:
=A0
=A0
The flume.conf is as followings. I played around the &qu= ot;hostNames" with full name,=A0IP address, etc.=A0Do not see output m= essage=A0in flume.log. Could someone let me know what could potentially cau= se the issue?
=A0
Thanks.
=A0
Shushuai
=A0
=A0
=A0
# Define a memory channel called ch1 on agent1
agent1.channels =3D ch1
agent1.c= hannels.ch1.type =3D memory
=A0
# Define an Avro source= called avro-source1 on agent1 and tell it to bind to 0.0.0.0:41414. Connect it to channel ch1.=
agent1.sources =3D avro-source1
agent1.sources.avro-source1.channels =3D= ch1
agent1.sources.avro-source1.type =3D avro
agent1.sources.avro-so= urce1.bind =3D 0.0.0.0
agent1.sources.avro-source1.port =3D 41414
<= div>=A0
# Define a local file sink that simply logs all events it receives (th= is works well)
#agent1.sinks =3D localout
#agent1.sinks.localout.type= =3D file_roll
#agent1.sinks.localout.sink.directory =3D /scratch/flume-= ng/log
#agent1.sinks.localout.sink.rollInterval =3D 0
#agent1.sinks.localout.ch= annel =3D ch1
=A0
# Define ElasticSearchSink sink (this= does not work)
agent1.sinks =3D k1
agent1.sinks.k1.type =3D org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent1.sinks.k1.h= ostNames =3D localhost:9300
agent1.sinks.k1.indexName =3D flume
agent= 1.sinks.k1.indexType =3D logs
agent1.sinks.k1.clusterName =3D elasticsea= rch
agent1.sinks.k1.batchSize =3D 2
agent1.sinks.k1.serializer =3D org.apach= e.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
agent1.sinks.k= 1.channel =3D ch1

--089e0158ca68884ee404def59194--