asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [16/34] incubator-asterixdb git commit: Enabled Feed Tests and Added External Library tests
Date Mon, 22 Feb 2016 22:35:04 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
index 06f7e72..e39b507 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
@@ -30,7 +30,9 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.asterix.external.util.TweetGenerator;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
@@ -80,8 +82,11 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
             return true;
         }
 
-        public void start() {
-            executorService.execute(dataProvider);
+        public synchronized void start() {
+            if (!started) {
+                executorService.execute(dataProvider);
+                started = true;
+            }
         }
 
         @Override
@@ -93,7 +98,6 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
         public int read() throws IOException {
             if (!started) {
                 start();
-                started = true;
             }
             return in.read();
         }
@@ -106,6 +110,18 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
             }
             return in.read(b, off, len);
         }
+
+        @Override
+        public void configure(Map<String, String> configuration) {
+        }
+
+        @Override
+        public void setFeedLogManager(FeedLogManager logManager) {
+        }
+
+        @Override
+        public void setController(AbstractFeedDataFlowController controller) {
+        }
     }
 
     private static class DataProvider implements Runnable {
@@ -170,7 +186,7 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
                     break;
                 } catch (Exception e) {
                     if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Exception in adaptor " + e.getMessage());
+                        LOGGER.warning("Exception in adapter " + e.getMessage());
                     }
                 }
             }
@@ -181,4 +197,12 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider
         }
 
     }
