kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] KIP-26 - Add Copycat, a connector framework for data import/export
Date Mon, 06 Jul 2015 18:40:21 GMT
Hi Ewen,

I read through the KIP page and here are some comments on the design
section:

1. "... and Copycat does not require that all partitions be enumerated".
Not very clear about this, do you mean Copycat allows non-enumerable stream
partitions?

2. "... translates the data to Copycat's format, decides the destination
topic (and possibly partition) in Kafka." Just to confirm it seems
indicating two destination scenarios Copycat connectors should be able to
support:

a. Specific destination topics per task (e.g. as illustrated in the digram,
task 1 to topics A and B, task 2 to topics B and C).
b. Specific destination topic-partitions per task (as said in "possibly
partition", like task 1 to topicA-partition1 and topicB-partition1, task 2
to topicA-partition2 and topicB-partition2).

I understand connector developers needs to implement the dynamic mapping
coordination from the source streams to tasks, but does the mapping from
tasks to destination topic-partitions (for sinking Copycat I assume it
would be stream-partitions) also need to be implemented dynamically since
the destination stream could also change?

3. "Delivery Guarantees": depending on how we define the guarantees, it may
not only depends on the output system but also the input system. For
example, duplicates may be generated from the input systems as well. Do we
also need to consider these scenarios?

4. "Integration with Process Management": for "Resource constrained
connectors", I am not sure how it is different in deployment from
"Copycat-as-a-service"? I feel there are generally three different types:

  1) run-as-a-service: on a shared cluster equipped with some resource
manager, a Copycat framework is ever-running and users submit their
connector jobs via REST.
  2) standalone: on a single machine, start a Copycat instance with the
configured master + #.workers processes via some cmdline tool.
  3) embedded library: the Copycat code will be running on whatever the
embedding application is running on.

5. Some terminology suggestions, how about the following descriptions (no
technical difference except the CLI APIs, just some naming changes) of
Copycat:

a. Copycat developers needs to implement the "*connector*" module, which
include the "*master*" and "*worker*" logic:

  1) "master" is responsible for coordinating the assignment from the
resource stream partitions to the workers (and possibly also the assignment
from the workers to the destination stream partitions?) *dynamically*, and
  2) "worker" is responsible for polling from the assigned resource stream
partitions and pushing to the assigned destination stream partitions.

b. Copycat framework includes:

  1) The interface for the connector workers polling-from-resource and
pushing-to-destination function calls,
  2) The interface for resource management integration: it leverages the
underlying resource managers like YARN / Mesos to get a list of allocated "
*containers*".
  3) A "*connector manager*" responsible for coordinating the assignment
from the connector master / worker processes to the allocated containers
*dynamically*.

c. Copycat users need to specify the *connector configurations* through
config files or ZK / other storage systems, including #.tasks, starting
offsets, etc, and start the *connector job* with its configurations (each
job as its own configs) via the above mentioned three different modes:

  1) submit the job via REST to a Copycat service running on a shared
cluster with resource manager, or
  2) start the job in standalone mode in a single machine, with all the
master / workers running on that single machine.
  3) start a copycat instance first in embedded mode and then add
connectors, all the added connectors (i.e. their master / workers) run on
the single machine where the embedding app code is running.

d. As for the CLI APIs, we will only need one for the standalone mode since
the run-as-a-service mode will always have some resource manager to
allocate the containers.

Guozhang


On Mon, Jun 29, 2015 at 9:50 AM, Ewen Cheslack-Postava <ewen@confluent.io>
wrote:

