asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [06/21] incubator-asterixdb git commit: First stage of external data cleanup
Date Sun, 03 Jan 2016 17:41:04 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFactory.java
index 2ccc91c..5202093 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFactory.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.external.library;
 
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
 
 public class SumFactory implements IFunctionFactory {
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFunction.java
index d2c9e1b..d81f01b 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/SumFunction.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.external.library;
 
-import org.apache.asterix.external.library.IExternalScalarFunction;
-import org.apache.asterix.external.library.IFunctionHelper;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
 import org.apache.asterix.external.library.java.JObjects.JInt;
 
 public class SumFunction implements IExternalScalarFunction {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFactory.java
index 0d738da..f74ed38 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFactory.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.external.library;
 
-import org.apache.asterix.external.library.IExternalFunction;
-import org.apache.asterix.external.library.IFunctionFactory;
+import org.apache.asterix.external.api.IExternalFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
 
 public class UpperCaseFactory implements IFunctionFactory {
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
index 56121b0..70bd3e1 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/UpperCaseFunction.java
@@ -23,6 +23,8 @@ import java.util.Random;
 import org.apache.asterix.external.library.java.JObjects.JInt;
 import org.apache.asterix.external.library.java.JObjects.JRecord;
 import org.apache.asterix.external.library.java.JObjects.JString;
+import org.apache.asterix.external.api.IExternalScalarFunction;
+import org.apache.asterix.external.api.IFunctionHelper;
 import org.apache.asterix.external.library.java.JTypeTag;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index 39f8271..df0fb94 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -27,14 +27,13 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import org.apache.asterix.common.feeds.api.IFeedAdapter;
 import org.apache.asterix.external.dataset.adapter.StreamBasedAdapter;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
 
-public class TestTypedAdapter extends StreamBasedAdapter implements IFeedAdapter {
+public class TestTypedAdapter extends StreamBasedAdapter {
 
     private static final long serialVersionUID = 1L;
 
@@ -126,17 +125,13 @@ public class TestTypedAdapter extends StreamBasedAdapter implements IFeedAdapter
     }
 
     @Override
-    public DataExchangeMode getDataExchangeMode() {
-        return DataExchangeMode.PUSH;
-    }
-
-    @Override
-    public void stop() throws Exception {
+    public boolean stop() throws Exception {
         generator.stop();
+        return true;
     }
 
     @Override
-    public boolean handleException(Exception e) {
+    public boolean handleException(Throwable e) {
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index c177a58..6b08f3a 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -18,26 +18,30 @@
  */
 package org.apache.asterix.external.library.adapter;
 
+import java.io.InputStream;
 import java.util.Map;
 
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.external.adapter.factory.IAdapterFactory.SupportedOperation;
-import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.asterix.common.parse.ITupleForwarder;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.parser.ADMDataParser;
+import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.std.file.ITupleParser;
 import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
 
-public class TestTypedAdapterFactory implements IFeedAdapterFactory {
+public class TestTypedAdapterFactory implements IAdapterFactory {
 
     private static final long serialVersionUID = 1L;
 
-    public static final String NAME = "test_typed_adapter";
-
     private ARecordType outputType;
 
     public static final String KEY_NUM_OUTPUT_RECORDS = "num_output_records";
@@ -45,13 +49,8 @@ public class TestTypedAdapterFactory implements IFeedAdapterFactory {
     private Map<String, String> configuration;
 
     @Override
-    public SupportedOperation getSupportedOperations() {
-        return SupportedOperation.READ;
-    }
-
-    @Override
-    public String getName() {
-        return NAME;
+    public String getAlias() {
+        return "test_typed";
     }
 
     @Override
@@ -60,9 +59,47 @@ public class TestTypedAdapterFactory implements IFeedAdapterFactory {
     }
 
     @Override
-    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        ITupleParserFactory tupleParserFactory = new AsterixTupleParserFactory(configuration, outputType,
-                InputDataFormat.ADM);
+    public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+        ITupleParserFactory tupleParserFactory = new ITupleParserFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ITupleParser createTupleParser(final IHyracksCommonContext ctx) throws HyracksDataException {
+                ADMDataParser parser;
+                ITupleForwarder forwarder;
+                ArrayTupleBuilder tb;
+                try {
+                    parser = new ADMDataParser();
+                    forwarder = DataflowUtils.getTupleForwarder(configuration);
+                    forwarder.configure(configuration);
+                    tb = new ArrayTupleBuilder(1);
+                } catch (AsterixException e) {
+                    throw new HyracksDataException(e);
+                }
+                return new ITupleParser() {
+
+                    @Override
+                    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+                        try {
+                            parser.configure(configuration, outputType);
+                            parser.setInputStream(in);
+                            forwarder.initialize(ctx, writer);
+                            while (true) {
+                                tb.reset();
+                                if (!parser.parse(tb.getDataOutput())) {
+                                    break;
+                                }
+                                tb.addFieldEndOffset();
+                                forwarder.addTuple(tb);
+                            }
+                            forwarder.close();
+                        } catch (Exception e) {
+                            throw new HyracksDataException(e);
+                        }
+                    }
+                };
+            }
+        };
         return new TestTypedAdapter(tupleParserFactory, outputType, ctx, configuration, partition);
     }
 
@@ -77,14 +114,4 @@ public class TestTypedAdapterFactory implements IFeedAdapterFactory {
         this.outputType = outputType;
     }
 
-    @Override
-    public boolean isRecordTrackingEnabled() {
-        return false;
-    }
-
-    @Override
-    public IIntakeProgressTracker createIntakeProgressTracker() {
-        return null;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java b/asterix-external-data/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java
new file mode 100644
index 0000000..698e414
--- /dev/null
+++ b/asterix-external-data/src/test/java/org/apache/asterix/runtime/operator/file/ADMDataParserTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operator.file;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.external.parser.ADMDataParser;
+import org.apache.asterix.om.base.AMutableInterval;
+import org.junit.Assert;
+import org.junit.Test;
+
+import junit.extensions.PA;
+
+public class ADMDataParserTest {
+
+    @Test
+    public void test() {
+        String[] dateIntervals = { "-9537-08-04, 9656-06-03", "-9537-04-04, 9656-06-04", "-9537-10-04, 9626-09-05" };
+        AMutableInterval[] parsedDateIntervals = new AMutableInterval[] {
+                new AMutableInterval(-4202630, 2807408, (byte) 17), new AMutableInterval(-4202752, 2807409, (byte) 17),
+                new AMutableInterval(-4202569, 2796544, (byte) 17), };
+
+        String[] timeIntervals = { "12:04:45.689Z, 12:41:59.002Z", "12:10:45.169Z, 15:37:48.736Z",
+                "04:16:42.321Z, 12:22:56.816Z" };
+        AMutableInterval[] parsedTimeIntervals = new AMutableInterval[] {
+                new AMutableInterval(43485689, 45719002, (byte) 18),
+                new AMutableInterval(43845169, 56268736, (byte) 18),
+                new AMutableInterval(15402321, 44576816, (byte) 18), };
+
+        String[] dateTimeIntervals = { "-2640-10-11T17:32:15.675Z, 4104-02-01T05:59:11.902Z",
+                "0534-12-08T08:20:31.487Z, 6778-02-16T22:40:21.653Z",
+                "2129-12-12T13:18:35.758Z, 8647-07-01T13:10:19.691Z" };
+        AMutableInterval[] parsedDateTimeIntervals = new AMutableInterval[] {
+                new AMutableInterval(-145452954464325L, 67345192751902L, (byte) 16),
+                new AMutableInterval(-45286270768513L, 151729886421653L, (byte) 16),
+                new AMutableInterval(5047449515758L, 210721439419691L, (byte) 16) };
+
+        Thread[] threads = new Thread[16];
+        AtomicInteger errorCount = new AtomicInteger(0);
+        for (int i = 0; i < threads.length; ++i) {
+            threads[i] = new Thread(new Runnable() {
+                ADMDataParser parser = new ADMDataParser();
+                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                DataOutput dos = new DataOutputStream(bos);
+
+                @Override
+                public void run() {
+                    try {
+                        int round = 0;
+                        while (round++ < 10000) {
+                            // Test parseDateInterval.
+                            for (int index = 0; index < dateIntervals.length; ++index) {
+                                PA.invokeMethod(parser, "parseDateInterval(java.lang.String, java.io.DataOutput)",
+                                        dateIntervals[index], dos);
+                                AMutableInterval aInterval = (AMutableInterval) PA.getValue(parser, "aInterval");
+                                Assert.assertTrue(aInterval.equals(parsedDateIntervals[index]));
+                            }
+
+                            // Tests parseTimeInterval.
+                            for (int index = 0; index < timeIntervals.length; ++index) {
+                                PA.invokeMethod(parser, "parseTimeInterval(java.lang.String, java.io.DataOutput)",
+                                        timeIntervals[index], dos);
+                                AMutableInterval aInterval = (AMutableInterval) PA.getValue(parser, "aInterval");
+                                Assert.assertTrue(aInterval.equals(parsedTimeIntervals[index]));
+                            }
+
+                            // Tests parseDateTimeInterval.
+                            for (int index = 0; index < dateTimeIntervals.length; ++index) {
+                                PA.invokeMethod(parser, "parseDateTimeInterval(java.lang.String, java.io.DataOutput)",
+                                        dateTimeIntervals[index], dos);
+                                AMutableInterval aInterval = (AMutableInterval) PA.getValue(parser, "aInterval");
+                                Assert.assertTrue(aInterval.equals(parsedDateTimeIntervals[index]));
+                            }
+                        }
+                    } catch (Exception e) {
+                        errorCount.incrementAndGet();
+                        e.printStackTrace();
+                    }
+                }
+            });
+            // Kicks off test threads.
+            threads[i].start();
+        }
+
+        // Joins all the threads.
+        try {
+            for (int i = 0; i < threads.length; ++i) {
+                threads[i].join();
+            }
+        } catch (InterruptedException e) {
+            throw new IllegalStateException(e);
+        }
+        // Asserts no failure.
+        Assert.assertTrue(errorCount.get() == 0);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java
index 0613498..42827b4 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java
@@ -20,7 +20,7 @@ import java.util.Collection;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.external.dataset.adapter.FileSystemBasedAdapter;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.IdentitiyResolverFactory;
 import org.apache.asterix.test.aql.TestExecutor;
 import org.apache.asterix.test.runtime.HDFSCluster;
@@ -43,8 +43,8 @@ public abstract class AbstractExecutionIT {
     protected static final Logger LOGGER = Logger.getLogger(AbstractExecutionIT.class.getName());
 
     protected static final String PATH_ACTUAL = "ittest" + File.separator;
-    protected static final String PATH_BASE = StringUtils.join(new String[] { "..", "asterix-app", "src", "test",
-            "resources", "runtimets" }, File.separator);
+    protected static final String PATH_BASE = StringUtils
+            .join(new String[] { "..", "asterix-app", "src", "test", "resources", "runtimets" }, File.separator);
 
     protected static final String HDFS_BASE = "../asterix-app/";
 
@@ -63,21 +63,21 @@ public abstract class AbstractExecutionIT {
 
         //This is nasty but there is no very nice way to set a system property on each NC that I can figure.
         //The main issue is that we need the NC resolver to be the IdentityResolver and not the DNSResolver.
-        FileUtils.copyFile(
-                new File(StringUtils.join(new String[] { "src", "test", "resources", "integrationts", "asterix-configuration.xml" }, File.separator)),
+        FileUtils
+                .copyFile(
+                        new File(StringUtils.join(new String[] { "src", "test", "resources", "integrationts",
+                                "asterix-configuration.xml" }, File.separator)),
                 new File(AsterixInstallerIntegrationUtil.getManagixHome() + "/conf/asterix-configuration.xml"));
 
         AsterixLifecycleIT.setUp();
 
-
         FileUtils.copyDirectoryStructure(
                 new File(StringUtils.join(new String[] { "..", "asterix-app", "data" }, File.separator)),
                 new File(AsterixInstallerIntegrationUtil.getManagixHome() + "/clusters/local/working_dir/data"));
 
-
         // Set the node resolver to be the identity resolver that expects node names
         // to be node controller ids; a valid assumption in test environment.
-        System.setProperty(FileSystemBasedAdapter.NODE_RESOLVER_FACTORY_PROPERTY,
+        System.setProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY,
                 IdentitiyResolverFactory.class.getName());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java
index 438bb05..1da01c3 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java
@@ -43,16 +43,21 @@ public class AsterixExternalLibraryIT {
 
     @BeforeClass
     public static void setUp() throws Exception {
-        AsterixInstallerIntegrationUtil.init();
-        File asterixInstallerProjectDir = new File(System.getProperty("user.dir"));
-        String asterixExternalLibraryPath = asterixInstallerProjectDir.getParentFile().getAbsolutePath()
-                + File.separator + LIBRARY_PATH;
-        LOGGER.info("Installing library :" + LIBRARY_NAME + " located at " + asterixExternalLibraryPath
-                + " in dataverse " + LIBRARY_DATAVERSE);
-        AsterixInstallerIntegrationUtil.installLibrary(LIBRARY_NAME, LIBRARY_DATAVERSE, asterixExternalLibraryPath);
-        AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
-        TestCaseContext.Builder b = new TestCaseContext.Builder();
-        testCaseCollection = b.build(new File(PATH_BASE));
+        try {
+            AsterixInstallerIntegrationUtil.init();
+            File asterixInstallerProjectDir = new File(System.getProperty("user.dir"));
+            String asterixExternalLibraryPath = asterixInstallerProjectDir.getParentFile().getAbsolutePath()
+                    + File.separator + LIBRARY_PATH;
+            LOGGER.info("Installing library :" + LIBRARY_NAME + " located at " + asterixExternalLibraryPath
+                    + " in dataverse " + LIBRARY_DATAVERSE);
+            AsterixInstallerIntegrationUtil.installLibrary(LIBRARY_NAME, LIBRARY_DATAVERSE, asterixExternalLibraryPath);
+            AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
+            TestCaseContext.Builder b = new TestCaseContext.Builder();
+            testCaseCollection = b.build(new File(PATH_BASE));
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
     }
 
     @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
index c000d55..34a8733 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
@@ -93,6 +93,7 @@ public class AsterixInstallerIntegrationUtil {
         String command = "shutdown";
         cmdHandler.processCommand(command.split(" "));
 
+        //TODO: This must be fixed, an arbitrary wait for 2s is not a reliable way to make sure the process have completed successfully.
         Thread.sleep(2000);
 
         // start zookeeper

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-installer/src/test/java/org/apache/asterix/installer/test/ClusterExecutionIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ClusterExecutionIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ClusterExecutionIT.java
index 93e9f6d..cf69e1a 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ClusterExecutionIT.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ClusterExecutionIT.java
@@ -18,10 +18,12 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.logging.Level;
-import java.util.logging.Logger;
 
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.IdentitiyResolverFactory;
 import org.apache.asterix.test.aql.TestExecutor;
 import org.apache.asterix.test.runtime.HDFSCluster;
+import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.commons.lang3.StringUtils;
 import org.codehaus.plexus.util.FileUtils;
 import org.junit.AfterClass;
@@ -31,20 +33,16 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import org.apache.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import org.apache.asterix.external.util.IdentitiyResolverFactory;
-import org.apache.asterix.testframework.context.TestCaseContext;
-
 /**
  * Runs the runtime test cases under 'asterix-app/src/test/resources/runtimets'.
  */
 @RunWith(Parameterized.class)
-public class ClusterExecutionIT extends AbstractExecutionIT{
+public class ClusterExecutionIT extends AbstractExecutionIT {
 
     private static final String CLUSTER_CC_ADDRESS = "10.10.0.2";
     private static final int CLUSTER_CC_API_PORT = 19002;
 
-    private final static TestExecutor testExecutor = new TestExecutor(CLUSTER_CC_ADDRESS,CLUSTER_CC_API_PORT);
+    private final static TestExecutor testExecutor = new TestExecutor(CLUSTER_CC_ADDRESS, CLUSTER_CC_API_PORT);
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -60,13 +58,14 @@ public class ClusterExecutionIT extends AbstractExecutionIT{
         AsterixClusterLifeCycleIT.setUp();
 
         FileUtils.copyDirectoryStructure(
-                new File(StringUtils.join(new String[] { "..", "asterix-app", "data" }, File.separator)), new File(
-                StringUtils.join(new String[] { "src", "test", "resources", "clusterts", "managix-working", "data" },
+                new File(StringUtils.join(new String[] { "..", "asterix-app", "data" }, File.separator)),
+                new File(StringUtils.join(
+                        new String[] { "src", "test", "resources", "clusterts", "managix-working", "data" },
                         File.separator)));
 
         // Set the node resolver to be the identity resolver that expects node names
         // to be node controller ids; a valid assumption in test environment.
-        System.setProperty(FileSystemBasedAdapter.NODE_RESOLVER_FACTORY_PROPERTY,
+        System.setProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY,
                 IdentitiyResolverFactory.class.getName());
     }
 
@@ -100,6 +99,7 @@ public class ClusterExecutionIT extends AbstractExecutionIT{
         this.tcCtx = tcCtx;
     }
 
+    @Override
     @Test
     public void test() throws Exception {
         testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixExecutionIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixExecutionIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixExecutionIT.java
index 17184c7..492f173 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixExecutionIT.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixExecutionIT.java
@@ -14,25 +14,9 @@
  */
 package org.apache.asterix.installer.test;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import org.apache.asterix.external.util.IdentitiyResolverFactory;
-import org.apache.asterix.test.aql.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.asterix.test.runtime.HDFSCluster;
-import org.codehaus.plexus.util.FileUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Runs the runtime test cases under 'asterix-app/src/test/resources/runtimets'.
@@ -40,7 +24,6 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class ManagixExecutionIT extends AbstractExecutionIT {
 
-
     private TestCaseContext tcCtx;
 
     public ManagixExecutionIT(TestCaseContext tcCtx) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
index 2e66afd..b9c2072 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
@@ -17,19 +17,8 @@ package org.apache.asterix.installer.test;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
-import org.apache.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import org.apache.asterix.external.util.IdentitiyResolverFactory;
-import org.apache.asterix.test.aql.TestExecutor;
-import org.apache.asterix.test.runtime.HDFSCluster;
 import org.apache.asterix.testframework.context.TestCaseContext;
-import org.apache.commons.lang3.StringUtils;
-import org.codehaus.plexus.util.FileUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -38,7 +27,7 @@ import org.junit.runners.Parameterized.Parameters;
  * Runs the runtime test cases under 'asterix-app/src/test/resources/runtimets'.
  */
 @RunWith(Parameterized.class)
-public class ManagixSqlppExecutionIT extends ManagixExecutionIT{
+public class ManagixSqlppExecutionIT extends ManagixExecutionIT {
 
     @Parameters
     public static Collection<Object[]> tests() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index a62abaa..71c762a 100644
--- a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -29,7 +29,7 @@ import org.apache.asterix.common.feeds.FeedConnectionRequest;
 import org.apache.asterix.common.feeds.FeedId;
 import org.apache.asterix.common.feeds.FeedPolicyAccessor;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
+import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.lang.aql.parser.AQLParserFactory;
 import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.IParserFactory;
@@ -188,7 +188,7 @@ public class SubscribeFeedStatement implements Statement {
         try {
             switch (feed.getFeedType()) {
                 case PRIMARY:
-                    Triple<IFeedAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+                    Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
 
                     factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput((PrimaryFeed) feed, policyAccessor,
                             mdTxnCtx);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
----------------------------------------------------------------------
diff --git a/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java b/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
index 3e21653..a113864 100644
--- a/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
+++ b/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
@@ -72,11 +72,34 @@ public class [LEXER_NAME] {
 // ================================================================================
 //  Public interface
 // ================================================================================
-    
+
     public [LEXER_NAME](java.io.Reader stream) throws IOException{
         reInit(stream);
     }
 
+    public [LEXER_NAME]() throws IOException{
+        reInit();
+    }
+
+    public void setBuffer(char[] buffer){
+        this.buffer = buffer;
+        tokenBegin = bufpos = 0;
+        containsEscapes = false;
+        line++;
+        tokenBegin = -1;
+    }
+
+    public void reInit(){
+        bufsize        = Integer.MAX_VALUE;
+        endOf_UNUSED_Buffer = bufsize;
+        endOf_USED_Buffer = bufsize;
+        line           = 0;
+        prevCharIsCR   = false;
+        prevCharIsLF   = false;
+        tokenBegin     = -1;
+        maxUnusedBufferSize = bufsize;
+    }
+
     public void reInit(java.io.Reader stream) throws IOException{
         done();
         inputStream    = stream;
@@ -239,5 +262,5 @@ public class [LEXER_NAME] {
       bufsize += maxUnusedBufferSize;
       endOf_UNUSED_Buffer = bufsize;
       tokenBegin = 0;
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 5317fc2..a73a236 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -39,9 +39,13 @@ import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.context.BaseOperationTracker;
+import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import org.apache.asterix.external.adapter.factory.IAdapterFactory;
+import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
+import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.runtime.GenericSocketFeedAdapterFactory;
+import org.apache.asterix.external.runtime.SocketClientAdapterFactory;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
@@ -76,12 +80,14 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
 import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 import org.apache.hyracks.storage.common.file.ILocalResourceFactory;
@@ -316,19 +322,8 @@ public class MetadataBootstrap {
     }
 
     private static void insertInitialAdapters(MetadataTransactionContext mdTxnCtx) throws Exception {
-        String[] builtInAdapterClassNames = new String[] {
-                "org.apache.asterix.external.adapter.factory.PullBasedAzureTwitterAdapterFactory",
-                "org.apache.asterix.external.adapter.factory.NCFileSystemAdapterFactory",
-                "org.apache.asterix.external.adapter.factory.HDFSAdapterFactory",
-                "org.apache.asterix.external.adapter.factory.HiveAdapterFactory",
-                "org.apache.asterix.external.adapter.factory.PullBasedTwitterAdapterFactory",
-                "org.apache.asterix.external.adapter.factory.PushBasedTwitterAdapterFactory",
-                "org.apache.asterix.external.adapter.factory.RSSFeedAdapterFactory",
-                "org.apache.asterix.external.adapter.factory.CNNFeedAdapterFactory",
-                "org.apache.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory",
-                "org.apache.asterix.tools.external.data.TwitterFirehoseFeedAdapterFactory",
-                "org.apache.asterix.tools.external.data.GenericSocketFeedAdapterFactory",
-                "org.apache.asterix.tools.external.data.SocketClientAdapterFactory" };
+        String[] builtInAdapterClassNames = new String[] { GenericAdapterFactory.class.getName(),
+                GenericSocketFeedAdapterFactory.class.getName(), SocketClientAdapterFactory.class.getName() };
         DatasourceAdapter adapter;
         for (String adapterClassName : builtInAdapterClassNames) {
             adapter = getAdapter(adapterClassName);
@@ -349,11 +344,9 @@ public class MetadataBootstrap {
     }
 
     private static void insertInitialCompactionPolicies(MetadataTransactionContext mdTxnCtx) throws Exception {
-        String[] builtInCompactionPolicyClassNames = new String[] {
-                "org.apache.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory",
-                "org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory",
-                "org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory",
-                "org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory" };
+        String[] builtInCompactionPolicyClassNames = new String[] { ConstantMergePolicyFactory.class.getName(),
+                PrefixMergePolicyFactory.class.getName(), NoMergePolicyFactory.class.getName(),
+                CorrelatedPrefixMergePolicyFactory.class.getName() };
         CompactionPolicy compactionPolicy;
         for (String policyClassName : builtInCompactionPolicyClassNames) {
             compactionPolicy = getCompactionPolicyEntity(policyClassName);
@@ -362,7 +355,7 @@ public class MetadataBootstrap {
     }
 
     private static DatasourceAdapter getAdapter(String adapterFactoryClassName) throws Exception {
-        String adapterName = ((IAdapterFactory) (Class.forName(adapterFactoryClassName).newInstance())).getName();
+        String adapterName = ((IAdapterFactory) (Class.forName(adapterFactoryClassName).newInstance())).getAlias();
         return new DatasourceAdapter(new AdapterIdentifier(MetadataConstants.METADATA_DATAVERSE_NAME, adapterName),
                 adapterFactoryClassName, DatasourceAdapter.AdapterType.INTERNAL);
     }
@@ -378,8 +371,7 @@ public class MetadataBootstrap {
         ClusterPartition metadataPartition = propertiesProvider.getMetadataProperties().getMetadataPartition();
         int metadataDeviceId = metadataPartition.getIODeviceNum();
         String metadataPartitionPath = SplitsAndConstraintsUtil.prepareStoragePartitionPath(
-                AsterixClusterProperties.INSTANCE.getStorageDirectoryName(),
-                metadataPartition.getPartitionId());
+                AsterixClusterProperties.INSTANCE.getStorageDirectoryName(), metadataPartition.getPartitionId());
         String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
         FileReference file = ioManager.getAbsoluteFileRef(metadataDeviceId, resourceName);
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index 745f436..c9157df 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -23,7 +23,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -52,17 +51,17 @@ import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactor
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
-import org.apache.asterix.common.parse.IParseFileSplitsDecl;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
-import org.apache.asterix.external.adapter.factory.IAdapterFactory;
-import org.apache.asterix.external.adapter.factory.IAdapterFactory.SupportedOperation;
-import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
+import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
+import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
-import org.apache.asterix.external.indexing.dataflow.HDFSLookupAdapterFactory;
-import org.apache.asterix.external.indexing.operators.ExternalLoopkupOperatorDiscriptor;
+import org.apache.asterix.external.operators.ExternalBTreeSearchOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalLookupOperatorDescriptor;
+import org.apache.asterix.external.operators.ExternalRTreeSearchOperatorDescriptor;
+import org.apache.asterix.external.provider.AdapterFactoryProvider;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.AqlLinearizeComparatorFactoryProvider;
@@ -100,8 +99,6 @@ import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.om.util.NonTaggedFormatUtil;
 import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
-import org.apache.asterix.runtime.external.ExternalBTreeSearchOperatorDescriptor;
-import org.apache.asterix.runtime.external.ExternalRTreeSearchOperatorDescriptor;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
@@ -153,11 +150,8 @@ import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.primitive.ShortPointable;
 import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
 import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
 import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
@@ -202,8 +196,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
     private final AsterixStorageProperties storageProperties;
 
-    public static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
-
     public String getPropertyValue(String propertyName) {
         return config.get(propertyName);
     }
@@ -490,10 +482,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(JobSpecification jobSpec,
             LoadableDataSource alds, IAdapterFactory adapterFactory, RecordDescriptor rDesc, boolean isPKAutoGenerated,
             List<List<String>> primaryKeys, ARecordType recType, int pkIndex) throws AlgebricksException {
-        if (!(adapterFactory.getSupportedOperations().equals(SupportedOperation.READ)
-                || adapterFactory.getSupportedOperations().equals(SupportedOperation.READ_WRITE))) {
-            throw new AlgebricksException(" External dataset adapter does not support read operation");
-        }
+
         ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, rDesc,
                 adapterFactory);
         AlgebricksPartitionConstraint constraint;
@@ -552,24 +541,9 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     private IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
             Map<String, String> configuration, IAType itemType, boolean isPKAutoGenerated,
             List<List<String>> primaryKeys) throws AlgebricksException {
-        IAdapterFactory adapterFactory;
-        DatasourceAdapter adapterEntity;
-        String adapterFactoryClassname;
         try {
-            adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
-                    adapterName);
-            if (adapterEntity != null) {
-                adapterFactoryClassname = adapterEntity.getClassname();
-                adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
-            } else {
-                adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
-                if (adapterFactoryClassname == null) {
-                    throw new AlgebricksException(" Unknown adapter :" + adapterName);
-                }
-                adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
-            }
-
-            adapterFactory.configure(configuration, (ARecordType) itemType);
+            IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(adapterName, configuration,
+                    (ARecordType) itemType);
 
             // check to see if dataset is indexed
             Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -602,11 +576,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             throw new AlgebricksException("Can only scan datasets of records.");
         }
 
-        if (!(adapterFactory.getSupportedOperations().equals(SupportedOperation.READ)
-                || adapterFactory.getSupportedOperations().equals(SupportedOperation.READ_WRITE))) {
-            throw new AlgebricksException(" External dataset adapter does not support read operation");
-        }
-
+        @SuppressWarnings("rawtypes")
         ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
@@ -623,33 +593,11 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
     }
 
-    @SuppressWarnings("rawtypes")
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildScannerRuntime(JobSpecification jobSpec,
-            IAType itemType, IParseFileSplitsDecl decl, IDataFormat format) throws AlgebricksException {
-        if (itemType.getTypeTag() != ATypeTag.RECORD) {
-            throw new AlgebricksException("Can only scan datasets of records.");
-        }
-        ARecordType rt = (ARecordType) itemType;
-        ITupleParserFactory tupleParser = format.createTupleParser(rt, decl);
-        FileSplit[] splits = decl.getSplits();
-        IFileSplitProvider scannerSplitProvider = new ConstantFileSplitProvider(splits);
-        ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
-        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-        IOperatorDescriptor scanner = new FileScanOperatorDescriptor(jobSpec, scannerSplitProvider, tupleParser,
-                scannerDesc);
-        String[] locs = new String[splits.length];
-        for (int i = 0; i < splits.length; i++) {
-            locs[i] = splits[i].getNodeName();
-        }
-        AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(locs);
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, apc);
-    }
-
-    public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IFeedAdapterFactory> buildFeedIntakeRuntime(
+    public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime(
             JobSpecification jobSpec, PrimaryFeed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception {
-        Triple<IFeedAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+        Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
         factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx);
-        IFeedAdapterFactory adapterFactory = factoryOutput.first;
+        IAdapterFactory adapterFactory = factoryOutput.first;
         FeedIntakeOperatorDescriptor feedIngestor = null;
         switch (factoryOutput.third) {
             case INTERNAL:
@@ -665,7 +613,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         }
 
         AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint();
-        return new Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IFeedAdapterFactory>(feedIngestor,
+        return new Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory>(feedIngestor,
                 partitionConstraint, adapterFactory);
     }
 
@@ -1515,7 +1463,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp());
 
             // Generate Output Record format
-            ISerializerDeserializer[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
+            ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
             ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
             ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
 
@@ -2102,7 +2050,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
      * Calculate an estimate size of the bloom filter. Note that this is an
      * estimation which assumes that the data is going to be uniformly
      * distributed across all partitions.
-     *
      * @param dataset
      * @return Number of elements that will be used to create a bloom filter per
      *         dataset per partition
@@ -2147,24 +2094,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         return SplitsAndConstraintsUtil.splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
     }
 
-    private static Map<String, String> initializeAdapterFactoryMapping() {
-        Map<String, String> adapterFactoryMapping = new HashMap<String, String>();
-        adapterFactoryMapping.put("org.apache.asterix.external.dataset.adapter.NCFileSystemAdapter",
-                "org.apache.asterix.external.adapter.factory.NCFileSystemAdapterFactory");
-        adapterFactoryMapping.put("org.apache.asterix.external.dataset.adapter.HDFSAdapter",
-                "org.apache.asterix.external.adapter.factory.HDFSAdapterFactory");
-        adapterFactoryMapping.put("org.apache.asterix.external.dataset.adapter.PullBasedTwitterAdapter",
-                "org.apache.asterix.external.dataset.adapter.PullBasedTwitterAdapterFactory");
-        adapterFactoryMapping.put("org.apache.asterix.external.dataset.adapter.RSSFeedAdapter",
-                "org.apache.asterix.external.dataset.adapter..RSSFeedAdapterFactory");
-        adapterFactoryMapping.put("org.apache.asterix.external.dataset.adapter.CNNFeedAdapter",
-                "org.apache.asterix.external.dataset.adapter.CNNFeedAdapterFactory");
-        adapterFactoryMapping.put("org.apache.asterix.tools.external.data.RateControlledFileSystemBasedAdapter",
-                "org.apache.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory");
-
-        return adapterFactoryMapping;
-    }
-
     public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName)
             throws MetadataException {
         DatasourceAdapter adapter = null;
@@ -2232,35 +2161,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         return FormatUtils.getDefaultFormat();
     }
 
-    /**
-     * Add HDFS scheduler and the cluster location constraint into the scheduler
-     *
-     * @param properties
-     *            the original dataset properties
-     * @return a new map containing the original dataset properties and the
-     *         scheduler/locations
-     */
-    private static Map<String, Object> wrapProperties(Map<String, String> properties) {
-        Map<String, Object> wrappedProperties = new HashMap<String, Object>();
-        wrappedProperties.putAll(properties);
-        // wrappedProperties.put(SCHEDULER, hdfsScheduler);
-        // wrappedProperties.put(CLUSTER_LOCATIONS, getClusterLocations());
-        return wrappedProperties;
-    }
-
-    /**
-     * Adapt the original properties to a string-object map
-     *
-     * @param properties
-     *            the original properties
-     * @return the new stirng-object map
-     */
-    private static Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
-        Map<String, Object> wrappedProperties = new HashMap<String, Object>();
-        wrappedProperties.putAll(properties);
-        return wrappedProperties;
-    }
-
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
             String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
         return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx, dataverseName,
@@ -2284,67 +2184,54 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             IVariableTypeEnvironment typeEnv, List<LogicalVariable> outputVars, IOperatorSchema opSchema,
             JobGenContext context, AqlMetadataProvider metadataProvider, boolean retainNull)
                     throws AlgebricksException {
-        // Get data type
-        IAType itemType = null;
         try {
+            // Get data type
+            IAType itemType = null;
             itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
                     dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype();
-        } catch (MetadataException e) {
-            e.printStackTrace();
-            throw new AlgebricksException("Unable to get item type from metadata " + e);
-        }
-        if (itemType.getTypeTag() != ATypeTag.RECORD) {
-            throw new AlgebricksException("Can only scan datasets of records.");
-        }
 
-        // Create the adapter factory <- right now there is only one. if there are more in the future, we can create a map->
-        ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
-        HDFSLookupAdapterFactory adapterFactory = new HDFSLookupAdapterFactory();
-        adapterFactory.configure(itemType, retainInput, ridIndexes, datasetDetails.getProperties(), retainNull);
+            // Create the adapter factory <- right now there is only one. if there are more in the future, we can create a map->
+            ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+            LookupAdapterFactory<?> adapterFactory = AdapterFactoryProvider.getAdapterFactory(
+                    datasetDetails.getProperties(), (ARecordType) itemType, ridIndexes, retainInput, retainNull,
+                    context.getNullWriterFactory());
 
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo;
-        try {
-            compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
-        } catch (MetadataException e) {
-            throw new AlgebricksException(" Unabel to create merge policy factory for external dataset", e);
-        }
-
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        // Create the file index data flow helper
-        ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
-                compactionInfo.first, compactionInfo.second,
-                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(),
-                ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, metadataProvider), !temp);
-
-        // Create the out record descriptor, appContext and fileSplitProvider for the files index
-        RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-        try {
-            spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
-                    dataset.getDatasetName(), dataset.getDatasetName().
-
-            concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX), false);
-        } catch (
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo;
+            try {
+                compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+            } catch (MetadataException e) {
+                throw new AlgebricksException(" Unabel to create merge policy factory for external dataset", e);
+            }
 
-        Exception e)
+            boolean temp = datasetDetails.isTemp();
+            // Create the file index data flow helper
+            ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
+                    compactionInfo.first, compactionInfo.second,
+                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                    metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(),
+                    ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, metadataProvider), !temp);
 
-        {
+            // Create the out record descriptor, appContext and fileSplitProvider for the files index
+            RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+            spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+                    dataset.getDatasetName(),
+                    dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX), false);
+            ISearchOperationCallbackFactory searchOpCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+                    : new SecondaryIndexSearchOperationCallbackFactory();
+            // Create the operator
+            ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
+                    outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(),
+                    appContext.getStorageManagerInterface(), spPc.first, dataset.getDatasetId(),
+                    metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), searchOpCallbackFactory,
+                    retainNull, context.getNullWriterFactory());
+
+            // Return value
+            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, spPc.second);
+        } catch (Exception e) {
             throw new AlgebricksException(e);
         }
-
-        ISearchOperationCallbackFactory searchOpCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
-                : new SecondaryIndexSearchOperationCallbackFactory();
-        // Create the operator
-        ExternalLoopkupOperatorDiscriptor op = new ExternalLoopkupOperatorDiscriptor(jobSpec, adapterFactory,
-                outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(),
-                appContext.getStorageManagerInterface(), spPc.first, dataset.getDatasetId(),
-                metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), searchOpCallbackFactory,
-                retainNull, context.getNullWriterFactory());
-
-        // Return value
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, spPc.second);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapter.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapter.java
deleted file mode 100644
index f4484cf..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapter.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.declared;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public class FieldExtractingAdapter implements IDatasourceAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    private final RecordDescriptor inRecDesc;
-
-    private final RecordDescriptor outRecDesc;
-
-    private final IDatasourceAdapter wrappedAdapter;
-
-    private final FieldExtractingPushRuntime fefw;
-
-    public FieldExtractingAdapter(IHyracksTaskContext ctx, RecordDescriptor inRecDesc, RecordDescriptor outRecDesc,
-            int[][] extractFields, ARecordType rType, IDatasourceAdapter wrappedAdapter) {
-        this.inRecDesc = inRecDesc;
-        this.outRecDesc = outRecDesc;
-        this.wrappedAdapter = wrappedAdapter;
-        fefw = new FieldExtractingPushRuntime(ctx, extractFields, rType);
-    }
-
-    @Override
-    public void start(int partition, IFrameWriter writer) throws Exception {
-        fefw.setInputRecordDescriptor(0, inRecDesc);
-        fefw.setFrameWriter(0, writer, outRecDesc);
-        fefw.open();
-        try {
-            wrappedAdapter.start(partition, fefw);
-        } catch (Throwable t) {
-            fefw.fail();
-            throw t;
-        } finally {
-            fefw.close();
-        }
-    }
-
-    private static class FieldExtractingPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
-
-        private final IHyracksTaskContext ctx;
-
-        private final int[][] extractFields;
-
-        private final ARecordType rType;
-
-        private final int nullBitmapSize;
-
-        private final ArrayTupleBuilder tb;
-
-        public FieldExtractingPushRuntime(IHyracksTaskContext ctx, int[][] extractFields, ARecordType rType) {
-            this.ctx = ctx;
-            this.extractFields = extractFields;
-            this.rType = rType;
-            nullBitmapSize = ARecordType.computeNullBitmapSize(rType);
-            tb = new ArrayTupleBuilder(extractFields.length + 1);
-        }
-
-        @Override
-        public void open() throws HyracksDataException {
-            initAccessAppendRef(ctx);
-        }
-
-        @Override
-        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            tAccess.reset(buffer);
-            for (int i = 0; i < tAccess.getTupleCount(); ++i) {
-                tb.reset();
-                tRef.reset(tAccess, i);
-                byte[] record = tRef.getFieldData(0);
-                int recStart = tRef.getFieldStart(0);
-                int recLength = tRef.getFieldLength(0);
-                for (int f = 0; f < extractFields.length; ++f) {
-                    try {
-                        byte[] subRecord = record;
-                        int subFStart = recStart;
-                        int subFOffset = 0;
-                        boolean isNull = false;
-                        IAType subFType = rType;
-                        int subFLen = recLength;
-                        int subBitMapSize = nullBitmapSize;
-                        byte[] subRecordTmp;
-
-                        for (int j = 0; j < extractFields[f].length; j++) {
-                            //Get offset for subfield
-                            subFOffset = ARecordSerializerDeserializer.getFieldOffsetById(subRecord, subFStart,
-                                    extractFields[f][j], subBitMapSize, ((ARecordType) subFType).isOpen());
-                            if (subFOffset == 0) {
-                                tb.getDataOutput().write(ATypeTag.NULL.serialize());
-                                isNull = true;
-                                break;
-                            } else {
-                                //Get type of subfield
-                                subFType = ((ARecordType) subFType).getFieldTypes()[extractFields[f][j]];
-                                try {
-                                    //Get length of subfield
-                                    subFLen = NonTaggedFormatUtil.getFieldValueLength(subRecord,
-                                            subFStart + subFOffset, subFType.getTypeTag(), false);
-
-                                    if (j < extractFields[f].length - 1) {
-                                        subRecordTmp = new byte[subFLen + 1];
-                                        subRecordTmp[0] = subFType.getTypeTag().serialize();
-                                        System.arraycopy(subRecord, subFStart + subFOffset, subRecordTmp, 1, subFLen);
-                                        subRecord = subRecordTmp;
-                                        subFStart = 0;
-                                        subBitMapSize = ARecordType.computeNullBitmapSize((ARecordType) subFType);
-                                    }
-
-                                } catch (AsterixException e) {
-                                    throw new HyracksDataException(e);
-                                }
-                            }
-                        }
-
-                        if (!isNull) {
-                            tb.getDataOutput().write(subFType.getTypeTag().serialize());
-                            tb.getDataOutput().write(subRecord, subFStart + subFOffset, subFLen);
-                        }
-
-                    } catch (IOException e) {
-                        throw new HyracksDataException(e);
-                    }
-                    tb.addFieldEndOffset();
-                }
-                tb.addField(record, recStart, tRef.getFieldLength(0));
-                appendToFrameFromTupleBuilder(tb);
-            }
-        }
-
-        @Override
-        public void close() throws HyracksDataException {
-            flushIfNotFailed();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapterFactory.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapterFactory.java
deleted file mode 100644
index 989e4a3..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapterFactory.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.declared;
-
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.external.adapter.factory.IAdapterFactory;
-import org.apache.asterix.external.adapter.factory.IAdapterFactory.SupportedOperation;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class FieldExtractingAdapterFactory implements IAdapterFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    private final IAdapterFactory wrappedAdapterFactory;
-
-    private final RecordDescriptor inRecDesc;
-
-    private final RecordDescriptor outRecDesc;
-
-    private final int[][] extractFields;
-
-    private final ARecordType rType;
-
-    public FieldExtractingAdapterFactory(IAdapterFactory wrappedAdapterFactory, RecordDescriptor inRecDesc,
-            RecordDescriptor outRecDesc, int[][] extractFields, ARecordType rType) {
-        this.wrappedAdapterFactory = wrappedAdapterFactory;
-        this.inRecDesc = inRecDesc;
-        this.outRecDesc = outRecDesc;
-        this.extractFields = extractFields;
-        this.rType = rType;
-    }
-
-    @Override
-    public SupportedOperation getSupportedOperations() {
-        return wrappedAdapterFactory.getSupportedOperations();
-    }
-
-    @Override
-    public String getName() {
-        return "FieldExtractingAdapter[ " + wrappedAdapterFactory.getName() + " ]";
-    }
-
-  
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return wrappedAdapterFactory.getPartitionConstraint();
-    }
-
-    @Override
-    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        IDatasourceAdapter wrappedAdapter = wrappedAdapterFactory.createAdapter(ctx, partition);
-        return new FieldExtractingAdapter(ctx, inRecDesc, outRecDesc, extractFields, rType, wrappedAdapter);
-    }
-    
-    @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
-        wrappedAdapterFactory.configure(configuration, outputType);
-    }
-
-    @Override
-    public ARecordType getAdapterOutputType() {
-        return wrappedAdapterFactory.getAdapterOutputType();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/PKGeneratingAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/PKGeneratingAdapter.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/PKGeneratingAdapter.java
deleted file mode 100644
index e0c5fc0..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/PKGeneratingAdapter.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.declared;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.om.base.AMutableUUID;
-import org.apache.asterix.om.base.AUUID;
-import org.apache.asterix.om.pointables.ARecordVisitablePointable;
-import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public class PKGeneratingAdapter implements IDatasourceAdapter {
-
-    private static final long serialVersionUID = 1L;
-    private final RecordDescriptor inRecDesc;
-    private final RecordDescriptor outRecDesc;
-    private final IDatasourceAdapter wrappedAdapter;
-    private final PKGeneratingPushRuntime pkRuntime;
-    private final int pkIndex;
-
-    public PKGeneratingAdapter(IHyracksTaskContext ctx, RecordDescriptor inRecDesc, RecordDescriptor outRecDesc,
-            ARecordType inRecType, ARecordType outRecType, IDatasourceAdapter wrappedAdapter, int pkIndex) {
-        this.inRecDesc = inRecDesc;
-        this.outRecDesc = outRecDesc;
-        this.wrappedAdapter = wrappedAdapter;
-        this.pkRuntime = new PKGeneratingPushRuntime(ctx, inRecType, outRecType);
-        this.pkIndex = pkIndex;
-    }
-
-    @Override
-    public void start(int partition, IFrameWriter writer) throws Exception {
-        pkRuntime.setInputRecordDescriptor(0, inRecDesc);
-        pkRuntime.setFrameWriter(0, writer, outRecDesc);
-        pkRuntime.open();
-        try {
-            wrappedAdapter.start(partition, pkRuntime);
-        } catch (Throwable t) {
-            pkRuntime.fail();
-            throw t;
-        } finally {
-            pkRuntime.close();
-        }
-    }
-
-    private class PKGeneratingPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
-        private final IHyracksTaskContext ctx;
-        private final ARecordType outRecType;
-        private final ArrayTupleBuilder tb;
-        private final AMutableUUID aUUID = new AMutableUUID(0, 0);
-        private final byte AUUIDTag = ATypeTag.UUID.serialize();
-        private final byte[] serializedUUID = new byte[16];
-        private final PointableAllocator pa = new PointableAllocator();
-        private final ARecordVisitablePointable recordPointable;
-        private final IAType[] outClosedTypes;
-
-        private final RecordBuilder recBuilder;
-
-        public PKGeneratingPushRuntime(IHyracksTaskContext ctx, ARecordType inRecType, ARecordType outRecType) {
-            this.ctx = ctx;
-            this.outRecType = outRecType;
-            this.tb = new ArrayTupleBuilder(2);
-            this.recBuilder = new RecordBuilder();
-            this.recordPointable = (ARecordVisitablePointable) pa.allocateRecordValue(inRecType);
-            this.outClosedTypes = outRecType.getFieldTypes();
-        }
-
-        /*
-         * We write this method in low level instead of using pre-existing libraries since this will be called for each record and to avoid 
-         * size validation
-         */
-        private void serializeUUID(AUUID aUUID, byte[] serializedUUID) {
-            long v = aUUID.getLeastSignificantBits();
-            serializedUUID[0] = (byte) (v >>> 56);
-            serializedUUID[1] = (byte) (v >>> 48);
-            serializedUUID[2] = (byte) (v >>> 40);
-            serializedUUID[3] = (byte) (v >>> 32);
-            serializedUUID[4] = (byte) (v >>> 24);
-            serializedUUID[5] = (byte) (v >>> 16);
-            serializedUUID[6] = (byte) (v >>> 8);
-            serializedUUID[7] = (byte) (v >>> 0);
-            v = aUUID.getMostSignificantBits();
-            serializedUUID[8] = (byte) (v >>> 56);
-            serializedUUID[9] = (byte) (v >>> 48);
-            serializedUUID[10] = (byte) (v >>> 40);
-            serializedUUID[11] = (byte) (v >>> 32);
-            serializedUUID[12] = (byte) (v >>> 24);
-            serializedUUID[13] = (byte) (v >>> 16);
-            serializedUUID[14] = (byte) (v >>> 8);
-            serializedUUID[15] = (byte) (v >>> 0);
-        }
-
-        @Override
-        public void open() throws HyracksDataException {
-            initAccessAppendRef(ctx);
-            recBuilder.reset(outRecType);
-            recBuilder.init();
-        }
-
-        @Override
-        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            try {
-                tAccess.reset(buffer);
-                for (int i = 0; i < tAccess.getTupleCount(); ++i) {
-                    tb.reset();
-                    tRef.reset(tAccess, i);
-
-                    // We need to do the following:
-                    // 1. generate a UUID
-                    // 2. fill in the first field with the UUID
-                    aUUID.nextUUID();
-                    tb.getDataOutput().writeByte(AUUIDTag);
-                    serializeUUID(aUUID, serializedUUID);
-                    tb.getDataOutput().write(serializedUUID);
-                    tb.addFieldEndOffset();
-                    // 3. fill in the second field with the record after adding to it the UUID
-                    recordPointable.set(tRef.getFieldData(0), tRef.getFieldStart(0), tRef.getFieldLength(0));
-                    // Start by closed fields
-                    int inIndex = 0;
-                    for (int f = 0; f < outClosedTypes.length; f++) {
-                        if (f == pkIndex) {
-                            recBuilder.addField(f, serializedUUID);
-                        } else {
-                            recBuilder.addField(f, recordPointable.getFieldValues().get(inIndex));
-                            inIndex++;
-                        }
-                    }
-
-                    // Add open fields
-                    if (outRecType.isOpen()) {
-                        List<IVisitablePointable> fp = recordPointable.getFieldNames();
-                        if (fp.size() >= outClosedTypes.length) {
-                            int index = outClosedTypes.length - 1;
-                            while (index < fp.size()) {
-                                recBuilder.addField(fp.get(index), recordPointable.getFieldValues().get(index));
-                                index++;
-                            }
-                        }
-                    }
-                    //write the record
-                    recBuilder.write(tb.getDataOutput(), true);
-                    tb.addFieldEndOffset();
-                    appendToFrameFromTupleBuilder(tb);
-                }
-            } catch (Exception e) {
-                throw new HyracksDataException("Error in the auto id generation and merge of the record", e);
-            }
-        }
-
-        @Override
-        public void close() throws HyracksDataException {
-            flushIfNotFailed();
-        }
-    }
-
-}


Mime
View raw message