+
+    @Override
+    public void configure(Map<String, String> configuration) {
+    }
+
+    @Override
+    public void setFeedLogManager(FeedLogManager feedLogManager) {
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionProvider.java
index 3dea50c..df0ddc8 100755
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionProvider.java
@@ -63,8 +63,8 @@ class ExternalScalarFunction extends ExternalFunction implements IExternalScalar
         try {
             setArguments(tuple);
             evaluate(functionHelper);
-            functionHelper.reset();
             result.set(resultBuffer.getByteArray(), resultBuffer.getStartOffset(), resultBuffer.getLength());
+            functionHelper.reset();
         } catch (Exception e) {
             e.printStackTrace();
             throw new AlgebricksException(e);
@@ -73,6 +73,7 @@ class ExternalScalarFunction extends ExternalFunction implements IExternalScalar
 
     @Override
     public void evaluate(IFunctionHelper argumentProvider) throws Exception {
+        resultBuffer.reset();
         ((IExternalScalarFunction) externalFunction).evaluate(argumentProvider);
         /*
          * Make sure that if "setResult" is not called,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
index 7ad6cfa..db85e2f 100755
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
@@ -23,8 +23,14 @@ import java.util.Map;
 
 public class ExternalLibraryManager {
 
-    private static Map<String, ClassLoader> libraryClassLoaders = new HashMap<String, ClassLoader>();
+    private static final Map<String, ClassLoader> libraryClassLoaders = new HashMap<String, ClassLoader>();
 
+    /**
+     * Register the library class loader with the external library manager
+     * @param dataverseName
+     * @param libraryName
+     * @param classLoader
+     */
     public static void registerLibraryClassLoader(String dataverseName, String libraryName, ClassLoader classLoader) {
         String key = getKey(dataverseName, libraryName);
         synchronized (libraryClassLoaders) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index a929eec..7e28c35 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -23,11 +23,11 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
 import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
 import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.runtime.IngestionRuntime;
@@ -151,7 +151,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
         ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
         while (ingestionRuntime == null && waitCycleCount < 10) {
             try {
-                Thread.sleep(2000);
+                Thread.sleep(3000);
                 waitCycleCount++;
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("waiting to obtain ingestion runtime for subscription " + subscribableRuntimeId);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 20c11b3..d41179f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -36,12 +36,14 @@ import org.apache.asterix.external.feed.runtime.FeedRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
 import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
 import org.apache.asterix.external.feed.runtime.SubscribableRuntime;
+import org.apache.asterix.external.util.FeedUtils;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 /*
@@ -91,6 +93,8 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
 
     private FeedRuntimeInputHandler inputSideHandler;
 
+    private ByteBuffer message = ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE);
+
     public FeedMetaComputeNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
             int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
             Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
@@ -103,6 +107,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
         this.connectionId = feedConnectionId;
         this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
+        ctx.setSharedObject(message);
     }
 
     @Override
@@ -126,7 +131,8 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
     private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
         this.fta = new FrameTupleAccessor(recordDesc);
         this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
-                policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager, nPartitions);
+                policyEnforcer.getFeedPolicyAccessor(), policyEnforcer.getFeedPolicyAccessor().bufferingEnabled(), fta,
+                recordDesc, feedManager, nPartitions);
 
         DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
                 writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
@@ -157,6 +163,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         try {
+            FeedUtils.processFeedMessage(buffer, message, fta);
             inputSideHandler.nextFrame(buffer);
         } catch (Exception e) {
             e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index c596671..018aeaa 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -33,7 +33,7 @@ import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
 import org.apache.asterix.external.feed.runtime.FeedRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.asterix.external.util.FeedUtils;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
@@ -41,7 +41,6 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
-import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -119,7 +118,6 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
             } else {
                 reviveOldFeedRuntime(runtimeId);
             }
-
             coreOperator.open();
         } catch (Exception e) {
             LOGGER.log(Level.WARNING, "Failed to open feed store operator", e);
@@ -167,7 +165,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         try {
-            processFeedMessage(buffer);
+            FeedUtils.processFeedMessage(buffer, message, fta);
             inputSideHandler.nextFrame(buffer);
         } catch (Exception e) {
             e.printStackTrace();
@@ -175,18 +173,6 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
         }
     }
 
-    private void processFeedMessage(ByteBuffer buffer) {
-        // read the message and reduce the number of tuples
-        fta.reset(buffer);
-        int tc = fta.getTupleCount() - 1;
-        int offset = fta.getTupleStartOffset(tc);
-        int len = fta.getTupleLength(tc);
-        message.clear();
-        message.put(buffer.array(), offset, len);
-        message.flip();
-        IntSerDeUtils.putInt(buffer.array(), FrameHelper.getTupleCountOffset(buffer.capacity()), tc);
-    }
-
     @Override
     public void fail() throws HyracksDataException {
         if (LOGGER.isLoggable(Level.WARNING)) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
index 93aa18b..60c80f1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
@@ -265,7 +265,8 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
                 break;
             }
             case AdmLexer.TOKEN_INT_LITERAL: {
-                // For an INT value without any suffix, we return it as INT64 type value since it is the default integer type.
+                // For an INT value without any suffix, we return it as INT64 type value since it is
+                // the default integer type.
                 parseAndCastNumeric(ATypeTag.INT64, objectType, out);
                 break;
             }
@@ -506,7 +507,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
             } else {
                 return null;
             }
-            //            return ATypeHierarchy.canPromote(expectedTypeTag, typeTag) ? typeTag : null;
+            // return ATypeHierarchy.canPromote(expectedTypeTag, typeTag) ? typeTag : null;
         } else { // union
             List<IAType> unionList = ((AUnionType) aObjectType).getUnionList();
             for (IAType t : unionList) {
@@ -531,7 +532,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
 
         BitSet nulls = null;
         if (recType != null) {
-            //TODO: use BitSet Pool
+            // TODO: use BitSet Pool
             nulls = new BitSet(recType.getFieldNames().length);
             recBuilder.reset(recType);
         } else {
@@ -569,7 +570,8 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
                                 admLexer.getLastTokenImage().length() - 1);
                         fieldId = recBuilder.getFieldId(fldName);
                         if (fieldId < 0 && !recType.isOpen()) {
-                            throw new ParseException("This record is closed, you can not add extra fields !!");
+                            throw new ParseException(
+                                    "This record is closed, you can not add extra fields! new field name: " + fldName);
                         } else if (fieldId < 0 && recType.isOpen()) {
                             aStringFieldName.setValue(admLexer.getLastTokenImage().substring(1,
                                     admLexer.getLastTokenImage().length() - 1));
@@ -895,7 +897,8 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
             throw new ParseException(mismatchErrorMessage + objectType.getTypeName() + mismatchErrorMessage2 + typeTag);
         }
 
-        // If two type tags are not the same, either we try to promote or demote source type to the target type
+        // If two type tags are not the same, either we try to promote or demote source type to the
+        // target type
         if (targetTypeTag != typeTag) {
             if (ATypeHierarchy.canPromote(typeTag, targetTypeTag)) {
                 // can promote typeTag to targetTypeTag
@@ -907,7 +910,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
                 promoteComputer.convertType(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
                         castBuffer.getLength() - 1, out);
             } else if (ATypeHierarchy.canDemote(typeTag, targetTypeTag)) {
-                //can demote source type to the target type
+                // can demote source type to the target type
                 ITypeConvertComputer demoteComputer = ATypeHierarchy.getTypeDemoteComputer(typeTag, targetTypeTag);
                 if (demoteComputer == null) {
                     throw new ParseException("Can't cast the " + typeTag + " type to the " + targetTypeTag + " type.");
@@ -942,7 +945,8 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
                         if (targetTypeTag != typeTag) {
                             ITypeConvertComputer promoteComputer = ATypeHierarchy.getTypePromoteComputer(typeTag,
                                     targetTypeTag);
-                            // the availability if the promote computer should be consistent with the availability of a target type
+                            // the availability if the promote computer should be consistent with
+                            // the availability of a target type
                             assert promoteComputer != null;
                             // do the promotion; note that the type tag field should be skipped
                             promoteComputer.convertType(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index c5b39df..efbc6bf 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -57,6 +57,7 @@ public class AdapterFactoryProvider {
         // Compatability
         adapterFactories.put(ExternalDataConstants.ADAPTER_HDFS_CLASSNAME, GenericAdapterFactory.class);
         adapterFactories.put(ExternalDataConstants.ADAPTER_LOCALFS_CLASSNAME, GenericAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER, GenericAdapterFactory.class);
         return adapterFactories;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index dfe7aed..6b4b6ba 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -28,7 +28,6 @@ import org.apache.asterix.external.api.IInputStreamProvider;
 import org.apache.asterix.external.api.IInputStreamProviderFactory;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordDataParserFactory;
-import org.apache.asterix.external.api.IRecordFlowController;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.api.IStreamDataParser;
@@ -39,10 +38,13 @@ import org.apache.asterix.external.dataflow.FeedStreamDataFlowController;
 import org.apache.asterix.external.dataflow.IndexingDataFlowController;
 import org.apache.asterix.external.dataflow.RecordDataFlowController;
 import org.apache.asterix.external.dataflow.StreamDataFlowController;
+import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.util.DataflowUtils;
-import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
 
 public class DataflowControllerProvider {
 
@@ -57,52 +59,67 @@ public class DataflowControllerProvider {
      * else
      * |-a. Set stream parser
      * 5. start(writer)
+     * @param feedLogFileSplits
+     * @param isFeed
      */
 
+    // TODO: Instead, use a factory just like data source and data parser.
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
             int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
-            Map<String, String> configuration, boolean indexingOp) throws Exception {
+            Map<String, String> configuration, boolean indexingOp, boolean isFeed, FileSplit[] feedLogFileSplits)
+                    throws Exception {
+        FeedLogManager feedLogManager = null;
+        if (isFeed) {
+            feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits);
+        }
         switch (dataSourceFactory.getDataSourceType()) {
             case RECORDS:
-                IRecordFlowController recordDataFlowController = null;
-                if (indexingOp) {
-                    recordDataFlowController = new IndexingDataFlowController();
-                } else if (ExternalDataUtils.isFeed(configuration)) {
-                    recordDataFlowController = new FeedRecordDataFlowController();
-                } else {
-                    recordDataFlowController = new RecordDataFlowController();
-                }
-                recordDataFlowController.configure(configuration, ctx);
-                recordDataFlowController.setTupleForwarder(DataflowUtils.getTupleForwarder(configuration));
+                IDataFlowController recordDataFlowController = null;
                 IRecordReaderFactory<?> recordReaderFactory = (IRecordReaderFactory<?>) dataSourceFactory;
                 IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(ctx, partition);
+                recordReader.configure(configuration);
                 IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
                 IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
                 dataParser.configure(configuration, recordType);
-                recordDataFlowController.setRecordReader(recordReader);
-                recordDataFlowController.setRecordParser(dataParser);
+                if (indexingOp) {
+                    recordDataFlowController = new IndexingDataFlowController(dataParser, recordReader);
+                } else if (isFeed) {
+                    recordDataFlowController = new FeedRecordDataFlowController(feedLogManager, dataParser,
+                            recordReader);
+                } else {
+                    recordDataFlowController = new RecordDataFlowController(dataParser, recordReader);
+                }
+                recordDataFlowController.configure(configuration, ctx);
+                recordDataFlowController
+                        .setTupleForwarder(DataflowUtils.getTupleForwarder(configuration, feedLogManager));
                 return recordDataFlowController;
             case STREAM:
                 IStreamFlowController streamDataFlowController = null;
-                if (ExternalDataUtils.isFeed(configuration)) {
-                    streamDataFlowController = new FeedStreamDataFlowController();
+                if (isFeed) {
+                    streamDataFlowController = new FeedStreamDataFlowController(feedLogManager);
                 } else {
                     streamDataFlowController = new StreamDataFlowController();
                 }
                 streamDataFlowController.configure(configuration, ctx);
-                streamDataFlowController.setTupleForwarder(DataflowUtils.getTupleForwarder(configuration));
+                streamDataFlowController
+                        .setTupleForwarder(DataflowUtils.getTupleForwarder(configuration, feedLogManager));
                 IInputStreamProviderFactory streamProviderFactory = (IInputStreamProviderFactory) dataSourceFactory;
+                streamProviderFactory.configure(configuration);
                 IInputStreamProvider streamProvider = streamProviderFactory.createInputStreamProvider(ctx, partition);
+                streamProvider.setFeedLogManager(feedLogManager);
+                streamProvider.configure(configuration);
                 IStreamDataParserFactory streamParserFactory = (IStreamDataParserFactory) dataParserFactory;
                 streamParserFactory.configure(configuration);
                 IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition);
                 streamParser.configure(configuration, recordType);
-                streamParser.setInputStream(streamProvider.getInputStream());
+                AInputStream inputStream = streamProvider.getInputStream();
+                streamParser.setInputStream(inputStream);
                 streamDataFlowController.setStreamParser(streamParser);
                 return streamDataFlowController;
             default:
                 throw new AsterixException("Unknown data source type: " + dataSourceFactory.getDataSourceType());
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 745c653..0d65f72 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -26,11 +26,13 @@ import org.apache.asterix.external.api.IInputStreamProviderFactory;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
 import org.apache.asterix.external.input.record.reader.couchbase.CouchbaseReaderFactory;
+import org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReaderFactory;
 import org.apache.asterix.external.input.record.reader.stream.LineRecordReaderFactory;
 import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReaderFactory;
 import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory;
 import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamProviderFactory;
 import org.apache.asterix.external.input.stream.factory.SocketInputStreamProviderFactory;
+import org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamProviderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 
@@ -65,6 +67,9 @@ public class DatasourceFactoryProvider {
                 case ExternalDataConstants.STREAM_SOCKET:
                     streamFactory = new SocketInputStreamProviderFactory();
                     break;
+                case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
+                    streamFactory = new TwitterFirehoseStreamProviderFactory();
+                    break;
                 default:
                     throw new AsterixException("unknown input stream factory");
             }
@@ -101,6 +106,11 @@ public class DatasourceFactoryProvider {
                 case ExternalDataConstants.READER_COUCHBASE:
                     readerFactory = new CouchbaseReaderFactory();
                     break;
+                case ExternalDataConstants.READER_LINE_SEPARATED:
+                    readerFactory = new EmptyLineSeparatedRecordReaderFactory()
+                            .setInputStreamFactoryProvider(DatasourceFactoryProvider.getInputStreamFactory(
+                                    ExternalDataUtils.getRecordReaderStreamName(configuration), configuration));
+                    break;
                 default:
                     throw new AsterixException("unknown record reader factory: " + reader);
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
index e604d42..cab8a69 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -43,12 +43,13 @@ public class DataflowUtils {
         }
     }
 
-    public static ITupleForwarder getTupleForwarder(Map<String, String> configuration) throws AsterixException {
+    public static ITupleForwarder getTupleForwarder(Map<String, String> configuration, FeedLogManager feedLogManager)
+            throws AsterixException {
         ITupleForwarder policy = null;
         ITupleForwarder.TupleForwardPolicy policyType = null;
         String propValue = configuration.get(ITupleForwarder.FORWARD_POLICY);
         if (ExternalDataUtils.isFeed(configuration)) {
-            //TODO pass this value in the configuration and avoid this check for feeds
+            // TODO pass this value in the configuration and avoid this check for feeds
             policyType = TupleForwardPolicy.FEED;
         } else if (propValue == null) {
             policyType = TupleForwardPolicy.FRAME_FULL;
@@ -57,7 +58,7 @@ public class DataflowUtils {
         }
         switch (policyType) {
             case FEED:
-                policy = new FeedTupleForwarder();
+                policy = new FeedTupleForwarder(feedLogManager);
                 break;
             case FRAME_FULL:
                 policy = new FrameFullTupleForwarder();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
index 7bfe698..035c1c3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
@@ -48,7 +48,7 @@ public class ExternalDataCompatibilityUtils {
         }
     }
 
-    //TODO:Add remaining aliases
+    // TODO:Add remaining aliases
     public static void addCompatabilityParameters(String adapterName, ARecordType itemType,
             Map<String, String> configuration) throws AsterixException {
         // HDFS
@@ -71,22 +71,29 @@ public class ExternalDataCompatibilityUtils {
         if (adapterName.equals(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER)
                 || adapterName.contains(ExternalDataConstants.ADAPTER_LOCALFS_CLASSNAME)
                 || adapterName.contains(ExternalDataConstants.ALIAS_LOCALFS_PUSH_ADAPTER)) {
-            if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
-                throw new AsterixException("Unspecified format parameter for local file system adapter");
+            if (configuration.get(ExternalDataConstants.KEY_READER) == null) {
+                if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+                    // If reader is specified, we will use the selected reader. If format is
+                    // specified, we will assign a suitable reader for the format.
+                    // TODO: better error message
+                    throw new AsterixException(
+                            "Unspecified (\"reader\" or \"format\") parameter for local filesystem adapter");
+                }
+                configuration.put(ExternalDataConstants.KEY_READER,
+                        configuration.get(ExternalDataConstants.KEY_FORMAT));
+                configuration.put(ExternalDataConstants.KEY_READER_STREAM, ExternalDataConstants.ALIAS_LOCALFS_ADAPTER);
             }
-            configuration.put(ExternalDataConstants.KEY_READER, configuration.get(ExternalDataConstants.KEY_FORMAT));
-            configuration.put(ExternalDataConstants.KEY_READER_STREAM, ExternalDataConstants.ALIAS_LOCALFS_ADAPTER);
         }
 
         // Socket
-        if (adapterName.equalsIgnoreCase(ExternalDataConstants.ALIAS_SOCKET_ADAPTER)) {
+        if (adapterName.equalsIgnoreCase(ExternalDataConstants.ALIAS_SOCKET_ADAPTER)
+                || adapterName.equalsIgnoreCase(ExternalDataConstants.ALIAS_SOCKET_CLIENT_ADAPTER)) {
             if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
                 throw new AsterixException("Unspecified format parameter for socket adapter");
             }
             configuration.put(ExternalDataConstants.KEY_READER, configuration.get(ExternalDataConstants.KEY_FORMAT));
             configuration.put(ExternalDataConstants.KEY_READER_STREAM, ExternalDataConstants.STREAM_SOCKET);
         }
-
         // Twitter (Pull)
         if (adapterName.equals(ExternalDataConstants.ALIAS_TWITTER_PULL_ADAPTER)) {
             configuration.put(ExternalDataConstants.KEY_READER, ExternalDataConstants.READER_TWITTER_PULL);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 9bac07c..4b2826c 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -118,6 +118,7 @@ public class ExternalDataConstants {
     public static final String READER_DELIMITED = "delimited-text";
     public static final String READER_TWITTER_PUSH = "twitter-push";
     public static final String READER_TWITTER_PULL = "twitter-pull";
+    public static final String READER_LINE_SEPARATED = "line-separated";
 
     public static final String CLUSTER_LOCATIONS = "cluster-locations";
     public static final String SCHEDULER = "hdfs-scheduler";
@@ -204,4 +205,5 @@ public class ExternalDataConstants {
      * Expected parameter values
      */
     public static final String PARAMETER_OF_SIZE_ONE = "Value of size 1";
+    public static final String LARGE_RECORD_ERROR_MESSAGE = "Record is too large";
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 7c03e4d..c36b629 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -93,7 +93,8 @@ public class ExternalDataUtils {
     }
 
     public static boolean isExternal(String aString) {
-        return (aString.contains(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) && aString.trim().length() > 1);
+        return (aString != null && aString.contains(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR)
+                && aString.trim().length() > 1);
     }
 
     public static ClassLoader getClassLoader(String dataverse, String library) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
index 72b438d..4737727 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
@@ -37,7 +37,8 @@ public class FeedLogManager {
         START,      // partition start
         END,        // partition end
         COMMIT,     // a record commit within a partition
-        SNAPSHOT    // an identifier that partitions with identifiers before this one should be ignored
+        SNAPSHOT    // an identifier that partitions with identifiers before this one should be
+                    // ignored
     }
 
     public static final String PROGRESS_LOG_FILE_NAME = "progress.log";
@@ -52,10 +53,15 @@ public class FeedLogManager {
     private BufferedWriter progressLogger;
     private BufferedWriter errorLogger;
     private BufferedWriter recordLogger;
+    private StringBuilder stringBuilder = new StringBuilder();
 
-    public FeedLogManager(File file) {
+    public FeedLogManager(File file) throws IOException {
         this.dir = file.toPath();
         this.completed = new TreeSet<String>();
+        if (!exists()) {
+            create();
+        }
+        open();
     }
 
     public void endPartition() throws IOException {
@@ -124,22 +130,31 @@ public class FeedLogManager {
     }
 
     public void logProgress(String log) throws IOException {
-        progressLogger.write(log);
-        progressLogger.newLine();
+        stringBuilder.setLength(0);
+        stringBuilder.append(log);
+        stringBuilder.append(ExternalDataConstants.LF);
+        progressLogger.write(stringBuilder.toString());
+        progressLogger.flush();
     }
 
     public void logError(String error, Throwable th) throws IOException {
-        errorLogger.append(error);
-        errorLogger.newLine();
-        errorLogger.append(th.toString());
-        errorLogger.newLine();
+        stringBuilder.setLength(0);
+        stringBuilder.append(error);
+        stringBuilder.append(ExternalDataConstants.LF);
+        stringBuilder.append(th.toString());
+        stringBuilder.append(ExternalDataConstants.LF);
+        errorLogger.write(stringBuilder.toString());
+        errorLogger.flush();
     }
 
-    public void logRecord(String record, Exception e) throws IOException {
-        recordLogger.append(record);
-        recordLogger.newLine();
-        recordLogger.append(e.toString());
-        recordLogger.newLine();
+    public void logRecord(String record, String errorMessage) throws IOException {
+        stringBuilder.setLength(0);
+        stringBuilder.append(record);
+        stringBuilder.append(ExternalDataConstants.LF);
+        stringBuilder.append(errorMessage);
+        stringBuilder.append(ExternalDataConstants.LF);
+        recordLogger.write(stringBuilder.toString());
+        recordLogger.flush();
     }
 
     public static String getSplitId(String log) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 224ee31..c128545 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -19,6 +19,8 @@
 package org.apache.asterix.external.util;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -29,8 +31,12 @@ import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartit
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 
 public class FeedUtils {
@@ -38,13 +44,27 @@ public class FeedUtils {
         return dataverseName + File.separator + feedName;
     }
 
+    public static FileSplit[] splitsForAdapter(String dataverseName, String feedName, String nodeName, int partition) {
+        File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
+        String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+        ClusterPartition nodePartition = AsterixClusterProperties.INSTANCE.getNodePartitions(nodeName)[0];
+        String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName,
+                nodePartition.getPartitionId());
+        // format: 'storage dir name'/partition_#/dataverse/feed/adapter_#
+        File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator
+                + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + partition);
+        return new FileSplit[] { StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f) };
+    }
+
     public static FileSplit[] splitsForAdapter(String dataverseName, String feedName,
             AlgebricksPartitionConstraint partitionConstraints) throws Exception {
         File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
+        String[] locations = null;
         if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) {
             throw new AlgebricksException("Can't create file splits for adapter with count partitioning constraints");
+        } else {
+            locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
         }
-        String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
         List<FileSplit> splits = new ArrayList<FileSplit>();
         String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
         int i = 0;
@@ -66,4 +86,22 @@ public class FeedUtils {
         return ioManager.getAbsoluteFileRef(ioDeviceId, relativePath);
     }
 
+    public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, int partition,
+            FileSplit[] feedLogFileSplits) throws IOException {
+        return new FeedLogManager(
+                FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getLocalFile().getFile().getPath(),
+                        feedLogFileSplits[partition].getIODeviceId(), ctx.getIOManager()).getFile());
+    }
+
+    public static void processFeedMessage(ByteBuffer input, ByteBuffer message, FrameTupleAccessor fta) {
+        // read the message and reduce the number of tuples
+        fta.reset(input);
+        int tc = fta.getTupleCount() - 1;
+        int offset = fta.getTupleStartOffset(tc);
+        int len = fta.getTupleLength(tc);
+        message.clear();
+        message.put(input.array(), offset, len);
+        message.flip();
+        IntSerDeUtils.putInt(input.array(), FrameHelper.getTupleCountOffset(input.capacity()), tc);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
index 631eef4..2e3b8ec 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
@@ -47,23 +47,25 @@ public class FileSystemWatcher {
     private final LinkedList<File> files = new LinkedList<File>();
     private Iterator<File> it;
     private final String expression;
-    private final FeedLogManager logManager;
+    private FeedLogManager logManager;
     private final Path path;
     private final boolean isFeed;
     private boolean done;
     private File current;
     private AbstractFeedDataFlowController controller;
 
-    public FileSystemWatcher(FeedLogManager logManager, Path inputResource, String expression, boolean isFeed)
-            throws IOException {
+    public FileSystemWatcher(Path inputResource, String expression, boolean isFeed) throws IOException {
         this.watcher = isFeed ? FileSystems.getDefault().newWatchService() : null;
         this.keys = isFeed ? new HashMap<WatchKey, Path>() : null;
-        this.logManager = logManager;
         this.expression = expression;
         this.path = inputResource;
         this.isFeed = isFeed;
     }
 
+    public void setFeedLogManager(FeedLogManager feedLogManager) {
+        this.logManager = feedLogManager;
+    }
+
     public void init() throws IOException {
         LinkedList<Path> dirs = null;
         dirs = new LinkedList<Path>();
@@ -91,13 +93,6 @@ public class FileSystemWatcher {
         if (logManager == null) {
             return;
         }
-        if (logManager.exists()) {
-            logManager.open();
-        } else {
-            logManager.create();
-            logManager.open();
-            return;
-        }
         /*
          * Done processing the progress log file. We now have:
          * the files that were completed.
@@ -210,6 +205,9 @@ public class FileSystemWatcher {
             return false;
         }
         files.clear();
+        if (keys.isEmpty()) {
+            return false;
+        }
         // Read new Events (Polling first to add all available files)
         WatchKey key;
         key = watcher.poll();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableCharArrayString.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableCharArrayString.java b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableCharArrayString.java
new file mode 100644
index 0000000..6722a83
--- /dev/null
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableCharArrayString.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.classad;
+
+public class AMutableCharArrayString implements Comparable<AMutableCharArrayString>, CharSequence {
+    private char[] value;
+    private int length;
+    private int increment = 64;
+
+    public AMutableCharArrayString(String str) {
+        this.value = str.toCharArray();
+        this.length = value.length;
+    }
+
+    public void decrementLength() {
+        length--;
+    }
+
+    public AMutableCharArrayString() {
+        length = 0;
+        value = new char[increment];
+    }
+
+    @Override
+    public char charAt(int i) {
+        return value[i];
+    }
+
+    @Override
+    public String toString() {
+        return String.valueOf(value, 0, length);
+    }
+
+    public AMutableCharArrayString(char[] value, int length) {
+        this.value = value;
+        this.length = length;
+    }
+
+    public AMutableCharArrayString(AMutableCharArrayString aMutableCharArrayString) {
+        this.value = new char[aMutableCharArrayString.length];
+        setValue(aMutableCharArrayString);
+    }
+
+    public AMutableCharArrayString(int iniitialSize) {
+        this.value = new char[iniitialSize];
+        this.length = 0;
+    }
+
+    private void expand() {
+        char[] tmpValue = new char[length + increment];
+        System.arraycopy(value, 0, tmpValue, 0, length);
+        value = tmpValue;
+    }
+
+    private void copyAndExpand(int newSize) {
+        char[] tmpValue = new char[newSize];
+        System.arraycopy(value, 0, tmpValue, 0, length);
+        value = tmpValue;
+    }
+
+    public void appendChar(char aChar) {
+        if (length == value.length) {
+            expand();
+        }
+        value[length] = aChar;
+        length++;
+    }
+
+    public void erase(int position) {
+        if (position != length - 1) {
+            System.arraycopy(value, position + 1, value, position, length - (position + 1));
+        }
+        length--;
+    }
+
+    public void setLength(int l) {
+        this.length = l;
+    }
+
+    public void setValue(AMutableCharArrayString otherString) {
+        if (otherString.length > value.length) {
+            // need to reallocate
+            value = new char[otherString.length];
+        }
+        System.arraycopy(otherString.value, 0, value, 0, otherString.length);
+        this.length = otherString.length;
+    }
+
+    public void setValue(String format) {
+        reset();
+        appendString(format);
+    }
+
+    public void reset() {
+        this.length = 0;
+    }
+
+    public void setValue(AMutableCharArrayString otherString, int length) {
+        if (length > value.length) {
+            // need to reallocate
+            value = new char[length];
+        }
+        System.arraycopy(otherString.value, 0, value, 0, length);
+        this.length = length;
+    }
+
+    public void setValue(String otherString, int length) {
+        if (length > value.length) {
+            // need to reallocate
+            value = new char[length];
+        }
+        otherString.getChars(0, otherString.length(), value, 0);
+        this.length = length;
+    }
+
+    public void copyValue(char[] value, int length) {
+        if (length > this.value.length) {
+            // need to reallocate
+            this.value = new char[length];
+        }
+        System.arraycopy(value, 0, this.value, 0, length);
+        this.length = length;
+    }
+
+    public void setString(char[] value, int length) {
+        this.value = value;
+        this.length = length;
+    }
+
+    public void setChar(int i, char ch) {
+        value[i] = ch;
+    }
+
+    public void incrementLength() {
+        if (value.length == length) {
+            expand();
+        }
+        length++;
+    }
+
+    public boolean isEqualsIgnoreCaseLower(char[] compareTo) {
+        if (length == compareTo.length) {
+            for (int i = 0; i < length; i++) {
+                if (compareTo[i] != Character.toLowerCase(value[i])) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public void appendString(String aString) {
+        if (value.length - length < aString.length()) {
+            copyAndExpand(value.length + aString.length());
+        }
+        aString.getChars(0, aString.length(), value, length);
+        length += aString.length();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof AMutableCharArrayString) {
+            AMutableCharArrayString s = (AMutableCharArrayString) o;
+            if (length == s.length) {
+                for (int i = 0; i < length; i++) {
+                    if (value[i] != s.value[i]) {
+                        return false;
+                    }
+                }
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean equalsIgnoreCase(Object o) {
+        if (o instanceof AMutableCharArrayString) {
+            AMutableCharArrayString s = (AMutableCharArrayString) o;
+            if (length == s.length) {
+                for (int i = 0; i < length; i++) {
+                    if (Character.toLowerCase(value[i]) != Character.toLowerCase(s.value[i])) {
+                        return false;
+                    }
+                }
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean equalsString(String aString) {
+        if (length == aString.length()) {
+            for (int i = 0; i < length; i++) {
+                if (value[i] != aString.charAt(i)) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public void erase(int position, int length) {
+        if (length + position >= this.length) {
+            this.length -= length;
+        } else {
+            System.arraycopy(value, position + length, value, position, this.length - (position + length));
+            this.length -= length;
+        }
+    }
+
+    public String substr(int i, int len) {
+        return String.copyValueOf(value, i, len);
+    }
+
+    public int firstNonDigitChar() {
+        for (int i = 0; i < length; i++) {
+            if (!Character.isDigit(value[i])) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    public int fistNonDoubleDigitChar() {
+        boolean inFraction = false;
+        boolean prevCharIsPoint = false;
+        for (int i = 0; i < length; i++) {
+            if (!Character.isDigit(value[i])) {
+                if (inFraction) {
+                    if (prevCharIsPoint) {
+                        return i - 1;
+                    } else {
+                        return i;
+                    }
+                } else {
+                    if (value[i] == '.') {
+                        inFraction = true;
+                        prevCharIsPoint = true;
+                    }
+                }
+            } else {
+                prevCharIsPoint = false;
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public int compareTo(AMutableCharArrayString o) {
+        return toString().compareTo(o.toString());
+    }
+
+    public int compareTo(String o) {
+        return toString().compareTo(o);
+    }
+
+    public int compareToIgnoreCase(AMutableCharArrayString o) {
+        return toString().compareToIgnoreCase(o.toString());
+    }
+
+    public void appendString(AMutableCharArrayString aString) {
+        if (value.length - length < aString.length()) {
+            copyAndExpand(value.length + aString.length());
+        }
+        System.arraycopy(aString.value, 0, value, length, aString.length);
+        length += aString.length();
+    }
+
+    @Override
+    public int length() {
+        return length;
+    }
+
+    public int size() {
+        return length;
+    }
+
+    @Override
+    public CharSequence subSequence(int start, int end) {
+        return substr(start, end - start);
+    }
+
+    public int firstIndexOf(char delim) {
+        return firstIndexOf(delim, 0);
+    }
+
+    public int firstIndexOf(char delim, int startIndex) {
+        int position = startIndex;
+        while (position < length) {
+            if (value[position] == delim) {
+                return position;
+            }
+            position++;
+        }
+        return -1;
+    }
+
+    public String substr(int lastIndex) {
+        return String.copyValueOf(value, lastIndex, length - lastIndex);
+    }
+
+    public void prependChar(char c) {
+        if (value.length == length) {
+            copyAndExpand(value.length * 2);
+        }
+        System.arraycopy(value, 0, value, 1, length);
+        value[0] = c;
+        length += 1;
+    }
+
+    public void insert(int i, String aString) {
+        if (value.length - length < aString.length()) {
+            copyAndExpand(value.length + aString.length());
+        }
+        System.arraycopy(value, i, value, i + aString.length(), aString.length());
+        aString.getChars(0, aString.length(), value, i);
+        length += aString.length();
+    }
+
+    public char[] getValue() {
+        return value;
+    }
+
+    public int getLength() {
+        return length;
+    }
+
+    public int getIncrement() {
+        return increment;
+    }
+
+    public void setValue(char[] value) {
+        this.value = value;
+    }
+
+    public void setIncrement(int increment) {
+        this.increment = increment;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableNumberFactor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableNumberFactor.java b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableNumberFactor.java
new file mode 100644
index 0000000..5c2a2f9
--- /dev/null
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableNumberFactor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.classad;
+
+import org.apache.asterix.external.classad.Value.NumberFactor;
+
+public class AMutableNumberFactor {
+    private NumberFactor factor;
+
+    public AMutableNumberFactor() {
+        factor = NumberFactor.NO_FACTOR;
+    }
+
+    public NumberFactor getFactor() {
+        return factor;
+    }
+
+    public void setFactor(NumberFactor factor) {
+        this.factor = factor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java
new file mode 100644
index 0000000..04fe6ec
--- /dev/null
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.classad;
+
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AttributeReference extends ExprTree {
+
+    private ExprTree expr;
+    private boolean absolute;
+    private AMutableCharArrayString attributeStr;
+    private ClassAd current = new ClassAd(false, false);
+    private ExprList adList = new ExprList();
+    private Value val = new Value();
+    private MutableBoolean rVal = new MutableBoolean(false);
+    private AttributeReference tempAttrRef;
+    private EvalState tstate = new EvalState();
+
+    public ExprTree getExpr() {
+        return expr;
+    }
+
+    public void setExpr(ExprTree expr) {
+        this.expr = expr == null ? null : expr.self();
+    }
+
+    public AttributeReference() {
+        expr = null;
+        attributeStr = null;
+        absolute = false;
+    }
+
+    /// Copy Constructor
+    public AttributeReference(AttributeReference ref) throws HyracksDataException {
+        copyFrom(ref);
+    }
+
+    /// Assignment operator
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof AttributeReference) {
+            AttributeReference ref = (AttributeReference) o;
+            return sameAs(ref);
+        }
+        return false;
+    }
+
+    /// node type
+    @Override
+    public NodeKind getKind() {
+        return NodeKind.ATTRREF_NODE;
+    }
+
+    public static AttributeReference createAttributeReference(ExprTree expr, AMutableCharArrayString attrName) {
+        return createAttributeReference(expr, attrName, false);
+    }
+
+    /**
+     * Return a copy of this attribute reference.
+     *
+     * @throws HyracksDataException
+     */
+    @Override
+    public ExprTree copy() throws HyracksDataException {
+        AttributeReference newTree = new AttributeReference();
+        newTree.copyFrom(this);
+        return newTree;
+    }
+
+    /**
+     * Copy from the given reference into this reference.
+     *
+     * @param ref
+     *            The reference to copy from.
+     * @return true if the copy succeeded, false otherwise.
+     * @throws HyracksDataException
+     */
+    public boolean copyFrom(AttributeReference ref) throws HyracksDataException {
+        if (attributeStr == null) {
+            attributeStr = new AMutableCharArrayString(ref.attributeStr);
+        } else {
+            attributeStr.setValue(ref.attributeStr);
+        }
+        if (ref.expr != null) {
+            expr = ref.expr.copy();
+        }
+        super.copyFrom(ref);
+        this.absolute = ref.absolute;
+        return true;
+    }
+
+    /**
+     * Is this attribute reference the same as another?
+     *
+     * @param tree
+     *            The reference to compare with
+     * @return true if they are the same, false otherwise.
+     */
+    @Override
+    public boolean sameAs(ExprTree tree) {
+        boolean is_same;
+        ExprTree pSelfTree = tree.self();
+        if (this == pSelfTree) {
+            is_same = true;
+        } else if (pSelfTree.getKind() != NodeKind.ATTRREF_NODE) {
+            is_same = false;
+        } else {
+            AttributeReference other_ref = (AttributeReference) pSelfTree;
+            if (absolute != other_ref.absolute || !attributeStr.equals(other_ref.attributeStr)) {
+                is_same = false;
+            } else if ((expr == null && other_ref.expr == null) || (expr.equals(other_ref.expr))
+                    || (expr != null && other_ref.expr != null && ((AttributeReference) expr).sameAs(other_ref.expr))) {
+                // Will this check result in infinite recursion? How do I stop it?
+                is_same = true;
+            } else {
+                is_same = false;
+            }
+        }
+        return is_same;
+    }
+
+    // a private ctor for use in significant expr identification
+    private AttributeReference(ExprTree tree, AMutableCharArrayString attrname, boolean absolut) {
+        attributeStr = attrname;
+        expr = tree == null ? null : tree.self();
+        absolute = absolut;
+    }
+
+    @Override
+    public void privateSetParentScope(ClassAd parent) {
+        if (expr != null) {
+            expr.setParentScope(parent);
+        }
+    }
+
+    public void getComponents(ExprTreeHolder tree, AMutableCharArrayString attr, MutableBoolean abs)
+            throws HyracksDataException {
+        tree.copyFrom(expr);
+        attr.setValue(attributeStr);
+        abs.setValue(absolute);
+    }
+
+    public EvalResult findExpr(EvalState state, ExprTreeHolder tree, ExprTreeHolder sig, boolean wantSig)
+            throws HyracksDataException {
+        // establish starting point for search
+        if (expr == null) {
+            // "attr" and ".attr"
+            current = absolute ? state.getRootAd() : state.getCurAd();
+            if (absolute && (current == null)) { // NAC - circularity so no root
+                return EvalResult.EVAL_FAIL; // NAC
+            } // NAC
+        } else {
+            // "expr.attr"
+            rVal.setValue(wantSig ? expr.publicEvaluate(state, val, sig) : expr.publicEvaluate(state, val));
+            if (!rVal.booleanValue()) {
+                return (EvalResult.EVAL_FAIL);
+            }
+
+            if (val.isUndefinedValue()) {
+                return (EvalResult.EVAL_UNDEF);
+            } else if (val.isErrorValue()) {
+                return (EvalResult.EVAL_ERROR);
+            }
+
+            if (!val.isClassAdValue(current) && !val.isListValue(adList)) {
+                return (EvalResult.EVAL_ERROR);
+            }
+        }
+
+        if (val.isListValue()) {
+            ExprList eList = new ExprList();
+            //
+            // iterate through exprList and apply attribute reference
+            // to each exprTree
+            for (ExprTree currExpr : adList.getExprList()) {
+                if (currExpr == null) {
+                    return (EvalResult.EVAL_FAIL);
+                } else {
+                    if (tempAttrRef == null) {
+                        tempAttrRef = new AttributeReference();
+                    } else {
+                        tempAttrRef.reset();
+                    }
+                    createAttributeReference(currExpr.copy(), attributeStr, false, tempAttrRef);
+                    val.clear();
+                    // Create new EvalState, within this scope, because
+                    // attrRef is only temporary, so we do not want to
+                    // cache the evaluated result in the outer state object.
+                    tstate.reset();
+                    tstate.setScopes(state.getCurAd());
+                    rVal.setValue(wantSig ? tempAttrRef.publicEvaluate(tstate, val, sig)
+                            : tempAttrRef.publicEvaluate(tstate, val));
+                    if (!rVal.booleanValue()) {
+                        return (EvalResult.EVAL_FAIL);
+                    }
+
+                    ClassAd evaledAd = new ClassAd();
+                    ExprList evaledList = new ExprList();
+                    if (val.isClassAdValue(evaledAd)) {
+                        eList.add(evaledAd);
+                        continue;
+                    } else if (val.isListValue(evaledList)) {
+                        eList.add(evaledList.copy());
+                        continue;
+                    } else {
+                        eList.add(Literal.createLiteral(val));
+                    }
+                }
+            }
+            tree.setInnerTree(ExprList.createExprList(eList));
+            ClassAd newRoot = new ClassAd();
+            tree.setParentScope(newRoot);
+            return EvalResult.EVAL_OK;
+        }
+        // lookup with scope; this may side-affect state
+
+        /* ClassAd::alternateScope is intended for transitioning Condor from
+         * old to new ClassAds. It allows unscoped attribute references
+         * in expressions that can't be found in the local scope to be
+         * looked for in an alternate scope. In Condor, the alternate
+         * scope is the Target ad in matchmaking.
+         * Expect alternateScope to be removed from a future release.
+         */
+        if (current == null) {
+            return EvalResult.EVAL_UNDEF;
+        }
+        int rc = current.lookupInScope(attributeStr.toString(), tree, state);
+        if (expr == null && !absolute && rc == EvalResult.EVAL_UNDEF.ordinal() && current.getAlternateScope() != null) {
+            rc = current.getAlternateScope().lookupInScope(attributeStr.toString(), tree, state);
+        }
+        return EvalResult.values()[rc];
+    }
+
+    @Override
+    public boolean publicEvaluate(EvalState state, Value val) throws HyracksDataException {
+        ExprTreeHolder tree = new ExprTreeHolder();
+        ExprTreeHolder dummy = new ExprTreeHolder();
+        ClassAd curAd = new ClassAd(state.getCurAd());
+        boolean rval;
+        // find the expression and the evalstate
+        switch (findExpr(state, tree, dummy, false)) {
+            case EVAL_FAIL:
+                return false;
+            case EVAL_ERROR:
+                val.setErrorValue();
+                state.setCurAd(curAd);
+                return true;
+            case EVAL_UNDEF:
+                val.setUndefinedValue();
+                state.setCurAd(curAd);
+                return true;
+            case EVAL_OK: {
+                if (state.getDepthRemaining() <= 0) {
+                    val.setErrorValue();
+                    state.setCurAd(curAd);
+                    return false;
+                }
+                state.decrementDepth();
+                rval = tree.publicEvaluate(state, val);
+                state.incrementDepth();
+                state.getCurAd().setValue(curAd);
+                return rval;
+            }
+            default:
+                throw new HyracksDataException("ClassAd:  Should not reach here");
+        }
+    }
+
+    @Override
+    public boolean privateEvaluate(EvalState state, Value val, ExprTreeHolder sig) throws HyracksDataException {
+        ExprTreeHolder tree = new ExprTreeHolder();
+        ExprTreeHolder exprSig = new ExprTreeHolder();
+        ClassAd curAd = new ClassAd(state.getCurAd());
+        MutableBoolean rval = new MutableBoolean(true);
+        switch (findExpr(state, tree, exprSig, true)) {
+            case EVAL_FAIL:
+                rval.setValue(false);
+                break;
+            case EVAL_ERROR:
+                val.setErrorValue();
+                break;
+            case EVAL_UNDEF:
+                val.setUndefinedValue();
+                break;
+            case EVAL_OK: {
+                if (state.getDepthRemaining() <= 0) {
+                    val.setErrorValue();
+                    state.getCurAd().setValue(curAd);
+                    return false;
+                }
+                state.decrementDepth();
+                rval.setValue(tree.publicEvaluate(state, val));
+                state.incrementDepth();
+                break;
+            }
+            default:
+                throw new HyracksDataException("ClassAd:  Should not reach here");
+        }
+        sig.setInnerTree((new AttributeReference(exprSig, attributeStr, absolute)));
+        state.getCurAd().setValue(curAd);
+        return rval.booleanValue();
+    }
+
+    @Override
+    public boolean privateFlatten(EvalState state, Value val, ExprTreeHolder ntree, AMutableInt32 op)
+            throws HyracksDataException {
+        ExprTreeHolder tree = new ExprTreeHolder();
+        ExprTreeHolder dummy = new ExprTreeHolder();
+        ClassAd curAd;
+        boolean rval;
+        ntree.setInnerTree(null); // Just to be safe...  wenger 2003-12-11.
+        // find the expression and the evalstate
+        curAd = state.getCurAd();
+        switch (findExpr(state, tree, dummy, false)) {
+            case EVAL_FAIL:
+                return false;
+            case EVAL_ERROR:
+                val.setErrorValue();
+                state.getCurAd().setValue(curAd);
+                return true;
+            case EVAL_UNDEF:
+                if (expr != null && state.isFlattenAndInline()) {
+                    ExprTreeHolder expr_ntree = new ExprTreeHolder();
+                    Value expr_val = new Value();
+                    if (state.getDepthRemaining() <= 0) {
+                        val.setErrorValue();
+                        state.getCurAd().setValue(curAd);
+                        return false;
+                    }
+                    state.decrementDepth();
+                    rval = expr.publicFlatten(state, expr_val, expr_ntree);
+                    state.incrementDepth();
+                    if (rval && expr_ntree.getInnerTree() != null) {
+                        ntree.setInnerTree(createAttributeReference(expr_ntree, attributeStr));
+                        if (ntree.getInnerTree() != null) {
+                            state.getCurAd().setValue(curAd);
+                            return true;
+                        }
+                    }
+                }
+                ntree.setInnerTree(copy());
+                state.getCurAd().setValue(curAd);
+                return true;
+            case EVAL_OK: {
+                // Don't flatten or inline a classad that's referred to
+                // by an attribute.
+                if (tree.getKind() == NodeKind.CLASSAD_NODE) {
+                    ntree.setInnerTree(copy());
+                    val.setUndefinedValue();
+                    return true;
+                }
+
+                if (state.getDepthRemaining() <= 0) {
+                    val.setErrorValue();
+                    state.getCurAd().setValue(curAd);
+                    return false;
+                }
+                state.decrementDepth();
+
+                rval = tree.publicFlatten(state, val, ntree);
+                state.incrementDepth();
+
+                // don't inline if it didn't flatten to a value, and clear cache
+                // do inline if FlattenAndInline was called
+                if (ntree.getInnerTree() != null) {
+                    if (state.isFlattenAndInline()) { // NAC
+                        return true; // NAC
+                    } // NAC
+                    ntree.setInnerTree(copy());
+                    val.setUndefinedValue();
+                }
+
+                state.getCurAd().setValue(curAd);
+                return rval;
+            }
+            default:
+                throw new HyracksDataException("ClassAd:  Should not reach here");
+        }
+    }
+
+    /**
+     * Factory method to create attribute reference nodes.
+     *
+     * @param expr
+     *            The expression part of the reference (i.e., in
+     *            case of expr.attr). This parameter is NULL if the reference
+     *            is absolute (i.e., .attr) or simple (i.e., attr).
+     * @param attrName
+     *            The name of the reference. This string is
+     *            duplicated internally.
+     * @param absolute
+     *            True if the reference is an absolute reference
+     *            (i.e., in case of .attr). This parameter cannot be true if
+     *            expr is not NULL, default value is false;
+     */
+    public static AttributeReference createAttributeReference(ExprTree tree, AMutableCharArrayString attrStr,
+            boolean absolut) {
+        return (new AttributeReference(tree, attrStr, absolut));
+    }
+
+    public void setValue(ExprTree tree, AMutableCharArrayString attrStr, boolean absolut) {
+        this.absolute = absolut;
+        this.attributeStr = attrStr;
+        this.expr = tree == null ? null : tree.self();
+    }
+
+    public static void createAttributeReference(ExprTree tree, AMutableCharArrayString attrStr, boolean absolut,
+            AttributeReference ref) {
+        ref.setValue(tree, attrStr, absolut);
+    }
+
+    @Override
+    public boolean privateEvaluate(EvalState state, Value val) throws HyracksDataException {
+        ExprTreeHolder tree = new ExprTreeHolder();
+        ExprTreeHolder dummy = new ExprTreeHolder();
+        ClassAd curAd;
+        boolean rval;
+
+        // find the expression and the evalstate
+        curAd = state.getCurAd();
+        switch (findExpr(state, tree, dummy, false)) {
+            case EVAL_FAIL:
+                return false;
+            case EVAL_ERROR:
+                val.setErrorValue();
+                state.getCurAd().setValue(curAd);
+                return true;
+            case EVAL_UNDEF:
+                val.setUndefinedValue();
+                state.getCurAd().setValue(curAd);
+                return true;
+
+            case EVAL_OK: {
+                if (state.getDepthRemaining() <= 0) {
+                    val.setErrorValue();
+                    state.getCurAd().setValue(curAd);
+                    return false;
+                }
+                state.decrementDepth();
+                rval = tree.publicEvaluate(state, val);
+                state.incrementDepth();
+                state.getCurAd().setValue(curAd);
+                return rval;
+            }
+            default:
+                throw new HyracksDataException("ClassAd:  Should not reach here");
+        }
+    }
+
+    @Override
+    public void reset() {
+        if (expr != null) {
+            expr.reset();
+        }
+    }
+}


Mime
View raw message