Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 60FF8200B85 for ; Thu, 1 Sep 2016 04:09:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 584D6160ABA; Thu, 1 Sep 2016 02:09:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 28DDE160AB5 for ; Thu, 1 Sep 2016 04:09:22 +0200 (CEST) Received: (qmail 76668 invoked by uid 500); 1 Sep 2016 02:09:16 -0000 Mailing-List: contact dev-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list dev@beam.incubator.apache.org Received: (qmail 76638 invoked by uid 99); 1 Sep 2016 02:09:15 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Sep 2016 02:09:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 69D491864AA for ; Thu, 1 Sep 2016 02:09:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.247 X-Spam-Level: X-Spam-Status: No, score=-0.247 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=google.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id uUNFw_qQ7vSn for ; Thu, 1 Sep 2016 02:09:09 +0000 (UTC) Received: from mail-yb0-f181.google.com (mail-yb0-f181.google.com [209.85.213.181]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id 0AEDB5F1B8 for ; Thu, 1 Sep 2016 02:09:09 +0000 (UTC) Received: by mail-yb0-f181.google.com with SMTP id h3so23672044ybi.2 for ; Wed, 31 Aug 2016 19:09:09 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=cNIPQoffD1XsjvSKdIVE1XwSZ+V/CMYR/C5Phqay4GQ=; b=TlrxmhZfVmopYeoQ578t3AzZKJ4MjiFLBqBZNwgud93ei0JE/VGDQSHNirh4m61ldo tSbxEkjpEiS5GtJZDadif/scd7Tax/OL5pAsQrSmJugkxx3sjRLyQc3RVPuISsMXV2OF GPeLzCD1akJVMtbK5Zib0IeGpe03qOA2z0D+ftJ3ZAgRXUnz8yODmcsbQ2lYL/hXo7XD 846gS/udZ4mkUjzR/s5ecfYX0Xb6/ElR36ByCjB4VfZ/T4w74ywYZ3p+l9FT8931kYXx lDHaLhUSQxY6GiR9MKqFh2NHg1MXjHqLLe3TmG2215zmAO0cY7aqLVI0e/7NAAe+Pw7R 7t3w== X-Gm-Message-State: AE9vXwPlCCw+i9ZG176eesNAdkrD54BFqaDYJ2+2tcLSX2tV38UiXQwcdY2LyRMIw/sQtXpGc4UxDqjdFNX+GBdB X-Received: by 10.37.4.215 with SMTP id 206mr10423383ybe.29.1472695748194; Wed, 31 Aug 2016 19:09:08 -0700 (PDT) MIME-Version: 1.0 Received: by 10.129.108.79 with HTTP; Wed, 31 Aug 2016 19:09:07 -0700 (PDT) In-Reply-To: References: From: Thomas Groh Date: Wed, 31 Aug 2016 19:09:07 -0700 Message-ID: Subject: Re: KafkaIO Windowing Fn To: beam-dev Content-Type: multipart/alternative; boundary=001a11c02626ef56ac053b68b2bc archived-at: Thu, 01 Sep 2016 02:09:33 -0000 --001a11c02626ef56ac053b68b2bc Content-Type: text/plain; charset=UTF-8 In 0.2.0-incubating and beyond we've replaced the DirectPipelineRunner with the DirectRunner (formerly InProcessPipelineRunner), which is capable of handling Unbounded Pipelines. Is it possible for you to upgrade? On Wed, Aug 31, 2016 at 5:17 PM, Chawla,Sumit wrote: > @Ajioscha, My assumption is here that atleast one trigger should fire. > Either the 100 elements or the 30 second since first element. (whichever > happens first) > > @Thomas - here is the error i get: I am using 0.1.0-incubating > > *ava.lang.IllegalStateException: no evaluator registered for > Read(UnboundedKafkaSource)* > > * at > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator. > visitPrimitiveTransform(DirectPipelineRunner.java:890)* > * at > org.apache.beam.sdk.runners.TransformTreeNode.visit( > TransformTreeNode.java:225)* > * at > org.apache.beam.sdk.runners.TransformTreeNode.visit( > TransformTreeNode.java:220)* > * at > org.apache.beam.sdk.runners.TransformTreeNode.visit( > TransformTreeNode.java:220)* > * a* > > Regards > Sumit Chawla > > > On Wed, Aug 31, 2016 at 10:19 AM, Aljoscha Krettek > wrote: > > > Hi, > > could the reason for the second part of the trigger never firing be that > > there are never at least 100 elements per key. The trigger would only > fire > > if it saw 100 elements and with only 540 elements that seems unlikely if > > you have more than 6 keys. > > > > Cheers, > > Aljoscha > > > > On Wed, 31 Aug 2016 at 17:47 Thomas Groh > wrote: > > > > > KafkaIO is implemented using the UnboundedRead API, which is supported > by > > > the DirectRunner. You should be able to run without the > > withMaxNumRecords; > > > if you can't, I'd be very interested to see the stack trace that you > get > > > when you try to run the Pipeline. > > > > > > On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit > > > > wrote: > > > > > > > Yes. I added it only for DirectRunner as it cannot translate > > > > Read(UnboundedSourceOfKafka) > > > > > > > > Regards > > > > Sumit Chawla > > > > > > > > > > > > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek < > > aljoscha@apache.org> > > > > wrote: > > > > > > > > > Ah ok, this might be a stupid question but did you remove this line > > > when > > > > > running it with Flink: > > > > > .withMaxNumRecords(500) > > > > > > > > > > Cheers, > > > > > Aljoscha > > > > > > > > > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit > > > > wrote: > > > > > > > > > > > Hi Aljoscha > > > > > > > > > > > > The code is not different while running on Flink. It have > removed > > > > > business > > > > > > specific transformations only. > > > > > > > > > > > > Regards > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek < > > > aljoscha@apache.org > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > could you maybe also post the complete that you're using with > the > > > > > > > FlinkRunner? I could have a look into it. > > > > > > > > > > > > > > Cheers, > > > > > > > Aljoscha > > > > > > > > > > > > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit < > > sumitkchawla@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > Hi Thomas > > > > > > > > > > > > > > > > Sorry i tried with DirectRunner but ran into some kafka > issues. > > > > > > > Following > > > > > > > > is the snippet i am working on, and will post more details > > once i > > > > get > > > > > > it > > > > > > > > working ( as of now i am unable to read messages from Kafka > > using > > > > > > > > DirectRunner) > > > > > > > > > > > > > > > > > > > > > > > > PipelineOptions pipelineOptions = > > > PipelineOptionsFactory.create(); > > > > > > > > pipelineOptions.setRunner(DirectPipelineRunner.class); > > > > > > > > Pipeline pipeline = Pipeline.create(pipelineOptions); > > > > > > > > pipeline.apply(KafkaIO.read() > > > > > > > > .withMaxNumRecords(500) > > > > > > > > .withTopics(ImmutableList.of("mytopic")) > > > > > > > > .withBootstrapServers("localhost:9092") > > > > > > > > .updateConsumerProperties(ImmutableMap.of( > > > > > > > > ConsumerConfig.GROUP_ID_CONFIG, "test1", > > > > > > > > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > > "true", > > > > > > > > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > > > "earliest" > > > > > > > > ))).apply(ParDo.of(new DoFn > byte[]>, > > > > > > > > KV>() { > > > > > > > > @Override > > > > > > > > public void processElement(ProcessContext c) throws > > > Exception { > > > > > > > > KV record = c.element().getKV(); > > > > > > > > c.output(KV.of(new String(record.getKey()), new > > > > > > > > String(record.getValue()))); > > > > > > > > } > > > > > > > > })) > > > > > > > > .apply("WindowByMinute", Window. > > > > > > > String>>into(FixedWindows.of(Duration.standardSeconds(10))) > > > > > > > > .withAllowedLateness(Duration. > > standardSeconds(1)) > > > > > > > > .triggering( > > > > > > > > Repeatedly.forever( > > > > > > > > AfterFirst.of( > > > > > > > > > > > > > > > > AfterProcessingTime.pastFirstElementInPane() > > > > > > > > > > > > > > > > .plusDelayOf(Duration.standardSeconds(30)), > > > > > > > > > > > > > AfterPane.elementCountAtLeast( > > > > > > > 100) > > > > > > > > ))) > > > > > > > > .discardingFiredPanes()) > > > > > > > > .apply("GroupByTenant", GroupByKey.create()) > > > > > > > > .apply(ParDo.of(new DoFn Iterable>, > > > > > Void>() > > > > > > { > > > > > > > > @Override > > > > > > > > public void processElement(ProcessContext c) > throws > > > > > > > Exception { > > > > > > > > KV> element = > > > c.element(); > > > > > > > > Iterator iterator = > > > > > > > element.getValue().iterator(); > > > > > > > > int count = 0; > > > > > > > > while (iterator.hasNext()) { > > > > > > > > iterator.next(); > > > > > > > > count++; > > > > > > > > } > > > > > > > > System.out.println(String.format("Key %s > Value > > > > Count > > > > > > > > %d", element.getKey(), count)); > > > > > > > > } > > > > > > > > })); > > > > > > > > pipeline.run(); > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > If you use the DirectRunner, do you observe the same > > behavior? > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit < > > > > > > sumitkchawla@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Thomas > > > > > > > > > > > > > > > > > > > > I am using FlinkRunner. Yes the second part of trigger > > never > > > > > fires > > > > > > > for > > > > > > > > > me, > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas Groh > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hey Sumit; > > > > > > > > > > > > > > > > > > > > > > What runner are you using? I can set up a test with the > > > same > > > > > > > trigger > > > > > > > > > > > reading from an unbounded input using the DirectRunner > > and > > > I > > > > > get > > > > > > > the > > > > > > > > > > > expected output panes. > > > > > > > > > > > > > > > > > > > > > > Just to clarify, the second half of the trigger ('when > > the > > > > > first > > > > > > > > > element > > > > > > > > > > > has been there for at least 30+ seconds') simply never > > > fires? > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit < > > > > > > > > sumitkchawla@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Thomas > > > > > > > > > > > > > > > > > > > > > > > > That did not work. > > > > > > > > > > > > > > > > > > > > > > > > I tried following instead: > > > > > > > > > > > > > > > > > > > > > > > > .triggering( > > > > > > > > > > > > Repeatedly.forever( > > > > > > > > > > > > AfterFirst.of( > > > > > > > > > > > > AfterProcessingTime. > > > > > > > > > > > pastFirstElementInPane() > > > > > > > > > > > > > > > > > .plusDelayOf(Duration.standard > > > > > > > > > > > > Seconds(30)), > > > > > > > > > > > > > > > > > > AfterPane.elementCountAtLeast(100) > > > > > > > > > > > > ))) > > > > > > > > > > > > .discardingFiredPanes() > > > > > > > > > > > > > > > > > > > > > > > > What i am trying to do here. This is to make sure > that > > > > > > followup > > > > > > > > > > > > operations receive batches of records. > > > > > > > > > > > > > > > > > > > > > > > > 1. Fire when at Pane has 100+ elements > > > > > > > > > > > > > > > > > > > > > > > > 2. Or Fire when the first element has been there for > > > > atleast > > > > > > 30 > > > > > > > > > sec+. > > > > > > > > > > > > > > > > > > > > > > > > However, 2 point does not seem to work. e.g. I have > > 540 > > > > > > records > > > > > > > > in > > > > > > > > > > > > Kafka. The first 500 records are available > > immediately, > > > > > > > > > > > > > > > > > > > > > > > > but the remaining 40 don't pass through. I was > > expecting > > > > 2nd > > > > > to > > > > > > > > > > > > trigger to help here. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM, Thomas Groh > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > You can adjust the trigger in the windowing > transform > > > if > > > > > your > > > > > > > > sink > > > > > > > > > > can > > > > > > > > > > > > > handle being written to multiple times for the same > > > > window. > > > > > > For > > > > > > > > > > > example, > > > > > > > > > > > > if > > > > > > > > > > > > > the sink appends to the output when it receives new > > > data > > > > > in a > > > > > > > > > window, > > > > > > > > > > > you > > > > > > > > > > > > > could add something like > > > > > > > > > > > > > > > > > > > > > > > > > > Window.into(...).withAllowedLateness(...). > > > > > > > > > triggering(AfterWatermark. > > > > > > > > > > > > > pastEndOfWindow().withEarlyFirings( > > AfterProcessingTime. > > > > > > > > > > > > > pastFirstElementInPane().withDelayOf(Duration. > > > > > > > > > standardSeconds(5))). > > > > > > > > > > > > > withLateFirings(AfterPane. > elementCountAtLeast(1))). > > > > > discardin > > > > > > > > > > > > gFiredPanes(); > > > > > > > > > > > > > > > > > > > > > > > > > > This will cause elements to be output some amount > of > > > time > > > > > > after > > > > > > > > > they > > > > > > > > > > > are > > > > > > > > > > > > > first received from Kafka, even if Kafka does not > > have > > > > any > > > > > > new > > > > > > > > > > > elements. > > > > > > > > > > > > > Elements will only be output by the GroupByKey > once. > > > > > > > > > > > > > > > > > > > > > > > > > > We should still have a JIRA to improve the KafkaIO > > > > > watermark > > > > > > > > > tracking > > > > > > > > > > > in > > > > > > > > > > > > > the absence of new records . > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 10:29 AM, Chawla,Sumit < > > > > > > > > > > sumitkchawla@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Raghu. > > > > > > > > > > > > > > > > > > > > > > > > > > > > I don't have much control over changing KafkaIO > > > > > properties. > > > > > > > I > > > > > > > > > > added > > > > > > > > > > > > > > KafkaIO code for completing the example. Are > there > > > any > > > > > > > changes > > > > > > > > > > that > > > > > > > > > > > > can > > > > > > > > > > > > > be > > > > > > > > > > > > > > done to Windowing to achieve the same behavior? > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 5:06 PM, Raghu Angadi > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The default implementation returns processing > > > > timestamp > > > > > > of > > > > > > > > the > > > > > > > > > > last > > > > > > > > > > > > > > record > > > > > > > > > > > > > > > (in effect. more accurately it returns same as > > > > > > > > getTimestamp(), > > > > > > > > > > > which > > > > > > > > > > > > > > might > > > > > > > > > > > > > > > overridden by user). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As a work around, yes, you can provide your own > > > > > > watermarkFn > > > > > > > > > that > > > > > > > > > > > > > > > essentially returns Now() or Now()-1sec. (usage > > in > > > > > > javadoc > > > > > > > > > > > > > > > > > > incubator-beam/blob/master/ > > > > > > > > > > > > > > > sdks/java/io/kafka/src/main/ > > > > > java/org/apache/beam/sdk/io/ > > > > > > > > > > > > > > > kafka/KafkaIO.java#L138> > > > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think default watermark should be smarter. it > > > > should > > > > > > > > advance > > > > > > > > > to > > > > > > > > > > > > > current > > > > > > > > > > > > > > > time if there aren't any records to read from > > > Kafka. > > > > > > Could > > > > > > > > you > > > > > > > > > > > file a > > > > > > > > > > > > > > jira? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > thanks, > > > > > > > > > > > > > > > Raghu. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 24, 2016 at 2:10 PM, Chawla,Sumit < > > > > > > > > > > > > sumitkchawla@gmail.com> > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi All > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am trying to do some simple batch > processing > > on > > > > > > KafkaIO > > > > > > > > > > > records. > > > > > > > > > > > > > My > > > > > > > > > > > > > > > beam > > > > > > > > > > > > > > > > pipeline looks like following: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > pipeline.apply(KafkaIO.read() > > > > > > > > > > > > > > > > .withTopics(ImmutableList.of( > > s"mytopic")) > > > > > > > > > > > > > > > > .withBootstrapServers(" > > localhost:9200") > > > > > > > > > > > > > > > > .apply("ExtractMessage", ParDo.of(new > > > > > > > ExtractKVMessage())) > > > > > > > > // > > > > > > > > > > > > Emits a > > > > > > > > > > > > > > > > KV > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > .apply("WindowBy10Sec", Window. > > > > > > > > > > > > > > > JSONObject>>into(FixedWindows. > > > > > > > of(Duration.standardSeconds( > > > > > > > > > > > > > > > > 10))).withAllowedLateness( > > > > > Duration.standardSeconds(1))) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > .apply("GroupByKey", GroupByKey.create()) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > .apply("Sink", ParDo.of(new MySink()) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > My Kafka Source already has some messages > > 1000+, > > > > and > > > > > > new > > > > > > > > > > messages > > > > > > > > > > > > > > arrive > > > > > > > > > > > > > > > > every few minutes. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > When i start my pipeline, i can see that it > > > reads > > > > > all > > > > > > > the > > > > > > > > > > 1000+ > > > > > > > > > > > > > > messages > > > > > > > > > > > > > > > > from Kafka. However, Window does not fire > > > untill a > > > > > new > > > > > > > > > message > > > > > > > > > > > > > arrives > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > Kafka. And Sink does not receive any message > > > until > > > > > > that > > > > > > > > > point. > > > > > > > > > > > > Do i > > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > > to override the WaterMarkFn here? Since i am > > not > > > > > > > providing > > > > > > > > > any > > > > > > > > > > > > > > > timeStampFn > > > > > > > > > > > > > > > > , i am assuming that timestamps will be > > assigned > > > as > > > > > in > > > > > > > when > > > > > > > > > > > message > > > > > > > > > > > > > > > arrives > > > > > > > > > > > > > > > > i.e. ingestion time. What is the default > > > > WaterMarkFn > > > > > > > > > > > > implementation? > > > > > > > > > > > > > > Is > > > > > > > > > > > > > > > > the Window not supposed to be fired based on > > > > > Ingestion > > > > > > > > time? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > > > > > > > > > > Sumit Chawla > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > --001a11c02626ef56ac053b68b2bc--