asterixdb-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterix-gerrit.ics.uci.edu>
Subject Change in asterixdb[master]: Introduced Local Filesystem Feed Adapter
Date Wed, 02 Sep 2015 09:26:59 GMT
abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/376

Change subject: Introduced Local Filesystem Feed Adapter
......................................................................

Introduced Local Filesystem Feed Adapter

In this change, I have added filesystem based feed adapter. The adapter is aliased push_localfs.
It works in the following way:
When it starts, it starts ingesting available data in directories passed in the adapter
arguments. Once it is done with the existing files, it places a watch on the directories passed.
It is push based and so will only stop using a disconnect feed statement. Faulty records are
dropped and the feed will try to parse the next records.

Change-Id: I707756e3b4c9ffca4b55ec9817a08e5c16333010
---
A asterix-app/data/local-dir/data/even-more.txt
A asterix-app/data/local-dir/data/hello-world
A asterix-app/data/local-dir/data/ignored
A asterix-app/data/local-dir/data/more-records.txt
A asterix-app/data/local-dir/data/records.txt
A asterix-app/src/test/resources/runtimets/queries/external/query1/query1.1.ddl.aql
A asterix-app/src/test/resources/runtimets/queries/external/query1/query1.2.update.aql
A asterix-app/src/test/resources/runtimets/queries/external/query1/query1.3.query.aql
A asterix-app/src/test/resources/runtimets/results/external/query1/query1.1.adm
M asterix-app/src/test/resources/runtimets/testsuite.xml
M asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
A asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LocalFileSystemFeedAdapterFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
M asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
A asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemWatcher.java
A asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFileSystemFeedAdapter.java
A asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFilesystemFeedClient.java
M asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
A asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/NCLocalFilesystemDirecoryInputStream.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
M asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
M asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
M asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
M asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
M asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
26 files changed, 1,097 insertions(+), 115 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/76/376/1

diff --git a/asterix-app/data/local-dir/data/even-more.txt b/asterix-app/data/local-dir/data/even-more.txt
new file mode 100644
index 0000000..2f3c6c9
--- /dev/null
+++ b/asterix-app/data/local-dir/data/even-more.txt
@@ -0,0 +1,3 @@
+45|Steve|50
+46|John|23
+47|Samuel|22
diff --git a/asterix-app/data/local-dir/data/hello-world b/asterix-app/data/local-dir/data/hello-world
new file mode 100644
index 0000000..73e0e49
--- /dev/null
+++ b/asterix-app/data/local-dir/data/hello-world
@@ -0,0 +1,3 @@
+hellow world
+
+
diff --git a/asterix-app/data/local-dir/data/ignored b/asterix-app/data/local-dir/data/ignored
new file mode 100644
index 0000000..0a77cda
--- /dev/null
+++ b/asterix-app/data/local-dir/data/ignored
@@ -0,0 +1 @@
+This file is expected to be ignored.
\ No newline at end of file
diff --git a/asterix-app/data/local-dir/data/more-records.txt b/asterix-app/data/local-dir/data/more-records.txt
new file mode 100644
index 0000000..36fc93c
--- /dev/null
+++ b/asterix-app/data/local-dir/data/more-records.txt
@@ -0,0 +1,22 @@
+23|Steve|50
+24|John|23
+25|Samuel|22
+26|Mary|29
+27|William|75
+28|Sarah|16
+29|Noel|33
+30|Carlos|40
+31|Joseph|45
+32|David|22
+33|Nadine|10
+34|Steve|50
+35|John|23
+36|Samuel|22
+37|Mary|29
+38|William|75
+39|Sarah|16
+40|Noel|33
+41|Carlos|40
+42|Joseph|45
+43|David|22
+44|Nadine|10
diff --git a/asterix-app/data/local-dir/data/records.txt b/asterix-app/data/local-dir/data/records.txt
new file mode 100644
index 0000000..538d7bf
--- /dev/null
+++ b/asterix-app/data/local-dir/data/records.txt
@@ -0,0 +1,22 @@
+1|Steve|50
+2|John|23
+3|Samuel|22
+4|Mary|29
+5|William|75
+6|Sarah|16
+7|Noel|33
+8|Carlos|40
+9|Joseph|45
+10|David|22
+11|Nadine|10
+12|Steve|50
+13|John|23
+14|Samuel|22
+15|Mary|29
+16|William|75
+17|Sarah|16
+18|Noel|33
+19|Carlos|40
+20|Joseph|45
+21|David|22
+22|Nadine|10
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.1.ddl.aql
new file mode 100644
index 0000000..229af78
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.1.ddl.aql
@@ -0,0 +1,22 @@
+/*
+* Description  : Create an external dataset 
+                 that contains records stored in a local directory.
+                 Query the dataset and make sure all the records in
+                 files which match the expression are returned
+* Expected Res : Success
+* Date         : 2015/08/31
+*/
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type EmployeeType as closed {
+ id: int64,
+ name: string,
+ age: int64
+};
+
+create external dataset EmployeeDataset(EmployeeType)
+using localfs
+(("path"="nc1://data/local-dir/data"),("format"="delimited-text"),("delimiter"="|"),("expression"=".*\\.txt"));
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.2.update.aql
new file mode 100644
index 0000000..58f5830
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.2.update.aql
@@ -0,0 +1,8 @@
+/*
+* Description  : Create an external dataset 
+                 that contains records stored in a local directory.
+                 Query the dataset and make sure all the records in
+                 files which match the expression are returned
+* Expected Res : Success
+* Date         : 2015/08/31
+*/
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.3.query.aql
new file mode 100644
index 0000000..a89f8d3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/external/query1/query1.3.query.aql
@@ -0,0 +1,14 @@
+/*
+* Description  : Create an external dataset 
+                 that contains records stored in a local directory.
+                 Query the dataset and make sure all the records in
+                 files which match the expression are returned
+* Expected Res : Success
+* Date         : 2015/08/31
+*/
+
+use dataverse test;
+
+for $x in dataset EmployeeDataset
+order by $x.id asc
+return $x;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/external/query1/query1.1.adm b/asterix-app/src/test/resources/runtimets/results/external/query1/query1.1.adm
new file mode 100644
index 0000000..9a9d74c
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/external/query1/query1.1.adm
@@ -0,0 +1,48 @@
+[ { "id": 1, "name": "Steve", "age": 50 }
+, { "id": 2, "name": "John", "age": 23 }
+, { "id": 3, "name": "Samuel", "age": 22 }
+, { "id": 4, "name": "Mary", "age": 29 }
+, { "id": 5, "name": "William", "age": 75 }
+, { "id": 6, "name": "Sarah", "age": 16 }
+, { "id": 7, "name": "Noel", "age": 33 }
+, { "id": 8, "name": "Carlos", "age": 40 }
+, { "id": 9, "name": "Joseph", "age": 45 }
+, { "id": 10, "name": "David", "age": 22 }
+, { "id": 11, "name": "Nadine", "age": 10 }
+, { "id": 12, "name": "Steve", "age": 50 }
+, { "id": 13, "name": "John", "age": 23 }
+, { "id": 14, "name": "Samuel", "age": 22 }
+, { "id": 15, "name": "Mary", "age": 29 }
+, { "id": 16, "name": "William", "age": 75 }
+, { "id": 17, "name": "Sarah", "age": 16 }
+, { "id": 18, "name": "Noel", "age": 33 }
+, { "id": 19, "name": "Carlos", "age": 40 }
+, { "id": 20, "name": "Joseph", "age": 45 }
+, { "id": 21, "name": "David", "age": 22 }
+, { "id": 22, "name": "Nadine", "age": 10 }
+, { "id": 23, "name": "Steve", "age": 50 }
+, { "id": 24, "name": "John", "age": 23 }
+, { "id": 25, "name": "Samuel", "age": 22 }
+, { "id": 26, "name": "Mary", "age": 29 }
+, { "id": 27, "name": "William", "age": 75 }
+, { "id": 28, "name": "Sarah", "age": 16 }
+, { "id": 29, "name": "Noel", "age": 33 }
+, { "id": 30, "name": "Carlos", "age": 40 }
+, { "id": 31, "name": "Joseph", "age": 45 }
+, { "id": 32, "name": "David", "age": 22 }
+, { "id": 33, "name": "Nadine", "age": 10 }
+, { "id": 34, "name": "Steve", "age": 50 }
+, { "id": 35, "name": "John", "age": 23 }
+, { "id": 36, "name": "Samuel", "age": 22 }
+, { "id": 37, "name": "Mary", "age": 29 }
+, { "id": 38, "name": "William", "age": 75 }
+, { "id": 39, "name": "Sarah", "age": 16 }
+, { "id": 40, "name": "Noel", "age": 33 }
+, { "id": 41, "name": "Carlos", "age": 40 }
+, { "id": 42, "name": "Joseph", "age": 45 }
+, { "id": 43, "name": "David", "age": 22 }
+, { "id": 44, "name": "Nadine", "age": 10 }
+, { "id": 45, "name": "Steve", "age": 50 }
+, { "id": 46, "name": "John", "age": 23 }
+, { "id": 47, "name": "Samuel", "age": 22 }
+ ]
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index feeeefd..5b79a95 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -22,6 +22,13 @@
         ResultOffsetPath="results"
         QueryOffsetPath="queries"
         QueryFileExtension=".aql">
