apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sas...@apache.org
Subject [02/26] incubator-apex-site git commit: from 7a8804b25247fa64430f6e030a81bc7389b7daf4
Date Fri, 18 Mar 2016 22:13:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e411d993/content/docs/malhar/operators/block_reader/index.html
----------------------------------------------------------------------
diff --git a/content/docs/malhar/operators/block_reader/index.html b/content/docs/malhar/operators/block_reader/index.html
new file mode 100644
index 0000000..71d6a4b
--- /dev/null
+++ b/content/docs/malhar/operators/block_reader/index.html
@@ -0,0 +1,400 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>Block Reader - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "Block Reader";
+    var mkdocs_page_input_path = "operators/block_reader.md";
+    var mkdocs_page_url = "/operators/block_reader/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../kafkaInputOperator/">Kafka Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_splitter/">File Splitter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">Block Reader</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a href="#block-reader">Block Reader</a></li>
+                
+                    <li><a class="toctree-l4" href="#why-is-it-needed">Why is it needed?</a></li>
+                
+                    <li><a class="toctree-l4" href="#class-diagram">Class Diagram</a></li>
+                
+                    <li><a class="toctree-l4" href="#abstractblockreader">AbstractBlockReader</a></li>
+                
+                    <li><a class="toctree-l4" href="#example-application">Example Application</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#abstractfsreadaheadlinereader">AbstractFSReadAheadLineReader</a></li>
+                
+                    <li><a class="toctree-l4" href="#readaheadlinereadercontext">ReadAheadLineReaderContext</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#abstractfslinereader">AbstractFSLineReader</a></li>
+                
+                    <li><a class="toctree-l4" href="#linereadercontext">LineReaderContext</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#fsslicereader">FSSliceReader</a></li>
+                
+                    <li><a class="toctree-l4" href="#fixedbytesreadercontext">FixedBytesReaderContext</a></li>
+                
+                    <li><a class="toctree-l4" href="#configuration_1">Configuration</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#partitioner-and-statslistener">Partitioner and StatsListener</a></li>
+                
+                    <li><a class="toctree-l4" href="#processstats">processStats </a></li>
+                
+                    <li><a class="toctree-l4" href="#definepartitions">definePartitions</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_output/">File Output</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>Block Reader</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="block-reader">Block Reader</h1>
+<p>This is a scalable operator that reads and parses blocks of data sources into records. A data source can be a file or a message bus that contains records and a block defines a chunk of data in the source by specifying the block offset and the length of the source belonging to the block. </p>
+<h2 id="why-is-it-needed">Why is it needed?</h2>
+<p>A Block Reader is needed to parallelize reading and parsing of a single data source, for example a file. Simple parallelism of reading data sources can be achieved by multiple partitions reading different source of same type (for files see <a href="https://github.com/apache/incubator-apex-malhar/blob/devel-3/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java">AbstractFileInputOperator</a>) but Block Reader partitions can read blocks of same source in parallel and parse them for records ensuring that no record is duplicated or missed.</p>
+<h2 id="class-diagram">Class Diagram</h2>
+<p><img alt="BlockReader class diagram" src="../images/blockreader/classdiagram.png" /></p>
+<h2 id="abstractblockreader">AbstractBlockReader</h2>
+<p>This is the abstract implementation that serves as the base for different types of data sources. It defines how a block metadata is processed. The flow diagram below describes the processing of a block metadata.</p>
+<p><img alt="BlockReader flow diagram" src="../images/blockreader/flowdiagram.png" /></p>
+<h3 id="ports">Ports</h3>
+<ul>
+<li>
+<p>blocksMetadataInput: input port on which block metadata are received.</p>
+</li>
+<li>
+<p>blocksMetadataOutput: output port on which block metadata are emitted if the port is connected. This port is useful when a downstream operator that receives records from block reader may also be interested to know the details of the corresponding blocks.</p>
+</li>
+<li>
+<p>messages: output port on which tuples of type <code>com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord</code> are emitted. This class encapsulates a <code>record</code> and the <code>blockId</code> of the corresponding block.</p>
+</li>
+</ul>
+<h3 id="readercontext">readerContext</h3>
+<p>This is one of the most important fields in the block reader. It is of type <code>com.datatorrent.lib.io.block.ReaderContext</code> and is responsible for fetching bytes that make a record. It also lets the reader know how many total bytes were consumed which may not be equal to the total bytes in a record because consumed bytes also include bytes for the record delimiter which may not be a part of the actual record.</p>
+<p>Once the reader creates an input stream for the block (or uses the previous opened stream if the current block is successor of the previous block) it initializes the reader context by invoking <code>readerContext.initialize(stream, blockMetadata, consecutiveBlock);</code>. Initialize method is where any implementation of <code>ReaderContext</code> can perform all the operations which have to be executed just before reading the block or create states which are used during the lifetime of reading the block.</p>
+<p>Once the initialization is done, <code>readerContext.next()</code> is called repeatedly until it returns <code>null</code>. It is left to the <code>ReaderContext</code> implementations to decide when a block is completely processed. In cases when a record is split across adjacent blocks, reader context may decide to read ahead of the current block boundary to completely fetch the split record (examples- <code>LineReaderContext</code> and <code>ReadAheadLineReaderContext</code>). In other cases when there isn't a possibility of split record (example- <code>FixedBytesReaderContext</code>), it returns <code>null</code> immediately when the block boundary is reached. The return type of <code>readerContext.next()</code> is of type <code>com.datatorrent.lib.io.block.ReaderContext.Entity</code> which is just a wrapper for a <code>byte[]</code> that represents the record and total bytes used in fetching the record.</p>
+<h3 id="abstract-methods">Abstract methods</h3>
+<ul>
+<li>
+<p><code>STREAM setupStream(B block)</code>: creating a stream for a block is dependent on the type of source which is not known to AbstractBlockReader. Sub-classes which deal with a specific data source provide this implementation.</p>
+</li>
+<li>
+<p><code>R convertToRecord(byte[] bytes)</code><a name="convertToRecord"></a>: this converts the array of bytes into the actual instance of record type.</p>
+</li>
+</ul>
+<h3 id="auto-scalability">Auto-scalability</h3>
+<p>Block reader can auto-scale, that is, depending on the backlog (total number of all the blocks which are waiting in the <code>blocksMetadataInput</code> port queue of all partitions) it can create more partitions or reduce them. Details are discussed in the last section which covers the <a href="#partitioning">partitioner and stats-listener</a>.</p>
+<h3 id="configuration">Configuration</h3>
+<ol>
+<li><a name="maxReaders"></a><strong>maxReaders</strong>: when auto-scaling is enabled, this controls the maximum number of block reader partitions that can be created.</li>
+<li><a name="minReaders"></a><strong>minReaders</strong>: when auto-scaling is enabled, this controls the minimum number of block reader partitions that should always exist.</li>
+<li><a name="collectStats"></a><strong>collectStats</strong>: this enables or disables auto-scaling. When it is set to <code>true</code> the stats (number of blocks in the queue) are collected and this triggers partitioning; otherwise auto-scaling is disabled.</li>
+<li><strong>intervalMillis</strong>: when auto-scaling is enabled, this specifies the interval at which the reader will trigger the logic of computing the backlog and auto-scale.</li>
+</ol>
+<h2 id="abstractfsblockreader"><a name="AbstractFSBlockReader"></a> AbstractFSBlockReader</h2>
+<p>This abstract implementation deals with files. Different types of file systems that are implementations of <code>org.apache.hadoop.fs.FileSystem</code> are supported. The user can override <code>getFSInstance()</code> method to create an instance of a specific <code>FileSystem</code>. By default, filesystem instance is created from the filesytem URI that comes from the default hadoop configuration.</p>
+<pre><code class="java">protected FileSystem getFSInstance() throws IOException
+{
+  return FileSystem.newInstance(configuration);
+}
+</code></pre>
+
+<p>It uses this filesystem instance to setup a stream of type <code>org.apache.hadoop.fs.FSDataInputStream</code> to read the block.</p>
+<pre><code class="java">@Override
+protected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata block) throws IOException
+{
+  return fs.open(new Path(block.getFilePath()));
+}
+</code></pre>
+
+<p>All the ports and configurations are derived from the super class. It doesn't provide an implementation of <a href="#convertToRecord"><code>convertToRecord(byte[] bytes)</code></a> method which is delegated to concrete sub-classes.</p>
+<h3 id="example-application">Example Application</h3>
+<p>This simple dag demonstrates how any concrete implementation of <code>AbstractFSBlockReader</code> can be plugged into an application. </p>
+<p><img alt="Application with FSBlockReader" src="../images/blockreader/fsreaderexample.png" /></p>
+<p>In the above application, file splitter creates block metadata for files which are sent to block reader. Partitions of the block reader parses the file blocks for records which are filtered, transformed and then persisted to a file (created per block). Therefore block reader is parallel partitioned with the 2 downstream operators - filter/converter and record output operator. The code which implements this dag is below.</p>
+<pre><code class="java">public class ExampleApplication implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    FileSplitterInput input = dag.addOperator(&quot;File-splitter&quot;, new FileSplitterInput());
+    //any concrete implementation of AbstractFSBlockReader based on the use-case can be added here.
+    LineReader blockReader = dag.addOperator(&quot;Block-reader&quot;, new LineReader());
+    Filter filter = dag.addOperator(&quot;Filter&quot;, new Filter());
+    RecordOutputOperator recordOutputOperator = dag.addOperator(&quot;Record-writer&quot;, new RecordOutputOperator());
+
+    dag.addStream(&quot;file-block metadata&quot;, input.blocksMetadataOutput, blockReader.blocksMetadataInput);
+    dag.addStream(&quot;records&quot;, blockReader.messages, filter.input);
+    dag.addStream(&quot;filtered-records&quot;, filter.output, recordOutputOperator.input);
+  }
+
+  /**
+   * Concrete implementation of {@link AbstractFSBlockReader} for which a record is a line in the file.
+   */
+  public static class LineReader extends AbstractFSBlockReader.AbstractFSReadAheadLineReader&lt;String&gt;
+  {
+
+    @Override
+    protected String convertToRecord(byte[] bytes)
+    {
+      return new String(bytes);
+    }
+  }
+
+  /**
+   * Considers any line starting with a '.' as invalid. Emits the valid records.
+   */
+  public static class Filter extends BaseOperator
+  {
+    public final transient DefaultOutputPort&lt;AbstractBlockReader.ReaderRecord&lt;String&gt;&gt; output = new DefaultOutputPort&lt;&gt;();
+    public final transient DefaultInputPort&lt;AbstractBlockReader.ReaderRecord&lt;String&gt;&gt; input = new DefaultInputPort&lt;AbstractBlockReader.ReaderRecord&lt;String&gt;&gt;()
+    {
+      @Override
+      public void process(AbstractBlockReader.ReaderRecord&lt;String&gt; stringRecord)
+      {
+        //filter records and transform
+        //if the string starts with a '.' ignore the string.
+        if (!StringUtils.startsWith(stringRecord.getRecord(), &quot;.&quot;)) {
+          output.emit(stringRecord);
+        }
+      }
+    };
+  }
+
+  /**
+   * Persists the valid records to corresponding block files.
+   */
+  public static class RecordOutputOperator extends AbstractFileOutputOperator&lt;AbstractBlockReader.ReaderRecord&lt;String&gt;&gt;
+  {
+    @Override
+    protected String getFileName(AbstractBlockReader.ReaderRecord&lt;String&gt; tuple)
+    {
+      return Long.toHexString(tuple.getBlockId());
+    }
+
+    @Override
+    protected byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord&lt;String&gt; tuple)
+    {
+      return tuple.getRecord().getBytes();
+    }
+  }
+}
+</code></pre>
+
+<p>Configuration to parallel partition block reader with its downstream operators.</p>
+<pre><code class="xml">  &lt;property&gt;
+    &lt;name&gt;dt.operator.Filter.port.input.attr.PARTITION_PARALLEL&lt;/name&gt;
+    &lt;value&gt;true&lt;/value&gt;
+  &lt;/property&gt;
+  &lt;property&gt;
+    &lt;name&gt;dt.operator.Record-writer.port.input.attr.PARTITION_PARALLEL&lt;/name&gt;
+    &lt;value&gt;true&lt;/value&gt;
+  &lt;/property&gt;
+</code></pre>
+
+<h2 id="abstractfsreadaheadlinereader">AbstractFSReadAheadLineReader</h2>
+<p>This extension of <a href="#AbstractFSBlockReader"><code>AbstractFSBlockReader</code></a> parses lines from a block and binds the <code>readerContext</code> field to an instance of <code>ReaderContext.ReadAheadLineReaderContext</code>.</p>
+<p>It is abstract because it doesn't provide an implementation of <a href="#convertToRecord"><code>convertToRecord(byte[] bytes)</code></a> since the user may want to convert the bytes that make a line into some other type. </p>
+<h3 id="readaheadlinereadercontext">ReadAheadLineReaderContext</h3>
+<p>In order to handle a line split across adjacent blocks, ReadAheadLineReaderContext always reads beyond the block boundary and ignores the bytes till the first end-of-line character of all the blocks except the first block of the file. This ensures that no line is missed or incomplete.</p>
+<p>This is one of the most common ways of handling a split record. It doesn't require any further information to decide if a line is complete. However, the cost of this consistent way to handle a line split is that it always reads from the next block.</p>
+<h2 id="abstractfslinereader">AbstractFSLineReader</h2>
+<p>Similar to <code>AbstractFSReadAheadLineReader</code>, even this parses lines from a block. However, it binds the <code>readerContext</code> field to an instance of <code>ReaderContext.LineReaderContext</code>.</p>
+<h3 id="linereadercontext">LineReaderContext</h3>
+<p>This handles the line split differently from <code>ReadAheadLineReaderContext</code>. It doesn't always read from the next block. If the end of the last line is aligned with the block boundary then it stops processing the block. It does read from the next block when the boundaries are not aligned, that is, last line extends beyond the block boundary. The result of this is an inconsistency in reading the next block.</p>
+<p>When the boundary of the last line of the previous block was aligned with its block, then the first line of the current block is a valid line. However, in the other case the bytes from the block start offset to the first end-of-line character should be ignored. Therefore, this means that any record formed by this reader context has to be validated. For example, if the lines are of fixed size then size of each record can be validated or if each line begins with a special field then that knowledge can be used to check if a record is complete.</p>
+<p>If the validations of completeness fails for a line then <a href="#convertToRecord"><code>convertToRecord(byte[] bytes)</code></a> should return null.</p>
+<h2 id="fsslicereader">FSSliceReader</h2>
+<p>A concrete extension of <a href="#AbstractFSBlockReader"><code>AbstractFSBlockReader</code></a> that reads fixed-size <code>byte[]</code> from a block and emits the byte array wrapped in <code>com.datatorrent.netlet.util.Slice</code>.</p>
+<p>This operator binds the <code>readerContext</code> to an instance of <code>ReaderContext.FixedBytesReaderContext</code>.</p>
+<h3 id="fixedbytesreadercontext">FixedBytesReaderContext</h3>
+<p>This implementation of <code>ReaderContext</code> never reads beyond a block boundary which can result in the last <code>byte[]</code> of a block to be of a shorter length than the rest of the records.</p>
+<h3 id="configuration_1">Configuration</h3>
+<p><strong>readerContext.length</strong>: length of each record. By default, this is initialized to the default hdfs block size.</p>
+<h2 id="partitioner-and-statslistener">Partitioner and StatsListener</h2>
+<p>The logical instance of the block reader acts as the Partitioner (unless a custom partitioner is set using the operator attribute - <code>PARTITIONER</code>) as well as a StatsListener. This is because the 
+<code>AbstractBlockReader</code> implements both the <code>com.datatorrent.api.Partitioner</code> and <code>com.datatorrent.api.StatsListener</code> interfaces and provides an implementation of <code>definePartitions(...)</code> and <code>processStats(...)</code> which make it auto-scalable.</p>
+<h3 id="processstats">processStats <a name="processStats"></a></h3>
+<p>The application master invokes <code>Response processStats(BatchedOperatorStats stats)</code> method on the logical instance with the stats (<code>tuplesProcessedPSMA</code>, <code>tuplesEmittedPSMA</code>, <code>latencyMA</code>, etc.) of each partition. The data which this operator is interested in is the <code>queueSize</code> of the input port <code>blocksMetadataInput</code>.</p>
+<p>Usually the <code>queueSize</code> of an input port gives the count of waiting control tuples plus data tuples. However, if a stats listener is interested only in the count of data tuples then that can be expressed by annotating the class with <code>@DataQueueSize</code>. In this case <code>AbstractBlockReader</code> itself is the <code>StatsListener</code> which is why it is annotated with <code>@DataQueueSize</code>.</p>
+<p>The logical instance caches the queue size per partition and at regular intervals (configured by <code>intervalMillis</code>) sums these values to find the total backlog which is then used to decide whether re-partitioning is needed. The flow-diagram below describes this logic.</p>
+<p><img alt="Processing of total-backlog" src="../images/blockreader/totalBacklogProcessing.png" /></p>
+<p>The goal of this logic is to create as many partitions within bounds (see <a href="#maxReaders"><code>maxReaders</code></a> and <a href="#minReaders"><code>minReaders</code></a> above) to quickly reduce this backlog or if the backlog is small then remove any idle partitions.</p>
+<h3 id="definepartitions">definePartitions</h3>
+<p>Based on the <code>repartitionRequired</code> field of the <code>Response</code> object which is returned by <em><a href="#processStats">processStats</a></em> method, the application master invokes </p>
+<pre><code class="java">Collection&lt;Partition&lt;AbstractBlockReader&lt;...&gt;&gt;&gt; definePartitions(Collection&lt;Partition&lt;AbstractBlockReader&lt;...&gt;&gt;&gt; partitions, PartitioningContext context)
+</code></pre>
+
+<p>on the logical instance which is also the partitioner instance. The implementation calculates the difference between required partitions and the existing count of partitions. If this difference is negative, then equivalent number of partitions are removed otherwise new partitions are created. </p>
+<p>Please note auto-scaling can be disabled by setting <a href="#collectStats"><code>collectStats</code></a> to <code>false</code>. If the use-case requires only static partitioning, then that can be achieved by setting <a href="https://github.com/chandnisingh/incubator-apex-core/blob/master/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java"><code>StatelessPartitioner</code></a> as the operator attribute- <code>PARTITIONER</code> on the block reader.</p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
+      
+        <a href="../file_output/" class="btn btn-neutral float-right" title="File Output">Next <span class="icon icon-circle-arrow-right"></span></a>
+      
+      
+        <a href="../file_splitter/" class="btn btn-neutral" title="File Splitter"><span class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
+</footer>
+	  
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../file_splitter/" style="color: #fcfcfc;">&laquo; Previous</a></span>
+      
+      
+        <span style="margin-left: 15px"><a href="../file_output/" style="color: #fcfcfc">Next &raquo;</a></span>
+      
+    </span>
+</div>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e411d993/content/docs/malhar/operators/file_output/index.html
----------------------------------------------------------------------
diff --git a/content/docs/malhar/operators/file_output/index.html b/content/docs/malhar/operators/file_output/index.html
new file mode 100644
index 0000000..dea0703
--- /dev/null
+++ b/content/docs/malhar/operators/file_output/index.html
@@ -0,0 +1,331 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>File Output - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "File Output";
+    var mkdocs_page_input_path = "operators/file_output.md";
+    var mkdocs_page_url = "/operators/file_output/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../kafkaInputOperator/">Kafka Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_splitter/">File Splitter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../block_reader/">Block Reader</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">File Output</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a href="#abstractfileoutputoperator">AbstractFileOutputOperator</a></li>
+                
+                    <li><a class="toctree-l4" href="#persisting-data-to-files">Persisting data to files</a></li>
+                
+                    <li><a class="toctree-l4" href="#automatic-rotation">Automatic rotation</a></li>
+                
+                    <li><a class="toctree-l4" href="#fault-tolerance">Fault-tolerance</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>File Output</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="abstractfileoutputoperator">AbstractFileOutputOperator</h1>
+<p>The abstract file output operator in Apache Apex Malhar library &mdash; <a href="https://github.com/apache/incubator-apex-malhar/blob/devel-3/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java"><code>AbstractFileOutputOperator</code></a> writes streaming data to files. The main features of this operator are:</p>
+<ol>
+<li>Persisting data to files.</li>
+<li>Automatic rotation of files based on:<br />
+  a. maximum length of a file.<br />
+  b. time-based rotation where time is specified using a count of application windows.</li>
+<li>Fault-tolerance.</li>
+<li>Compression and encryption of data before it is persisted.</li>
+</ol>
+<p>In this tutorial we will cover the details of the basic structure and implementation of all the above features in <code>AbstractFileOutputOperator</code>. Configuration items related to each feature are discussed as they are introduced in the section of that feature.</p>
+<h2 id="persisting-data-to-files">Persisting data to files</h2>
+<p>The principal function of this operator is to persist tuples to files efficiently. These files are created under a specific directory on the file system. The relevant configuration item is:</p>
+<p><strong>filePath</strong>: path specifying the directory where files are written.</p>
+<p>Different types of file system that are implementations of <code>org.apache.hadoop.fs.FileSystem</code> are supported. The file system instance which is used for creating streams is constructed from the <code>filePath</code> URI.</p>
+<pre><code class="java">FileSystem.newInstance(new Path(filePath).toUri(), new Configuration())
+</code></pre>
+
+<p>Tuples may belong to different files therefore expensive IO operations like creating multiple output streams, flushing of data to disk, and closing streams are handled carefully.</p>
+<h3 id="ports">Ports</h3>
+<ul>
+<li><code>input</code>: the input port on which tuples to be persisted are received.</li>
+</ul>
+<h3 id="streamscache"><code>streamsCache</code></h3>
+<p>This transient state caches output streams per file in memory. The file to which the data is appended may change with incoming tuples. It will be highly inefficient to keep re-opening streams for a file just because tuples for that file are interleaved with tuples for another file. Therefore, the operator maintains a cache of limited size with open output streams.</p>
+<p><code>streamsCache</code> is of type <code>com.google.common.cache.LoadingCache</code>. A <code>LoadingCache</code> has an attached <code>CacheLoader</code> which is responsible to load value of a key when the key is not present in the cache. Details are explained here- <a href="https://github.com/google/guava/wiki/CachesExplained">CachesExplained</a>.</p>
+<p>The operator constructs this cache in <code>setup(...)</code>. It is built with the following configuration items:</p>
+<ul>
+<li><strong>maxOpenFiles</strong>: maximum size of the cache. The cache evicts entries that haven't been used recently when the cache size is approaching this limit. <em>Default</em>: 100</li>
+<li><strong>expireStreamAfterAcessMillis</strong>: expires streams after the specified duration has passed since the stream was last accessed. <em>Default</em>: value of attribute- <code>OperatorContext.SPIN_MILLIS</code>.</li>
+</ul>
+<p>An important point to note here is that the guava cache does not perform cleanup and evict values asynchronously, that is, instantly after a value expires. Instead, it performs small amounts of maintenance during write operations, or during occasional read operations if writes are rare.</p>
+<h4 id="cacheloader">CacheLoader</h4>
+<p><code>streamsCache</code> is created with a <code>CacheLoader</code> that opens an <code>FSDataOutputStream</code> for a file which is not in the cache. The output stream is opened in either <code>append</code> or <code>create</code> mode and the basic logic to determine this is explained by the simple diagram below.</p>
+<p><img alt="Opening an output stream" src="../images/fileoutput/diagram1.png" /></p>
+<p>This process gets complicated when fault-tolerance (writing to temporary files)  and rotation is added.</p>
+<p>Following are few configuration items used for opening the streams:</p>
+<ul>
+<li><strong>replication</strong>: specifies the replication factor of the output files. <em>Default</em>: <code>fs.getDefaultReplication(new Path(filePath))</code></li>
+<li><strong>filePermission</strong>: specifies the permission of the output files. The permission is an octal number similar to that used by the Unix chmod command. <em>Default</em>: 0777</li>
+</ul>
+<h4 id="removallistener">RemovalListener</h4>
+<p>A <code>Guava</code> cache also allows specification of removal listener which can perform some operation when an entry is removed from the cache. Since <code>streamsCache</code> is of limited size and also has time-based expiry enabled, it is imperative that when a stream is evicted from the cache it is closed properly. Therefore, we attach a removal listener to <code>streamsCache</code> which closes the stream when it is evicted.</p>
+<h3 id="setupoperatorcontext-context"><code>setup(OperatorContext context)</code></h3>
+<p>During setup the following main tasks are performed:</p>
+<ol>
+<li>FileSystem instance is created.</li>
+<li>The cache of streams is created.</li>
+<li>Files are recovered (see Fault-tolerance section).</li>
+<li>Stray part files are cleaned (see Automatic rotation section).</li>
+</ol>
+<h3 id="processtupleinput-tuple"><a name="processTuple"></a><code>processTuple(INPUT tuple)</code></h3>
+<p>The code snippet below highlights the basic steps of processing a tuple.</p>
+<pre><code class="java">protected void processTuple(INPUT tuple)
+{  
+  //which file to write to is derived from the tuple.
+  String fileName = getFileName(tuple);  
+
+  //streamsCache is queried for the output stream. If the stream is already opened then it is returned immediately otherwise the cache loader creates one.
+  FilterOutputStream fsOutput = streamsCache.get(fileName).getFilterStream();
+
+  byte[] tupleBytes = getBytesForTuple(tuple);
+
+  fsOutput.write(tupleBytes);
+}
+</code></pre>
+
+<h3 id="endwindow"><a name="endWindow"></a>endWindow()</h3>
+<p>It should be noted that while processing a tuple we do not flush the stream after every write. Since flushing is expensive it is done periodically for all the open streams in the operator's <code>endWindow()</code>.</p>
+<pre><code class="java">Map&lt;String, FSFilterStreamContext&gt; openStreams = streamsCache.asMap();
+for (FSFilterStreamContext streamContext: openStreams.values()) {
+  ...
+  //this flushes the stream
+  streamContext.finalizeContext();
+  ...
+}
+</code></pre>
+
+<p><code>FSFilterStreamContext</code> will be explained with compression and encryption.</p>
+<h3 id="teardown"><a name="teardown"></a>teardown()</h3>
+<p>When any operator in a DAG fails then the application master invokes <code>teardown()</code> for that operator and its downstream operators. In <code>AbstractFileOutputOperator</code> we have a bunch of open streams in the cache and the operator (acting as HDFS client) holds leases for all the corresponding files. It is important to release these leases for clean re-deployment. Therefore, we try to close all the open streams in <code>teardown()</code>.</p>
+<h2 id="automatic-rotation">Automatic rotation</h2>
+<p>In a streaming application where data is being continuously processed, when this output operator is used, data will be continuously written to an output file. The users may want to be able to take the data from time to time to use it, copy it out of Hadoop or do some other processing. Having all the data in a single file makes it difficult as the user needs to keep track of how much data has been read from the file each time so that the same data is not read again. Also users may already have processes and scripts in place that work with full files and not partial data from a file.</p>
+<p>To help solve these problems the operator supports creating many smaller files instead of writing to just one big file. Data is written to a file and when some condition is met the file is finalized and data is written to a new file. This is called file rotation. The user can determine when the file gets rotated. Each of these files is called a part file as they contain portion of the data.</p>
+<h3 id="part-filename">Part filename</h3>
+<p>The filename for a part file is formed by using the original file name and the part number. The part number starts from 0 and is incremented each time a new part file created. The default filename has the format, assuming origfile represents the original filename and partnum represents the part number,</p>
+<p><code>origfile.partnum</code></p>
+<p>This naming scheme can be changed by the user. It can be done so by overriding the following method</p>
+<pre><code class="java">protected String getPartFileName(String fileName, int part)
+</code></pre>
+
+<p>This method is passed the original filename and part number as arguments and should return the part filename.</p>
+<h3 id="mechanisms">Mechanisms</h3>
+<p>The user has a couple of ways to specify when a file gets rotated. First is based on size and second on time. In the first case the files are limited by size and in the second they are rotated by time.</p>
+<h4 id="size-based">Size Based</h4>
+<p>With size based rotation the user specifies a size limit. Once the size of the currently file reaches this limit the file is rotated. The size limit can be specified by setting the following property</p>
+<p><code>maxLength</code></p>
+<p>Like any other property this can be set in Java application code or in the property file.</p>
+<h4 id="time-based">Time Based</h4>
+<p>In time based rotation user specifies a time interval. This interval is specified as number of application windows. The files are rotated periodically once the specified number of application windows have elapsed. Since the interval is application window based it is not always exactly constant time. The interval can be specified using the following property</p>
+<p><code>rotationWindows</code></p>
+<h3 id="setupoperatorcontext-context_1"><code>setup(OperatorContext context)</code></h3>
+<p>When an operator is being started there may be stray part files and they need to be cleaned up. One common scenario, when these could be present, is in the case of failure, where a node running the operator failed and a previous instance of the operator was killed. This cleanup and other initial processing for the part files happens in the operator setup. The following diagram describes this process</p>
+<p><img alt="Rotation setup" src="../images/fileoutput/FileRotation.png" /></p>
+<h2 id="fault-tolerance">Fault-tolerance</h2>
+<p>There are two issues that should be addressed in order to make the operator fault-tolerant:</p>
+<ol>
+<li>
+<p>The operator flushes data to the filesystem every application window. This implies that after a failure when the operator is re-deployed and tuples of a window are replayed, then duplicate data will be saved to the files. This is handled by recording how much the operator has written to each file every window in a state that is checkpointed and truncating files back to the recovery checkpoint after re-deployment.</p>
+</li>
+<li>
+<p>While writing to HDFS, if the operator gets killed and didn't have the opportunity to close a file, then later when it is redeployed it will attempt to truncate/restore that file. Restoring a file may fail because the lease that the previous process (operator instance before failure) had acquired from namenode to write to a file may still linger and therefore there can be exceptions in acquiring the lease again by the new process (operator instance after failure). This is handled by always writing data to temporary files and renaming these files to actual files when a file is finalized (closed) for writing, that is, we are sure that no more data will be written to it. The relevant configuration item is:  </p>
+</li>
+<li><strong>alwaysWriteToTmp</strong>: enables/disables writing to a temporary file. <em>Default</em>: true.</li>
+</ol>
+<p>Most of the complexity in the code comes from making this operator fault-tolerant.</p>
+<h3 id="checkpointed-states-needed-for-fault-tolerance">Checkpointed states needed for fault-tolerance</h3>
+<ul>
+<li>
+<p><code>endOffsets</code>: contains the size of each file as it is being updated by the operator. It helps the operator to restore a file during recovery in operator <code>setup(...)</code> and is also used while loading a stream to find out if the operator has seen a file before.</p>
+</li>
+<li>
+<p><code>fileNameToTmpName</code>: contains the name of the temporary file per actual file. It is needed because the name of a temporary file is random. They are named based on the timestamp when the stream is created. During recovery the operator needs to know the temp file which it was writing to and if it needs restoration then it creates a new temp file and updates this mapping.</p>
+</li>
+<li>
+<p><code>finalizedFiles</code>: contains set of files which were requested to be finalized per window id.</p>
+</li>
+<li>
+<p><code>finalizedPart</code>: contains the latest <code>part</code> of each file which was requested to be finalized.</p>
+</li>
+</ul>
+<p>The use of <code>finalizedFiles</code> and <code>finalizedPart</code> are explained in detail under <a href="#requestFinalize"><code>requestFinalize(...)</code></a> method.</p>
+<h3 id="recovering-files">Recovering files</h3>
+<p>When the operator is re-deployed, it checks in its <code>setup(...)</code> method if the state of a file which it has seen before the failure is consistent with the file's state on the file system, that is, the size of the file on the file system should match the size in the <code>endOffsets</code>. When it doesn't the operator truncates the file.</p>
+<p>For example, let's say the operator wrote 100 bytes to test1.txt by the end of window 10. It wrote another 20 bytes by the end of window 12 but failed in window 13. When the operator gets re-deployed it is restored with window 10 (recovery checkpoint) state. In the previous run, by the end of window 10, the size of file on the filesystem was 100 bytes but now it is 120 bytes. Tuples for windows 11 and 12 are going to be replayed. Therefore, in order to avoid writing duplicates to test1.txt, the operator truncates the file to 100 bytes (size at the end of window 10) discarding the last 20 bytes.</p>
+<h3 id="requestfinalizestring-filename"><a name="requestFinalize"></a><code>requestFinalize(String fileName)</code></h3>
+<p>When the operator is always writing to temporary files (in order to avoid HDFS Lease exceptions), then it is necessary to rename the temporary files to the actual files once it has been determined that the files are closed. This is refered to as <em>finalization</em> of files and the method allows the user code to specify when a file is ready for finalization.</p>
+<p>In this method, the requested file (or in the case of rotation &mdash; all the file parts including the latest open part which have not yet been requested for finalization) are registered for finalization. Registration is basically adding the file names to <code>finalizedFiles</code> state and updating <code>finalizedPart</code>.</p>
+<p>The process of <em>finalization</em> of all the files which were requested till the window <em>w</em> is deferred till window <em>w</em> is committed. This is because until a window is committed it can be replayed after a failure which means that a file can be open for writing even after it was requested for finalization.</p>
+<p>When rotation is enabled, part files as and when they get completed are requested for finalization. However, when rotation is not enabled user code needs to invoke this method as the knowledge that when a file is closed is unknown to this abstract operator.</p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
+      
+      
+        <a href="../block_reader/" class="btn btn-neutral" title="Block Reader"><span class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
+</footer>
+	  
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../block_reader/" style="color: #fcfcfc;">&laquo; Previous</a></span>
+      
+      
+    </span>
+</div>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e411d993/content/docs/malhar/operators/file_splitter/index.html
----------------------------------------------------------------------
diff --git a/content/docs/malhar/operators/file_splitter/index.html b/content/docs/malhar/operators/file_splitter/index.html
new file mode 100644
index 0000000..824b554
--- /dev/null
+++ b/content/docs/malhar/operators/file_splitter/index.html
@@ -0,0 +1,352 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>File Splitter - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "File Splitter";
+    var mkdocs_page_input_path = "operators/file_splitter.md";
+    var mkdocs_page_url = "/operators/file_splitter/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../kafkaInputOperator/">Kafka Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">File Splitter</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a href="#file-splitter">File Splitter</a></li>
+                
+                    <li><a class="toctree-l4" href="#why-is-it-needed">Why is it needed?</a></li>
+                
+                    <li><a class="toctree-l4" href="#class-diagram">Class Diagram</a></li>
+                
+                    <li><a class="toctree-l4" href="#abstractfilesplitter">AbstractFileSplitter</a></li>
+                
+                    <li><a class="toctree-l4" href="#filesplitterbase">FileSplitterBase</a></li>
+                
+                    <li><a class="toctree-l4" href="#filesplitterinput">FileSplitterInput</a></li>
+                
+                    <li><a class="toctree-l4" href="#handling-of-split-records">Handling of split records</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../block_reader/">Block Reader</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_output/">File Output</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>File Splitter</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="file-splitter">File Splitter</h1>
+<p>This is a simple operator whose main function is to split a file virtually and create metadata describing the files and the splits. </p>
+<h2 id="why-is-it-needed">Why is it needed?</h2>
+<p>It is a common operation to read a file and parse it. This operation can be parallelized by having multiple partitions of such operators and each partition operating on different files. However, at times when a file is large then a single partition reading it can become a bottleneck.
+In these cases, throughput can be increased if instances of the partitioned operator can read and parse non-overlapping sets of file blocks. This is where file splitter comes in handy. It creates metadata of blocks of file which serves as tasks handed out to downstream operator partitions. 
+The downstream partitions can read/parse the block without the need of interacting with other partitions.</p>
+<h2 id="class-diagram">Class Diagram</h2>
+<p><img alt="FileSplitter class dierarchy" src="../images/filesplitter/classdiagram.png" /></p>
+<h2 id="abstractfilesplitter">AbstractFileSplitter</h2>
+<p>The abstract implementation defines the logic of processing <code>FileInfo</code>. This comprises the following tasks -  </p>
+<ul>
+<li>
+<p>building <code>FileMetadata</code> per file and emitting it. This metadata contains the file information such as filepath, no. of blocks in it, length of the file, all the block ids, etc.</p>
+</li>
+<li>
+<p>creating <code>BlockMetadataIterator</code> from <code>FileMetadata</code>. The iterator lazy-loads the block metadata when needed. We use an iterator because the no. of blocks in a file can be huge if the block size is small and loading all of them at once in memory may cause out of memory errors.</p>
+</li>
+<li>
+<p>retrieving <code>BlockMetadata.FileBlockMetadata</code> from the block metadata iterator and emitting it. The FileBlockMetadata contains the block id, start offset of the block, length of file in the block, etc. The number of block metadata emitted per window are controlled by <code>blocksThreshold</code> setting which by default is 1.  </p>
+</li>
+</ul>
+<p>The main utility method that performs all the above tasks is the <a href="#process_method"><code>process()</code></a> method. Concrete implementations can invoke this method whenever they have data to process.</p>
+<h3 id="ports">Ports</h3>
+<p>Declares only output ports on which file metadata and block metadata are emitted.</p>
+<ul>
+<li>filesMetadataOutput: metadata for each file is emitted on this port. </li>
+<li>blocksMetadataOutput: metadata for each block is emitted on this port. </li>
+</ul>
+<h3 id="process-method"><a name="process_method"></a><code>process()</code> method</h3>
+<p>When process() is invoked, any pending blocks from the current file are emitted on the 'blocksMetadataOutput' port. If the threshold for blocks per window is still not met then a new input file is processed - corresponding metadata is emitted on 'filesMetadataOutput' and more of its blocks are emitted. This operation is repeated until the <code>blocksThreshold</code> is reached or there are no more new files.</p>
+<pre><code class="java">  protected void process()
+  {
+    if (blockMetadataIterator != null &amp;&amp; blockCount &lt; blocksThreshold) {
+      emitBlockMetadata();
+    }
+
+    FileInfo fileInfo;
+    while (blockCount &lt; blocksThreshold &amp;&amp; (fileInfo = getFileInfo()) != null) {
+      if (!processFileInfo(fileInfo)) {
+        break;
+      }
+    }
+  }
+</code></pre>
+
+<h3 id="abstract-methods">Abstract methods</h3>
+<ul>
+<li>
+<p><code>FileInfo getFileInfo()</code>: called from within the <code>process()</code> and provides the next file to process.</p>
+</li>
+<li>
+<p><code>long getDefaultBlockSize()</code>: provides the block size which is used when user hasn't configured the size.</p>
+</li>
+<li>
+<p><code>FileStatus getFileStatus(Path path)</code>: provides the <code>org.apache.hadoop.fs.FileStatus</code> instance for a path.   </p>
+</li>
+</ul>
+<h3 id="configuration">Configuration</h3>
+<ol>
+<li><strong>blockSize</strong>: size of a block.</li>
+<li><strong>blocksThreshold</strong><a name="blocksThreshold"></a>: threshold on the number of blocks emitted by file splitter every window. This setting is used for throttling the work for downstream operators.</li>
+</ol>
+<h2 id="filesplitterbase">FileSplitterBase</h2>
+<p>Simple operator that receives tuples of type <code>FileInfo</code> on its <code>input</code> port. <code>FileInfo</code> contains the information (currently just the file path) about the file which this operator uses to create file metadata and block metadata.</p>
+<h3 id="example-application">Example application</h3>
+<p>This is a simple sub-dag that demonstrates how FileSplitterBase can be plugged into an application.
+<img alt="Application with FileSplitterBase" src="../images/filesplitter/baseexample.png" /></p>
+<p>The upstream operator emits tuples of type <code>FileInfo</code> on its output port which is connected to splitter input port. The downstream receives tuples of type <code>BlockMetadata.FileBlockMetadata</code> from the splitter's block metadata output port.</p>
+<pre><code class="java">public class ApplicationWithBaseSplitter implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    JMSInput input = dag.addOperator(&quot;Input&quot;, new JMSInput());
+    FileSplitterBase splitter = dag.addOperator(&quot;Splitter&quot;, new FileSplitterBase());
+    FSSliceReader blockReader = dag.addOperator(&quot;BlockReader&quot;, new FSSliceReader());
+    ...
+    dag.addStream(&quot;file-info&quot;, input.output, splitter.input);
+    dag.addStream(&quot;block-metadata&quot;, splitter.blocksMetadataOutput, blockReader.blocksMetadataInput);
+    ...
+  }
+
+  public static class JMSInput extends AbstractJMSInputOperator&lt;AbstractFileSplitter.FileInfo&gt;
+  {
+
+    public final transient DefaultOutputPort&lt;AbstractFileSplitter.FileInfo&gt; output = new DefaultOutputPort&lt;&gt;();
+
+    @Override
+    protected AbstractFileSplitter.FileInfo convert(Message message) throws JMSException
+    {
+      //assuming the message is a text message containing the absolute path of the file.
+      return new AbstractFileSplitter.FileInfo(null, ((TextMessage)message).getText());
+    }
+
+    @Override
+    protected void emit(AbstractFileSplitter.FileInfo payload)
+    {
+      output.emit(payload);
+    }
+  }
+}
+</code></pre>
+
+<h3 id="ports_1">Ports</h3>
+<p>Declares an input port on which it receives tuples from the upstream operator. Output ports are inherited from AbstractFileSplitter.</p>
+<ul>
+<li>input: non optional port on which tuples of type <code>FileInfo</code> are received.</li>
+</ul>
+<h3 id="configuration_1">Configuration</h3>
+<ol>
+<li><strong>file</strong>: path of the file from which the filesystem is inferred. FileSplitter creates an instance of <code>org.apache.hadoop.fs.FileSystem</code> which is why this path is needed.  </li>
+</ol>
+<pre><code>FileSystem.newInstance(new Path(file).toUri(), new Configuration());
+</code></pre>
+
+<p>The fs instance is then used to fetch the default block size and <code>org.apache.hadoop.fs.FileStatus</code> for each file path.</p>
+<h2 id="filesplitterinput">FileSplitterInput</h2>
+<p>This is an input operator that discovers files itself. The scanning of the directories for new files is asynchronous which is handled by <code>TimeBasedDirectoryScanner</code>. The function of TimeBasedDirectoryScanner is to periodically scan specified directories and find files which were newly added or modified. The interaction between the operator and the scanner is depicted in the diagram below.</p>
+<p><img alt="Interaction between operator and scanner" src="../images/filesplitter/sequence.png" /></p>
+<h3 id="example-application_1">Example application</h3>
+<p>This is a simple sub-dag that demonstrates how FileSplitterInput can be plugged into an application.</p>
+<p><img alt="Application with FileSplitterInput" src="../images/filesplitter/inputexample.png" /></p>
+<p>Splitter is the input operator here that sends block metadata to the downstream BlockReader.</p>
+<pre><code class="java">  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    FileSplitterInput input = dag.addOperator(&quot;Input&quot;, new FileSplitterInput());
+    FSSliceReader reader = dag.addOperator(&quot;Block Reader&quot;, new FSSliceReader());
+    ...
+    dag.addStream(&quot;block-metadata&quot;, input.blocksMetadataOutput, reader.blocksMetadataInput);
+    ...
+  }
+
+</code></pre>
+
+<h3 id="ports_2">Ports</h3>
+<p>Since it is an input operator there are no input ports and output ports are inherited from AbstractFileSplitter.</p>
+<h3 id="configuration_2">Configuration</h3>
+<ol>
+<li><strong>scanner</strong>: the component that scans directories asynchronously. It is of type <code>com.datatorrent.lib.io.fs.FileSplitter.TimeBasedDirectoryScanner</code>. The basic implementation of TimeBasedDirectoryScanner can be customized by users.  </li>
+</ol>
+<p>a. <strong>files</strong>: comma separated list of directories to scan.  </p>
+<p>b. <strong>recursive</strong>: flag that controls whether the directories should be scanned recursively.  </p>
+<p>c. <strong>scanIntervalMillis</strong>: interval specified in milliseconds after which another scan iteration is triggered.  </p>
+<p>d. <strong>filePatternRegularExp</strong>: regular expression for accepted file names.  </p>
+<p>e. <strong>trigger</strong>: a flag that triggers a scan iteration instantly. If the scanner thread is idling then it will initiate a scan immediately otherwise if a scan is in progress, then the new iteration will be triggered immediately after the completion of current one.
+2. <strong>idempotentStorageManager</strong>: by default FileSplitterInput is idempotent. 
+Idempotency ensures that the operator will process the same set of files/blocks in a window if it has seen that window previously, i.e., before a failure. For example, let's say the operator completed window 10 and failed somewhere between window 11. If the operator gets restored at window 10 then it will process the same file/block again in window 10 which it did in the previous run before the failure. Idempotency is important but comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. Therefore, if one doesn't care about idempotency then they can set this property to be an instance of <code>com.datatorrent.lib.io.IdempotentStorageManager.NoopIdempotentStorageManager</code>.</p>
+<h2 id="handling-of-split-records">Handling of split records</h2>
+<p>Splitting of files to create tasks for downstream operator needs to be a simple operation that doesn't consume a lot of resources and is fast. This is why the file splitter doesn't open files to read. The downside of that is if the file contains records then a record may split across adjacent blocks. Handling of this is left to the downstream operator.</p>
+<p>We have created Block readers in Apex-malhar library that handle line splits efficiently. The 2 line readers- <code>AbstractFSLineReader</code> and <code>AbstractFSReadAheadLineReader</code> can be found here <a href="https://github.com/apache/incubator-apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java">AbstractFSBlockReader</a>.</p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
+      
+        <a href="../block_reader/" class="btn btn-neutral float-right" title="Block Reader">Next <span class="icon icon-circle-arrow-right"></span></a>
+      
+      
+        <a href="../kafkaInputOperator/" class="btn btn-neutral" title="Kafka Input"><span class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
+</footer>
+	  
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../kafkaInputOperator/" style="color: #fcfcfc;">&laquo; Previous</a></span>
+      
+      
+        <span style="margin-left: 15px"><a href="../block_reader/" style="color: #fcfcfc">Next &raquo;</a></span>
+      
+    </span>
+</div>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e411d993/content/docs/malhar/operators/images/blockreader/classdiagram.png
----------------------------------------------------------------------
diff --git a/content/docs/malhar/operators/images/blockreader/classdiagram.png b/content/docs/malhar/operators/images/blockreader/classdiagram.png
new file mode 100644
index 0000000..8fbd6fc
Binary files /dev/null and b/content/docs/malhar/operators/images/blockreader/classdiagram.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e411d993/content/docs/malhar/operators/images/blockreader/flowdiagram.png
----------------------------------------------------------------------
diff --git a/content/docs/malhar/operators/images/blockreader/flowdiagram.png b/content/docs/malhar/operators/images/blockreader/flowdiagram.png
new file mode 100644
index 0000000..1b2897d
Binary files /dev/null and b/content/docs/malhar/operators/images/blockreader/flowdiagram.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e411d993/content/docs/malhar/operators/images/blockreader/fsreaderexample.png
----------------------------------------------------------------------
diff --git a/content/docs/malhar/operators/images/blockreader/fsreaderexample.png b/content/docs/malhar/operators/images/blockreader/fsreaderexample.png
new file mode 100644
index 0000000..571b60a
Binary files /dev/null and b/content/docs/malhar/operators/images/blockreader/fsreaderexample.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e411d993/content/docs/malhar/operators/images/blockreader/totalBacklogProcessing.png
----------------------------------------------------------------------
diff --git a/content/docs/malhar/operators/images/blockreader/totalBacklogProcessing.png b/content/docs/malhar/operators/images/blockreader/totalBacklogProcessing.png
new file mode 100644
index 0000000..2ed481f
Binary files /dev/null and b/content/docs/malhar/operators/images/blockreader/totalBacklogProcessing.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e411d993/content/docs/malhar/operators/images/fileoutput/FileRotation.png
----------------------------------------------------------------------
diff --git a/content/docs/malhar/operators/images/fileoutput/FileRotation.png b/content/docs/malhar/operators/images/fileoutput/FileRotation.png
new file mode 100644
index 0000000..624c96e
Binary files /dev/null and b/content/docs/malhar/operators/images/fileoutput/FileRotation.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e411d993/content/docs/malhar/operators/images/fileoutput/diagram1.png
----------------------------------------------------------------------
diff --git a/content/docs/malhar/operators/images/fileoutput/diagram1.png b/content/docs/malhar/operators/images/fileoutput/diagram1.png
new file mode 100644
index 0000000..0a260de
Binary files /dev/null and b/content/docs/malhar/operators/images/fileoutput/diagram1.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e411d993/content/docs/malhar/operators/images/filesplitter/baseexample.png
----------------------------------------------------------------------
diff --git a/content/docs/malhar/operators/images/filesplitter/baseexample.png b/content/docs/malhar/operators/images/filesplitter/baseexample.png
new file mode 100644
index 0000000..6af2b44
Binary files /dev/null and b/content/docs/malhar/operators/images/filesplitter/baseexample.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e411d993/content/docs/malhar/operators/images/filesplitter/classdiagram.png
----------------------------------------------------------------------
diff --git a/content/docs/malhar/operators/images/filesplitter/classdiagram.png b/content/docs/malhar/operators/images/filesplitter/classdiagram.png
new file mode 100644
index 0000000..6490368
Binary files /dev/null and b/content/docs/malhar/operators/images/filesplitter/classdiagram.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e411d993/content/docs/malhar/operators/images/filesplitter/inputexample.png
----------------------------------------------------------------------
diff --git a/content/docs/malhar/operators/images/filesplitter/inputexample.png b/content/docs/malhar/operators/images/filesplitter/inputexample.png
new file mode 100644
index 0000000..65e199f
Binary files /dev/null and b/content/docs/malhar/operators/images/filesplitter/inputexample.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e411d993/content/docs/malhar/operators/images/filesplitter/sequence.png
----------------------------------------------------------------------
diff --git a/content/docs/malhar/operators/images/filesplitter/sequence.png b/content/docs/malhar/operators/images/filesplitter/sequence.png
new file mode 100644
index 0000000..85cf702
Binary files /dev/null and b/content/docs/malhar/operators/images/filesplitter/sequence.png differ

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e411d993/content/docs/malhar/operators/images/kafkainput/image00.png
----------------------------------------------------------------------
diff --git a/content/docs/malhar/operators/images/kafkainput/image00.png b/content/docs/malhar/operators/images/kafkainput/image00.png
new file mode 100644
index 0000000..0fa00e8
Binary files /dev/null and b/content/docs/malhar/operators/images/kafkainput/image00.png differ


Mime
View raw message