Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E3BA1200B47 for ; Sun, 17 Jul 2016 20:24:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E2878160A6B; Sun, 17 Jul 2016 18:24:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 10464160A5F for ; Sun, 17 Jul 2016 20:24:52 +0200 (CEST) Received: (qmail 12390 invoked by uid 500); 17 Jul 2016 18:24:46 -0000 Mailing-List: contact users-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@camel.apache.org Delivered-To: mailing list users@camel.apache.org Received: (qmail 12374 invoked by uid 99); 17 Jul 2016 18:24:46 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 17 Jul 2016 18:24:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id F262C180A56 for ; Sun, 17 Jul 2016 18:24:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 5.486 X-Spam-Level: ***** X-Spam-Status: No, score=5.486 tagged_above=-999 required=6.31 tests=[DKIM_ADSP_CUSTOM_MED=0.001, HTML_MESSAGE=2, NML_ADSP_CUSTOM_MED=1.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_SOFTFAIL=0.972, URI_HEX=1.313] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id po32uhAeA3lk for ; Sun, 17 Jul 2016 18:24:43 +0000 (UTC) Received: from mbob.nabble.com (mbob.nabble.com [162.253.133.15]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTP id CD87E5FBA9 for ; Sun, 17 Jul 2016 18:24:42 +0000 (UTC) Received: from msam.nabble.com (unknown [162.253.133.85]) by mbob.nabble.com (Postfix) with ESMTP id 504CF2CA51CD for ; Sun, 17 Jul 2016 11:01:34 -0700 (PDT) Date: Sun, 17 Jul 2016 11:24:40 -0700 (MST) From: souciance To: users@camel.apache.org Message-ID: In-Reply-To: <71FE8BF7-37BF-4F9A-93CE-8DAF5330F5BB@narvar.com> References: <71FE8BF7-37BF-4F9A-93CE-8DAF5330F5BB@narvar.com> Subject: Re: Route setup to upload file to s3 and put message on kafka queue MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="----=_Part_359212_452502784.1468779880855" archived-at: Sun, 17 Jul 2016 18:24:54 -0000 ------=_Part_359212_452502784.1468779880855 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable I haven't used the CamelBatchComplete flag but that sounds reasonable. The only thing I guess is to test what happens if there are no files do download. Should you continue polling or stop immediately? On Sun, Jul 17, 2016 at 7:52 PM, Sherwin Pinto [via Camel] < ml-node+s465427n5785195h8@n5.nabble.com> wrote: > Hi !! > > Thank you for your input !!! I will try out your suggestions. They seem t= o > be exactly what i was looking for. > > Regarding the batch processing, I am using the FTP Idempotent Consumer > with a quartz scheduler. When the ftp consumer polls and downloads the > files, I was hoping that the CamelBatchComplete flag would be set after t= he > last file is downloaded and that would be the signal to send a message to > Kafka (In my processor I could maintain a list of meta info of file). I > haven=E2=80=99t looked into aggregators yet , maybe I could aggregate mes= sages in a > list and then send message to kafka ? > > Thanks Again > > Regards > > Sherwin > > > On Jul 17, 2016, at 10:43 AM, souciance <[hidden email] > > wrote: > > > > I have not worked with S3 or Kafka so cannot help you much there but I > > would say the general setup of your route looks ok. > > > > The only thing is, if you are worried about the metadata being sent to > > Kafka if something goes wrong with the S3 part, then you need to put th= e > S3 > > endpoint in a doTry() doCatch() statement and catch possible exceptions > and > > then send the exchange to an error handler. > > > > Alternatively, add a choice().when() and only when the > > predicate is true do you send the metadata to kafka. > > > > If you are going to do batch processing, how do you know when the batch > > starts and when the batch finishes? > > > > On Sun, Jul 17, 2016 at 7:18 PM, Sherwin Pinto [via Camel] < > > [hidden email] > >> wrote: > > > >> Hi All, > >> > >> Can someone help me with this ? > >> > >> Thanks > >> > >> Sherwin > >> > >>> On Jul 14, 2016, at 1:18 PM, Sherwin Pinto <[hidden email] > >> http://user/SendEmail.jtp?type=3Dnode&node=3D5785193&i=3D0>>> wrote: > >>> > >>> HI All, > >>> > >>> My use case is as follows > >>> > >>> Use case 1: > >>> 1. Pole ftp server for file. > >>> 2. Upload file to S3 > >>> 3. Put message on to kafka with file meta info i. e filename, S3 > bucket > >> etc. This information is then used by a separate java process to > download > >> file and process it. > >>> > >>> I have a working prototype but want to make sure I am doing this > >> correctly. Here is the skeleton of the route config. > >>> > >>> from(config.getCamelFTPConsumerString()).routeId(config.getRouteId()) > >>> > >> > .idempotentConsumer(ExpressionBuilder.headerExpression(Exchange.FILE_NAME= ), > >>> JpaMessageIdRepository.jpaMessageIdRepository(postgresEntityManager, > >> "FLAT_FILE_PROCESSOR")) > >>> .eager(true).process(myProcessor) > >>> .to(s3Config.getCamelS3Configuration()) > >>> .process(kafkaProducerMessageProcessor) > >>> .to(kafkaConfig.getCamelKafkaConfig()) > >>> .log("EventProc - Uploaded file ${file:name} to S3=E2=80=9D); > >>> > >>> > >>> I am using a pipeline, the file is first uploaded to S3, then > >> kafkaProducerMessageProcessor modifies the message and replaces the > output > >> with the a JSON string that represents the kafka message which is then > >> routed to kafka > >>> > >>> public void process(Exchange exchange) throws Exception { > >>> String > >> > s3fileName=3Dexchange.getIn().getHeader(Exchange.FILE_NAME_PRODUCED).toSt= ring(); > > >> > >>> > >>> exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, null); > >>> > >>> QueueMessage queueMsg=3D > S3ProducerKafkaMessage.populate(type, > >> retailerMoniker, s3fileName, bucket, provider, fileFormat, > carrierMoniker); > >>> > >>> String queueMsgStr=3D mapper.writeValueAsString(queueMsg); > >>> > >>> Message out=3Dexchange.getOut(); > >>> > >>> out.setBody(queueMsgStr, String.class); > >>> > >>> System.out.println(queueMsgStr); > >>> } > >>> > >>> Is there a better way to do this. My main concern is that if there is > an > >> error while writing to kafka, then the file is still uploaded to S3. I= s > >> there a better setup where maybe i can do this in a transaction. > >>> > >>> Use Case 2: > >>> > >>> What I may also want to do is instead of putting a single message on > >> kafka, for each file uploaded to S3, is batch the meta info into a > single > >> message and put on to kafka, to process the files in the current ftp > poll > >> as a batch (maybe storing all meta info in a list and then on > >> CamelBatchComplete send message to kafka) . > >> > >> > >> > >> ------------------------------ > >> If you reply to this email, your message will be added to the > discussion > >> below: > >> > >> > http://camel.465427.n5.nabble.com/Route-setup-to-upload-file-to-s3-and-pu= t-message-on-kafka-queue-tp5785115p5785193.html > < > http://camel.465427.n5.nabble.com/Route-setup-to-upload-file-to-s3-and-pu= t-message-on-kafka-queue-tp5785115p5785193.html> > > >> To start a new topic under Camel - Users, email > >> [hidden email] > > > >> To unsubscribe from Camel - Users, click here > >> < href=3D"" target=3D"_top" rel=3D"nofollow" link=3D"external"> > >> . > >> NAML > >> < > http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=3Dmacro_= viewer&id=3Dinstant_html%21nabble%3Aemail.naml&base=3Dnabble.naml.namespace= s.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.t= emplate.NodeNamespace&breadcrumbs=3Dnotify_subscribers%21nabble%3Aemail.nam= l-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.= naml > < > http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=3Dmacro_= viewer&id=3Dinstant_html%21nabble%3Aemail.naml&base=3Dnabble.naml.namespace= s.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.t= emplate.NodeNamespace&breadcrumbs=3Dnotify_subscribers%21nabble%3Aemail.nam= l-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.= naml>> > > >> > > > > > > > > > > -- > > View this message in context: > http://camel.465427.n5.nabble.com/Route-setup-to-upload-file-to-s3-and-pu= t-message-on-kafka-queue-tp5785115p5785194.html > < > http://camel.465427.n5.nabble.com/Route-setup-to-upload-file-to-s3-and-pu= t-message-on-kafka-queue-tp5785115p5785194.html> > > > Sent from the Camel - Users mailing list archive at Nabble.com < > http://nabble.com/>. > > > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > > http://camel.465427.n5.nabble.com/Route-setup-to-upload-file-to-s3-and-pu= t-message-on-kafka-queue-tp5785115p5785195.html > To start a new topic under Camel - Users, email > ml-node+s465427n465428h31@n5.nabble.com > To unsubscribe from Camel - Users, click here > > . > NAML > > -- View this message in context: http://camel.465427.n5.nabble.com/Route-setup= -to-upload-file-to-s3-and-put-message-on-kafka-queue-tp5785115p5785196.html Sent from the Camel - Users mailing list archive at Nabble.com. ------=_Part_359212_452502784.1468779880855--