nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bryan Bende <bbe...@gmail.com>
Subject Re: Bulk inserting into HBase with NiFi
Date Thu, 08 Jun 2017 20:52:33 GMT
Mike,

Just out of curiosity, what would the original data for your example
look like that produced that JSON?

Is it a CSV with two lines, like:

ABC, XYZ
DEF, LMN

and then ExecuteScript is turning that into the JSON array?


As far as reading the JSON, I created a simple flow of GeneratFlowFile
-> ConvertRecord -> LogAttribute  where ConvertRecord uses the
JsonPathReader with $.value

https://gist.github.com/bbende/3789a6907a9af09aa7c32413040e7e2b

LogAttribute ends up logging:

[ {
  "value" : "XYZ"
}, {
  "value" : "LMN"
} ]

Which seems correct given that its reading in the JSON with a schema
that only has the field "value" in it.

Let me know if that is not what you are looking for.



On Thu, Jun 8, 2017 at 4:13 PM, Mike Thomsen <mikerthomsen@gmail.com> wrote:
> Bryan,
>
> I have the processor somewhat operational now, but I'm running into a
> problem with the record readers. What I've done is basically this:
>
> Ex. JSON:
>
> [
>    {
>        "key": "ABC", "value": "XYZ"
>    },
>    {
>        "key": "DEF", "value": "LMN"
>    }
> ]
>
> Avro schema:
>
> {
> "type": "record",
> "name": "GenomeRecord",
> "fields": [{
> "name": "value",
> "type": "string"
> },
> ]
> }
>
> 1. ExecuteScript iterates over a line and builds a JSON array as mentioned
> above.
> 2. PutHBaseRecord is wired to use a JsonPathReader that uses an
> AvroSchemaRegistry.
>   - I put a lot of logging in and can verify it is identifying the schema
> based on the attribute on the flowfile and looking at the appropriate field
> while looping over the Record to turn it into a serializable form for a Put.
>   - All I get are nulls.
> 3. My JsonPath has been variously $.value and $[*].value. It just does not
> seem to want to parse that JSON.
>
> The strategy I was going for is to use the "key" attribute in each JSON
> object to set the row key for the Put.
>
> Any ideas would be great.
>
> Thanks,
>
> Mike
>
> On Wed, Jun 7, 2017 at 4:40 PM, Bryan Bende <bbende@gmail.com> wrote:
>>
>> Mike,
>>
>> Glad to hear that the record API looks promising for what you are trying
>> to do!
>>
>> Here are a couple of thoughts, and please correct me if I am not
>> understanding your flow correctly...
>>
>> We should be able to make a generic PutHBaseRecord processor that uses
>> any record reader to read the incoming flow file and then converts
>> each record directly into a PutFlowFile (more on this in a minute).
>>
>> Once we have PutHBaseRecord, then there may be no need for you to
>> convert your data from CSV to JSON (unless there is another reason I
>> am missing) because you can send your CSV data directly into
>> PutHBaseRecord configured with a CSVRecordReader.
>>
>> If you are doing other processing/enrichment while going from CSV to
>> JSON, then you may be able to achieve some of the same things with
>> processors like UpdateRecord, PartitionRecord, and LookupRecord.
>> Essentially keeping the initial CSV intact and treating it like
>> records through the entire flow.
>>
>> Now back to PutHBaseRecord and the question of how to go from a Record
>> to a PutFlowFile...
>>
>> We basically need to know the rowId, column family, and then a list of
>> column-qualifier/value pairs. I haven't fully though this through yet,
>> but...
>>
>> For the row id, we could have a similar strategy as PutHBaseJson,
>> where the value comes from a "Row Id" property in the processor or
>> from a "Row Id Record Path" which would evaluate the record path
>> against the record and use that value for the row id.
>>
>> For column family, we could probably do the same as above, where it
>> could be from a property or a record path.
>>
>> For the list of column-qualifier/value pairs, we can loop over all
>> fields in the record (skipping the row id and family if using record
>> fields) and then convert each one into a PutColumn. The bulk of the
>> work here is going to be taking the value of a field and turning it
>> into an appropriate byte[], so you'll likely want to use the type of
>> the field to cast into an appropriate Java type and then figure out
>> how to represent that as bytes.
>>
>> I know this was a lot of information, but I hope this helps, and let
>> me know if anything is not making sense.
>>
>> Thanks,
>>
>> Bryan
>>
>>
>> On Wed, Jun 7, 2017 at 3:56 PM, Mike Thomsen <mikerthomsen@gmail.com>
>> wrote:
>> > Yeah, it's really getting hammered by the small files. I took a look at
>> > the
>> > new record APIs and that looked really promising. In fact, I'm taking a
>> > shot
>> > at creating a variant of PutHBaseJSON that uses the record API. Look
>> > fairly
>> > straight forward so far. My strategy is roughly like this:
>> >
>> > GetFile -> SplitText -> ExecuteScript -> RouteOnAttribute ->
>> > PutHBaseJSONRecord
>> >
>> > ExecuteScript generates a larger flowfile that contains a structure like
>> > this now:
>> >
>> > [
>> >   { "key": "XYZ", "value": "ABC" }
>> > ]
>> >
>> >
>> > My intention is to have a JsonPathReader take that bigger flowfile which
>> > is
>> > a JSON array and iterate over it as a bunch of records to turn into Puts
>> > with the new HBase processor. I'm borrowing some code for wiring in the
>> > reader from the QueryRecord processor.
>> >
>> > So my only question now is, what is the best way to serialize the Record
>> > objects to JSON? The PutHBaseJson processor already has a Jackson setup
>> > internally. Any suggestions on doing this in a way that doesn't tie me
>> > at
>> > the hip to a particular reader implementation?
>> >
>> > Thanks,
>> >
>> > Mike
>> >
>> >
>> > On Wed, Jun 7, 2017 at 6:12 PM, Bryan Bende <bbende@gmail.com> wrote:
>> >>
>> >> Mike,
>> >>
>> >> Just following up on this...
>> >>
>> >> I created this JIRA to track the idea of record-based HBase processors:
>> >> https://issues.apache.org/jira/browse/NIFI-4034
>> >>
>> >> Also wanted to mention that with the existing processors, the main way
>> >> to scale up would be to increase the concurrent tasks on PutHBaseJson
>> >> and also to increase the Batch Size property which defaults to 25. The
>> >> Batch Size controls the maximum number of flow files that a concurrent
>> >> task will attempt to pull from the queue and send to HBase in one put
>> >> operation.
>> >>
>> >> Even with those tweaks your flow may still be getting hammered with
>> >> lots of small flow files, but thought I would mention to see if it
>> >> helps at all.
>> >>
>> >> -Bryan
>> >>
>> >>
>> >> On Tue, Jun 6, 2017 at 7:40 PM, Bryan Bende <bbende@gmail.com> wrote:
>> >> > Mike,
>> >> >
>> >> > With the recent record-oriented processors that have come out
>> >> > recently,
>> >> > a
>> >> > good solution would be to implement a PutHBaseRecord processor that
>> >> > would
>> >> > have a Record Reader configured. This way the processor could read
in
>> >> > a
>> >> > large CSV without having to convert to individual JSON documents.
>> >> >
>> >> > One thing to consider is how many records/puts to send in a single
>> >> > call
>> >> > to
>> >> > HBase. Assuming multi-GB csv files you'll want to send portions at
a
>> >> > time to
>> >> > avoid having the whole content in memory (some kind of record batch
>> >> > size
>> >> > property), but then you also have to deal with what happens when
>> >> > things
>> >> > fail
>> >> > half way through. If the puts are idempotent then it may be fine to
>> >> > route
>> >> > the whole to failure and try again even if some data was already
>> >> > inserted.
>> >> >
>> >> > Feel free to create a JIRA for hbase record processors, or I can do
>> >> > it
>> >> > later.
>> >> >
>> >> > Hope that helps.
>> >> >
>> >> > -Bryan
>> >> >
>> >> >
>> >> > On Tue, Jun 6, 2017 at 7:21 PM Mike Thomsen <mikerthomsen@gmail.com>
>> >> > wrote:
>> >> >>
>> >> >> We have a very large body of CSV files (well over 1TB) that need
to
>> >> >> be
>> >> >> imported into HBase. For a single 20GB segment, we are looking
at
>> >> >> having to
>> >> >> push easily 100M flowfiles into HBase and most of the JSON files
>> >> >> generated
>> >> >> are rather small (like 20-250 bytes).
>> >> >>
>> >> >> It's going very slowly, and I assume that is because we're taxing
>> >> >> the
>> >> >> disk
>> >> >> very heavily because of the content and provenance repositories
>> >> >> coming
>> >> >> into
>> >> >> play. So I'm wondering if anyone has a suggestion on a good
>> >> >> NiFiesque
>> >> >> way of
>> >> >> solving this. Right now, I'm considering two options:
>> >> >>
>> >> >> 1. Looking for a way to inject the HBase controller service into
an
>> >> >> ExecuteScript processor so I can handle the data in large chunks
>> >> >> (splitting
>> >> >> text and generating a List<Put> inside the processor myself
and
>> >> >> doing
>> >> >> one
>> >> >> huge Put)
>> >> >>
>> >> >> 2. Creating a library that lets me generate HFiles from within
an
>> >> >> ExecuteScript processor.
>> >> >>
>> >> >> What I really need is something fast within NiFi that would let
me
>> >> >> generate huge blocks of updates for HBase and push them out. Any
>> >> >> ideas?
>> >> >>
>> >> >> Thanks,
>> >> >>
>> >> >> Mike
>> >> >
>> >> > --
>> >> > Sent from Gmail Mobile
>> >
>> >
>
>

Mime
View raw message