Return-Path: X-Original-To: apmail-nifi-users-archive@minotaur.apache.org Delivered-To: apmail-nifi-users-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B909B18EAA for ; Mon, 26 Oct 2015 07:41:42 +0000 (UTC) Received: (qmail 84329 invoked by uid 500); 26 Oct 2015 07:41:20 -0000 Delivered-To: apmail-nifi-users-archive@nifi.apache.org Received: (qmail 84303 invoked by uid 500); 26 Oct 2015 07:41:20 -0000 Mailing-List: contact users-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@nifi.apache.org Delivered-To: mailing list users@nifi.apache.org Received: (qmail 84293 invoked by uid 99); 26 Oct 2015 07:41:20 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Oct 2015 07:41:20 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id E7F011801DA for ; Mon, 26 Oct 2015 07:41:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 4.15 X-Spam-Level: **** X-Spam-Status: No, score=4.15 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, FREEMAIL_REPLY=1, HTML_MESSAGE=3, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id uquuhuBqbKVx for ; Mon, 26 Oct 2015 07:41:11 +0000 (UTC) Received: from mail-yk0-f171.google.com (mail-yk0-f171.google.com [209.85.160.171]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 271DE439F4 for ; Mon, 26 Oct 2015 07:41:11 +0000 (UTC) Received: by ykaz22 with SMTP id z22so176303412yka.2 for ; Mon, 26 Oct 2015 00:41:05 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc:content-type; bh=V+iiOp94hy4WzLHtB+zLnQ+DsO1bUoC/nHHvesU4KDk=; b=qkHUeor/6YwHWTOqAtXSRwtOLD2fRJdPjhL+zBApjqG7EW4DklcRAuWKNSFnYLd6il urHu9FFewduEh1GvzK/PAtLoHL1Yzju54vzeIuQJvBCeEZ2hJySCn2mFcFc+zLEBVO+4 hI7kQ+LjhO0/oD8olFhZJC/FnF2YitAJvM4jwiWklmTeKWSPD8FnGT6F8rOQbVyE+hK+ 5/wwB60/iYz1k3jLilycbWpAz47mggntU99wRQix9EoQXvY96ISNdax5/0q4g3/txchp qsqPXjnZMduDBInGmjVu2/yz3hvJvCOOWmTgubC0/DVS61WqjPGyrJ/FzLReMV9GfIaU NS9A== MIME-Version: 1.0 X-Received: by 10.129.156.133 with SMTP id t127mr24306091ywg.60.1445845264864; Mon, 26 Oct 2015 00:41:04 -0700 (PDT) Received: by 10.129.46.138 with HTTP; Mon, 26 Oct 2015 00:41:04 -0700 (PDT) In-Reply-To: References: Date: Mon, 26 Oct 2015 13:11:04 +0530 Message-ID: Subject: Re: Need help in nifi- flume processor From: Parul Agrawal To: Bryan Bende Cc: users@nifi.apache.org Content-Type: multipart/alternative; boundary=94eb2c0b6594699cee0522fd1577 --94eb2c0b6594699cee0522fd1577 Content-Type: text/plain; charset=UTF-8 Hi, Thank you very much for all the support. I have written a custom processor to split json to multiple json. Now I would like to route the flowfile based on the content of the flowfile. I tried using RouteOnContent. But it did not work. Can you please help me how can i route the flowfile based on the content/data it contains. Thanks and Regards, Parul On Tue, Oct 13, 2015 at 6:54 PM, Bryan Bende wrote: > Parul, > > You can use SplitJson to take a large JSON document and split an array > element into individual documents. I took the json you attached and created > a flow like GetFile -> SplitJson -> SplitJson -> PutFile > > In the first SplitJson the path I used was $.packet.proto and in the > second I used $.field This seemed to mostly work except some of the splits > going into PutFile still have another level of "field" which needs to be > split again so would possibly need some conditional logic to split certain > documents again. > > Alternatively you could write a custom processor that restructures your > JSON. > > -Bryan > > > > On Tue, Oct 13, 2015 at 8:36 AM, Parul Agrawal > wrote: > >> Hi, >> >> I tried with the above json element. But I am getting the below mentioned >> error: >> >> 2015-10-12 23:53:39,209 ERROR [Timer-Driven Process Thread-9] >> o.a.n.p.standard.ConvertJSONToSQL >> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to parse >> StandardFlowFileRecord[uuid=dfc16db0-c7a6-4e9e-8b4d-8c5b4ec50742,claim=StandardContentClaim >> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default, >> section=1], offset=132621, length=55],offset=0,name=json,size=55] as JSON >> due to org.apache.nifi.processor.exception.ProcessException: IOException >> thrown from ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e]: >> org.codehaus.jackson.JsonParseException: Unexpected character ('I' (code >> 73)): expected a valid value (number, String, array, object, 'true', >> 'false' or 'null') >> >> Also I have a huge json object attached (new.json). Can you guide me on >> how do i use ConvertJSONToSQL processor. >> Should I use any other processor before using ConvertJSONToSQL processor >> so that this new.json can be converted in to a flat document of >> key/value pairs, or an array of flat documents. >> >> Any help/guidance would be really useful. >> >> Thanks and Regards, >> Parul >> >> On Mon, Oct 12, 2015 at 10:36 PM, Bryan Bende wrote: >> >>> I think ConvertJSONToSQL expects a flat document of key/value pairs, or >>> an array of flat documents. So I think your JSON would be: >>> >>> [ >>> {"firstname":"John", "lastname":"Doe"}, >>> {"firstname":"Anna", "lastname":"Smith"} >>> ] >>> >>> The table name will come from the Table Name property. >>> >>> Let us know if this doesn't work. >>> >>> -Bryan >>> >>> >>> On Mon, Oct 12, 2015 at 12:19 PM, Parul Agrawal < >>> parulagrawal14@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Thank you very much for all the support. >>>> I could able to convert XML format to json using custom flume source. >>>> >>>> Now I would need ConvertJSONToSQL processor to insert data into SQL. >>>> I am trying to get hands-on on this processor. Will update you on this. >>>> Meanwhile if any example you could share to use this processor for a >>>> sample >>>> json data, then it would be great. >>>> >>>> =============== >>>> >>>> 1) I tried using ConvertJSONToSQL processor with the below sample json >>>> file: >>>> >>>> "details":[ >>>> {"firstname":"John", "lastname":"Doe"}, >>>> {"firstname":"Anna", "lastname":"Smith"} >>>> ] >>>> >>>> 2) I created table *details *in the postgreSQL >>>> * select * from details ;* >>>> * firstname | lastname* >>>> *-----------+----------* >>>> *(0 rows)* >>>> >>>> 3) ConvertJSONToSQL Processor property details are as below: >>>> *Property * *Value* >>>> JDBC Connection PoolInfo DBCPConnectionPool >>>> Statement TypeInfo INSERT >>>> Table NameInfo details >>>> Catalog NameInfo No value set >>>> Translate Field NamesInfo false >>>> Unmatched Field BehaviorInfo Ignore Unmatched Fields >>>> Update KeysInfo No value set >>>> >>>> But I am getting the below mentioned error in ConvertJSONToSQL >>>> Processor. >>>> 2015-10-12 05:15:19,584 ERROR [Timer-Driven Process Thread-1] >>>> o.a.n.p.standard.ConvertJSONToSQL >>>> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to convert >>>> StandardFlowFileRecord[uuid=3a58716b-1474-4d75-91c1-e2fc3b9175ba,claim=StandardContentClaim >>>> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default, >>>> section=1], offset=115045, length=104],offset=0,name=json,size=104] to a >>>> SQL INSERT statement due to >>>> org.apache.nifi.processor.exception.ProcessException: None of the fields in >>>> the JSON map to the columns defined by the details table; routing to >>>> failure: org.apache.nifi.processor.exception.ProcessException: None of the >>>> fields in the JSON map to the columns defined by the details table >>>> >>>> Thanks and Regards, >>>> Parul >>>> >>>> On Sat, Oct 10, 2015 at 9:45 PM, Joey Echeverria >>>> wrote: >>>> >>>>> I've done something like this by wrapping the command in a shell >>>>> script: >>>>> >>>>> http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/ >>>>> >>>>> My use case was slightly different, but I'm pretty sure you can adapt >>>>> the same idea. >>>>> >>>>> -Joey >>>>> >>>>> On Oct 10, 2015, at 03:52, Parul Agrawal >>>>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I actually need to get the data from pipe. >>>>> So the actual command I would need is mkfifo /tmp/packet;tshark -i >>>>> ens160 -T pdml >/tmp/packet. >>>>> Is it possible to use ExecuteProcessor for multiple commands ? >>>>> >>>>> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal < >>>>> parulagrawal14@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I added custom flume source and when flume source is sending the data >>>>>> to flume sink, below mentioned error is thrown at flume sink. >>>>>> >>>>>> Administratively Yielded for 1 sec due to processing failure >>>>>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9] >>>>>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding >>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught >>>>>> Exception: java.lang.IllegalStateException: close() called when transaction >>>>>> is OPEN - you must either commit or rollback first >>>>>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9] >>>>>> o.a.n.c.t.ContinuallyRunProcessorTask >>>>>> java.lang.IllegalStateException: close() called when transaction is >>>>>> OPEN - you must either commit or rollback first >>>>>> at >>>>>> com.google.common.base.Preconditions.checkState(Preconditions.java:172) >>>>>> ~[guava-r05.jar:na] >>>>>> at >>>>>> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) >>>>>> ~[flume-ng-core-1.6.0.jar:1.6.0] >>>>>> at >>>>>> org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105) >>>>>> ~[flume-ng-core-1.6.0.jar:1.6.0] >>>>>> at >>>>>> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139) >>>>>> ~[na:na] >>>>>> at >>>>>> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148) >>>>>> ~[na:na] >>>>>> at >>>>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077) >>>>>> ~[nifi-framework-core-0.3.0.jar:0.3.0] >>>>>> at >>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127) >>>>>> [nifi-framework-core-0.3.0.jar:0.3.0] >>>>>> at >>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49) >>>>>> [nifi-framework-core-0.3.0.jar:0.3.0] >>>>>> at >>>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119) >>>>>> [nifi-framework-core-0.3.0.jar:0.3.0] >>>>>> at >>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>>>>> [na:1.7.0_85] >>>>>> at >>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) >>>>>> [na:1.7.0_85] >>>>>> at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) >>>>>> [na:1.7.0_85] >>>>>> at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>>>> [na:1.7.0_85] >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>> [na:1.7.0_85] >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>> [na:1.7.0_85] >>>>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85] >>>>>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9] >>>>>> o.a.n.processors.flume.ExecuteFlumeSink >>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] >>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process >>>>>> due to org.apache.nifi.processor.exception.FlowFileHandlingException: >>>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim >>>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default, >>>>>> section=7], offset=180436, >>>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in >>>>>> this session (StandardProcessSession[id=218318]); rolling back session: >>>>>> org.apache.nifi.processor.exception.FlowFileHandlingException: >>>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim >>>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default, >>>>>> section=7], offset=180436, >>>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in >>>>>> this session (StandardProcessSession[id=218318]) >>>>>> >>>>>> Any idea what could be wrong in this. >>>>>> >>>>>> Thanks and Regards, >>>>>> Parul >>>>>> >>>>>> >>>>>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende wrote: >>>>>> >>>>>>> Hi Parul, >>>>>>> >>>>>>> I think it would be good to keep the convo going on the users list >>>>>>> since there are more people who can offer help there, and also helps >>>>>>> everyone learn new solutions. >>>>>>> >>>>>>> The quick answer though is that NiFi has an ExecuteProcess processor >>>>>>> which could execute "tshark -i eth0 -T pdml". >>>>>>> >>>>>>> There is not currently an XmlToJson processor, so this could be a >>>>>>> place where you need a custom processor. For simple cases you can use an >>>>>>> EvaluateXPath processor to extract values from the XML, and then a >>>>>>> ReplaceText processor to build a new json document from those extracted >>>>>>> values. >>>>>>> >>>>>>> -Bryan >>>>>>> >>>>>>> >>>>>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal < >>>>>>> parulagrawal14@gmail.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Little more to add..... >>>>>>>> I need to keep reading the flowfile till END_TAG is received. i.e. >>>>>>>> we may need to concatenate the flowfile data till END_TAG. >>>>>>>> and then convert it to json and call PutFile() processor. >>>>>>>> >>>>>>>> Thanks and Regards, >>>>>>>> Parul >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal < >>>>>>>> parulagrawal14@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Thank you very much again for the guidance provided. >>>>>>>>> Basically I would need a processor which would convert XML file to >>>>>>>>> Json. >>>>>>>>> >>>>>>>>> Currently I have a flume source which is of type "exec" and the >>>>>>>>> command used is "tshark -i eth0 -T pdml". >>>>>>>>> >>>>>>>>> Here Flume source keeps sending data to flume sink. This flow file >>>>>>>>> would be of PDML format. >>>>>>>>> >>>>>>>>> Now I need a processor which would do the following >>>>>>>>> >>>>>>>>> 1) Form a complete XML file based on START TAG () >>>>>>>>> and END TAG () >>>>>>>>> 2) Once the XML message is formed convert it to json. >>>>>>>>> 3) Place a json file to local directory using PutFile() processor. >>>>>>>>> >>>>>>>>> I am not sure if I could able to explain the processor >>>>>>>>> requirement. >>>>>>>>> Would be really great if you could help me in this. >>>>>>>>> >>>>>>>>> Thanks and Regards, >>>>>>>>> Parul >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria >>>>>>>> > wrote: >>>>>>>>> >>>>>>>>>> > If you plan to use NiFi for the long term, it might be worth >>>>>>>>>> investing in converting your custom Flume components to NiFi processors. We >>>>>>>>>> can help you get started if you need any guidance going that route. >>>>>>>>>> >>>>>>>>>> +1. Running Flume sources/sinks is meant as a transition step. >>>>>>>>>> It's >>>>>>>>>> really useful if you have a complex Flume flow and want to migrate >>>>>>>>>> only parts of it over to NiFi at a time. I would port any custom >>>>>>>>>> sources and sinks to NiFi once you knew that it would meet your >>>>>>>>>> needs >>>>>>>>>> well. NiFi has a lot of documentation on writing processors and >>>>>>>>>> the >>>>>>>>>> concepts map pretty well if you're already familiar with Flume's >>>>>>>>>> execution model. >>>>>>>>>> >>>>>>>>>> -Joey >>>>>>>>>> >>>>>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende >>>>>>>>>> wrote: >>>>>>>>>> > >>>>>>>>>> > Hi Parul, >>>>>>>>>> > >>>>>>>>>> > It is possible to deploy a custom Flume source/sink to NiFi, >>>>>>>>>> but due to the way the Flume processors load the classes for the sources >>>>>>>>>> and sinks, the jar you deploy to the lib directory also needs to include >>>>>>>>>> the other dependencies your source/sink needs (or they each need to >>>>>>>>>> individually be in lib/ directly). >>>>>>>>>> > >>>>>>>>>> > So here is a sample project I created that makes a shaded jar: >>>>>>>>>> > https://github.com/bbende/my-flume-source >>>>>>>>>> > >>>>>>>>>> > It will contain the custom source and following dependencies >>>>>>>>>> all in one jar: >>>>>>>>>> > >>>>>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT >>>>>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile >>>>>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile >>>>>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile >>>>>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile >>>>>>>>>> > \- com.google.guava:guava:jar:11.0.2:compile >>>>>>>>>> > \- com.google.code.findbugs:jsr305:jar:1.3.9:compile >>>>>>>>>> > >>>>>>>>>> > I copied that to NiFi lib, restarted, created an >>>>>>>>>> ExecuteFlumeSource processor with the following config: >>>>>>>>>> > >>>>>>>>>> > Source Type = org.apache.flume.MySource >>>>>>>>>> > Agent Name = a1 >>>>>>>>>> > Source Name = r1 >>>>>>>>>> > Flume Configuration = a1.sources = r1 >>>>>>>>>> > >>>>>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log >>>>>>>>>> > >>>>>>>>>> > Keep in mind that this could become risky because any classes >>>>>>>>>> found in the lib directory would be accessible to all NARs in NiFi and >>>>>>>>>> would be found before classes within a NAR because the parent is checked >>>>>>>>>> first during class loading. This example isn't too risky because we are >>>>>>>>>> only bringing in flume jars and one guava jar, but for example if another >>>>>>>>>> nar uses a different version of guava this is going to cause a problem. >>>>>>>>>> > >>>>>>>>>> > If you plan to use NiFi for the long term, it might be worth >>>>>>>>>> investing in converting your custom Flume components to NiFi processors. We >>>>>>>>>> can help you get started if you need any guidance going that route. >>>>>>>>>> > >>>>>>>>>> > -Bryan >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal < >>>>>>>>>> parulagrawal14@gmail.com> wrote: >>>>>>>>>> >> >>>>>>>>>> >> Hello Bryan, >>>>>>>>>> >> >>>>>>>>>> >> Thank you very much for your response. >>>>>>>>>> >> >>>>>>>>>> >> Is it possible to have customized flume source and sink in >>>>>>>>>> Nifi? >>>>>>>>>> >> I have my own customized source and sink? I followed below >>>>>>>>>> steps to add my own customized source but it did not work. >>>>>>>>>> >> >>>>>>>>>> >> 1) Created Maven project and added customized source. >>>>>>>>>> (flume.jar was created after this step) >>>>>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder. >>>>>>>>>> >> 3) Added flume source processor with the below configuration >>>>>>>>>> >> >>>>>>>>>> >> Property Value >>>>>>>>>> >> Source Type com.flume.source.Source >>>>>>>>>> >> Agent Name a1 >>>>>>>>>> >> Source Name k1. >>>>>>>>>> >> >>>>>>>>>> >> But I am getting the below error in Flume Source Processor. >>>>>>>>>> >> "Failed to run validation due to >>>>>>>>>> java.lang.NoClassDefFoundError : /org/apache/flume/PollableSource." >>>>>>>>>> >> >>>>>>>>>> >> Can you please help me in this regard. Any step/configuration >>>>>>>>>> I missed. >>>>>>>>>> >> >>>>>>>>>> >> Thanks and Regards, >>>>>>>>>> >> Parul >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende >>>>>>>>>> wrote: >>>>>>>>>> >>> >>>>>>>>>> >>> Hello, >>>>>>>>>> >>> >>>>>>>>>> >>> The NiFi Flume processors are for running Flume sources and >>>>>>>>>> sinks with in NiFi. They don't communicate with an external Flume process. >>>>>>>>>> >>> >>>>>>>>>> >>> In your example you would need an ExecuteFlumeSource >>>>>>>>>> configured to run the netcat source, connected to a ExecuteFlumeSink >>>>>>>>>> configured with the logger. >>>>>>>>>> >>> >>>>>>>>>> >>> -Bryan >>>>>>>>>> >>> >>>>>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal < >>>>>>>>>> parulagrawal14@gmail.com> wrote: >>>>>>>>>> >>>> >>>>>>>>>> >>>> Hi, >>>>>>>>>> >>>> >>>>>>>>>> >>>> I was trying to run Nifi Flume processor with the below >>>>>>>>>> mentioned >>>>>>>>>> >>>> details but not could bring it up. >>>>>>>>>> >>>> >>>>>>>>>> >>>> I already started flume with the sample configuration file >>>>>>>>>> >>>> ============================================= >>>>>>>>>> >>>> # example.conf: A single-node Flume configuration >>>>>>>>>> >>>> >>>>>>>>>> >>>> # Name the components on this agent >>>>>>>>>> >>>> a1.sources = r1 >>>>>>>>>> >>>> a1.sinks = k1 >>>>>>>>>> >>>> a1.channels = c1 >>>>>>>>>> >>>> >>>>>>>>>> >>>> # Describe/configure the source >>>>>>>>>> >>>> a1.sources.r1.type = netcat >>>>>>>>>> >>>> a1.sources.r1.bind = localhost >>>>>>>>>> >>>> a1.sources.r1.port = 44444 >>>>>>>>>> >>>> >>>>>>>>>> >>>> # Describe the sink >>>>>>>>>> >>>> a1.sinks.k1.type = logger >>>>>>>>>> >>>> >>>>>>>>>> >>>> # Use a channel which buffers events in memory >>>>>>>>>> >>>> a1.channels.c1.type = memory >>>>>>>>>> >>>> a1.channels.c1.capacity = 1000 >>>>>>>>>> >>>> a1.channels.c1.transactionCapacity = 100 >>>>>>>>>> >>>> >>>>>>>>>> >>>> # Bind the source and sink to the channel >>>>>>>>>> >>>> a1.sources.r1.channels = c1 >>>>>>>>>> >>>> a1.sinks.k1.channel = c1 >>>>>>>>>> >>>> ============================================= >>>>>>>>>> >>>> >>>>>>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf >>>>>>>>>> conf >>>>>>>>>> >>>> --conf-file example.conf --name a1 >>>>>>>>>> -Dflume.root.logger=INFO,console >>>>>>>>>> >>>> >>>>>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following >>>>>>>>>> configuration was done: >>>>>>>>>> >>>> Property Value >>>>>>>>>> >>>> Sink Type logger >>>>>>>>>> >>>> Agent Name a1 >>>>>>>>>> >>>> Sink Name k1. >>>>>>>>>> >>>> >>>>>>>>>> >>>> Event is sent to the flume using: >>>>>>>>>> >>>> $ telnet localhost 44444 >>>>>>>>>> >>>> Trying 127.0.0.1... >>>>>>>>>> >>>> Connected to localhost.localdomain (127.0.0.1). >>>>>>>>>> >>>> Escape character is '^]'. >>>>>>>>>> >>>> Hello world! >>>>>>>>>> >>>> OK >>>>>>>>>> >>>> >>>>>>>>>> >>>> But I could not get any data in the nifi flume processor. >>>>>>>>>> Request your >>>>>>>>>> >>>> help in this. >>>>>>>>>> >>>> Do i need to change the example.conf file of flume so that >>>>>>>>>> Nifi Flume >>>>>>>>>> >>>> Sink should get the data. >>>>>>>>>> >>>> >>>>>>>>>> >>>> Thanks and Regards, >>>>>>>>>> >>>> Parul >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> -- >>>>>>>>>> >>> Sent from Gmail Mobile >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > --94eb2c0b6594699cee0522fd1577 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,