+    <test-group name="external">
+		<test-case FilePath="external">
+			<compilation-unit name="query1">
+				<output-dir compare="Text">query1</output-dir>
+			</compilation-unit>
+		</test-case>
+	</test-group>
     <test-group name="flwor">
         <test-case FilePath="flwor">
             <compilation-unit name="at00">
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
index bb94ce7..0df8f44 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
@@ -40,5 +40,7 @@
     public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
 
     public void close() throws HyracksDataException;
+    
+    public void forceFlush() throws HyracksDataException;
 
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LocalFileSystemFeedAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LocalFileSystemFeedAdapterFactory.java
new file mode 100644
index 0000000..4bcfbdf
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LocalFileSystemFeedAdapterFactory.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.adapter.factory;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
+import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
+import org.apache.asterix.external.dataset.adapter.LocalFileSystemFeedAdapter;
+import org.apache.asterix.external.util.INodeResolver;
+import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class LocalFileSystemFeedAdapterFactory extends StreamBasedAdapterFactory implements IFeedAdapterFactory {
+
+    private static final long serialVersionUID = 1L;
+    private static final String NAME = "push_localfs";
+    private IAType sourceDatatype;
+    private FileSplit[] fileSplits;
+    private ARecordType outputType;
+    private String expression; // Expression to filter files with when the user provides a directory
+
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return configurePartitionConstraint();
+    }
+
+    @Override
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+        return new LocalFileSystemFeedAdapter(configuration, fileSplits, parserFactory, sourceDatatype, ctx,
+                expression);
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+        this.configuration = configuration;
+        this.outputType = outputType;
+        this.expression = (String) configuration.get(AsterixTupleParserFactory.KEY_EXPRESSION);
+        String[] splits = ((String) configuration.get(AsterixTupleParserFactory.KEY_PATH)).split(",");
+        IAType sourceDatatype = (IAType) outputType;
+        configureFileSplits(splits);
+        configureFormat(sourceDatatype);
+    }
+
+    private void configureFileSplits(String[] splits) throws AsterixException {
+        if (fileSplits == null) {
+            fileSplits = new FileSplit[splits.length];
+            String nodeName;
+            String nodeLocalPath;
+            int count = 0;
+            String trimmedValue;
+            for (String splitPath : splits) {
+                trimmedValue = splitPath.trim();
+                if (!trimmedValue.contains("://")) {
+                    throw new AsterixException(
+                            "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
+                }
+                nodeName = trimmedValue.split(":")[0];
+                nodeLocalPath = trimmedValue.split("://")[1];
+                FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
+                fileSplits[count++] = fileSplit;
+            }
+        }
+    }
+
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return outputType;
+    }
+
+    @Override
+    public boolean isRecordTrackingEnabled() {
+        return false;
+    }
+
+    @Override
+    public IIntakeProgressTracker createIntakeProgressTracker() {
+        return null;
+    }
+
+    private AlgebricksPartitionConstraint configurePartitionConstraint() throws AsterixException {
+        String[] locs = new String[fileSplits.length];
+        String location;
+        for (int i = 0; i < fileSplits.length; i++) {
+            location = getNodeResolver().resolveNode(fileSplits[i].getNodeName());
+            locs[i] = location;
+        }
+        return new AlgebricksAbsolutePartitionConstraint(locs);
+    }
+
+    protected INodeResolver getNodeResolver() {
+        if (nodeResolver == null) {
+            nodeResolver = NCFileSystemAdapterFactory.initializeNodeResolver();
+        }
+        return nodeResolver;
+    }
+
+    @Override
+    public InputDataFormat getInputDataFormat() {
+        return InputDataFormat.UNKNOWN;
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 31bd7ab..7d6d43a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -15,7 +15,6 @@
 package org.apache.asterix.external.adapter.factory;
 
 import java.io.File;
-import java.util.List;
 import java.util.Map;
 import java.util.logging.Level;
 
@@ -25,7 +24,6 @@
 import org.apache.asterix.external.util.DNSResolverFactory;
 import org.apache.asterix.external.util.INodeResolver;
 import org.apache.asterix.external.util.INodeResolverFactory;
-import org.apache.asterix.metadata.entities.ExternalFile;
 import org.apache.asterix.metadata.external.IAdapterFactory;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
@@ -33,7 +31,6 @@
 import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
@@ -53,19 +50,18 @@
     private IAType sourceDatatype;
     private FileSplit[] fileSplits;
     private ARecordType outputType;
-
+    // Expression to filter files with when the user provides a directory
+    private String expression;
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(fileSplits, parserFactory, sourceDatatype, ctx);
-        return fsAdapter;
+        return new NCFileSystemAdapter(fileSplits, parserFactory, sourceDatatype, ctx, expression);
     }
 
     @Override
     public String getName() {
         return NC_FILE_SYSTEM_ADAPTER_NAME;
     }
