nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Austin Duncan <adun...@pyaanalytics.com>
Subject Re: Stateful
Date Tue, 06 Feb 2018 19:21:48 GMT
could it be because the txn_time is a timestamp without time zone data type?

On Tue, Feb 6, 2018 at 2:01 PM, Austin Duncan <aduncan@pyaanalytics.com>
wrote:

> After creating the view and querying it with txn_time as the max-value
> column it just pulls all of the rows in the table every time it runs a task
> which makes me think i did something wrong. here is what my view and
> processor look like.
>
>
>
> On Tue, Feb 6, 2018 at 11:26 AM, Austin Duncan <aduncan@pyaanalytics.com>
> wrote:
>
>> I was able to create a view. but I am wondering how the best way to
>> update the view would be. The tables will have data loaded to them
>> peridoically so i wont be able to continually query the same view..
>>
>> On Tue, Feb 6, 2018 at 10:26 AM, Matt Burgess <mattyb149@apache.org>
>> wrote:
>>
>>> Austin,
>>>
>>> Can you create a (non-)materialized view from that query? If so then
>>> QueryDatabaseTable could work. If not, then try QueryRecord after
>>> ExecuteSQL (after adding s.txn_time, I didn't see it in the query), I
>>> think you can add a "max_txn_time" field to the schema and do
>>> something like SELECT *, MAX(txn_time) AS max_txn_time from FLOWFILE.
>>> You may also need a JSONRecordSetWriter so you can get the value from
>>> the max_txn_time field using EvaluateJSONPath. Then you can use
>>> UpdateAttribute to store the max value, and if you want to remove the
>>> max_txn_time field (and/or convert back to Avro) you can use
>>> ConvertRecord downstream. From UpdateAttribute you can send the flow
>>> file back to ExecuteSQL, assuming the query uses ExpressionLanguage
>>> with the max_txn_time attribute to get the new records. You may also
>>> need some logic to send the same flow file back to ExecuteSQL if no
>>> rows were returned in the resultset (a RouteOnAttribute using
>>> executesql.row.count should work), which should cause a retry until
>>> records are returned.
>>>
>>> BTW, there is a PR under review [1] to support arbitrary queries in
>>> QueryDatabaseTable, I'm not sure if that would solve your use case but
>>> you may see that option in NiFi 1.6.0.
>>>
>>> Regards,
>>> Matt
>>>
>>> [1] https://github.com/apache/nifi/pull/2162
>>>
>>>
>>> On Tue, Feb 6, 2018 at 9:54 AM, Austin Duncan <aduncan@pyaanalytics.com>
>>> wrote:
>>> > select
>>> > a.sendingfacility_namespaceid as "PracticeId",
>>> > Initcap(a.sendingfacility_universalid) as "PracticeName",
>>> > s.aip_personel_resource_id as "PhysicianId",
>>> > Initcap(s.aip_personel_resource_lname) as "PhysicianLastName",
>>> > Initcap(s.aip_personel_resource_fname) as "PhysicianFirstName",
>>> > coalesce(a.alternate_patient_id, s.alternate_patient_id) as
>>> "PatientId",
>>> > initcap(coalesce(a.patient_fname, s.patient_fname)) as "FirstName",
>>> > initcap(coalesce(a.patient_lname, s.patient_lname)) as "LastName",
>>> > a.patient_dob::date as "DateOfBirth",
>>> > a.patient_sex as "Gender",
>>> > a.patient_street as "Address1",
>>> > a.patient_other_designation as "Address2",
>>> > initcap(a.patient_city) as "City",
>>> > a.patient_state as "State",
>>> > a.patient_zip as "Zip",
>>> > coalesce(a.patient_ssn, s.patient_ssn) as "SSN",
>>> > a.pri_ins_company_name as "Payer1",
>>> > a.pri_ins_policy_nbr as "PolicyNumber1",
>>> > a.sec_ins_company_name as "Payer2",
>>> > a.sec_ins_policy_nbr as "PolicyNumber2",
>>> > s.schedule_id as "AppointmentId",
>>> > to_timestamp(s.appt_startdate, 'YYYYMMDDHH24MISS') as
>>> "AppointmentTime",
>>> > s.event_reason as "AppointmentDescription",
>>> > initcap(s.appt_type) as "AppointmentType",
>>> > s.filler_status as "AppointmentStatus",
>>> > s.event_reason_id as "EncounterID"
>>> >
>>> > from
>>> > hl7.adt a, hl7.siu s
>>> > where
>>> > a.alternate_patient_id = s.alternate_patient_id
>>> > and
>>> > length(trim(s.alternate_patient_id)) > 1
>>> >
>>> > i am also matching on 3 particular doctors but dont want to send that.
>>> The
>>> > column that i am trying to match on is s.txn_time. I only want
>>> txn_times
>>> > after the highest txn_time from the previous query.
>>> >
>>> > On Tue, Feb 6, 2018 at 9:50 AM, James Wing <jvwing@gmail.com> wrote:
>>> >>
>>> >> Understood.  Can you share an outline of the query you are trying to
>>> use?
>>> >> Which values would you change based on the maintained state?  What
>>> would be
>>> >> the source of the updated state?
>>> >>
>>> >> On Tue, Feb 6, 2018 at 6:33 AM, Austin Duncan <
>>> aduncan@pyaanalytics.com>
>>> >> wrote:
>>> >>>
>>> >>> I am not sure if thats what i need. My query is pretty robust and
i
>>> am
>>> >>> not sure if it will be able to be implemented in that
>>> >>>
>>> >>> On Tue, Feb 6, 2018 at 9:30 AM, James Wing <jvwing@gmail.com>
wrote:
>>> >>>>
>>> >>>> Austin,
>>> >>>>
>>> >>>> Have you tried QueryDatabaseTable?  For some databases and table
>>> schema,
>>> >>>> it provides a shortcut to querying for the recently changed
>>> records, as long
>>> >>>> as you have a "maximum value column" to use.
>>> >>>>
>>> >>>>
>>> >>>> https://nifi.apache.org/docs/nifi-docs/components/org.apache
>>> .nifi/nifi-standard-nar/1.5.0/org.apache.nifi.processors.sta
>>> ndard.QueryDatabaseTable/index.html
>>> >>>>
>>> >>>> Thanks,
>>> >>>>
>>> >>>> James
>>> >>>>
>>> >>>> On Tue, Feb 6, 2018 at 6:12 AM, Austin Duncan <
>>> aduncan@pyaanalytics.com>
>>> >>>> wrote:
>>> >>>>>
>>> >>>>> All,
>>> >>>>> I am trying to do a flow that queries a postgres database
every
>>> hour. I
>>> >>>>> am trying to use the stateful settings in the updateAttribute
>>> processor so
>>> >>>>> that I only pull new files that have been uploaded within
that
>>> hour. I am
>>> >>>>> having trouble figuring out how to implement it. Can anyone
help
>>> me out?
>>> >>>>> Thanks,
>>> >>>>> Austin
>>> >>>>>
>>> >>>>> --
>>> >>>>> Austin Duncan
>>> >>>>> Developer
>>> >>>>> PYA Analytics
>>> >>>>> 2220 Sutherland Avenue
>>> >>>>> Knoxville, TN 37919
>>> >>>>> 423-260-4172
>>> >>>>
>>> >>>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> Austin Duncan
>>> >>> Developer
>>> >>> PYA Analytics
>>> >>> 2220 Sutherland Avenue
>>> >>> Knoxville, TN 37919
>>> >>> 423-260-4172
>>> >>
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > Austin Duncan
>>> > Developer
>>> > PYA Analytics
>>> > 2220 Sutherland Avenue
>>> > Knoxville, TN 37919
>>> > 423-260-4172
>>>
>>
>>
>>
>> --
>> ‚ÄčAustin Duncan
>> *Developer*
>> PYA Analytics
>> 2220 Sutherland Avenue
>> <https://maps.google.com/?q=2220+Sutherland+AvenueKnoxville,+TN+37919+865&entry=gmail&source=g>
>> Knoxville, TN 37919
>> <https://maps.google.com/?q=2220+Sutherland+AvenueKnoxville,+TN+37919+865&entry=gmail&source=g>
>> 423-260-4172
>>
>> <https://maps.google.com/?q=2220+Sutherland+AvenueKnoxville,+TN+37919+865&entry=gmail&source=g>
>> <%28865%29%20684-2828>
>>
>
>
>
> --
> ‚ÄčAustin Duncan
> *Developer*
> PYA Analytics
> 2220 Sutherland Avenue
> <https://maps.google.com/?q=2220+Sutherland+AvenueKnoxville,+TN+37919+865&entry=gmail&source=g>
> Knoxville, TN 37919
> <https://maps.google.com/?q=2220+Sutherland+AvenueKnoxville,+TN+37919+865&entry=gmail&source=g>
> 423-260-4172
>
> <https://maps.google.com/?q=2220+Sutherland+AvenueKnoxville,+TN+37919+865&entry=gmail&source=g>
> <%28865%29%20684-2828>
>



-- 
‚ÄčAustin Duncan
*Developer*
PYA Analytics
2220 Sutherland Avenue
<https://maps.google.com/?q=2220+Sutherland+AvenueKnoxville,+TN+37919+865&entry=gmail&source=g>
Knoxville, TN 37919
<https://maps.google.com/?q=2220+Sutherland+AvenueKnoxville,+TN+37919+865&entry=gmail&source=g>
423-260-4172
<https://maps.google.com/?q=2220+Sutherland+AvenueKnoxville,+TN+37919+865&entry=gmail&source=g>
<%28865%29%20684-2828>

Mime
View raw message