nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From olegz <>
Subject [GitHub] nifi pull request: NIFI-1028 initial commit of NiFi In Depth docum...
Date Mon, 18 Apr 2016 18:02:45 GMT
Github user olegz commented on a diff in the pull request:
    --- Diff: nifi-docs/src/main/asciidoc/nifi-in-depth.adoc ---
    @@ -0,0 +1,209 @@
    +// Licensed to the Apache Software Foundation (ASF) under one or more
    +// contributor license agreements.  See the NOTICE file distributed with
    +// this work for additional information regarding copyright ownership.
    +// The ASF licenses this file to You under the Apache License, Version 2.0
    +// (the "License"); you may not use this file except in compliance with
    +// the License.  You may obtain a copy of the License at
    +// Unless required by applicable law or agreed to in writing, software
    +// distributed under the License is distributed on an "AS IS" BASIS,
    +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +// See the License for the specific language governing permissions and
    +// limitations under the License.
    +Apache NiFi In Depth
    +Apache NiFi Team <>
    +This advanced level document is aimed at providing an in-depth look at the implementation
and design decisions of NiFi. It assumes the reader has read enough of the other documentation
to know the basics of NiFi.
    +FlowFiles are at the heart of NiFi and its flow-based design. A FlowFile is just a collection
of attributes and a pointer to content, which is associated with one or more provenance events.
The attributes are key/value pairs that act as the metadata for the FlowFile, such as the
FlowFile filename. The content is the actual data or the payload of the file. Provenance is
a record of what’s happened to the FlowFile. Each one of these parts has its own repository
(repo) for storage.
    +One key aspect of the repositories is immutability. The content in the Content Repository
and data within the FlowFile Repository are immutable. When a change occurs to the attributes
of a FlowFile new copies of the attributes are created in memory and then persisted on disk.
When content is being changed for a given FlowFile its original content is read, streamed
through the transform, and written to a new stream. Then the FlowFile's content pointer is
updated to the new location on disk. As a result, the default approach for FlowFile content
storage can be said to be an immutable versioned content store.  The benefits of which are
many including substantial reduction in storage space required for the typical complex graphs
of processing, natural replay capability, takes advantage of OS caching, reduces random read/write
performance hits, and is easy to reason over. The previous revisions are kept according to
the archiving properties set in file and outlined in
  the NiFi System Administrator’s Guide.
    +== Repositories
    +There are three repositories that are utilized by NiFi. Each exists within the OS/Host's
file system and provides specific functionality. In order to fully understand FlowFiles and
how they are used by the underlying system it's important to know about these repositories.
All three repositories are directories on local storage that NiFi uses to persist data.
    +- The FlowFile Repository contains metadata for all the current FlowFiles in the flow.
    +- The Content Repository holds the content for current and past FlowFiles.
    +- The Provenance Repository holds the history of FlowFiles.
    +image::NiFiArchitecture.png["NiFi Architecture Diagram"]
    +=== FlowFile Repository
    +FlowFiles that are actively being processed by the system is held in a hash map in the
JVM memory (more about that in "Deeper View: FlowFiles in Memory and on Disk"). This makes
it very efficient to process them, but requires a secondary mechanism to provide durability
of data across process restarts due to any number of reasons. Reasons such as power loss,
kernel panics, system upgrades, and maintenance cycles. The FlowFile Repository is a "Write-Ahead
Log" (or data record) of the metadata of each of the FlowFiles that currently exist in the
system. This FlowFile metadata includes all the attributes associated with the FlowFile, a
pointer to the actual content of the FlowFile (which exists in the Content Repo) and the state
of the FlowFile, such as which Connection/Queue the FlowFile belongs in. This Write-Ahead
Log provides NiFi the resiliency it needs to handle restarts and unexpected system failures.
    +The FlowFile Repository acts as NiFi's Write-Ahead Log, so as the FlowFiles are flowing