Thank you very much for all the sup= port.
I have written a custom processor to split json to multiple= json.
Now I would like to route the flowfile based on the conten= t of the flowfile.
I tried using RouteOnContent. But it did not w= ork.

Can you please help me how can i route the fl= owfile based on the content/data it contains.

Than= ks and Regards,
Parul



On Tue, Oct 13, 201= 5 at 6:54 PM, Bryan Bende <bbende@gmail.com> wrote:
Parul,

You can= use SplitJson to take a large JSON document and split an array element int= o individual documents. I took the json you attached and created a flow lik= e GetFile -> SplitJson -> SplitJson -> PutFile=C2=A0
In the first SplitJson the path I used was=C2=A0$.packet.proto = and in the second I used $.field =C2=A0This seemed to mostly work except so= me of the splits going into PutFile still have another level of "field= " which needs to be split again so would possibly need some conditiona= l logic to split certain documents again.

Alternat= ively you could write a custom processor that restructures your JSON.
=

-Bryan



On Tue, Oct 13, 2015 at 8:36 AM, Parul Agrawal <parulagrawal14@gm= ail.com> wrote:
Hi,

I tried with the above json element. But I am = getting the below mentioned error:

2015-10-12= 23:53:39,209 ERROR [Timer-Driven Process Thread-9] o.a.n.p.standard.Conver= tJSONToSQL ConvertJSONToSQL[id=3D0e964781-6914-486f-8bb7-214c6a1cd66e] Fail= ed to parse StandardFlowFileRecord[uuid=3Ddfc16db0-c7a6-4e9e-8b4d-8c5b4ec50= 742,claim=3DStandardContentClaim [resourceClaim=3DStandardResourceClaim[id= =3D1444483036971-1, container=3Ddefault, section=3D1], offset=3D132621, len= gth=3D55],offset=3D0,name=3Djson,size=3D55] as JSON due to org.apache.nifi.= processor.exception.ProcessException: IOException thrown from ConvertJSONTo= SQL[id=3D0e964781-6914-486f-8bb7-214c6a1cd66e]: org.codehaus.jackson.JsonPa= rseException: Unexpected character ('I' (code 73)): expected a vali= d value (number, String, array, object, 'true', 'false' or = 'null')

