incubator-kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Richard Park <richard.b.p...@gmail.com>
Subject Re: How to use the hadoop consumer in distributed mode?
Date Wed, 26 Oct 2011 20:33:45 GMT
Jay and I had a talk about this and we would like to release it as soon as
we can. There are a few LinkedIn specific coding that need to be abstracted
out first.

We also use Avro heavily, and so much of our code is written with that in
mind. It should be easy enough to abstract the Avro out, but we may release
that part of the code as is.

Anyways, we're evaluating what can be released and what needs to be
cleaned-up but we hope to get something out there soon.

On Wed, Oct 26, 2011 at 1:10 PM, Felix GV <felix@mate1inc.com> wrote:

> Hi,
>
> I wanted to give a little update on this topic.
>
> I was able to make hadoop-consumer work with a kafka cluster.
>
> What I did is:
>
>   1. I generated a .properties file for one of the kafka brokers I wanted
>   to connect to.
>   2. I ran the DataGenerator program by passing the .properties file as a
>   parameter.
>   3. I moved the 1.dat offset file generated in HDFS so that it has another
>   name (so that it's not overwritten the next time I run the
> DataGenerator).
>   4. I changed the the broker's address in the .properties file to the next
>   server I wanted to connect to.
>   5. I repeated step 2 to 4 for every kafka server in the cluster.
>   6. I then ran SimpleKafkaETLJob and it was able to spawn one map task per
>   broker and pull all the data from each.
>
> This is almost exactly what I was trying before, except that before, I had
> manually modified the .dat offset files instead of generating each one with
> the DataGenerator, and I think vim didn't play nice with the SEQ files or
> something like that... I don't know.
>
> Anyhow, what I'm doing now is a little convoluted but at least it works...
> I
> will create a script that does all this repetitive stuff for me. Ideally, I
> would also like to pull the brokers list from ZK, like you guys do.
>
> The Kafka/Hadoop ETL tools you mentioned are no doubt more mature and
> complete than the stuff I will create, so it would be really nice if you
> could release it.
>
> I think releasing those tools would help drive the adoption of Kafka,
> because in the state it's in now, Kafka is not really plug and play. That
> is, it works (which is already better than a lot of open source projects
> out
> there ;) !) but it seems a rather important part is missing.
>
> --
> Felix
>
>
>
> On Tue, Oct 18, 2011 at 7:31 PM, Hisham Mardam-Bey <hisham@mate1inc.com
> >wrote:
>
> > Hi folks, been following this thread, Felix and I are working together
> > on this project, we really like Kafka and are moving it into
> > production very soon.
> >
> > Jay, question, would you guys consider releasing the code in a "not so
> > clean state" and have the community (we would like to help) shore it
> > up so it becomes usable by the masses or are there other issues
> > (legal?) you have to sort out first?
> >
> > Thanks!
> >
> > hisham.
> >
> > On Tue, Oct 18, 2011 at 6:28 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
> > > I would actually love for us to release the full ETL system we have for
> > > Kafka/Hadoop, it is just a matter of finding the time to get this code
> > into
> > > that shape.
> > >
> > > The hadoop team that maintains that code is pretty busy right now, but
> i
> > am
> > > hoping we can find a way.
> > >
> > > -Jay
> > >
> > > On Tue, Oct 18, 2011 at 3:18 PM, Felix Giguere Villegas <
> > > felix.giguere@mate1inc.com> wrote:
> > >
> > >> Thanks for your replies guys :)
> > >>
> > >> @Jay: I thought about the Hadoop version mismatch too, because I've
> had
> > the
> > >> same problem before. I'll double check again to make sure I have the
> > same
> > >> versions of hadoop everywhere, as the Kafka distributed cluster I was
> > >> testing on is a new setup and I might have forgotten to put the hadoop
> > jars
> > >> we use in it... I'm working part-time for now so I probably won't
> touch
> > >> this
> > >> again until next week but I'll keep you guys posted ASAP :)
> > >>
> > >> @Richard: Thanks a lot for your description. That clears out the
> > >> inaccuracies in my understanding. Is there any chance you guys might
> > >> release
> > >> the code you use to query ZK and create appropriate offset files for
> > each
> > >> broker/partition pair? The hadoop consumer provided in the source
> works
> > >> with
> > >> the setup we get from the quickstart guide, but the process you
> describe
> > >> seems more appropriate for production use.
> > >>
> > >> Thanks again :)
> > >>
> > >> --
> > >> Felix
> > >>
> > >>
> > >>
> > >> On Tue, Oct 18, 2011 at 5:52 PM, Richard Park <
> richard.b.park@gmail.com
> > >> >wrote:
> > >>
> > >> > Does the version in contrib contain the fixes for Kafka-131? The
> > offsets
> > >> > were incorrectly computed prior to this patch.
> > >> >
> > >> > At LinkedIn, this is what we do in a nutshell.
> > >> > 1. We connect to the zookeeper instance. With this we are able to
> > >> discover
> > >> > the topics, the brokers and the partitions of a broker.
> > >> >
> > >> > 2. For a topic we want to pull, we create files that contains the
> > offset
> > >> > for
> > >> > each broker and partition.  Each individual file contains a unique
> > >> > broker/partition pair. This is essentially what data generator does,
> > >> except
> > >> > we use values from zookeeper. We take the output of the previous run
> > of
> > >> > kafka (the new offsets) and use them as the new offset files. If the
> > old
> > >> > offset doesn't exist, we set a default starting offset.
> > >> >
> > >> > 3. We run the pull hadoop job. One mapper per broker/partition pulls
> > >> using
> > >> > the simple consumer into hdfs (the KafkaETLRecordReader handles most
> > of
> > >> > this). We query kafka for the latest offset. The mapper fetches from
> > the
> > >> > kafka broker until the latest offset is reached.
> > >> >
> > >> > 4. We group the data by hourly partition with a reduce step.
> > >> >
> > >> > 5. The kafka hadoop job's mapper spits out new offsets for the next
> > time
> > >> we
> > >> > decide to pull the data. The pull occurs at regular scheduled
> > intervals
> > >> > quite frequently.
> > >> >
> > >> > That's the gist of it. There are a few additional modification we
> made
> > to
> > >> > the kafka job including the ability to handle unavailable nodes,
> avro
> > >> > schema
> > >> > resolution and auditing.
> > >> >
> > >> > Thanks,
> > >> > -Richard
> > >> >
> > >> >
> > >> >
> > >> > On Tue, Oct 18, 2011 at 2:03 PM, Jay Kreps <jay.kreps@gmail.com>
> > wrote:
> > >> >
> > >> > > Is it possible that this is due to a hadoop version mismatch?
> > Typically
> > >> > if
> > >> > > the client jar you pick up does not match the hadoop version
of
> your
> > >> > hadoop
> > >> > > cluster you get EOFException.
> > >> > >
> > >> > > -Jay
> > >> > >
> > >> > > On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas <
> > >> > > felix.giguere@mate1inc.com> wrote:
> > >> > >
> > >> > > > Hello everyone :) !
> > >> > > >
> > >> > > > I have trouble using the Kafka hadoop consumer included
in
> > >> > > > contrib/hadoop-consumer and I'd like to know if/how it is
used
> at
> > >> > > LinkedIn
> > >> > > > or elsewhere? I would also like if someone could confirm
or
> > correct
> > >> the
> > >> > > > assumptions I make below.
> > >> > > >
> > >> > > > Here's what I have so far:
> > >> > > >
> > >> > > > It works when pulling from one Kafka broker, but not when
> pulling
> > >> from
> > >> > > > many. There are two problems:
> > >> > > >
> > >> > > > The first problem concerns the offset files that the Map/Reduce
> > job
> > >> > takes
> > >> > > > as
> > >> > > > its input. From what I understand, these offset files represent
> > the
> > >> > > offset
> > >> > > > to start reading from on each of the Kafka brokers.
> > >> > > >
> > >> > > > To generate those files the first time (and thus start from
> offset
> > >> -1),
> > >> > > we
> > >> > > > can go in contrib/hadoop-consumer/ and run:
> > >> > > >
> > >> > > > ./run-class.sh kafka.etl.impl.DataGenerator
> > >> > my-properties-file.properties
> > >> > > >
> > >> > > > The problem is that this DataGenerator class can take only
one
> > Kafka
> > >> > > broker
> > >> > > > in its parameters (the properties file) and thus generates
only
> > one
> > >> > > offset
> > >> > > > file.
> > >> > > >
> > >> > > > The Map/Reduce job will then spawn one map task for each
offset
> > file
> > >> it
> > >> > > > finds in its input directory, and each of these map tasks
will
> > >> connect
> > >> > to
> > >> > > a
> > >> > > > different Kafka broker. Since the DataGenerator can only
> generate
> > one
> > >> > > > offset
> > >> > > > file, the Map/Reduce job only spawns one map task which
queries
> > only
> > >> > one
> > >> > > > Kafka broker.
> > >> > > >
> > >> > > > Unless my assumptions are wrong or someone else provides
a nice
> > >> > > alternative
> > >> > > > solution, I was planning to modify the DataGenerator class
so
> that
> > it
> > >> > can
> > >> > > > generate multiple offset files, but for now, as a manual
> > work-around,
> > >> I
> > >> > > > just
> > >> > > > duplicated the offset files and specified a different Kafka
> broker
> > in
> > >> > > each.
> > >> > > >
> > >> > > > Other than that, I am thinking perhaps a more robust solution
> > would
> > >> be
> > >> > to
> > >> > > > have ZK-based discovery of the available brokers. Again,
I'm
> > curious
> > >> to
> > >> > > > find
> > >> > > > out how this is done at LinkedIn or elsewhere?
> > >> > > >
> > >> > > > The second problem is when I run the M/R job. If I run it
with
> the
> > >> > > multiple
> > >> > > > offset files I manually generated as its input, it does
spawn
> > three
> > >> map
> > >> > > > tasks, as expected, but it then fails with the following
error:
> > >> > > >
> > >> > > > java.io.IOException: java.io.EOFException
> > >> > > >        at
> > >> > > >
> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166)
> > >> > > >        at
> > >> > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30)
> > >> > > >        at
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208)
> > >> > > >        at
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193)
> > >> > > >        at
> > org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
> > >> > > >        at
> > >> > org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
> > >> > > >        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
> > >> > > >        at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
> > >> > > >        at java.security.AccessController.doPrivileged(Native
> > Method)
> > >> > > >        at javax.security.auth.Subject.doAs(Subject.java:396)
> > >> > > >        at
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
> > >> > > >        at
> > org.apache.hadoop.mapred.Child.main(Child.java:264)Caused
> > >> by:
> > >> > > > java.io.EOFException
> > >> > > >        at
> > java.io.DataInputStream.readFully(DataInputStream.java:180)
> > >> > > >        at
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
> > >> > > >        at
> > >> > > >
> > >> org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
> > >> > > >        at
> > >> > > >
> > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945)
> > >> > > >        at
> > >> > > >
> > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077)
> > >> > > >        at
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76)
> > >> > > >        at
> > >> > > >
> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128)
> > >> > > >        ... 11 more
> > >> > > >
> > >> > > >
> > >> > > > It fails before writing anything whatsoever, and it fails
> > repeatedly
> > >> > for
> > >> > > > each Map task until the JobTracker reaches the maximum amount
of
> > >> > failures
> > >> > > > per task and marks the job as failed.
> > >> > > >
> > >> > > > I haven't figured this one out yet...
> > >> > > >
> > >> > > > Any help would be greatly appreciated :) !
> > >> > > >
> > >> > > > Thanks :) !!!!
> > >> > > >
> > >> > > > --
> > >> > > > Felix
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
> > --
> > Hisham Mardam Bey
> >
> > A: Because it messes up the order in which people normally read text.
> > Q: Why is top-posting such a bad thing?
> > A: Top-posting.
> > Q: What is the most annoying thing in e-mail?
> >
> > -=[ Codito Ergo Sum ]=-
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message