> Seems like discussion has mostly quieted down on this. Any more questions,
> comments, or discussion? If nobody brings up any other issues, I'll start a
> vote thread in a day or two.
>
> -Ewen
>
> On Thu, Jun 25, 2015 at 3:36 PM, Jay Kreps <jay@confluent.io> wrote:
>
> > We were talking on the call about a logo...so here I present "The
> Original
> > Copycat":
> > http://shirtoid.com/67790/the-original-copycat/
> >
> > -Jay
> >
> > On Tue, Jun 23, 2015 at 6:28 PM, Gwen Shapira <gshapira@cloudera.com>
> > wrote:
> >
> > > One more reason to have CopyCat as a separate project is to sidestep
> > > the entire "Why CopyCat and not X" discussion :)
> > >
> > > On Tue, Jun 23, 2015 at 6:26 PM, Gwen Shapira <gshapira@cloudera.com>
> > > wrote:
> > > > Re: Flume vs. CopyCat
> > > >
> > > > I would love to have an automagically-parallelizing, schema-aware
> > > > version of Flume with great reliability guarantees. Flume has good
> > > > core architecture and I'm sure that if the Flume community is
> > > > interested, it can be extended in that direction.
> > > >
> > > > However, the Apache way is not to stop new innovation just because
> > > > some systems already exists. We develop the best systems we can, and
> > > > users choose the ones they prefer - thats how ecosystems thrive.
> > > > If we can have Flume and NiFi, Sentry and Argus, Flink and Storm,
> > > > Parquet and ORC, I'm sure we can also have CopyCat in the zoo :)
> > > >
> > > > Gwen
> > > >
> > > >
> > > >
> > > > On Tue, Jun 23, 2015 at 6:11 PM, Ewen Cheslack-Postava
> > > > <ewen@confluent.io> wrote:
> > > >> On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik <
> roshan@hortonworks.com>
> > > wrote:
> > > >>
> > > >>> Thanks Jay and Ewen for the response.
> > > >>>
> > > >>>
> > > >>> >@Jay
> > > >>> >
> > > >>> > 3. This has a built in notion of parallelism throughout.
> > > >>>
> > > >>>
> > > >>>
> > > >>> It was not obvious how it will look like or differ from existing
> > > systemsÅ 
> > > >>> since all of existing ones do parallelize data movement.
> > > >>>
> > > >>
> > > >> I'm guessing some confusion here might also be because we want both
> > > >> parallelization and distribution.
> > > >>
> > > >> Roughly speaking, I think of Copycat making the consumer group
> > > abstraction
> > > >> available for any import task, and the idea is to make this
> automatic
> > > and
> > > >> transparent to the user. This isn't interesting for systems that
> > > literally
> > > >> only have a single input stream, but Copycat source connectors have
> a
> > > >> built-in notion of parallel input streams. The connector's job is
to
> > > inform
> > > >> the the Copycat framework of what input streams there are and
> Copycat
> > > >> handles running tasks, balancing the streams across them, handles
> > > failures
> > > >> by rebalancing as necessary, provides offset commit and storage so
> > tasks
> > > >> can resume from the last known-good state, etc.
> > > >>
> > > >> On the sink side, the input is the Kafka consumer group, which
> > obviously
> > > >> already has this parallelism built in. Depending on the output, this
> > may
> > > >> manifest in different ways. For HDFS, the effect is just that your
> > > output
> > > >> files are partitioned (one per topic-partition).
> > > >>
> > > >> As for other systems, can you be more specific? Some of them
> obviously
> > > do
> > > >> (e.g. Camus), but others require you to handle this manually. I
> don't
> > > want
> > > >> to pick on Flume specifically, but as an example, it requires either
> > > >> configuring multiple (or multiplexed) flows in a single agent or
> > manage
> > > >> multiple agents independently. This isn't really the same as what
> I've
> > > >> described above where you hand Copycat one config and it
> automatically
> > > >> spreads the work across multiple, fault-tolerant tasks. But flume
is
> > > also
> > > >> targeting a much different general problem, trying to build
> > potentially
> > > >> large, multi-stage data flows with all sorts of transformations,
> > > filtering,
> > > >> etc.
> > > >>
> > > >>
> > > >>>
> > > >>>
> > > >>> @Ewen,
> > > >>>
> > > >>> >Import: Flume is just one of many similar systems designed
around
> > log
> > > >>> >collection. See notes below, but one major point is that they
> > > generally
> > > >>> >don't provide any sort of guaranteed delivery semantics.
> > > >>>
> > > >>>
> > > >>> I think most of them do provide guarantees of some sort (Ex. Flume
> &
> > > >>> FluentD).
> > > >>>
> > > >>
> > > >> This part of the discussion gets a little bit tricky, not least
> > because
> > > it
> > > >> seems people can't agree on exactly what these terms mean.
> > > >>
> > > >> First, some systems that you didn't mention. Logstash definitely
> > doesn't
> > > >> have any guarantees as it uses a simple 20-event in-memory buffer
> > > between
> > > >> stages. As far as I can tell, Heka doesn't provide these semantics
> > > either,
> > > >> although I have not investigated it as deeply.
> > > >>
> > > >> fluentd has an article discussing the options for it (
> > > >> http://docs.fluentd.org/articles/high-availability), but I actually
> > > think
> > > >> the article on writing plugins is more informative
> > > >> http://docs.fluentd.org/articles/plugin-development The most
> > important
> > > >> point is that input plugins have no way to track or discovery
> > downstream
> > > >> delivery (i.e. they cannot get acks, nor is there any sort of offset
> > > >> tracked that it can lookup to discover where to restart upon
> failure,
> > > nor
> > > >> is it guaranteed that after router.emit() returns that the data will
> > > have
> > > >> already been delivered downstream). So if I have a replicated input
> > data
> > > >> store, e.g. a replicated database, and I am just reading off it's
> > > >> changelog, does fluentd actually guarantee something like at least
> > once
> > > >> delivery to the sink? In fact, fluentd's own documentation (the high
> > > >> availability doc) describes data loss scenarios that aren't inherent
> > to
> > > >> every system (e.g., if their log aggregator dies, which not every
> > > system is
> > > >> susceptible to, vs. if an event is generated on a single host and
> that
> > > host
> > > >> dies before reporting it anywhere, then of course the data is
> > > permanently
> > > >> lost).
> > > >>
> > > >> Flume actually does have a (somewhat confusingly named) transaction
> > > concept
> > > >> to help control this. The reliability actually depends on what type
> of
> > > >> channel implementation you use. Gwen and Jeff from Cloudera
> integrated
> > > >> Kafka and Flume, including a Kafka channel (see
> > > >>
> > >
> >
> http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/
> > > ).
> > > >> This does allow for better control over delivery semantics, and I
> > think
> > > if
> > > >> you use something like Kafka for every channel in your pipeline, you
> > can
> > > >> get something like what Copycat can provide. I'd argue flume's
> > approach
> > > has
> > > >> some other drawbacks though. In order to work correctly, every
> source
> > > and
> > > >> sink has to handle the transaction semantics, which adds complexity
> > > >> (although they do offer great skeleton examples in their docs!).
> > > >>
> > > >> Copycat tries to avoid that complexity for connector developers by
> > > changing
> > > >> the framework to use streams, offsets, and commits, and pushing the
> > > >> complexities of dealing with any sorts of errors/failures into the
> > > >> framework. Ideally connector developers only need to a) check for
> > > offsets
> > > >> at startup and rewind to the last known committed offset and b) load
> > > events
> > > >> from the source system (with stream IDs and offsets) and pass them
> to
> > > the
> > > >> framework.
> > > >>
> > > >>
> > > >>>
> > > >>> >YARN: My point isn't that YARN is bad, it's that tying to
any
> > > particular
> > > >>> >cluster manager severely limits the applicability of the tool.
The
> > > goal is
> > > >>> >to make Copycat agnostic to the cluster manager so it can
run
> under
> > > Mesos,
> > > >>> >YARN, etc.
> > > >>>
> > > >>> ok. Got it. Sounds like there is plan to do some work here to
> ensure
> > > >>> out-of-the-box it works with more than one scheduler (as @Jay
> listed
> > > out).
> > > >>> In that case, IMO it would be better to actually rephrase it in
the
> > KIP
> > > >>> that it will support more than one scheduler.
> > > >>>
> > > >>>
> > > >> Tried to add some wording to clarify that.
> > > >>
> > > >>
> > > >>>
> > > >>> >Exactly once: You accomplish this in any system by managing
> offsets
> > > in the
> > > >>> >destination system atomically with the data or through some
kind
> of
> > > >>> >deduplication. Jiangjie actually just gave a great talk about
this
> > > issue
> > > >>> >at
> > > >>> >a recent Kafka meetup, perhaps he can share some slides about
it.
> > > When you
> > > >>> >see all the details involved, you'll see why I think it might
be
> > nice
> > > to
> > > >>> >have the framework help you manage the complexities of achieving
> > > different
> > > >>> >delivery semantics ;)
> > > >>>
> > > >>>
> > > >>> Deduplication as a post processing step is a common recommendation
> > done
> > > >>> today Å  but that is a workaround/fix for the inability to provide
> > > >>> exactly-once by the delivery systems. IMO such post processing
> should
> > > not
> > > >>> be considered part of the "exacty-once" guarantee of Copycat.
> > > >>>
> > > >>>
> > > >>> Will be good to know how this guarantee will be possible when
> > > delivering
> > > >>> to HDFS.
> > > >>> Would be great if someone can share those slides if it is discussed
> > > there.
> > > >>>
> > > >>>
> > > >> For HDFS, the gist of the solution is to write to temporary files
> and
> > > then
> > > >> rename atomically to their final destination, including offset
> > > information
> > > >> (e.g., it can just be in the filename). Readers only see files that
> > have
> > > >> been "committed". If there is a failure, any existing temp files get
> > > >> cleaned up and reading is reset to the last committed offset. There
> > are
> > > >> some tricky details if you have zombie processes and depending on
> how
> > > you
> > > >> organize the data across files, but this isn't really the point of
> > this
> > > >> KIP. If you're interested in HDFS specifically, I'd suggest looking
> at
> > > >> Camus's implementation.
> > > >>
> > > >>
> > > >>>
> > > >>>
> > > >>>
> > > >>> Was looking for clarification on this ..
> > > >>> - Export side - is this like a map reduce kind of job or something
> > > else ?
> > > >>> If delivering to hdfs would this be running on the hadoop cluster
> or
> > > >>> outside ?
> > > >>>
> > > >> - Import side - how does this look ? Is it a bunch of flume like
> > > processes
> > > >>> ? maybe just some kind of a broker that translates the incoming
> > > protocol
> > > >>> into outgoing Kafka producer api protocol ? If delivering to hdfs,
> > will
> > > >>> this run on the cluster or outside ?
> > > >>>
> > > >>
> > > >> No mapreduce; in fact, no other frameworks required unless the
> > connector
> > > >> needs it for some reason. Both source and sink look structurally the
> > > same.
> > > >> Probably the most common scenario is to run a set of workers that
> > > provide
> > > >> the copycat service. You submit connector jobs to run on these
> > workers.
> > > A
> > > >> coordinator handles distributing the work across worker nodes.
> > > Coordinators
> > > >> determine how to divide the tasks and generate configs for them,
> then
> > > the
> > > >> framework handles distributing that work. Each individual task
> handles
> > > some
> > > >> subset of the job. For source tasks, that subset is a set of input
> > > streams
> > > >> (in the JDBC example in the KIP, each table would have a
> corresponding
> > > >> stream). For sink tasks, the subset is determined automatically by
> the
> > > >> framework via the underlying consumer group as a subset of
> > > topic-partitions
> > > >> (since the input is from Kafka). Connectors are kept simple, just
> > > >> processing streams of records (either generating them by reading
> from
> > > the
> > > >> source system or recording them into the sink system). Source tasks
> > also
> > > >> include information about offsets, and sink tasks either need to
> > manage
> > > >> offsets themselves or implement flush() functionality. Given these
> > > >> primitives, the framework can then handle other complexities like
> > > different
> > > >> delivery semantics without any additional support from the
> connectors.
> > > >>
> > > >> The motivation for the additional modes of execution (agent,
> embedded)
> > > was
> > > >> to support a couple of other common use cases. Agent mode is
> > completely
> > > >> standalone, which provides for a much simpler implementation and
> > handles
> > > >> use cases where there isn't an easy way to avoid running the job
> > across
> > > >> many machines (e.g., if you have to load logs directly from log
> > files).
> > > >> Embedded mode is actually a simple variant of the distributed mode,
> > but
> > > >> lets you setup and run the entire cluster alongside the rest of your
> > > >> distributed app. This is useful if you want to get up and running
> with
> > > an
> > > >> application where you need to, for example, import data from another
> > > >> service into Kafka, then consume and process that data. You can
> setup
> > > the
> > > >> worker and submit a job directly from your code, reducing the
> > > operational
> > > >> complexity. It's probably not the right long term solution as your
> > usage
> > > >> expands, but it can significantly ease adoption.
> > > >>
> > > >>
> > > >>>
> > > >>>
> > > >>> I still think adding one or two specific end-to-end use-cases
in
> the
> > > KIP,
> > > >>> showing how copycat will pan out for them for import/export will
> > really
> > > >>> clarify things.
> > > >>>
> > > >>
> > > >> There were a couple of examples already in the KIP -- JDBC, HDFS,
> log
> > > >> import, and now I've also added mirror maker. Were you looking for
> > > >> something more specific? I could also explain a full source ->
kafka
> > ->
> > > >> sink pipeline, but I don't know that there's much to add there
> beyond
> > > the
> > > >> fact that we would like schemas to carry across the entire pipeline.
> > > >> Otherwise it's just chaining connectors. Besides, I think most of
> the
> > > >> interesting use cases actually have additional processing steps in
> > > between,
> > > >> i.e. using stream processing frameworks or custom consumers +
> > producers.
> > > >>
> > > >> --
> > > >> Thanks,
> > > >> Ewen
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message