Also I have a huge json obje= ct attached (new.json). Can you guide me on how do i use ConvertJSONToSQL p= rocessor.
Should I use any other processor before using ConvertJS= ONToSQL processor so that this new.json can be converted in to a flat=C2=A0= document of key/value pairs, or an array o= f flat documents.=C2=A0
<= br>
Any help/guidance wou= ld be really useful.

=
Thanks and Regards,
Parul
=

On Mon, Oct = 12, 2015 at 10:36 PM, Bryan Bende <bbende@gmail.com> wrote:
I think ConvertJSONToSQL = expects a flat document of key/value pairs, or an array of flat documents. = So I think your JSON would be:

[
=C2=A0 =C2=A0 {"fi= rstname":"John", "lastname":"Doe"},
=C2=A0 =C2=A0 {"firstname":"= ;Anna", "lastname":"Smith"}
]

The table name will come from the Table N= ame property.

Let us know if this doesn't work.

-Bryan
=C2=A0

On Mon, Oct 12, 2015 at 12:19 PM, Parul Agrawal = <parulagra= wal14@gmail.com> wrote:
Hi,

Thank you very much for all the suppor= t.
I could able to convert XML format to json =C2=A0using custom = flume source.

Now I would need=C2=A0ConvertJSONToS= QL processor to insert data into SQL.
I am trying to get hands-on= on this processor. Will update you on this.
Meanwhile if any exa= mple you could share to use this processor for a sample
json data= , then it would be great.