-
 
     @Override
     public SupportedOperation getSupportedOperations() {
@@ -76,11 +72,11 @@
     public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
         this.configuration = configuration;
         this.outputType = outputType;
+        this.expression = (String) configuration.get(AsterixTupleParserFactory.KEY_EXPRESSION);
         String[] splits = ((String) configuration.get(AsterixTupleParserFactory.KEY_PATH)).split(",");
         IAType sourceDatatype = (IAType) outputType;
         configureFileSplits(splits);
         configureFormat(sourceDatatype);
-
     }
 
     @Override
@@ -126,9 +122,11 @@
         return nodeResolver;
     }
 
-    private static INodeResolver initializeNodeResolver() {
+    // The method below should be moved to a util class since it is static and can potentially be used by multiple other classes
+    public static INodeResolver initializeNodeResolver() {
         INodeResolver nodeResolver = null;
-        String configuredNodeResolverFactory = System.getProperty(AsterixTupleParserFactory.NODE_RESOLVER_FACTORY_PROPERTY);
+        String configuredNodeResolverFactory = System
+                .getProperty(AsterixTupleParserFactory.NODE_RESOLVER_FACTORY_PROPERTY);
         if (configuredNodeResolverFactory != null) {
             try {
                 nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
@@ -146,19 +144,14 @@
         }
         return nodeResolver;
     }
-    
+
     @Override
     public ARecordType getAdapterOutputType() {
         return outputType;
     }
-    
+
     @Override
     public InputDataFormat getInputDataFormat() {
         return InputDataFormat.UNKNOWN;
     }
-
-    public void setFiles(List<ExternalFile> files) throws AlgebricksException {
-        throw new AlgebricksException("can't set files for this Adapter");
-    }
-
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
index 6abb0f0..ed82630 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
@@ -49,9 +49,9 @@
 
     private FrameTupleAppender appender;
     private IFrame frame;
-    private long tupleCount = 0;
-    private final IHyracksTaskContext ctx;
-    private int frameTupleCount = 0;
+    protected long tupleCount = 0;
+    protected final IHyracksTaskContext ctx;
+    protected int frameTupleCount = 0;
 
     protected FeedPolicyEnforcer policyEnforcer;
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
index c922f69..519180c 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedClient.java
@@ -121,6 +121,7 @@
         recordBuilder.write(dataOutput, true);
     }
 
+    @SuppressWarnings("unchecked")
     private void writeObject(IAObject obj, DataOutput dataOutput) throws IOException, AsterixException {
         switch (obj.getType().getTypeTag()) {
             case RECORD: {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemWatcher.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemWatcher.java
new file mode 100644
index 0000000..3aa4da1
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FileSystemWatcher.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.dataset.adapter;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.ClosedWatchServiceException;
+import java.nio.file.FileSystems;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchEvent.Kind;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class FileSystemWatcher extends InputStream {
+    private final WatchService watcher;
+    private final HashMap<WatchKey, Path> keys;
+    private LinkedList<File> files = new LinkedList<File>();
+    private Iterator<File> it;
+    private String expression;
+    private byte lastByte = '\n';
+    private static Logger LOGGER = Logger.getLogger(FileSystemWatcher.class.getName());
+
+    // Initialize the inputStream to a dummy inputStream that returns EndOfStream
+    private InputStream in = new InputStream() {
+        @Override
+        public int read() throws IOException {
+            return -1;
+        }
+    };
+
+    public FileSystemWatcher(Path inputResource, String expression) throws IOException {
+        this.expression = expression;
+        watcher = FileSystems.getDefault().newWatchService();
+        keys = new HashMap<WatchKey, Path>();
+        registerAll(inputResource);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (in != null) {
+            try {
+                in.close();
+            } finally {
+                in = null;
+            }
+        }
+    }
+
+    /**
+     * Closes the current input stream and opens the next one, if any.
+     */
+    private void advance() throws IOException {
+        close();
+        if (it.hasNext()) {
+            File file = it.next();
+            in = new FileInputStream(file);
+        } else {
+            // Empty the list of files to read
+            files.clear();
+            // Read new Events (Pulling first to add all available files)
+            WatchKey key;
+            key = watcher.poll();
+            while (key != null) {
+                handleEvents(key);
+                if (reachedEndOfStream(key)) {
+                    return;
+                }
+                key = watcher.poll();
+            }
+            // No file was found, wait for the filesystem to push events
+            while (files.isEmpty()) {
+                try {
+                    key = watcher.take();
+                } catch (InterruptedException x) {
+                    if (LOGGER.isEnabledFor(Level.WARN)) {
+                        LOGGER.warn("Watcher interrupted Stacktrace:\n" + x.getStackTrace());
+                    }
+                    in = null;
+                    return;
+                } catch (ClosedWatchServiceException e) {
+                    if (LOGGER.isEnabledFor(Level.WARN)) {
+                        LOGGER.warn("Watcher Service Exception. Stacktrace:\n" + e.getStackTrace());
+                    }
+                    in = null;
+                    return;
+                }
+                handleEvents(key);
+                if (reachedEndOfStream(key)) {
+                    return;
+                }
+            }
+            // files were found, re-create the iterator and move it one step
+            it = files.iterator();
+            in = new FileInputStream(it.next());
+        }
+    }
+
+    private void handleEvents(WatchKey key) {
+        // get dir associated with the key
+        Path dir = keys.get(key);
+        if (dir == null) {
+            // This should never happen
+            if (LOGGER.isEnabledFor(Level.WARN)) {
+                LOGGER.warn("WatchKey not recognized!!");
+            }
+            return;
+        }
+
+        for (WatchEvent<?> event : key.pollEvents()) {
+            Kind<?> kind = event.kind();
+            // TODO: Do something about overflow events
+            // An overflow event means that some events were dropped
+            if (kind == StandardWatchEventKinds.OVERFLOW) {
+                if (LOGGER.isEnabledFor(Level.WARN)) {
+                    LOGGER.warn("Overflow event. Some events might have been missed");
+                }
+                continue;
+            }
+
+            // Context for directory entry event is the file name of entry
+            WatchEvent<Path> ev = cast(event);
+            Path name = ev.context();
+            Path child = dir.resolve(name);
+            // if directory is created then register it and its sub-directories
+            if ((kind == StandardWatchEventKinds.ENTRY_CREATE)) {
+                try {
+                    if (Files.isDirectory(child, LinkOption.NOFOLLOW_LINKS)) {
+                        registerAll(child);
+                    } else {
+                        // it is a file, add it to the files list.
+                        if (expression != null || Pattern.matches(expression, child.toString())) {
+                            files.add(new File(child.toString()));
+                        }
+                    }
+                } catch (IOException e) {
+                    if (LOGGER.isEnabledFor(Level.WARN)) {
+                        LOGGER.warn(e.getMessage() + ":" + e.getStackTrace());
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean reachedEndOfStream(WatchKey key) {
+        // reset key and remove from set if directory no longer accessible
+        if (!key.reset()) {
+            keys.remove(key);
+            if (keys.isEmpty()) {
+                // No more directories, we close the stream: this essentially means the root directory monitored doesn't exist anymore
+                in = null;
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (in == null) {
+            return 0;
+        }
+        return in.available();
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+    @Override
+    public int read() throws IOException {
+        int result = in.read();
+        if (result == -1) {
+            advance();
+            if (in != null) {
+                return '\n';
+            }
+            return read();
+        }
+        return result;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (in == null) {
+            return -1;
+        }
+        int result = in.read(b, off, len);
+        if (result == 0) {
+            return 0;
+        } else if (result == -1) {
+            advance();
+            // return a new line at the end of every file <--Might create problems for some cases depending on the parser implementation-->
+            if (in != null) {
+                if (lastByte != '\n' && lastByte != '\r') {
+                    b[off] = '\n';
+                    return 1;
+                }
+            }
+            return read(b, off, len);
+        }
+        lastByte = b[off + result - 1];
+        return result;
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (in == null || n <= 0) {
+            return 0;
+        }
+        long result = in.skip(n);
+        if (result != 0) {
+            return result;
+        }
+        if (read() == -1) {
+            return 0;
+        }
+        return 1 + in.skip(n - 1);
+    }
+
+    /**
+     * Register the given directory, and all its sub-directories, with the
+     * WatchService.
+     */
+    private void registerAll(final Path inputResource) throws IOException {
+        // register directory and sub-directories
+        final LinkedList<Path> initialDirs = new LinkedList<Path>();
+        Files.walkFileTree(inputResource, new SimpleFileVisitor<Path>() {
+            @Override
+            public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes attrs) throws IOException {
+                if (!Files.exists(path, LinkOption.NOFOLLOW_LINKS)) {
+                    LOGGER.warn("File doesn't exist: " + path);
+                    return FileVisitResult.TERMINATE;
+                }
+                if (Files.isDirectory(path, LinkOption.NOFOLLOW_LINKS)) {
+                    initialDirs.add(path);
+                    //get immediate children files
+                    File[] subFiles = path.toFile().listFiles();
+                    for (File file : subFiles) {
+                        if (!file.isDirectory()) {
+                            if (expression == null || Pattern.matches(expression, file.getPath())) {
+                                files.add(file);
+                            }
+                        }
+                    }
+                } else {
+                    // Path is a file, add to list of files if it matches the expression
+                    if (expression == null || Pattern.matches(expression, path.toString())) {
+                        files.add(new File(path.toString()));
+                    }
+                }
+                return FileVisitResult.CONTINUE;
+            }
+        });
+        it = files.iterator();
+        for (Path path : initialDirs) {
+            register(path);
+        }
+    }
+
+    /**
+     * Register the given directory with the WatchService
+     */
+    private void register(Path dir) throws IOException {
+        WatchKey key = dir.register(watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
+                StandardWatchEventKinds.ENTRY_MODIFY);
+        keys.put(key, dir);
+    }
+
+    @SuppressWarnings("unchecked")
+    static <T> WatchEvent<T> cast(WatchEvent<?> event) {
+        return (WatchEvent<T>) event;
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFileSystemFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFileSystemFeedAdapter.java
new file mode 100644
index 0000000..15ae3f6
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFileSystemFeedAdapter.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.dataset.adapter;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.asterix.common.parse.ITupleForwardPolicy;
+import org.apache.asterix.external.dataset.adapter.IFeedClient.InflowState;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import org.apache.asterix.runtime.operators.file.CounterTimerTupleForwardPolicy;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class LocalFileSystemFeedAdapter extends ClientBasedFeedAdapter {
+
+    private static final long serialVersionUID = 1L;
+    private static final int DEFAULT_BATCH_SIZE = 100;
+    private final FileSplit[] fileSplits;
+    private final String expression;
+    protected final ITupleParserFactory parserFactory;
+    protected final IAType sourceDatatype;
+    private IFrameWriter writer;
+    private LocalFilesystemFeedClient client;
+
+    public LocalFileSystemFeedAdapter(Map<String, String> configuration, FileSplit[] fileSplits,
+            ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx, String expression) {
+        super(configuration, ctx);
+        this.fileSplits = fileSplits;
+        this.expression = expression;
+        this.parserFactory = parserFactory;
+        this.sourceDatatype = sourceDatatype;
+    }
+
+    /**
+     * Discontinue the ingestion of data and end the feed.
+     * 
+     * @throws Exception
+     */
+    public void stop() throws Exception {
+        // This needs to be fixed
+        continueIngestion = false;
+    }
+
+    @Override
+    public DataExchangeMode getDataExchangeMode() {
+        return DataExchangeMode.PUSH;
+    }
+
+    private void setup(int partition) throws Exception {
+        if (client == null) {
+            client = (LocalFilesystemFeedClient) getFeedClient(partition);
+        }
+    }
+
+    // This adapter needs better exception handling
+    @Override
+    public void start(int partition, IFrameWriter writer) throws Exception {
+        this.writer = writer;
+        setup(partition);
+        InflowState inflowState = null;
+        while (continueIngestion) {
+            try {
+                inflowState = client.parseNext();
+                switch (inflowState) {
+                    case DATA_AVAILABLE:
+                        frameTupleCount++;
+                        break;
+                    case NO_MORE_DATA:
+                        tupleCount += frameTupleCount;
+                        frameTupleCount = 0;
+                        continueIngestion = false;
+                        break;
+                    case DATA_NOT_AVAILABLE:
+                        break;
+                }
+            } catch (Exception e) {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public IFeedClient getFeedClient(int partition) throws Exception {
+        FileSplit split = fileSplits[partition];
+        File inputResource = split.getLocalFile().getFile();
+        return new LocalFilesystemFeedClient(adapterOutputType, inputResource.toPath(), parserFactory, ctx, writer,
+                expression);
+    }
+
+    @Override
+    public ITupleForwardPolicy getTupleParserPolicy() {
+        configuration.put(ITupleForwardPolicy.PARSER_POLICY,
+                ITupleForwardPolicy.TupleForwardPolicyType.COUNTER_TIMER_EXPIRED.name());
+        String propValue = configuration.get(CounterTimerTupleForwardPolicy.BATCH_SIZE);
+        if (propValue == null) {
+            configuration.put(CounterTimerTupleForwardPolicy.BATCH_SIZE, "" + DEFAULT_BATCH_SIZE);
+        }
+        return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
+    }
+
+    @Override
+    public boolean handleException(Exception e) {
+        return true;
+    }
+}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFilesystemFeedClient.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFilesystemFeedClient.java
new file mode 100644
index 0000000..5216934
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LocalFilesystemFeedClient.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.dataset.adapter;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.operators.file.AbstractTupleParser;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class LocalFilesystemFeedClient extends FeedClient {
+
+    private FileSystemWatcher watcher;
+    private AbstractTupleParser tupleParser;
+
+    public LocalFilesystemFeedClient(ARecordType recordType, Path inputResource, ITupleParserFactory parserFactory,
+            IHyracksTaskContext ctx, IFrameWriter writer, String expression) throws IOException, AsterixException {
+        watcher = new FileSystemWatcher(inputResource, expression);
+        tupleParser = (AbstractTupleParser) parserFactory.createTupleParser(ctx);
+        tupleParser.setInputStream(watcher);
+        tupleParser.setWriter(writer);
+    }
+
+    public InflowState parseNext() throws Exception {
+        if (tupleParser.parseNext()) {
+            return InflowState.DATA_AVAILABLE;
+        } else {
+            tupleParser.end();
+            return InflowState.NO_MORE_DATA;
+        }
+    }
+
+    public void flushRecords() throws Exception {
+        tupleParser.getPolicy().forceFlush();
+    }
+
+    @Override
+    public InflowState retrieveNextRecord() throws Exception {
+        throw new AsterixException(
+                "FileSystemFeedParser can't retrieve the record since it is tightly coupled with the Parser");
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index 635fae7..4a6b435 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -16,12 +16,15 @@
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.LinkedList;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
 
+import org.apache.asterix.external.indexing.input.NCLocalFilesystemDirecoryInputStream;
 import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
@@ -37,27 +40,73 @@
     private static final long serialVersionUID = 1L;
 
     private final FileSplit[] fileSplits;
+    private final String expression;
 
     public NCFileSystemAdapter(FileSplit[] fileSplits, ITupleParserFactory parserFactory, IAType atype,
-            IHyracksTaskContext ctx) throws HyracksDataException {
+            IHyracksTaskContext ctx, String expression) throws HyracksDataException {
         super(parserFactory, atype, ctx);
         this.fileSplits = fileSplits;
+        this.expression = expression;
     }
 
     @Override
     public InputStream getInputStream(int partition) throws IOException {
         FileSplit split = fileSplits[partition];
-        File inputFile = split.getLocalFile().getFile();
+        File inputResource = split.getLocalFile().getFile();
+        LinkedList<File> subFiles = new LinkedList<File>();
         InputStream in;
         try {
-            in = new FileInputStream(inputFile);
+            // Check that the resource exists
+            if (Files.exists(inputResource.toPath())) {
+                // Check if the resource is a directory
+                if (Files.isDirectory(inputResource.toPath())) {
+                    // Resource is a directory, we need to process files of interests inside it
+                    File[] files = inputResource.listFiles();
+                    for (File file : files) {
+                        if (file.isDirectory()) {
+                            addSubFiles(subFiles, file);
+                        } else {
+                            if (expression == null || Pattern.matches(expression, file.getPath())) {
+                                subFiles.add(file);
+                            }
+                        }
+                    }
+                    in = new NCLocalFilesystemDirecoryInputStream(subFiles);
+                } else {
+                    // Resource is a file
+                    in = new FileInputStream(inputResource);
+                }
+            } else {
+                throw new IOException("Resource doesn't exist");
+            }
             return in;
-        } catch (FileNotFoundException e) {
+        } catch (PatternSyntaxException e) {
+            throw new IOException("The regular expression provided is an invalid expression", e);
+        } catch (Exception e) {
             throw new IOException(e);
         }
     }
 
-   
+    /* list all files under the directory
+     * currentDir is expected to be a directory
+     */
+    private void addSubFiles(LinkedList<File> files, File currentDir) throws IOException {
+        try {
+            File[] subFiles = currentDir.listFiles();
+            for (File file : subFiles) {
+                if (file.isDirectory()) {
+                    addSubFiles(files, file);
+                } else {
+                    if (expression == null || Pattern.matches(expression, file.getPath())) {
+                        files.add(file);
+                    }
+                }
+            }
+        } catch (PatternSyntaxException e) {
+            throw new IOException("The regular expression provided is an invalid expression", e);
+        }
+    }
+
     @Override
     public String getFilename(int partition) {
         final FileSplit fileSplit = fileSplits[partition];
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/NCLocalFilesystemDirecoryInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/NCLocalFilesystemDirecoryInputStream.java
new file mode 100644
index 0000000..8bf30b9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/NCLocalFilesystemDirecoryInputStream.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.indexing.input;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+public class NCLocalFilesystemDirecoryInputStream extends InputStream {
+
+    private InputStream in;
+    private Iterator<File> it;
+    private byte lastByte;
+
+    public NCLocalFilesystemDirecoryInputStream(LinkedList<File> files) throws IOException {
+        this.it = files.iterator();
+        advance();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (in != null) {
+            try {
+                in.close();
+            } finally {
+                in = null;
+            }
+        }
+    }
+
+    /**
+     * Closes the current input stream and opens the next one, if any.
+     */
+    private void advance() throws IOException {
+        close();
+        if (it.hasNext()) {
+            File file = it.next();
+            in = new FileInputStream(file);
+        }
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (in == null) {
+            return 0;
+        }
+        return in.available();
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (in == null) {
+            return -1;
+        }
+        int result = in.read();
+        if (result == -1) {
+            advance();
+            // fix here as well
+            if (in != null) {
+                if (lastByte != '\n' && lastByte != '\r') {
+                    return '\n';
+                }
+            }
+            return read();
+        }
+        lastByte = (byte) result;
+        return result;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (in == null) {
+            return -1;
+        }
+        int result = in.read(b, off, len);
+        if (result == 0) {
+            return 0;
+        } else if (result == -1) {
+            advance();
+            // return a new line if the previous file didn't end with a new line.
+            if (in != null) {
+                if (lastByte != '\n' && lastByte != '\r') {
+                    b[off] = '\n';
+                    return 1;
+                } else {
+                    return read(b, off, len);
+                }
+            }
+            return read(b, off, len);
+        }
+        lastByte = b[off + result - 1];
+        return result;
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (in == null || n <= 0) {
+            return 0;
+        }
+        long result = in.skip(n);
+        if (result != 0) {
+            return result;
+        }
+        if (read() == -1) {
+            return 0;
+        }
+        return 1 + in.skip(n - 1);
+    }
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 53c53fe..c5f5177 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -228,8 +228,8 @@
     public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception {
         String dataverseName = MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName();
         String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT;
-        MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat,
-                IMetadataEntity.PENDING_NO_OP));
+        MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
+                new Dataverse(dataverseName, dataFormat, IMetadataEntity.PENDING_NO_OP));
     }
 
     public static void insertInitialDatasets(MetadataTransactionContext mdTxnCtx) throws Exception {
@@ -266,8 +266,8 @@
         getBuiltinTypes(types);
         getMetadataTypes(types);
         for (int i = 0; i < types.size(); i++) {
-            MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, types.get(i).getTypeName(),
-                    types.get(i), false));
+            MetadataManager.INSTANCE.addDatatype(mdTxnCtx,
+                    new Datatype(dataverseName, types.get(i).getTypeName(), types.get(i), false));
         }
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Finished inserting initial datatypes.");
@@ -276,10 +276,11 @@
 
     public static void insertInitialIndexes(MetadataTransactionContext mdTxnCtx) throws Exception {
         for (int i = 0; i < secondaryIndexes.length; i++) {
-            MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(secondaryIndexes[i].getDataverseName(),
-                    secondaryIndexes[i].getIndexedDatasetName(), secondaryIndexes[i].getIndexName(), IndexType.BTREE,
-                    secondaryIndexes[i].getPartitioningExpr(), secondaryIndexes[i].getPartitioningExprType(), false,
-                    false, IMetadataEntity.PENDING_NO_OP));
+            MetadataManager.INSTANCE.addIndex(mdTxnCtx,
+                    new Index(secondaryIndexes[i].getDataverseName(), secondaryIndexes[i].getIndexedDatasetName(),
+                            secondaryIndexes[i].getIndexName(), IndexType.BTREE,
+                            secondaryIndexes[i].getPartitioningExpr(), secondaryIndexes[i].getPartitioningExprType(),
+                            false, false, IMetadataEntity.PENDING_NO_OP));
         }
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Finished inserting initial indexes.");
@@ -324,6 +325,7 @@
                 "org.apache.asterix.external.adapter.factory.PushBasedTwitterAdapterFactory",
                 "org.apache.asterix.external.adapter.factory.RSSFeedAdapterFactory",
                 "org.apache.asterix.external.adapter.factory.CNNFeedAdapterFactory",
+                "org.apache.asterix.external.adapter.factory.LocalFileSystemFeedAdapterFactory",
                 "org.apache.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory",
                 "org.apache.asterix.tools.external.data.TwitterFirehoseFeedAdapterFactory",
                 "org.apache.asterix.tools.external.data.GenericSocketFeedAdapterFactory",
@@ -379,31 +381,25 @@
                 + IndexFileNameUtil.prepareFileName(metadataStore + File.separator + index.getFileNameRelativePath(),
                         runtimeContext.getMetaDataIODeviceId());
         FileReference file = new FileReference(new File(filePath));
-        List<IVirtualBufferCache> virtualBufferCaches = runtimeContext.getVirtualBufferCaches(index.getDatasetId()
-                .getId());
+        List<IVirtualBufferCache> virtualBufferCaches = runtimeContext
+                .getVirtualBufferCaches(index.getDatasetId().getId());
         ITypeTraits[] typeTraits = index.getTypeTraits();
         IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
         int[] bloomFilterKeyFields = index.getBloomFilterKeyFields();
         LSMBTree lsmBtree = null;
         long resourceID = -1;
-        ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
-                .getDatasetId().getId()) : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
-                index.getDatasetId().getId(), ((DatasetLifecycleManager) indexLifecycleManager).getDatasetInfo(index
-                        .getDatasetId().getId()));
+        ILSMOperationTracker opTracker = index.isPrimaryIndex()
+                ? runtimeContext.getLSMBTreeOperationTracker(index.getDatasetId().getId())
+                : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
+                        index.getDatasetId().getId(),
+                        ((DatasetLifecycleManager) indexLifecycleManager).getDatasetInfo(index.getDatasetId().getId()));
         final String path = file.getFile().getPath();
         if (create) {
-            lsmBtree = LSMBTreeUtils.createLSMTree(
-                    virtualBufferCaches,
-                    file,
-                    bufferCache,
-                    fileMapProvider,
-                    typeTraits,
-                    comparatorFactories,
-                    bloomFilterKeyFields,
-                    runtimeContext.getBloomFilterFalsePositiveRate(),
+            lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
+                    comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
                     runtimeContext.getMetadataMergePolicyFactory().createMergePolicy(
-                            GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager), opTracker,
-                    runtimeContext.getLSMIOScheduler(),
+                            GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager),
+                    opTracker, runtimeContext.getLSMIOScheduler(),
                     LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
                     null, null, null, null, true);
             lsmBtree.create();
@@ -422,19 +418,14 @@
             resourceID = resource.getResourceId();
             lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resourceID);
             if (lsmBtree == null) {
-                lsmBtree = LSMBTreeUtils.createLSMTree(
-                        virtualBufferCaches,
-                        file,
-                        bufferCache,
-                        fileMapProvider,
-                        typeTraits,
-                        comparatorFactories,
-                        bloomFilterKeyFields,
+                lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
+                        typeTraits, comparatorFactories, bloomFilterKeyFields,
                         runtimeContext.getBloomFilterFalsePositiveRate(),
                         runtimeContext.getMetadataMergePolicyFactory().createMergePolicy(
-                                GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager), opTracker,
-                        runtimeContext.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE
-                                .createIOOperationCallback(), index.isPrimaryIndex(), null, null, null, null, true);
+                                GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager),
+                        opTracker, runtimeContext.getLSMIOScheduler(),
+                        LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
+                        null, null, null, null, true);
                 indexLifecycleManager.register(resourceID, lsmBtree);
             }
         }
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
index c16e0e1..3931703 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
@@ -41,6 +41,9 @@
     protected DataOutput dos = tb.getDataOutput();
     protected final ARecordType recType;
     protected final IHyracksTaskContext ctx;
+    protected IFrameWriter writer;
+    protected IDataParser parser;
+    protected ITupleForwardPolicy policy;
 
     public AbstractTupleParser(IHyracksTaskContext ctx, ARecordType recType) throws HyracksDataException {
         this.recType = recType;
@@ -74,4 +77,32 @@
         }
     }
 
+    public void setInputStream(InputStream in) throws AsterixException, IOException {
+        parser = getDataParser();
+        parser.initialize(in, recType, true);
+    }
+
+    public void setWriter(IFrameWriter writer) throws HyracksDataException {
+        this.writer = writer;
+        policy = getTupleParserPolicy();
+        policy.initialize(ctx, writer);
+    }
+
+    public boolean parseNext() throws AsterixException, IOException {
+        tb.reset();
+        if (!parser.parse(tb.getDataOutput())) {
+            return false;
+        }
+        tb.addFieldEndOffset();
+        policy.addTuple(tb);
+        return true;
+    }
+
+    public void end() throws HyracksDataException {
+        policy.close();
+    }
+
+    public ITupleForwardPolicy getPolicy() {
+        return policy;
+    }
 }
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
index 4374ab6..74c4769 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
@@ -65,6 +65,7 @@
     public static final String AT_LEAST_ONE_SEMANTICS = FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS;
     public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
     public static final String DEFAULT_DELIMITER = ",";
+    public static final Object KEY_EXPRESSION = "expression";
 
     private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
 
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
index af6d35e..cc9a08c 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/CounterTimerTupleForwardPolicy.java
@@ -36,7 +36,7 @@
     public static final String BATCH_INTERVAL = "batch-interval";
 
     private static final Logger LOGGER = Logger.getLogger(CounterTimerTupleForwardPolicy.class.getName());
-   
+
     private FrameTupleAppender appender;
     private IFrame frame;
     private IFrameWriter writer;
@@ -87,7 +87,8 @@
     }
 
     private void addTupleToFrame(ArrayTupleBuilder tb) throws HyracksDataException {
-        if (tuplesInFrame == batchSize || !appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+        if (tuplesInFrame == batchSize
+                || !appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("flushing frame containg (" + tuplesInFrame + ") tuples");
             }
@@ -101,16 +102,7 @@
     }
 
     public void close() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            if (activeTimer) {
-                synchronized (lock) {
-                    FrameUtils.flushFrame(frame.getBuffer(), writer);
-                }
-            } else {
-                FrameUtils.flushFrame(frame.getBuffer(), writer);
-            }
-        }
-
+        forceFlush();
         if (timer != null) {
             timer.cancel();
         }
@@ -129,11 +121,11 @@
         @Override
         public void run() {
             try {
-                if (tuplesInFrame > 0) {
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("TTL expired flushing frame (" + tuplesInFrame + ")");
-                    }
-                    synchronized (lock) {
+                synchronized (lock) {
+                    if (tuplesInFrame > 0) {
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("TTL expired flushing frame (" + tuplesInFrame + ")");
+                        }
                         FrameUtils.flushFrame(frame.getBuffer(), writer);
                         appender.reset(frame, true);
                         tuplesInFrame = 0;
@@ -151,4 +143,23 @@
         return TupleForwardPolicyType.COUNTER_TIMER_EXPIRED;
     }
 
+    @Override
+    public void forceFlush() throws HyracksDataException {
+        if (activeTimer) {
+            synchronized (lock) {
+                if (appender.getTupleCount() > 0) {
+                    FrameUtils.flushFrame(frame.getBuffer(), writer);
+                    tuplesInFrame = 0;
+                    appender.reset(frame, true);
+                }
+            }
+        } else {
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(frame.getBuffer(), writer);
+                tuplesInFrame = 0;
+                appender.reset(frame, true);
+            }
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
index 7c43e26..079537d 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/FrameFullTupleForwardPolicy.java
@@ -28,45 +28,48 @@
 
 public class FrameFullTupleForwardPolicy implements ITupleForwardPolicy {
 
-	private FrameTupleAppender appender;
-	private IFrame frame;
-	private IFrameWriter writer;
+    private FrameTupleAppender appender;
+    private IFrame frame;
+    private IFrameWriter writer;
 
-	public void configure(Map<String, String> configuration) {
-		// no-op
-	}
+    public void configure(Map<String, String> configuration) {
+        // no-op
+    }
 
-	public void initialize(IHyracksTaskContext ctx, IFrameWriter writer)
-			throws HyracksDataException {
-		this.appender = new FrameTupleAppender();
-		this.frame = new VSizeFrame(ctx);
-		this.writer = writer;
-		appender.reset(frame, true);
-	}
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
+        this.appender = new FrameTupleAppender();
+        this.frame = new VSizeFrame(ctx);
+        this.writer = writer;
+        appender.reset(frame, true);
+    }
 
-	public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
-		boolean success = appender.append(tb.getFieldEndOffsets(),
-				tb.getByteArray(), 0, tb.getSize());
-		if (!success) {
-			FrameUtils.flushFrame(frame.getBuffer(), writer);
-			appender.reset(frame, true);
-			success = appender.append(tb.getFieldEndOffsets(),
-					tb.getByteArray(), 0, tb.getSize());
-			if (!success) {
-				throw new IllegalStateException();
-			}
-		}
-	}
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+        boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+        if (!success) {
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+            appender.reset(frame, true);
+            success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+            if (!success) {
+                throw new IllegalStateException();
+            }
+        }
+    }
 
-	public void close() throws HyracksDataException {
-		if (appender.getTupleCount() > 0) {
-			FrameUtils.flushFrame(frame.getBuffer(), writer);
-		}
+    public void close() throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(frame.getBuffer(), writer);
+        }
 
-	}
+    }
 
-	@Override
-	public TupleForwardPolicyType getType() {
-		return TupleForwardPolicyType.FRAME_FULL;
-	}
+    @Override
+    public TupleForwardPolicyType getType() {
+        return TupleForwardPolicyType.FRAME_FULL;
+    }
+
+    @Override
+    public void forceFlush() throws HyracksDataException {
+        FrameUtils.flushFrame(frame.getBuffer(), writer);
+        appender.reset(frame, true);
+    }
 }
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
index 95e6b22..463e0cd 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/RateControlledTupleForwardPolicy.java
@@ -81,4 +81,10 @@
     public TupleForwardPolicyType getType() {
         return TupleForwardPolicyType.RATE_CONTROLLED;
     }
+
+    @Override
+    public void forceFlush() throws HyracksDataException {
+        FrameUtils.flushFrame(frame.getBuffer(), writer);
+        appender.reset(frame, true);
+    }
 }
\ No newline at end of file

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/376
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I707756e3b4c9ffca4b55ec9817a08e5c16333010
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message