nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt Burgess <mattyb...@apache.org>
Subject Re: Replace flowfile contents from InvokeScriptedProcessor?
Date Tue, 02 May 2017 17:26:26 GMT
If you want to use attributes inside the callback, I recommend building a
dictionary from flowfile.getAttributes() and passing that into the
PyStreamCallback constructor:

class PyStreamCallback(StreamCallback):
    def __init__(self, attrs):
        self.attrs = attrs


# ... later on ...
session.write(flowfile, PyStreamCallback(PySet(flowfile.getAttributes())))

 Or something like that.  I thought I'd seen an example somewhere but I
can't find it.

Regards,
Matt

On Tue, May 2, 2017 at 1:06 PM, Andy LoPresto <alopresto@apache.org> wrote:

> I am not a Python expert, but if you set “self.result[“x”]” in one class,
> can you reference it in a separate class? What is the exception you are
> getting?
>
> Andy LoPresto
> alopresto@apache.org
> *alopresto.apache@gmail.com <alopresto.apache@gmail.com>*
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>
> On May 2, 2017, at 1:01 PM, James McMahon <jsmcmahon3@gmail.com> wrote:
>
> Thanks for your reply Matt. I think this cuts to the heart of my failure
> here. I have tried treating PyStreamCallback() as a second class in my
> python file, like so:
>
> class PyStreamCallback(StreamCallback) :
>      def __init__(self):
>           pass
>      def process(self, inputStream, outputStream) :
>           outputStream.write(Unicode(json.dumps(self.result['
> thisThing'])))
>
> classUpdateAttributes(Processor) :
>      def __init__(self) :
>           self.result{}
>           self.__rel_success = Relationship......etc etc
> .
> .
> .
>      def onTrigger(self, context, sessionFactory):
> .
> .
> .
>           self.result['thisThing'] = flowfile.getAttribute("s3.key")
>           # this fails:    flowfile = session.write(flowfile,
> PyStreamCallback())
>           # this fails too:  flowfile = session.write(self,flowfile,
> PyStreamCallback())
>
> Am I mistaken to configure PyStreamCallback as a second independent class?
> Should it be a defined method within class UpdateAttributes() ?
>
> On Tue, May 2, 2017 at 12:45 PM, Matt Burgess <mattyb149@apache.org>
> wrote:
>
>> Jim, you still can/should use something like a PyStreamCallback().
>> ExecuteScript is basically an onTrigger() body, so you can use the same
>> approach inside your onTrigger() body in InvokeScriptedProcessor.  Pass an
>> instance of your PyStreamCallback into something like:
>>
>> flowfile = session.write(flowfile, PyStreamCallback())
>>
>> at some point before you transfer the flow file. If you need
>> variables/data from outside the PyStreamCallback() inside, you can pass
>> them into the constructor or (more dangerously) mess with the scope of the
>> variable(s).
>>
>> Regards,
>> Matt
>>
>>
>> On Tue, May 2, 2017 at 12:39 PM, James McMahon <jsmcmahon3@gmail.com>
>> wrote:
>>
>>> This is an example found at  https://github.com/apache/
>>> nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi
>>> -scripting-processors/src/test/resources/jython/test_update_attribute.py
>>> . It shows how to set up InvokeScriptedProcessor for python (jython) to add
>>> a simple attribute to a flowfile.
>>>
>>> How would I integrate in this structure a call to PyStreamCallback()
>>> that allows me to replace the *contents* of the flowfile? Typically
>>> PyStreamCallback() to accomplish this is its own class, but here it looks
>>> like I would need to make it a method in the UpdateAttributes() class. I've
>>> been unable to get that to work.
>>>
>>> Thank you in advance for any help.
>>>
>>> #! /usr/bin/python
>>> #
>>> # Licensed to the Apache Software Foundation (ASF) under one
>>> # or more contributor license agreements. See the NOTICE file
>>> # distributed with this work for additional information
>>> # regarding copyright ownership. The ASF licenses this file
>>> # to you under the Apache License, Version 2.0 (the
>>> # "License"); you may not use this file except in compliance
>>> # with the License. You may obtain a copy of the License at
>>> #
>>> # http://www.apache.org/licenses/LICENSE-2.0
>>> #
>>> # Unless required by applicable law or agreed to in writing,
>>> # software distributed under the License is distributed on an
>>> # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>> # KIND, either express or implied. See the License for the
>>> # specific language governing permissions and limitations
>>> # under the License.
>>> #
>>> import sys
>>> import traceback
>>> from org.apache.nifi.processor import Processor
>>> from org.apache.nifi.processor import Relationship
>>> from org.apache.nifi.components import PropertyDescriptor
>>> from org.apache.nifi.processor.util import StandardValidators
>>> class UpdateAttributes(Processor) :
>>> __rel_success = Relationship.Builder().description("Success").name("
>>> success").build()
>>> def __init__(self) :
>>> pass
>>> def initialize(self, context) :
>>> pass
>>> def getRelationships(self) :
>>> return set([self.__rel_success])
>>> def validate(self, context) :
>>> pass
>>> def getPropertyDescriptors(self) :
>>> descriptor = PropertyDescriptor.Builder().name("for-attributes"
>>> ).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
>>> return [descriptor]
>>> def onPropertyModified(self, descriptor, newValue, oldValue) :
>>> pass
>>> def onTrigger(self, context, sessionFactory) :
>>> session = sessionFactory.createSession()
>>> try :
>>> # ensure there is work to do
>>> flowfile = session.get()
>>> if flowfile is None :
>>> return
>>> # extract some attribute values
>>> fromPropertyValue = context.getProperty("for-attributes").getValue()
>>> fromAttributeValue = flowfile.getAttribute("for-attributes")
>>> # set an attribute
>>> flowfile = session.putAttribute(flowfile, "from-property",
>>> fromPropertyValue)
>>> flowfile = session.putAttribute(flowfile, "from-attribute",
>>> fromAttributeValue)
>>> # transfer
>>> session.transfer(flowfile, self.__rel_success)
>>> session.commit()
>>> except :
>>> print sys.exc_info()[0]
>>> print "Exception in TestReader:"
>>> print '-' * 60
>>> traceback.print_exc(file=sys.stdout)
>>> print '-' * 60
>>> session.rollback(true)
>>> raise
>>> processor = UpdateAttributes()
>>>
>>
>>
>
>

Mime
View raw message