=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D

1) I tried using ConvertJSON= ToSQL processor with the below sample json file:

<= div>"details":[
=C2=A0 =C2=A0 {"firstname":&q= uot;John", "lastname":"Doe"},
=C2=A0 =C2= =A0 {"firstname":"Anna", "lastname":"Smi= th"}
]

2) I created table = details in the postgreSQL=C2=A0
=C2=A0select * from d= etails ;
=C2=A0firstname | lastname
-----= ------+----------
(0 rows)

<= div>3) ConvertJSONToSQL Processor property details are as below:
= Property =C2=A0=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Value
JDBC C= onnection PoolInfo =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DBCPConnectionP= ool
Statement TypeInfo =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0INSERT
Table NameInfo =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0details
Catalog NameInfo =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 No value set=
Translate Field NamesInfo =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 false
Unmatched Field BehaviorInfo =C2=A0 =C2=A0 =C2=A0 Ignor= e Unmatched Fields
Update KeysInfo =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 No value set=

But I am getting the below mentioned error = in ConvertJSONToSQL Processor.
2015-10-12 05:15:19,584 ERROR= [Timer-Driven Process Thread-1] o.a.n.p.standard.ConvertJSONToSQL ConvertJ= SONToSQL[id=3D0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to convert Stand= ardFlowFileRecord[uuid=3D3a58716b-1474-4d75-91c1-e2fc3b9175ba,claim=3DStand= ardContentClaim [resourceClaim=3DStandardResourceClaim[id=3D1444483036971-1= , container=3Ddefault, section=3D1], offset=3D115045, length=3D104],offset= =3D0,name=3Djson,size=3D104] to a SQL INSERT statement due to org.apache.ni= fi.processor.exception.ProcessException: None of the fields in the JSON map= to the columns defined by the details table; routing to failure: org.apach= e.nifi.processor.exception.ProcessException: None of the fields in the JSON= map to the columns defined by the details table

=
Thanks and Regards,
Parul

On Sat, Oct 10, 2015 at 9:45 PM,= Joey Echeverria <joey42@gmail.com> wrote:
I've done something like this by= wrapping the command in a shell script:


My use case was slightly different, but I'= ;m pretty sure you can adapt the same idea.=C2=A0

-Joey

O= n Oct 10, 2015, at 03:52, Parul Agrawal <parulagrawal14@gmail.com> wrote:
<= br>
Hi,

=
I actually need to get the data from pipe.
So the actual com= mand I would need is mkfifo /tmp/packet;tshark -i ens160 -T pdml >/tmp/p= acket.
Is it possible to use ExecuteProcessor for multiple comman= ds ?=C2=A0

On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <<= a href=3D"mailto:parulagrawal14@gmail.com" target=3D"_blank">parulagrawal14= @gmail.com> wrote:
Hi,

I added custom flume source and when flume = source is sending the data to flume sink, below mentioned error is thrown a= t flume sink.

=C2=A0Administratively Yielded = for 1 sec due to processing failure
2015-10-10 02:30:45,027 WARN = [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask Admin= istratively Yielding ExecuteFlumeSink[id=3D2d08dfe7-4fd1-4a10-9d25-0b007a2c= 41bf] due to uncaught Exception: java.lang.IllegalStateException: close() c= alled when transaction is OPEN - you must either commit or rollback first
2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9] o.a.n= .c.t.ContinuallyRunProcessorTask
java.lang.IllegalStateException:= close() called when transaction is OPEN - you must either commit or rollba= ck first
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at com.google.common.base.Pr= econditions.checkState(Preconditions.java:172) ~[guava-r05.jar:na]
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flume.channel.BasicTransactionS= emantics.close(BasicTransactionSemantics.java:179) ~[flume-ng-core-1.6.0.ja= r:1.6.0]
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flume.sink.Log= gerSink.process(LoggerSink.java:105) ~[flume-ng-core-1.6.0.jar:1.6.0]
=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.nifi.processors.flume.Execut= eFlumeSink.onTrigger(ExecuteFlumeSink.java:139) ~[na:na]
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 at org.apache.nifi.processors.flume.AbstractFlumeProce= ssor.onTrigger(AbstractFlumeProcessor.java:148) ~[na:na]
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 at org.apache.nifi.controller.StandardProcessorNode.on= Trigger(StandardProcessorNode.java:1077) ~[nifi-framework-core-0.3.0.jar:0.= 3.0]
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.nifi.controller.ta= sks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127) = [nifi-framework-core-0.3.0.jar:0.3.0]
=C2=A0 =C2=A0 =C2=A0 =C2=A0= at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(Conti= nuallyRunProcessorTask.java:49) [nifi-framework-core-0.3.0.jar:0.3.0]
=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.nifi.controller.scheduling.T= imerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119) [nifi-= framework-core-0.3.0.jar:0.3.0]
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at ja= va.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1= .7.0_85]
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.Futu= reTask.runAndReset(FutureTask.java:304) [na:1.7.0_85]
=C2=A0 =C2= =A0 =C2=A0 =C2=A0 at java.util.concurrent.ScheduledThreadPoolExecutor$Sched= uledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_8= 5]
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.ScheduledT= hreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:= 293) [na:1.7.0_85]
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concu= rrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_= 85]
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.ThreadPoo= lExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_85]
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Thread.run(Thread.java:745) [na:1.= 7.0_85]
2015-10-10 02:30:46,029 ERROR [Timer-Driven Process= Thread-9] o.a.n.processors.flume.ExecuteFlumeSink ExecuteFlumeSink[id=3D2d= 08dfe7-4fd1-4a10-9d25-0b007a2c41bf] ExecuteFlumeSink[id=3D2d08dfe7-4fd1-4a1= 0-9d25-0b007a2c41bf] failed to process due to org.apache.nifi.processor.exc= eption.FlowFileHandlingException: StandardFlowFileRecord[uuid=3D8832b036-51= a4-49cf-9703-fc4ed443ab80,claim=3DStandardContentClaim [resourceClaim=3DSta= ndardResourceClaim[id=3D1444462207782-7, container=3Ddefault, section=3D7],= offset=3D180436, length=3D14078],offset=3D0,name=3D8311685679474355,size= =3D14078] is not known in this session (StandardProcessSession[id=3D218318]= ); rolling back session: org.apache.nifi.processor.exception.FlowFileHandli= ngException: StandardFlowFileRecord[uuid=3D8832b036-51a4-49cf-9703-fc4ed443= ab80,claim=3DStandardContentClaim [resourceClaim=3DStandardResourceClaim[id= =3D1444462207782-7, container=3Ddefault, section=3D7], offset=3D180436, len= gth=3D14078],offset=3D0,name=3D8311685679474355,size=3D14078] is not known = in this session (StandardProcessSession[id=3D218318])

