nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt Burgess <mattyb...@gmail.com>
Subject Re: Upsert
Date Tue, 22 Aug 2017 21:47:45 GMT
If you expect more updates for a given row than its insert, you can do the logic backwards,
the first time the row is seen it will fail but all updates should go right through :)


> On Aug 22, 2017, at 3:56 PM, Austin Duncan <aduncan@pyaanalytics.com> wrote:
> 
> I think its working? i havent had a file fail yet to check so im just letting it run
but it should work. I just am updating any files that fail and routing it back to the putdatabaserecord
processor. In my head it seems like that should work.
> 
>> On Tue, Aug 22, 2017 at 3:12 PM, Matt Burgess <mattyb149@gmail.com> wrote:
>> It's not ideal, that's for sure :) However "upsert" is far from a standard operation
(no matter what the DB docs tell you lol). One thing that might be a viable improvement is
to implement upsert as a "try insert, then try update" as the workaround does. Depending on
the actual reason for flowfile failure, this might not be any better, and then the improvement
would be to choose a dialect and implement upsert where possible, falling back to either failure
if unsupported, or the try-insert-then-update behavior. It adds lots of complexity and still
might not solve all cases for ACID DBs.
>> 
>> Regards,
>> Matt
>> 
>>> On Aug 22, 2017, at 2:06 PM, Austin Duncan <aduncan@pyaanalytics.com> wrote:
>>> 
>>> That's what we had originally but we felt it was kind of hacky. 
>>> 
>>>> On Tue, Aug 22, 2017 at 2:05 PM, Matt Burgess <mattyb149@gmail.com>
wrote:
>>>> If the order of operations doesn't matter then you could route flow files
that failed because the record existed back to an UpdateAttribute that sets the statement.type
to update and try them again. Not sure what additional logic is involved (I'm not at my computer
ATM) but that's where PutDatabaseRecord offers an advantage.
>>>> 
>>>> Regards,
>>>> Matt
>>>> 
>>>> 
>>>>> On Aug 22, 2017, at 1:48 PM, Austin Duncan <aduncan@pyaanalytics.com>
wrote:
>>>>> 
>>>>> All of the records have the same schema though I think. It's all the
same kind of data and the same format. We were originally using a PutSql processor and were
having trouble with it. The current setup actually runs pretty well im just trying to figure
out how to do an insert and then if the data already exists in the table via the rfidnumber
do an update instead.
>>>>> 
>>>>>> On Tue, Aug 22, 2017 at 12:05 PM, Matt Burgess <mattyb149@gmail.com>
wrote:
>>>>>> Your use case will be more complex, because you need to form SQL
from your JSON. You can try ConvertJSONToSQL and then ReplaceText to add the ON CONFLICT to
the end.
>>>>>> 
>>>>>> Also if you are using SplitJson then PutDatabaseRecord won't be as
efficient as it could be; it is meant to work on a number of records in a single flow file
with the same schema. In your case you might be better off with PutSQL as it won't require
schemas or statement types. The Split -> convert to SQL -> PutSQL pattern was the original
way to do things, but for multiple records with the same schema, PutDatabaseRecord was added.
>>>>>> 
>>>>>> Regards,
>>>>>> Matt
>>>>>> 
>>>>>>> On Aug 22, 2017, at 11:43 AM, Austin Duncan <aduncan@pyaanalytics.com>
wrote:
>>>>>>> 
>>>>>>> The example you listed sounds like a solution I am just having
trouble fully understanding. I want to make my own sql so that i can do the "insert into on
conflict"
>>>>>>>  I am just having trouble really understanding what it is that
I have to do so that the query and the data will be understood by the processor. Do I need
different schemas? One like I sent and one for the query? 
>>>>>>> 
>>>>>>>> On Tue, Aug 22, 2017 at 11:38 AM, Matt Burgess <mattyb149@apache.org>
wrote:
>>>>>>>> If your incoming data is already in fields (JSON, e.g.) and
not a SQL
>>>>>>>> statement, then the statement.type should be "insert" rather
than
>>>>>>>> "sql". The "sql" type is for passing in explicit SQL statements
to
>>>>>>>> execute rather than taking a record and generating the appropriate
SQL
>>>>>>>> statement from the fields and the statement.type.
>>>>>>>> 
>>>>>>>> If you are trying to generate your own SQL and execute that,
then
>>>>>>>> you'd want to try something like the solution I outlined
before, where
>>>>>>>> you put the SQL in a field such as "statement", set statement.type
to
>>>>>>>> "sql" and Field Containing SQL to "statement".
>>>>>>>> 
>>>>>>>> Does that make sense? Or am I misunderstanding what you are
trying to do?
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Matt
>>>>>>>> 
>>>>>>>> On Tue, Aug 22, 2017 at 11:32 AM, Austin Duncan
>>>>>>>> <aduncan@pyaanalytics.com> wrote:
>>>>>>>> > Matt,
>>>>>>>> >
>>>>>>>> > I am using JsonPathReader and using the 'Schema Text'
Property with a schema
>>>>>>>> > defined in there. I could never figure out how to use
any of the other
>>>>>>>> > Access Strategies. It's an inventory system so we are
extracting the Json
>>>>>>>> > and splitting it into a json file for each row. Then
using this schema: {
>>>>>>>> >  "name": "insertSql",
>>>>>>>> >  "type": "record",
>>>>>>>> >  "fields": [
>>>>>>>> >   {
>>>>>>>> >    "name": "RfidNumber",
>>>>>>>> >    "type": "string"
>>>>>>>> >   },
>>>>>>>> >   {
>>>>>>>> >    "name": "CabinetName",
>>>>>>>> >    "type": "string"
>>>>>>>> >   },
>>>>>>>> >   {
>>>>>>>> >    "name": "ItemNumber",
>>>>>>>> >    "type": "string"
>>>>>>>> >   },
>>>>>>>> >   {
>>>>>>>> >    "name": "LotNumber",
>>>>>>>> >    "type": "string"
>>>>>>>> >   },
>>>>>>>> >   {
>>>>>>>> >    "name": "PurchaseOrderNumber",
>>>>>>>> >    "type": "string"
>>>>>>>> >   },
>>>>>>>> >   {
>>>>>>>> >    "name": "PurchaseOrderPrice",
>>>>>>>> >    "type": "float"
>>>>>>>> >   },
>>>>>>>> >   {
>>>>>>>> >    "name": "SupplierId",
>>>>>>>> >    "type": "string"
>>>>>>>> >   },
>>>>>>>> >   {
>>>>>>>> >    "name": "SupplierName",
>>>>>>>> >    "type": "string"
>>>>>>>> >   },
>>>>>>>> >   {
>>>>>>>> >     "name": "updatedate",
>>>>>>>> >     "type": "string"
>>>>>>>> >     }
>>>>>>>> >  ]
>>>>>>>> > }
>>>>>>>> > I inserted the data into the table. I am not 100% on
the uses of schemas so
>>>>>>>> > I am not quite sure what you mean by using a schema
to define the query.
>>>>>>>> >
>>>>>>>> > On Tue, Aug 22, 2017 at 11:23 AM, Matt Burgess <mattyb149@apache.org>
wrote:
>>>>>>>> >>
>>>>>>>> >> Austin,
>>>>>>>> >>
>>>>>>>> >> What are you using for a record reader and schema
for
>>>>>>>> >> PutDatabaseRecord?  In order to execute SQL using
PutDatabaseRecord,
>>>>>>>> >> you have to specify a "Field containing SQL", and
the incoming
>>>>>>>> >> record(s) must have a field with that name. The
value of that field
>>>>>>>> >> (for each record) will be executed.
>>>>>>>> >>
>>>>>>>> >> What I've done in the past is to put the whole SQL
statement in a JSON
>>>>>>>> >> doc: {"query": "INSERT INTO table (column1 ,column2,
column3, column4)
>>>>>>>> >> VALUES() ON CONFLICT (rfidnumber) DO UPDATE"} then
I set Field
>>>>>>>> >> Containing SQL to "query", and use a JsonPathReader
specifying a
>>>>>>>> >> "query" field with a path of $.query, or just a
JsonTreeReader, either
>>>>>>>> >> reader using a schema with a single string field
called "query".
>>>>>>>> >>
>>>>>>>> >> IMO an improvement would be nice to add a SQLReader
that could split
>>>>>>>> >> on newlines or semicolons or whatever, and each
"record" would contain
>>>>>>>> >> a SQL statement with the specified field name. That
would save the
>>>>>>>> >> trouble of having to temporarily convert your SQL
statement(s) into a
>>>>>>>> >> format that an existing Reader can recognize.
>>>>>>>> >>
>>>>>>>> >> Regards,
>>>>>>>> >> Matt
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> On Tue, Aug 22, 2017 at 11:12 AM, Austin Duncan
>>>>>>>> >> <aduncan@pyaanalytics.com> wrote:
>>>>>>>> >> > In my flow I am pulling data from a Json, splitting
the Json and then
>>>>>>>> >> > inserting that into a postgres table using
the putdatabaserecord
>>>>>>>> >> > processor.
>>>>>>>> >> > I have been using the insert statement option
and it has been working
>>>>>>>> >> > fine
>>>>>>>> >> > but now I am trying to figure out how to do
a INSERT INTO table ON
>>>>>>>> >> > CONFLICT
>>>>>>>> >> > UPDATE statement. I have the statement.type
attribute set to SQL and am
>>>>>>>> >> > trying to do the query:
>>>>>>>> >> >
>>>>>>>> >> > INSERT INTO table (column1 ,column2, column3,
column4)
>>>>>>>> >> >
>>>>>>>> >> > VALUES()
>>>>>>>> >> >
>>>>>>>> >> > ON CONFLICT (rfidnumber) DO UPDATE;
>>>>>>>> >> >
>>>>>>>> >> > I am getting the error 'Record schema does
not contain filed containing
>>>>>>>> >> > SQL'. So two th   Any help would be appreciated.
>>>>>>>> >> >
>>>>>>>> >> > Thanks,
>>>>>>>> >> >
>>>>>>>> >> > Austin
>>>>>>>> >
>>>>>>>> >
>>>>>>> 
>>>>> 
>>> 
> 

Mime
View raw message