beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chawla,Sumit " <sumitkcha...@gmail.com>
Subject Re: Suggestion for Writing Sink Implementation
Date Fri, 29 Jul 2016 20:26:19 GMT
Hi JB

I was referring to CassandraWriteOperation.finalize()

Regards
Sumit Chawla


On Fri, Jul 29, 2016 at 12:46 PM, Jean-Baptiste Onofré <jb@nanthrax.net>
wrote:

> Hi Sumit,
>
> Not sure I follow you.
>
> Which resource cleanup are you talking about:
> - the close() on the reader (source) ?
> - the finishBundle() on the writer (sink) ?
>
> Regards
> JB
>
>
> On 07/29/2016 09:35 PM, Chawla,Sumit  wrote:
>
>> Hi Raghu
>>
>> My source is going to be unbounded (streaming) with writes to Cassandra.
>> Only concern with KafkaIO. write is that producer is closed after every
>> bundle, and every bundle may have to open a new connection to Kafka.  (
>> Please correct me if i am wrong: I am assuming the bundle to be equivalent
>> to Window Size\Mini-batch).
>>
>> In Jean's implementation i see a different style of resource cleanup. Can
>> someone please explain when that finalize method is called?
>>
>> Regards
>> Sumit Chawla
>>
>>
>> On Fri, Jul 29, 2016 at 10:45 AM, Raghu Angadi <rangadi@google.com.invalid
>> >
>> wrote:
>>
>> It is the preferred pattern I think. Is your source bounded or unbounded
>>> (i.e. streaming)? If it is latter, your sink could even be simpler than
>>> JB's. e.g. KafkaIO.write() where it just writes the messages to Kafka in
>>> processElement().
>>>
>>> The pros are pretty clear : runner independent, pure Beam, simpler code.
>>> cons : no checkpoint/rollback, I don't know if Flink specific sink
>>> provides
>>> this either.
>>>
>>> On Fri, Jul 29, 2016 at 10:18 AM, Chawla,Sumit <sumitkchawla@gmail.com>
>>> wrote:
>>>
>>> Any more comments on this pattern suggested by Jean?
>>>>
>>>> Regards
>>>> Sumit Chawla
>>>>
>>>>
>>>> On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles <klk@google.com.invalid
>>>>
>>>> wrote:
>>>>
>>>> What I said earlier is not quite accurate, though my advice is the
>>>>>
>>>> same.
>>>
>>>> Here are the corrections:
>>>>>
>>>>>  - The Write transform actually has a too-general name, and
>>>>>
>>>> Write.of(Sink)
>>>>
>>>>> only really works for finite data. It re-windows into the global window
>>>>>
>>>> and
>>>>
>>>>> replaces any triggers.
>>>>>  - So the special case in the Flink runner actually just _enables_ a
>>>>>
>>>> (fake)
>>>>
>>>>> Sink to work.
>>>>>
>>>>> We should probably rename Write to some more specific name that
>>>>>
>>>> indicates
>>>
>>>> the particular strategy, and make it easier for a user to decide
>>>>>
>>>> whether
>>>
>>>> that pattern is what they want. And the transform as-is should probably
>>>>> reject unbounded inputs.
>>>>>
>>>>> So you should still proceed with implementation via ParDo and your own
>>>>> logic. If you want some logic similar to Write (but with different
>>>>> windowing and triggering) then it is a pretty simple composite to
>>>>>
>>>> derive
>>>
>>>> something from.
>>>>>
>>>>> On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <sumitkchawla@gmail.com
>>>>>
>>>>
>>>> wrote:
>>>>>
>>>>> Thanks Jean
>>>>>>
>>>>>> This is an interesting pattern here.  I see that its implemented
as
>>>>>> PTransform, with constructs ( WriteOperation/Writer)  pretty similar
>>>>>>
>>>>> to
>>>
>>>> Sink<T> interface.  Would love to hear more pros/cons of this pattern
>>>>>>
>>>>> :)
>>>>
>>>>> .
>>>>>
>>>>>> Definitely it gives more control over connection initialization and
>>>>>> cleanup.
>>>>>>
>>>>>> Regards
>>>>>> Sumit Chawla
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <
>>>>>>
>>>>> jb@nanthrax.net>
>>>>
>>>>> wrote:
>>>>>>
>>>>>> Hi Sumit,
>>>>>>>
>>>>>>> I created a PR containing Cassandra IO with a sink:
>>>>>>>
>>>>>>> https://github.com/apache/incubator-beam/pull/592
>>>>>>>
>>>>>>> Maybe it can help you.
>>>>>>>
>>>>>>> Regards
>>>>>>> JB
>>>>>>>
>>>>>>>
>>>>>>> On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
>>>>>>>
>>>>>>> Hi Kenneth
>>>>>>>>
>>>>>>>> Thanks for looking into it. I am currently trying to implement
>>>>>>>>
>>>>>>> Sinks
>>>
>>>> for
>>>>>
>>>>>> writing data into Cassandra/Titan DB.  My immediate goal is to run
>>>>>>>>
>>>>>>> it
>>>>
>>>>> on
>>>>>
>>>>>> Flink Runner.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards
>>>>>>>> Sumit Chawla
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles
>>>>>>>>
>>>>>>> <klk@google.com.invalid
>>>>>>
>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Sumit,
>>>>>>>>
>>>>>>>>>
>>>>>>>>> I see what has happened here, from that snippet you pasted
from
>>>>>>>>>
>>>>>>>> the
>>>
>>>> Flink
>>>>>>
>>>>>>> runner's code [1]. Thanks for looking into it!
>>>>>>>>>
>>>>>>>>> The Flink runner today appears to reject Write.Bounded
transforms
>>>>>>>>>
>>>>>>>> in
>>>>
>>>>> streaming mode if the sink is not an instance of
>>>>>>>>>
>>>>>>>> UnboundedFlinkSink.
>>>>
>>>>> The
>>>>>>
>>>>>>> intent of that code, I believe, was to special case
>>>>>>>>>
>>>>>>>> UnboundedFlinkSink
>>>>>
>>>>>> to
>>>>>>
>>>>>>> make it easy to use an existing Flink sink, not to disable all
>>>>>>>>>
>>>>>>>> other
>>>>
>>>>> Write
>>>>>>>>> transforms. What do you think, Max?
>>>>>>>>>
>>>>>>>>> Until we fix this issue, you should use ParDo transforms
to do
>>>>>>>>>
>>>>>>>> the
>>>
>>>> writing.
>>>>>>>>> If you can share a little about your sink, we may be
able to
>>>>>>>>>
>>>>>>>> suggest
>>>>
>>>>> patterns for implementing it. Like Eugene said, the
>>>>>>>>>
>>>>>>>> Write.of(Sink)
>>>
>>>> transform is just a specialized pattern of ParDo's, not a Beam
>>>>>>>>>
>>>>>>>> primitive.
>>>>>>
>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>>>
>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
>>>>>>>>> kirpichov@google.com.invalid> wrote:
>>>>>>>>>
>>>>>>>>> Thanks Sumit. Looks like your question is, indeed, specific
to
>>>>>>>>>
>>>>>>>> the
>>>
>>>> Flink
>>>>>>
>>>>>>> runner, and I'll then defer to somebody familiar with it.
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <
>>>>>>>>>>
>>>>>>>>> sumitkchawla@gmail.com>
>>>>>
>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Thanks a lot Eugene.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> My immediate requirement is to run this Sink
on FlinkRunner.
>>>>>>>>>>>
>>>>>>>>>> Which
>>>>
>>>>>
>>>>>>>>>>>>>> mandates that my implementation must
also implement
>>>>>>>>>>>>>
>>>>>>>>>>>> SinkFunction<>.
>>>>>
>>>>>> In
>>>>>>>>>>> that >>>case, none of the Sink<>
methods get called anyway.
>>>>>>>>>>>
>>>>>>>>>>> I am using FlinkRunner. The Sink implementation
that i was
>>>>>>>>>>>
>>>>>>>>>> writing
>>>>
>>>>> by
>>>>>
>>>>>> extending Sink<> class had to implement Flink Specific
>>>>>>>>>>>
>>>>>>>>>> SinkFunction
>>>>
>>>>> for
>>>>>>
>>>>>>>
>>>>>>>>>>> the
>>>>>>>>>>
>>>>>>>>>> correct translation.
>>>>>>>>>>>
>>>>>>>>>>> private static class WriteSinkStreamingTranslator<T>
implements
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
>>>
>>>>
>>>>>>>>> {
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>    @Override
>>>>>>>>>>>    public void translateNode(Write.Bound<T>
transform,
>>>>>>>>>>> FlinkStreamingTranslationContext context) {
>>>>>>>>>>>      String name = transform.getName();
>>>>>>>>>>>      PValue input = context.getInput(transform);
>>>>>>>>>>>
>>>>>>>>>>>      Sink<T> sink = transform.getSink();
>>>>>>>>>>>      if (!(sink instanceof UnboundedFlinkSink))
{
>>>>>>>>>>>        throw new UnsupportedOperationException("At
the time,
>>>>>>>>>>>
>>>>>>>>>> only
>>>
>>>> unbounded Flink sinks are supported.");
>>>>>>>>>>>      }
>>>>>>>>>>>
>>>>>>>>>>>      DataStream<WindowedValue<T>>
inputDataSet =
>>>>>>>>>>> context.getInputDataStream(input);
>>>>>>>>>>>
>>>>>>>>>>>      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
>>>>>>>>>>>
>>>>>>>>>>> Object>()
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> {
>>>>>>>>>>
>>>>>>>>>>        @Override
>>>>>>>>>>>        public void flatMap(WindowedValue<T>
value,
>>>>>>>>>>>
>>>>>>>>>> Collector<Object>
>>>>>
>>>>>> out) throws Exception {
>>>>>>>>>>>          out.collect(value.getValue());
>>>>>>>>>>>        }
>>>>>>>>>>>      }).addSink(((UnboundedFlinkSink<Object>)
>>>>>>>>>>> sink).getFlinkSource()).name(name);
>>>>>>>>>>>    }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Regards
>>>>>>>>>>> Sumit Chawla
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov
<
>>>>>>>>>>> kirpichov@google.com.invalid> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi Sumit,
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> All reusable parts of a pipeline, including
connectors to
>>>>>>>>>>>>
>>>>>>>>>>> storage
>>>>
>>>>>
>>>>>>>>>>>> systems,
>>>>>>>>>>>
>>>>>>>>>>> should be packaged as PTransform's.
>>>>>>>>>>>>
>>>>>>>>>>>> Sink is an advanced API that you can use
under the hood to
>>>>>>>>>>>>
>>>>>>>>>>> implement
>>>>>
>>>>>>
>>>>>>>>>>>> the
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> transform, if this particular connector benefits
from this API
>>>>>>>>>>>
>>>>>>>>>> -
>>>
>>>> but
>>>>>
>>>>>>
>>>>>>>>>>>> you
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> don't have to, and many connectors indeed don't need
it, and
>>>>>>>>>>>
>>>>>>>>>> are
>>>
>>>>
>>>>>>>>>>>> simpler
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>>> implement just as wrappers around a couple of
ParDo's writing
>>>>>>>>>>>>
>>>>>>>>>>> the
>>>>
>>>>>
>>>>>>>>>>>> data.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Even if the connector is implemented using a Sink,
packaging
>>>>>>>>>>>>
>>>>>>>>>>> the
>>>
>>>>
>>>>>>>>>>>> connector
>>>>>>>>>>>
>>>>>>>>>>> as a PTransform is important because it's easier
to apply in a
>>>>>>>>>>>>
>>>>>>>>>>>> pipeline
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> and
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> because it's more future-proof (the author of
the connector
>>>>>>>>>>>>
>>>>>>>>>>> may
>>>
>>>> later
>>>>>>
>>>>>>> change it to use something else rather than Sink under the
>>>>>>>>>>>>
>>>>>>>>>>> hood
>>>
>>>>
>>>>>>>>>>>> without
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> breaking existing users).
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Sink is, currently, useful in the following
case:
>>>>>>>>>>>> - You're writing a bounded amount of data
(we do not yet have
>>>>>>>>>>>>
>>>>>>>>>>> an
>>>
>>>>
>>>>>>>>>>>> unbounded
>>>>>>>>>>>
>>>>>>>>>>> Sink analogue)
>>>>>>>>>>>> - The location you're writing to is known
at pipeline
>>>>>>>>>>>>
>>>>>>>>>>> construction
>>>>
>>>>>
>>>>>>>>>>>> time,
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> and does not depend on the data itself (support for
>>>>>>>>>>>
>>>>>>>>>> "data-dependent"
>>>>>
>>>>>>
>>>>>>>>>>>> sinks
>>>>>>>>>>>
>>>>>>>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92
>>>>>>>>>>>>
>>>>>>>>>>> )
>>>
>>>> - The storage system you're writing to has a distinct
>>>>>>>>>>>>
>>>>>>>>>>>> "initialization"
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> and
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> "finalization" step, allowing the write operation
to appear
>>>>>>>>>>>>
>>>>>>>>>>> atomic
>>>>
>>>>>
>>>>>>>>>>>> (either
>>>>>>>>>>>
>>>>>>>>>>> all data is written or none). This mostly applies
to files
>>>>>>>>>>>>
>>>>>>>>>>> (where
>>>>
>>>>>
>>>>>>>>>>>> writing
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> is done by first writing to a temporary directory,
and then
>>>>>>>>>>>
>>>>>>>>>> renaming
>>>>>
>>>>>>
>>>>>>>>>>>> all
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> files to their final location), but there can be
other cases
>>>>>>>>>>>
>>>>>>>>>> too.
>>>
>>>>
>>>>>>>>>>>> Here's an example GCP connector using the
Sink API under the
>>>>>>>>>>>>
>>>>>>>>>>> hood:
>>>>
>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
>>>
>>>>
>>>>>>>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> DatastoreIO,
>>>>>>>>>>>
>>>>>>>>>>> BigtableIO etc.)
>>>>>>>>>>>>
>>>>>>>>>>>> I'm not familiar with the Flink API, however
I'm a bit
>>>>>>>>>>>>
>>>>>>>>>>> confused
>>>
>>>> by
>>>>
>>>>>
>>>>>>>>>>>> your
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> last paragraph: the Beam programming model is intentionally
>>>>>>>>>>
>>>>>>>>>>> runner-agnostic, so that you can run exactly
the same code on
>>>>>>>>>>>>
>>>>>>>>>>>> different
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> runners.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit
<
>>>>>>>>>>>>
>>>>>>>>>>> sumitkchawla@gmail.com
>>>>>>
>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Hi
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Please suggest me on what is the best
way to write a Sink in
>>>>>>>>>>>>>
>>>>>>>>>>>>> Beam.  I
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>> see
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> that there is a Sink<T> abstract class
which is in
>>>>>>>>>>>>
>>>>>>>>>>> experimental
>>>
>>>>
>>>>>>>>>>>>> state.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> What is the expected outcome of this one? Do we have
the api
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> frozen,
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>> or
>>>>>>>>>>
>>>>>>>>>> this could still change?  Most of the existing Sink
>>>>>>>>>>>
>>>>>>>>>> implementations
>>>>
>>>>>
>>>>>>>>>>>>> like
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> KafkaIO.Write are not using this interface, and
instead
>>>>>>>>>>>>
>>>>>>>>>>> extends
>>>
>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be
>>>>>>>>>>>>>
>>>>>>>>>>>> changed
>>>>
>>>>> to
>>>>>
>>>>>>
>>>>>>>>>>>>> extend
>>>>>>>>>>>>
>>>>>>>>>>>> Sink<>.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> My immediate requirement is to run this
Sink on FlinkRunner.
>>>>>>>>>>>>>
>>>>>>>>>>>> Which
>>>>>
>>>>>>
>>>>>>>>>>>>> mandates
>>>>>>>>>>>>
>>>>>>>>>>>> that my implementation must also implement
SinkFunction<>.
>>>>>>>>>>>>>
>>>>>>>>>>>> In
>>>
>>>> that
>>>>>
>>>>>>
>>>>>>>>>>>>> case,
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> none of the Sink<> methods get called anyway.
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards
>>>>>>>>>>>>> Sumit Chawla
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>> --
>>>>>>> Jean-Baptiste Onofré
>>>>>>> jbonofre@apache.org
>>>>>>> http://blog.nanthrax.net
>>>>>>> Talend - http://www.talend.com
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message