Any idea what could be wrong in this.

Than= ks and Regards,
Parul


On Fri, Oct 9, 2015 at 6= :32 PM, Bryan Bende <bbende@gmail.com> wrote:
Hi Parul,

I think it= would be good to keep the convo going on the users list since there are mo= re people who can offer help there, and also helps everyone learn new solut= ions.

The quick answer though is that NiFi has an = ExecuteProcess processor which could execute=C2=A0"tshark -i eth0 -T pdml".=C2=A0

There is not currently an XmlToJson processor, so this could be a plac= e where you need a custom processor. For simple cases you can use an Evalua= teXPath processor to extract values from the XML, and then a ReplaceText pr= ocessor to build a new json document from those extracted values.

=
-Bryan
=
=

On Fri, Oct = 9, 2015 at 3:39 AM, Parul Agrawal <parulagrawal14@gmail.com>= wrote:
Hi,
<= br>
Little more to add.....
=C2=A0I need to keep readin= g the flowfile till END_TAG is received. i.e. we may need to concatenate th= e flowfile data till END_TAG.
and then convert it to json and= call PutFile() processor.

Thanks and Regards,
Parul



On Fri, Oct 9, 2015 at 10:5= 6 AM, Parul Agrawal <parulagrawal14@gmail.com> wrote:=
Hi,