through the system each change is logged in the FlowFile Repository before it happens as a
transactional unit of work. This allows the system to know exactly what step the node is on
when processing a piece of data. If the node goes down while processing the data, it can easily
resume from where it left off upon restart (more in-depth in "Effect of System Failure on
Transactions"). The format of the FlowFiles in the log is a series of deltas (or changes)
that happened along the way. NiFi recovers a FlowFile by restoring a “snapshot” of the
FlowFile (created when the Repository is check-pointed) and then replaying each of these deltas.
    +A snapshot is automatically taken periodically by the system, which creates a new snapshot
for each FlowFile. The system computes a new base checkpoint by serializing each FlowFile
in the hash map and writing it to disk with the filename ".partial". As the checkpointing
proceeds, the new FlowFile baselines are written to the ".partial" file. Once the checkpointing
is done the old "snapshot" file is deleted and the ".partial" file is renamed "snapshot".
    +The period between system checkpoints is configurable in the file (documented
in the NiFi System Administrator's Guide). The default is a two-minute interval.
    +===== Effect of System Failure on Transactions
    +NiFi protects against hardware and system failures by keeping a record of what was happening
on each node at that time in their respective FlowFile Repo. As mentioned above, the FlowFile
Repo is NiFi's Write-Ahead Log. When the node comes back online, it works to restore its state
by first checking for the "snapshot" and ".partial" files. The node either accepts the "snapshot"
and deletes the ".partial" (if it exits), or renames the ".partial" file to "snapshot" if
the "snapshot" file doesn't exist.
    +If the Node was in the middle of writing content when it went down, nothing is corrupted,
thanks to the Copy On Write (mentioned below) and Immutability (mentioned above) paradigms.
Since FlowFile transactions never modify the original content (pointed to by the content pointer),
the original is safe. When NiFi goes down, the write claim for the change is orphaned and
then cleaned up by the background garbage collection. This provides a “rollback” to the
last known stable state.
    +The Node then restores its state from the FlowFile. For a more in-depth, step-by-step
explanation of the process, see this link:
    +This setup, in terms of transactional units of work, allows NiFi to be very resilient
in the face of adversity, ensuring that even if NiFi is suddenly killed, it can pick back
up without any loss of data.
    +===== Deeper View: FlowFiles in Memory and on Disk
    +The term "FlowFile" is a bit of a misnomer. This would lead one to believe that each
FlowFile corresponds to a file on disk, but that is not true. There are two main locations
that the FlowFile attributes exist, the Write-Ahead Log that is explained above and a hash
map in working memory. This hash map has a reference to all of the FlowFiles actively being
used in the Flow. The object referenced by this map is the same one that is used by processors
and held in connections queues. Since the FlowFile object is held in memory, all which has
to be done for the Processor to get the FlowFile is to ask the ProcessSession to grab it from
the queue.
    +When a change occurs to the FlowFile, the delta is written out to the Write-Ahead Log
and the object in memory is modified accordingly. This allows the system to quickly work with
FlowFiles while also keeping track of what has happened and what will happen when the session
is committed. This provides a very robust and durable system.
    +There is also the notion of "swapping" FlowFiles. This occurs when the number of FlowFiles
in a connection queue exceeds the value set in the "nifi.queue.swap.threshold" property. The
FlowFiles with the lowest priority in the connection queue are serialized and written to disk
in a "swap file" in batches of 10,000. These FlowFiles are then removed from the hash map
mentioned above and the connection queue is in charge of determining when to swap the files
back into memory. When the FlowFiles are swapped out, the FlowFile repo is notified and it
keeps a list of the swap files. When the system is checkpointed the snapshot includes a section
for swapped out files. When swap files are swapped back in, the FlowFiles are added back into
the hash map. This swapping technique, much like the swapping performed by most Operating
Systems, allows NiFi to provide very fast access to FlowFiles that are actively being processed
while still allowing many millions of FlowFiles to exist in the Fl
 ow without depleting the system’s memory.
    +=== Content Repository
    +The Content Repository is simply a place in local storage where the content of all FlowFiles
exists and it is typically the largest of the three Repositories. As mentioned in the introductory
section, this repository utilizes the immutability and copy-on-write paradigms to maximize
speed and thread-safety. The core design decision influencing the Content Repo is to hold
the FlowFile's content on disk and only read it into JVM memory when it's needed. This allows
NiFi to handle tiny and massive sized objects without requiring producer and consumer processors
to hold the full objects in memory. As a result, actions like splitting, aggregating, and
transforming very large objects are quite easy to do without harming memory.
    +In the same way the JVM Heap has a garbage collection process to reclaim unreachable
objects when space is needed, there exists a dedicated thread in NiFi to analyze the Content
repo for un-used content (more info in the " Deeper View: Deletion After Checkpointing" section).
After a FlowFile's content is identified as no longer in use it will either be deleted or
archived. If archiving is enabled in then the FlowFile’s content will exist
in the Content Repo either until it is aged off (deleted after a certain amount of time) or
deleted due to the Content Repo taking up too much space.  The conditions for archiving and/or
deleting are configured in the file ("nifi.content.repository.archive.max.retention.period",
"nifi.content.repository.archive.max.usage.percentage") and outlined in the Admin guide. Refer
to the "Data Egress" section for more information on the deletion of content.
    +===== Deeper View: Content Claim
    +In general, when talking about a FlowFile, the reference to its content can simply be
referred to as a "pointer" to the content. Though, the underlying implementation of the FlowFile
Content reference has multiple layers of complexity. The Content Repository is made up of
a collection of files on disk. These files are binned into Containers and Sections. A Section
is a subdirectory of a Container. A Container can be thought of as a “root directory”
for the Content Repository. The Content Repository, though, can be made up of many Containers.
This is done so that NiFi can take advantage of multiple physical partitions in parallel.”
NiFi is then capable of reading from, and writing to, all of these disks in parallel, in order
to achieve data rates of hundreds of Megabytes or even Gigabytes per second of disk throughput
on a single node. "Resource Claims" are Java objects that point to specific files on disk
(this is done by keeping track of the file ID, the section the file 
 is in, and the container the section is a part of).
    +To keep track of the FlowFile's contents, the FlowFile has a "Content Claim" object.
This Content Claim has a reference to the Resource Claim that contains the content, the offset
of the content within the file, and the length of the content. To access the content, the
Content Repository drills down using to the specific file on disk using the Resource Claim's
properties and then seeks to the offset specified by the Resource Claim before streaming content
from the file.
    +This layer of abstraction (Resource Claim) was done so that there is not a file on disk
for the content of every FlowFile. The concept of immutability is key to this being possible.
Since the content is never changed once it is written ("copy on write" is used to make changes),
there is no fragmentation of memory or moving data if the content of a FlowFile changes. By
utilizing a single file on disk to hold the content of many FlowFiles, NiFi is able to provide
far better throughput, often approaching the maximum data rates provided by the disks.
    +=== Provenance Repository
    +The Provenance Repository is where the history of each FlowFile is stored. This history
is used to provide the Data Lineage (also known as the Chain of Custody) of each piece of
data. Each time that an event occurs for a FlowFile (FlowFile is created, forked, cloned,
modified, etc.) a new provenance event is created. This provenance event is a snapshot of
the FlowFile as it looked and fit in the flow that existed at that point in time. When a provenance
event is created, it copies all the FlowFile's attributes and the pointer to the FlowFile's
content and aggregates that with the FlowFile's state (such as its relationship with other
provenance events) to one location in the Provenance Repo. This snapshot will not change,
with the exception of the data being expired. The Provenance Repository holds all of these
provenance events for a period of time after completion, as specified in the
    +Because all of the FlowFile attributes and the a pointer to the content are kept in the
Provenance Repository, a Dataflow Manager is able to not only see the lineage, or processing
history, of that piece of data, but is also able to later view the data itself and even replay
the data from any point in the flow. A common use-case for this is when a particular down-stream
system claims to have not received the data. The data lineage can show exactly when the data
was delivered to the downstream system, what the data looked like, the filename, and the URL
that the data was sent to – or can confirm that the data was indeed never sent. In either
case, the Send event can be replayed with the click of a button (or by accessing the appropriate
HTTP API endpoint) in order to resend the data only to that particular downstream system.
Alternatively, if the data was not handled properly (perhaps some data manipulation should
have occurred first), the flow can be fixed and then the data ca
 n be replayed into the new flow, in order to process the data properly.
    +Keep in mind, though, that since Provenance is not copying the content in the Content
Repo, and just copying the FlowFile's pointer to the content, the content could be deleted
before the provenance event that references it is deleted. This would mean that the user would
no longer able to see the content or replay the FlowFile later on. However, users are still
able to view the FlowFile’s lineage and understand what happened to the data. For instance,
even though the data itself will not be accessible, the user is still able to see the unique
identifier of the data, its filename (if applicable), when it was received, where it was received
from, how it was manipulated, where it was sent, and so on. Additionally, since the FlowFile’s
attributes are made available, a Dataflow Manager is able to understand why the data was processed
in the way that it was, providing a crucial tool for understanding and debugging the dataflow.
    +Note: Since provenance events are snapshots of the FlowFile, as it exists in the current
flow, changes to the flow may impact the ability to replay provenance events later on. For
example, if a Connection is deleted from the flow, the data cannot be replayed from that point
in the flow, since there is now nowhere to enqueue the data for processing.
    +For a look at the design decisions behind the Provenance Repository check out this link:
    +===== Deeper View: Provenance Log Files
    +Each provenance event has two maps, one for the attributes before the event and one for
the updated attribute values. In general, provenance events don't store the updated values
of the attributes as they existed when the event was emitted but instead, the attribute values
when the session is committed. The events are cached and saved until the session is committed
and once the session is committed the events are emitted with the attributes associated with
the FlowFile when the session is committed. The exception to this rule is the "SEND" event,
in which case the event contains the attributes as they existed when the event was emitted.
This is done because if the attributes themselves were also sent, it is important to have
an accurate account of exactly what information was sent.
    +As NiFi is running, there is a rolling group of 16 provenance log files. As provenance
events are emitted they are written to one of the 16 files (there are multiple files to increase
throughput). The log files are periodically rolled over (the default timeframe is every 30
seconds). This means the newly created provenance events start writing to a new group of 16
log files and the original ones are processed for long term storage. First the rolled over
logs are merged into one file. Then the file is optionally compressed (determined by the "nifi.provenance.repository.compress.on.rollover"
property). Lastly the events are indexed using Lucene and made available for querying. This
batched approach for indexing means provenance events aren't available immediately for querying
but in return this dramatically increases performance because committing a transaction and
indexing are very expensive tasks.
    +A separate thread handles the deletion of provenance logs. The two conditions admins
can set to control the deletion of provenance logs is the max amount of disk space it can
take up and the max retention duration for the logs. The thread sorts the repo by the last
modified date and deletes the oldest file when one of the conditions is exceeded.
    +The Provenance Repo is a Lucene index that is broken into multiple shards. This is done
for multiple reasons. Firstly, Lucene uses a 32-bit integer for the document identifier so
the maximum number of documents supported by Lucene without sharding is limited. Second, if
we know the time range for each shard, it makes it easy to search with multiple threads. Also,
this sharding also allows for more efficient deletion. NiFi waits until all events in a shard
are scheduled for deletion before deleting the entire shard from disk. This makes it so we
do not have to update the Lucene index when we delete.
    +=== General Repository Notes
    +===== Multiple Physical Storage Points
    +For the Provenance and Content repos, there is the option to stripe the information across
multiple physical partitions. An admin would do this if they wanted to federate reads and
writes across multiple disks. The repo (Content or Provenance) is still one logical store
but writes will be striped across multiple volumes/partitions automatically by the system.
The directories are specified in the file.
    +===== Best Practice
    +It is considered a best practice to analyze the contents of a FlowFile as few times as
possible and instead extract key information from the contents into the attributes of the
FlowFile; then read/write information from the FlowFile attributes. One example of this is
the ExtractText processor, which extracts text from the FlowFile Content and puts it as an
attribute so other processors can make use of it. This provides far better performance than
continually processing the entire content of the FlowFile, as the attributes are kept in-memory
and updating the FlowFile repository is much faster than updating the Content repository,
given the amount of data stored in each.
    +== Life of a FlowFile
    +To better understand how the repos interact with one another, the underlying functionality
of NiFi, and the life of a FlowFile; this next section will include examples of a FlowFile
at different points in a real flow. The flow is a template called "WebCrawler.xml" and is
available here:
    +At a high level, this template reaches out to a seed URL configured in the GetHTTP processor
then analyzes the response using the RouteText processor to find instances of a keyword (in
this case "nifi"), and potential URLs to hit. Then InvokeHTTP executes a HTTP Get request
using the URLs found in the original seed web page. The response is routed based on the status
code attribute and only 200-202 status codes are routed back to the original RouteText processor
for analysis.
    +The flow also detects duplicate URLs and prevents processing them again, emails the user
when keywords are found, logs all successful HTTP requests, and bundles up the successful
requests to be compressed and archived on disk.
    +Note: To use this flow you need to configure a couple options. First a DistributedMapCacheServer
controller service must be added with default properties. At the time of writing there was
no way to explicitly add the controller service to the template and since no processors reference
the service it is not included. Also to get emails, the PutEmail processor must be configured
with your email credentials. Finally to use HTTPS the StandardSSLContextService must be configured
with proper key and trust stores. Remember that the truststore must be configured with the
proper Certificate Authorities in order to work for websites. The command below is an example
of using the "keytool" command to add the default Java 1.8.0_60 CAs to a truststore called
    +keytool -importkeystore -srckeystore /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/security/cacerts
 -destkeystore myTrustStore
    +===== WebCrawler Template:
    +Note that it is not uncommon for bulletins with messages such as "Connection timed out"
to appear on the InvokeHttp processor due to the random nature of web crawling.
    +image::WebCrawler.png["Web Crawler Flow"]
    +=== Data Ingress
    +A FlowFile is created in the system when a producer processor invokes "ProcessSession.create()"
followed by an appropriate call to the ProvenanceReporter. The "ProcessSession.create()" call
creates an empty FlowFile with a few core attributes (filename, path and uuid for the standard
process session) but without any content or lineage to parents (the create method is overloaded
to allow parameters for parent FlowFiles). The producer processor then adds the content and
attributes to the FlowFile.
    +ProvenanceReporter is used to emit the Provenance Events for the FlowFile. If the file
is created by NiFi from data not received by an external entity then a "CREATE" event should
be emitted. If instead the data was created from data received from an external source then
a "RECEIVE" event should be emitted. The Provenance Events are made using "ProvenanceReporter.create()"
and "ProvenanceReporter.receive()" respectively.
    +In our WebCrawler flow, the GetHTTP processor creates the initial FlowFile using "ProcessSession.create()"
and records the receipt of data using "ProvenanceReporter.receive()". This method call also
provides the URL from which the data was received, how long it took the transfer the data,
and any FlowFile attributes that were added to the FlowFile. HTTP Headers, for instance, can
be added as FlowFile attributes.
    +image::DataIngress.png["Data Ingress"]
    +=== Pass by Reference
    +An important aspect of flow-based programming is the idea of resource-constrained relationships
between the black boxes. In NiFi these are queues and processors respectively.
    --- End diff --
    For the readers who are familiar with EIP and given that Flow-based programming has many
similarities/origin with EIP I would also mention Claim Check pattern (
and essentially state that NiFi implements an implicit claim-check while handling the content
of the FlowFile.

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

View raw message