= Thank you very much again for the guidance provided.
Basically I = would need a processor which would convert XML file to Json.

=
Currently I have a flume source which is of type "exec"= ; and the command used is "tshark -i eth0 -T pdml".
Here Flume source keeps sending data to flume sink. This flow f= ile would be of PDML format.

Now I need a processo= r which would do the following

1) Form a complete = XML file based on START TAG (<packet>)
and END TAG (</pa= cket>)
2) Once the XML message is formed convert it to json.
3) Place a json file to local directory using PutFile() processor.=

I am not sure if I could able to explain the proc= essor requirement.=C2=A0
Would be really great if you could help = me in this.

Thanks and Regards,
Parul=C2= =A0


On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <jo= ey42@gmail.com> wrote:
> If you plan to use NiFi for the long term, it might be worth investi= ng in converting your custom Flume components to NiFi processors. We can he= lp you get started if you need any guidance going that route.

+1. Running Flume sources/sinks is meant as a transition step. It= 9;s
really useful if you have a complex Flume flow and want to migrate
only parts of it over to NiFi at a time. I would port any custom
sources and sinks to NiFi once you knew that it would meet your needs
well. NiFi has a lot of documentation on writing processors and the
concepts map pretty well if you're already familiar with Flume's execution model.

-Joey

On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bbende@gmail.com> wrote:
>
> Hi Parul,
>
> It is possible to deploy a custom Flume source/sink to NiFi, but due t= o the way the Flume processors load the classes for the sources and sinks, = the jar you deploy to the lib directory also needs to include the other dep= endencies your source/sink needs (or they each need to individually be in l= ib/ directly).
>
> So here is a sample project I created that makes a shaded jar:
> https://github.com/bbende/my-flume-source
>
> It will contain the custom source and following dependencies all in on= e jar:
>
> org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
> +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
> +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
> +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
> +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>=C2=A0 =C2=A0\- com.google.guava:guava:jar:11.0.2:compile
>=C2=A0 =C2=A0 =C2=A0 \- com.google.code.findbugs:jsr305:jar:1.3.9:compi= le
>
> I copied that to NiFi lib, restarted, created an ExecuteFlumeSource pr= ocessor with the following config:
>
> Source Type =3D org.apache.flume.MySource
> Agent Name =3D a1
> Source Name =3D r1
> Flume Configuration =3D a1.sources =3D r1
>
> And I was getting the output in nifi/logs/nifi-bootstrap.log
>
> Keep in mind that this could become risky because any classes found in= the lib directory would be accessible to all NARs in NiFi and would be fou= nd before classes within a NAR because the parent is checked first during c= lass loading. This example isn't too risky because we are only bringing= in flume jars and one guava jar, but for example if another nar uses a dif= ferent version of guava this is going to cause a problem.
>
> If you plan to use NiFi for the long term, it might be worth investing= in converting your custom Flume components to NiFi processors. We can help= you get started if you need any guidance going that route.
>
> -Bryan
>
>
> On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <parulagrawal14@gmail.com> = wrote:
>>
>> Hello Bryan,
>>
>> Thank you very much for your response.
>>
>> Is it possible to have customized flume source and sink in Nifi? >> I have my own customized source and sink? I followed below steps t= o add my own customized source but it did not work.
>>
>> 1) Created Maven project and added customized source. (flume.jar w= as created after this step)
>> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>> 3) Added flume source processor with the below configuration
>>
>> Property=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Value
>> Source Type=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0com.flume.source.Sour= ce
>> Agent Name=C2=A0 =C2=A0 =C2=A0 a1
>> Source Name=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0k1.
>>
>> But I am getting the below error in Flume Source Processor.
>> "Failed to run validation due to java.lang.NoClassDefFoundErr= or : /org/apache/flume/PollableSource."
>>
>> Can you please help me in this regard. Any step/configuration I mi= ssed.
>>
>> Thanks and Regards,
>> Parul
>>
>>
>> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bbende@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> The NiFi Flume processors are for running Flume sources and si= nks with in NiFi. They don't communicate with an external Flume process= .
>>>
>>> In your example you would need an ExecuteFlumeSource configure= d to run the netcat source, connected to a ExecuteFlumeSink configured with= the logger.
>>>
>>> -Bryan
>>>
>>> On Wednesday, October 7, 2015, Parul Agrawal <parulagrawal14@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I was trying to run Nifi Flume processor with the below me= ntioned
>>>> details but not could bring it up.
>>>>
>>>> I already started flume with the sample configuration file=
>>>> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D
>>>> # example.conf: A single-node Flume configuration
>>>>
>>>> # Name the components on this agent
>>>> a1.sources =3D r1
>>>> a1.sinks =3D k1
>>>> a1.channels =3D c1
>>>>
>>>> # Describe/configure the source
>>>> a1.sources.r1.type =3D netcat
>>>> a1.sources.r1.bind =3D localhost
>>>> a1.sources.r1.port =3D 44444
>>>>
>>>> # Describe the sink
>>>> a1.sinks.k1.type =3D logger
>>>>
>>>> # Use a channel which buffers events in memory
>>>> a1.channels.c1.type =3D memory
>>>> a1.channels.c1.capacity =3D 1000
>>>> a1.channels.c1.transactionCapacity =3D 100
>>>>
>>>> # Bind the source and sink to the channel
>>>> a1.sources.r1.channels =3D c1
>>>> a1.sinks.k1.channel =3D c1
>>>> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D
>>>>
>>>> Command used to start flume : $ bin/flume-ng agent --conf = conf
>>>> --conf-file example.conf --name a1 -Dflume.root.logger=3DI= NFO,console
>>>>
>>>> In the Nifi browser of ExecuteFlumeSink following configur= ation was done:
>>>> Property=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Value
>>>> Sink Type=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0logger
>>>> Agent Name=C2=A0 =C2=A0 =C2=A0 a1
>>>> Sink Name=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0k1.
>>>>
>>>> Event is sent to the flume using:
>>>> $ telnet localhost 44444
>>>> Trying 127.0.0.1...
>>>> Connected to localhost.localdomain (127.0.0.1).
>>>> Escape character is '^]'.
>>>> Hello world! <ENTER>
>>>> OK
>>>>
>>>> But I could not get any data in the nifi flume processor. = Request your
>>>> help in this.
>>>> Do i need to change the example.conf file of flume so that= Nifi Flume
>>>> Sink should get the data.
>>>>
>>>> Thanks and Regards,
>>>> Parul
>>>
>>>
>>>
>>> --
>>> Sent from Gmail Mobile
>>
>>
>










--94eb2c0b6594699cee0522fd1577--