asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [20/21] incubator-asterixdb git commit: First stage of external data cleanup
Date Sun, 03 Jan 2016 17:41:18 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
index 822390a..4067508 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.common.feeds.api;
 
-public interface ITupleTrackingFeedAdapter extends IFeedAdapter {
+public interface ITupleTrackingFeedAdapter extends IDataSourceAdapter {
 
     public void tuplePersistedTimeCallback(long timestamp);
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/main/java/org/apache/asterix/common/parse/IAsterixTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/IAsterixTupleParser.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/IAsterixTupleParser.java
deleted file mode 100644
index 87f4c58..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/parse/IAsterixTupleParser.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.parse;
-
-import java.util.Map;
-
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-
-public interface IAsterixTupleParser extends ITupleParser{
-
-    public void configure(Map<String, String> configuration);
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
deleted file mode 100644
index df5a983..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.parse;
-
-import java.util.Map;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public interface ITupleForwardPolicy {
-
-    public static final String PARSER_POLICY = "parser-policy";
-    
-    public enum TupleForwardPolicyType {
-        FRAME_FULL,
-        COUNTER_TIMER_EXPIRED,
-        RATE_CONTROLLED
-    }
-
-    public void configure(Map<String, String> configuration);
-
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
-
-    public TupleForwardPolicyType getType();
-
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
-
-    public void close() throws HyracksDataException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
new file mode 100644
index 0000000..5ee065a
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.parse;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public interface ITupleForwarder {
+
+    public static final String FORWARD_POLICY = "forward-policy";
+
+    public enum TupleForwardPolicy {
+        FRAME_FULL,
+        COUNTER_TIMER_EXPIRED,
+        RATE_CONTROLLED
+    }
+
+    public void configure(Map<String, String> configuration);
+
+    public void initialize(IHyracksCommonContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
+
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
+
+    public void close() throws HyracksDataException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index d8147b6..6afe692 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -29,7 +29,6 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.net.URL;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -63,12 +62,12 @@ public class TestExecutor {
     private static final long MAX_URL_LENGTH = 2000l;
     private static Method managixExecuteMethod = null;
 
-    private static String host;
-    private static int port;
+    private String host;
+    private int port;
 
     public TestExecutor() {
-        this.host = "127.0.0.1";
-        this.port = 19002;
+        host = "127.0.0.1";
+        port = 19002;
     }
 
     public TestExecutor(String host, int port) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java b/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java
index 96fb6ec..a10b5ea 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java
@@ -29,6 +29,10 @@ import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.event.error.EventException;
+import org.apache.asterix.event.model.AsterixInstance;
+import org.apache.asterix.installer.schema.conf.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -38,10 +42,6 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.event.error.EventException;
-import org.apache.asterix.event.model.AsterixInstance;
-import org.apache.asterix.installer.schema.conf.Configuration;
 
 public class ZooKeeperService implements ILookupService {
 
@@ -63,6 +63,7 @@ public class ZooKeeperService implements ILookupService {
     private LinkedBlockingQueue<String> msgQ = new LinkedBlockingQueue<String>();
     private ZooKeeperWatcher watcher = new ZooKeeperWatcher(msgQ);
 
+    @Override
     public boolean isRunning(Configuration conf) throws Exception {
         List<String> servers = conf.getZookeeper().getServers().getServer();
         int clientPort = conf.getZookeeper().getClientPort().intValue();
@@ -92,6 +93,7 @@ public class ZooKeeperService implements ILookupService {
         return isRunning;
     }
 
+    @Override
     public void startService(Configuration conf) throws Exception {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Starting ZooKeeper at " + zkConnectionString);
@@ -107,22 +109,29 @@ public class ZooKeeperService implements ILookupService {
         for (String zkServer : zkServers) {
             cmdBuffer.append(zkServer + " ");
         }
-        Runtime.getRuntime().exec(cmdBuffer.toString());
+        //TODO: Create a better way to interact with zookeeper
+        Process zkProcess = Runtime.getRuntime().exec(cmdBuffer.toString());
+        int output = zkProcess.waitFor();
+        if (output != 0) {
+            throw new Exception("Error starting zookeeper server. output code = " + output);
+        }
         zk = new ZooKeeper(zkConnectionString, ZOOKEEPER_SESSION_TIME_OUT, watcher);
-        String head = msgQ.poll(10, TimeUnit.SECONDS);
+        String head = msgQ.poll(60, TimeUnit.SECONDS);
         if (head == null) {
             StringBuilder msg = new StringBuilder(
                     "Unable to start Zookeeper Service. This could be because of the following reasons.\n");
             msg.append("1) Managix is incorrectly configured. Please run " + "managix validate"
                     + " to run a validation test and correct the errors reported.");
-            msg.append("\n2) If validation in (1) is successful, ensure that java_home parameter is set correctly in Managix configuration ("
-                    + AsterixEventServiceUtil.MANAGIX_CONF_XML + ")");
+            msg.append(
+                    "\n2) If validation in (1) is successful, ensure that java_home parameter is set correctly in Managix configuration ("
+                            + AsterixEventServiceUtil.MANAGIX_CONF_XML + ")");
             throw new Exception(msg.toString());
         }
         msgQ.take();
         createRootIfNotExist();
     }
 
+    @Override
     public void stopService(Configuration conf) throws Exception {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Stopping ZooKeeper running at " + zkConnectionString);
@@ -141,6 +150,7 @@ public class ZooKeeperService implements ILookupService {
         }
     }
 
+    @Override
     public void writeAsterixInstance(AsterixInstance asterixInstance) throws Exception {
         String instanceBasePath = ASTERIX_INSTANCE_BASE_PATH + File.separator + asterixInstance.getName();
         ByteArrayOutputStream b = new ByteArrayOutputStream();
@@ -166,6 +176,7 @@ public class ZooKeeperService implements ILookupService {
         }
     }
 
+    @Override
     public AsterixInstance getAsterixInstance(String name) throws Exception {
         String path = ASTERIX_INSTANCE_BASE_PATH + File.separator + name;
         Stat stat = zk.exists(ASTERIX_INSTANCE_BASE_PATH + File.separator + name, false);
@@ -176,10 +187,12 @@ public class ZooKeeperService implements ILookupService {
         return readAsterixInstanceObject(asterixInstanceBytes);
     }
 
+    @Override
     public boolean exists(String path) throws Exception {
         return zk.exists(ASTERIX_INSTANCE_BASE_PATH + File.separator + path, false) != null;
     }
 
+    @Override
     public void removeAsterixInstance(String name) throws Exception {
         if (!exists(name)) {
             throw new EventException("Asterix instance by name " + name + " does not exists.");
@@ -195,6 +208,7 @@ public class ZooKeeperService implements ILookupService {
         zk.delete(ASTERIX_INSTANCE_BASE_PATH + File.separator + name, DEFAULT_NODE_VERSION);
     }
 
+    @Override
     public List<AsterixInstance> getAsterixInstances() throws Exception {
         List<String> instanceNames = zk.getChildren(ASTERIX_INSTANCE_BASE_PATH, false);
         List<AsterixInstance> asterixInstances = new ArrayList<AsterixInstance>();
@@ -207,13 +221,14 @@ public class ZooKeeperService implements ILookupService {
         return asterixInstances;
     }
 
-    private AsterixInstance readAsterixInstanceObject(byte[] asterixInstanceBytes) throws IOException,
-            ClassNotFoundException {
+    private AsterixInstance readAsterixInstanceObject(byte[] asterixInstanceBytes)
+            throws IOException, ClassNotFoundException {
         ByteArrayInputStream b = new ByteArrayInputStream(asterixInstanceBytes);
         ObjectInputStream ois = new ObjectInputStream(b);
         return (AsterixInstance) ois.readObject();
     }
 
+    @Override
     public void updateAsterixInstance(AsterixInstance updatedInstance) throws Exception {
         removeAsterixInstance(updatedInstance.getName());
         writeAsterixInstance(updatedInstance);
@@ -249,6 +264,7 @@ class ZooKeeperWatcher implements Watcher {
         this.msgQ = msgQ;
     }
 
+    @Override
     public void process(WatchedEvent wEvent) {
         if (wEvent.getState() == KeeperState.SyncConnected) {
             msgQ.add("connected");
@@ -276,7 +292,8 @@ class ZookeeperUtil {
         List<String> servers = conf.getZookeeper().getServers().getServer();
         int serverId = 1;
         for (String server : servers) {
-            buffer.append("server" + "." + serverId + "=" + server + ":" + leaderConnPort + ":" + leaderElecPort + "\n");
+            buffer.append(
+                    "server" + "." + serverId + "=" + server + ":" + leaderConnPort + ":" + leaderElecPort + "\n");
             serverId++;
         }
         AsterixEventServiceUtil.dumpToFile(zooKeeperConfigPath, buffer.toString());

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 4062b23..867c96b 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -35,6 +35,43 @@
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.asterix</groupId>
+                <artifactId>lexer-generator-maven-plugin</artifactId>
+                <version>0.8.8-SNAPSHOT</version>
+                <configuration>
+                    <grammarFile>src/main/resources/adm.grammar</grammarFile>
+                    <outputDir>${project.build.directory}/generated-sources/org/apache/asterix/runtime/operators/file/adm</outputDir>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>generate-lexer</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>generate-lexer</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.9</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${project.build.directory}/generated-sources/</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
                 <groupId>org.jvnet.jaxb2.maven2</groupId>
                 <artifactId>maven-jaxb2-plugin</artifactId>
                 <version>0.9.0</version>
@@ -91,6 +128,50 @@
                 </executions>
             </plugin>
         </plugins>
+        <pluginManagement>
+            <plugins>
+                <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+                <plugin>
+                    <groupId>org.eclipse.m2e</groupId>
+                    <artifactId>lifecycle-mapping</artifactId>
+                    <version>1.0.0</version>
+                    <configuration>
+                        <lifecycleMappingMetadata>
+                            <pluginExecutions>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId> org.apache.asterix</groupId>
+                                        <artifactId> lexer-generator-maven-plugin</artifactId>
+                                        <versionRange>[0.1,)</versionRange>
+                                        <goals>
+                                            <goal>generate-lexer</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <execute>
+                                            <runOnIncremental>false</runOnIncremental>
+                                        </execute>
+                                    </action>
+                                </pluginExecution>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId> org.codehaus.mojo</groupId>
+                                        <artifactId>build-helper-maven-plugin</artifactId>
+                                        <versionRange>[1.7,)</versionRange>
+                                        <goals>
+                                            <goal>add-source</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore />
+                                    </action>
+                                </pluginExecution>
+                            </pluginExecutions>
+                        </lifecycleMappingMetadata>
+                    </configuration>
+                </plugin>
+            </plugins>
+        </pluginManagement>
     </build>
     <dependencies>
         <dependency>
@@ -139,6 +220,10 @@
             <scope>compile</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+        </dependency>
+        <dependency>
             <groupId>net.java.dev.rome</groupId>
             <artifactId>rome-fetcher</artifactId>
             <version>1.0.0</version>
@@ -186,5 +271,11 @@
             <artifactId>jdo2-api</artifactId>
             <version>2.3-20090302111651</version>
         </dependency>
+        <dependency>
+            <groupId>com.e-movimento.tinytools</groupId>
+            <artifactId>privilegedaccessor</artifactId>
+            <version>1.2.2</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
deleted file mode 100644
index 8b7b6d5..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.external.dataset.adapter.RSSFeedAdapter;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * A factory class for creating the @see {CNNFeedAdapter}.
- */
-public class CNNFeedAdapterFactory implements IFeedAdapterFactory {
-    private static final long serialVersionUID = 1L;
-
-    private Map<String, String> configuration;
-
-    private List<String> feedURLs = new ArrayList<String>();
-    private static Map<String, String> topicFeeds = new HashMap<String, String>();
-    private ARecordType recordType;
-    public static final String KEY_RSS_URL = "topic";
-    public static final String KEY_INTERVAL = "interval";
-    public static final String TOP_STORIES = "topstories";
-    public static final String WORLD = "world";
-    public static final String US = "us";
-    public static final String SPORTS = "sports";
-    public static final String BUSINESS = "business";
-    public static final String POLITICS = "politics";
-    public static final String CRIME = "crime";
-    public static final String TECHNOLOGY = "technology";
-    public static final String HEALTH = "health";
-    public static final String ENTERNTAINMENT = "entertainemnt";
-    public static final String TRAVEL = "travel";
-    public static final String LIVING = "living";
-    public static final String VIDEO = "video";
-    public static final String STUDENT = "student";
-    public static final String POPULAR = "popular";
-    public static final String RECENT = "recent";
-
-    private void initTopics() {
-        topicFeeds.put(TOP_STORIES, "http://rss.cnn.com/rss/cnn_topstories.rss");
-        topicFeeds.put(WORLD, "http://rss.cnn.com/rss/cnn_world.rss");
-        topicFeeds.put(US, "http://rss.cnn.com/rss/cnn_us.rss");
-        topicFeeds.put(SPORTS, "http://rss.cnn.com/rss/si_topstories.rss");
-        topicFeeds.put(BUSINESS, "http://rss.cnn.com/rss/money_latest.rss");
-        topicFeeds.put(POLITICS, "http://rss.cnn.com/rss/cnn_allpolitics.rss");
-        topicFeeds.put(CRIME, "http://rss.cnn.com/rss/cnn_crime.rss");
-        topicFeeds.put(TECHNOLOGY, "http://rss.cnn.com/rss/cnn_tech.rss");
-        topicFeeds.put(HEALTH, "http://rss.cnn.com/rss/cnn_health.rss");
-        topicFeeds.put(ENTERNTAINMENT, "http://rss.cnn.com/rss/cnn_showbiz.rss");
-        topicFeeds.put(LIVING, "http://rss.cnn.com/rss/cnn_living.rss");
-        topicFeeds.put(VIDEO, "http://rss.cnn.com/rss/cnn_freevideo.rss");
-        topicFeeds.put(TRAVEL, "http://rss.cnn.com/rss/cnn_travel.rss");
-        topicFeeds.put(STUDENT, "http://rss.cnn.com/rss/cnn_studentnews.rss");
-        topicFeeds.put(POPULAR, "http://rss.cnn.com/rss/cnn_mostpopular.rss");
-        topicFeeds.put(RECENT, "http://rss.cnn.com/rss/cnn_latest.rss");
-    }
-
-    @Override
-    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        RSSFeedAdapter cnnFeedAdapter = new RSSFeedAdapter(configuration, recordType, ctx);
-        return cnnFeedAdapter;
-    }
-
-    @Override
-    public String getName() {
-        return "cnn_feed";
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
-        this.configuration = configuration;
-        String rssURLProperty = configuration.get(KEY_RSS_URL);
-        if (rssURLProperty == null) {
-            throw new IllegalArgumentException("no rss url provided");
-        }
-        initializeFeedURLs(rssURLProperty);
-        this.recordType = outputType;
-    }
-
-    private void initializeFeedURLs(String rssURLProperty) {
-        feedURLs.clear();
-        String[] rssTopics = rssURLProperty.split(",");
-        initTopics();
-        for (String topic : rssTopics) {
-            String feedURL = topicFeeds.get(topic);
-            if (feedURL == null) {
-                throw new IllegalArgumentException(
-                        " unknown topic :" + topic + " please choose from the following " + getValidTopics());
-            }
-            feedURLs.add(feedURL);
-        }
-    }
-
-    private static String getValidTopics() {
-        StringBuilder builder = new StringBuilder();
-        for (String key : topicFeeds.keySet()) {
-            builder.append(key);
-            builder.append(" ");
-        }
-        return new String(builder);
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(feedURLs.size());
-    }
-
-    @Override
-    public SupportedOperation getSupportedOperations() {
-        return SupportedOperation.READ;
-    }
-
-    @Override
-    public ARecordType getAdapterOutputType() {
-        return recordType;
-    }
-
-    @Override
-    public boolean isRecordTrackingEnabled() {
-        return false;
-    }
-
-    @Override
-    public IIntakeProgressTracker createIntakeProgressTracker() {
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
new file mode 100644
index 0000000..2e7158d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.adapter.factory;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IIndexibleExternalDataSource;
+import org.apache.asterix.external.api.IIndexingAdapterFactory;
+import org.apache.asterix.external.dataset.adapter.GenericAdapter;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.provider.DataflowControllerProvider;
+import org.apache.asterix.external.provider.DatasourceFactoryProvider;
+import org.apache.asterix.external.provider.ParserFactoryProvider;
+import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterFactory {
+
+    private static final long serialVersionUID = 1L;
+    private IExternalDataSourceFactory dataSourceFactory;
+    private IDataParserFactory dataParserFactory;
+    private ARecordType recordType;
+    private Map<String, String> configuration;
+    private List<ExternalFile> files;
+    private boolean indexingOp;
+
+    @Override
+    public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
+        this.files = files;
+        this.indexingOp = indexingOp;
+    }
+
+    @Override
+    public String getAlias() {
+        return ExternalDataConstants.ALIAS_GENERIC_ADAPTER;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return dataSourceFactory.getPartitionConstraint();
+    }
+
+    /**
+     * Runs on each node controller (after serialization-deserialization)
+     */
+    @Override
+    public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+        IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition,
+                dataSourceFactory, dataParserFactory, configuration, indexingOp);
+        return new GenericAdapter(controller);
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+        this.recordType = outputType;
+        this.configuration = configuration;
+        dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
+        dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
+        prepare();
+        ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory, dataParserFactory);
+    }
+
+    private void prepare() throws Exception {
+        if (dataSourceFactory.isIndexible() && (files != null)) {
+            ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
+        }
+        dataSourceFactory.configure(configuration);
+        dataParserFactory.setRecordType(recordType);
+        dataParserFactory.configure(configuration);
+    }
+
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return recordType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
deleted file mode 100644
index c4a96f4..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.external.dataset.adapter.HDFSAdapter;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.dataflow.HDFSObjectTupleParserFactory;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.ICCContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.hdfs.dataflow.ConfFactory;
-import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
-import org.apache.hyracks.hdfs.scheduler.Scheduler;
-
-/**
- * A factory class for creating an instance of HDFSAdapter
- */
-public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory {
-    private static final long serialVersionUID = 1L;
-
-    public static final String HDFS_ADAPTER_NAME = "hdfs";
-    public static final String CLUSTER_LOCATIONS = "cluster-locations";
-    public static transient String SCHEDULER = "hdfs-scheduler";
-
-    public static final String KEY_HDFS_URL = "hdfs";
-    public static final String KEY_PATH = "path";
-    public static final String KEY_INPUT_FORMAT = "input-format";
-    public static final String INPUT_FORMAT_TEXT = "text-input-format";
-    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
-    // New
-    public static final String KEY_PARSER = "parser";
-    public static final String PARSER_HIVE = "hive-parser";
-    public static final String INPUT_FORMAT_RC = "rc-input-format";
-    public static final String FORMAT_BINARY = "binary";
-
-    public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path";
-
-    // Hadoop property names constants
-    public static final String CLASS_NAME_TEXT_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
-    public static final String CLASS_NAME_SEQUENCE_INPUT_FORMAT = "org.apache.hadoop.mapred.SequenceFileInputFormat";
-    public static final String CLASS_NAME_RC_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
-    public static final String CLASS_NAME_HDFS_FILESYSTEM = "org.apache.hadoop.hdfs.DistributedFileSystem";
-    public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
-    public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
-    public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
-    public static final String KEY_HADOOP_INPUT_FORMAT = "mapred.input.format.class";
-    public static final String KEY_HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
-    public static final String KEY_HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
-
-    private transient AlgebricksPartitionConstraint clusterLocations;
-    private String[] readSchedule;
-    private boolean executed[];
-    private InputSplitsFactory inputSplitsFactory;
-    private ConfFactory confFactory;
-    private IAType atype;
-    private boolean configured = false;
-    public static Scheduler hdfsScheduler;
-    private static boolean initialized = false;
-    protected List<ExternalFile> files;
-
-    private static Scheduler initializeHDFSScheduler() {
-        ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
-        Scheduler scheduler = null;
-        try {
-            scheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
-                    ccContext.getClusterControllerInfo().getClientNetPort());
-        } catch (HyracksException e) {
-            throw new IllegalStateException("Cannot obtain hdfs scheduler");
-        }
-        return scheduler;
-    }
-
-    protected static final Map<String, String> formatClassNames = initInputFormatMap();
-
-    protected static Map<String, String> initInputFormatMap() {
-        Map<String, String> formatClassNames = new HashMap<String, String>();
-        formatClassNames.put(INPUT_FORMAT_TEXT, CLASS_NAME_TEXT_INPUT_FORMAT);
-        formatClassNames.put(INPUT_FORMAT_SEQUENCE, CLASS_NAME_SEQUENCE_INPUT_FORMAT);
-        formatClassNames.put(INPUT_FORMAT_RC, CLASS_NAME_RC_INPUT_FORMAT);
-        return formatClassNames;
-    }
-
-    public JobConf getJobConf() throws HyracksDataException {
-        return confFactory.getConf();
-    }
-
-    @Override
-    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        JobConf conf = confFactory.getConf();
-        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
-        String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
-        HDFSAdapter hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, nodeName,
-                parserFactory, ctx, configuration, files);
-        return hdfsAdapter;
-    }
-
-    @Override
-    public String getName() {
-        return HDFS_ADAPTER_NAME;
-    }
-
-    public static JobConf configureJobConf(Map<String, String> configuration) throws Exception {
-        JobConf conf = new JobConf();
-        String formatClassName = formatClassNames.get(configuration.get(KEY_INPUT_FORMAT).trim());
-        String localShortCircuitSocketPath = configuration.get(KEY_LOCAL_SOCKET_PATH);
-        if (formatClassName == null) {
-            formatClassName = configuration.get(KEY_INPUT_FORMAT).trim();
-        }
-        conf.set(KEY_HADOOP_FILESYSTEM_URI, configuration.get(KEY_HDFS_URL).trim());
-        conf.set(KEY_HADOOP_FILESYSTEM_CLASS, CLASS_NAME_HDFS_FILESYSTEM);
-        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
-        conf.set(KEY_HADOOP_INPUT_DIR, configuration.get(KEY_PATH).trim());
-        conf.set(KEY_HADOOP_INPUT_FORMAT, formatClassName);
-
-        // Enable local short circuit reads if user supplied the parameters
-        if (localShortCircuitSocketPath != null) {
-            conf.set(KEY_HADOOP_SHORT_CIRCUIT, "true");
-            conf.set(KEY_HADOOP_SOCKET_PATH, localShortCircuitSocketPath.trim());
-        }
-        return conf;
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        if (!configured) {
-            throw new IllegalStateException("Adapter factory has not been configured yet");
-        }
-        return clusterLocations;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
-        if (!initialized) {
-            hdfsScheduler = initializeHDFSScheduler();
-            initialized = true;
-        }
-        this.configuration = configuration;
-        JobConf conf = configureJobConf(configuration);
-        confFactory = new ConfFactory(conf);
-
-        clusterLocations = getClusterLocations();
-        int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
-
-        // if files list was set, we restrict the splits to the list since this dataset is indexed
-        InputSplit[] inputSplits;
-        if (files == null) {
-            inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
-        } else {
-            inputSplits = getSplits(conf);
-        }
-        inputSplitsFactory = new InputSplitsFactory(inputSplits);
-
-        readSchedule = hdfsScheduler.getLocationConstraints(inputSplits);
-        executed = new boolean[readSchedule.length];
-        Arrays.fill(executed, false);
-        configured = true;
-
-        atype = outputType;
-        configureFormat(atype);
-    }
-
-    @Override
-    public SupportedOperation getSupportedOperations() {
-        return SupportedOperation.READ;
-    }
-
-    public static AlgebricksPartitionConstraint getClusterLocations() {
-        ArrayList<String> locs = new ArrayList<String>();
-        Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
-        for (String i : stores.keySet()) {
-            String[] nodeStores = stores.get(i);
-            for (int j = 0; j < nodeStores.length; j++) {
-                //two readers per partition
-                locs.add(i);
-                locs.add(i);
-            }
-        }
-        String[] cluster = new String[locs.size()];
-        cluster = locs.toArray(cluster);
-        return new AlgebricksAbsolutePartitionConstraint(cluster);
-    }
-
-    @Override
-    public ARecordType getAdapterOutputType() {
-        return (ARecordType) atype;
-    }
-
-    @Override
-    public InputDataFormat getInputDataFormat() {
-        return InputDataFormat.UNKNOWN;
-    }
-
-    /*
-     * This method is overridden to do the following:
-     * if data is text data (adm or delimited text), it will use a text tuple parser,
-     * otherwise it will use hdfs record object parser
-     */
-    @Override
-    protected void configureFormat(IAType sourceDatatype) throws Exception {
-        String specifiedFormat = configuration.get(AsterixTupleParserFactory.KEY_FORMAT);
-        if (specifiedFormat == null) {
-            throw new IllegalArgumentException(" Unspecified data format");
-        }
-
-        if (AsterixTupleParserFactory.FORMAT_BINARY.equalsIgnoreCase(specifiedFormat)) {
-            parserFactory = new HDFSObjectTupleParserFactory((ARecordType) atype, this, configuration);
-        } else {
-            InputDataFormat inputFormat = InputDataFormat.UNKNOWN;
-            if (AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
-                inputFormat = InputDataFormat.DELIMITED;
-            } else if (AsterixTupleParserFactory.FORMAT_ADM.equalsIgnoreCase(specifiedFormat)) {
-                inputFormat = InputDataFormat.ADM;
-            }
-            parserFactory = new AsterixTupleParserFactory(configuration, (ARecordType) sourceDatatype, inputFormat);
-        }
-
-    }
-
-    /**
-     * Instead of creating the split using the input format, we do it manually
-     * This function returns fileSplits (1 per hdfs file block) irrespective of the number of partitions
-     * and the produced splits only cover intersection between current files in hdfs and files stored internally
-     * in AsterixDB
-     * 1. NoOp means appended file
-     * 2. AddOp means new file
-     * 3. UpdateOp means the delta of a file
-     *
-     * @return
-     * @throws IOException
-     */
-    protected InputSplit[] getSplits(JobConf conf) throws IOException {
-        ArrayList<FileSplit> fileSplits = new ArrayList<FileSplit>();
-        ArrayList<ExternalFile> orderedExternalFiles = new ArrayList<ExternalFile>();
-        // Create file system object
-        try (FileSystem fs = FileSystem.get(conf)) {
-            // Create files splits
-            for (ExternalFile file : files) {
-                Path filePath = new Path(file.getFileName());
-                FileStatus fileStatus;
-                try {
-                    fileStatus = fs.getFileStatus(filePath);
-                } catch (FileNotFoundException e) {
-                    // file was deleted at some point, skip to next file
-                    continue;
-                }
-                if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP
-                        && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
-                    // Get its information from HDFS name node
-                    BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize());
-                    // Create a split per block
-                    for (BlockLocation block : fileBlocks) {
-                        if (block.getOffset() < file.getSize()) {
-                            fileSplits.add(new FileSplit(filePath,
-                                    block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
-                                            ? block.getLength() : (file.getSize() - block.getOffset()),
-                                    block.getHosts()));
-                            orderedExternalFiles.add(file);
-                        }
-                    }
-                } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP
-                        && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
-                    long oldSize = 0L;
-                    long newSize = file.getSize();
-                    for (int i = 0; i < files.size(); i++) {
-                        if (files.get(i).getFileName() == file.getFileName()
-                                && files.get(i).getSize() != file.getSize()) {
-                            newSize = files.get(i).getSize();
-                            oldSize = file.getSize();
-                            break;
-                        }
-                    }
-
-                    // Get its information from HDFS name node
-                    BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize);
-                    // Create a split per block
-                    for (BlockLocation block : fileBlocks) {
-                        if (block.getOffset() + block.getLength() > oldSize) {
-                            if (block.getOffset() < newSize) {
-                                // Block interact with delta -> Create a split
-                                long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
-                                long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
-                                        : block.getOffset() + block.getLength() - newSize;
-                                long splitLength = block.getLength() - startCut - endCut;
-                                fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
-                                        block.getHosts()));
-                                orderedExternalFiles.add(file);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        files = orderedExternalFiles;
-        return fileSplits.toArray(new FileSplit[fileSplits.size()]);
-    }
-
-    // Used to tell the factory to restrict the splits to the intersection between this list and the actual files on hdfs side
-    public void setFiles(List<ExternalFile> files) {
-        this.files = files;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
deleted file mode 100644
index 8bf6d93..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.external.dataset.adapter.HDFSIndexingAdapter;
-import org.apache.asterix.external.indexing.dataflow.HDFSIndexingParserFactory;
-import org.apache.asterix.external.indexing.dataflow.IndexingScheduler;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.DelimitedDataParser;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.api.context.ICCContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.hdfs.dataflow.ConfFactory;
-import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
-
-public class HDFSIndexingAdapterFactory extends HDFSAdapterFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    private transient AlgebricksPartitionConstraint clusterLocations;
-    private String[] readSchedule;
-    private boolean executed[];
-    private InputSplitsFactory inputSplitsFactory;
-    private ConfFactory confFactory;
-    private IAType atype;
-    private boolean configured = false;
-    public static IndexingScheduler hdfsScheduler;
-    private static boolean initialized = false;
-    private Map<String, String> configuration;
-
-    public static final String HDFS_INDEXING_ADAPTER = "hdfs-indexing-adapter";
-
-    private static IndexingScheduler initializeHDFSScheduler() {
-        ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
-        IndexingScheduler scheduler = null;
-        try {
-            scheduler = new IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
-                    ccContext.getClusterControllerInfo().getClientNetPort());
-        } catch (HyracksException e) {
-            throw new IllegalStateException("Cannot obtain hdfs scheduler");
-        }
-        return scheduler;
-    }
-
-    @Override
-    public SupportedOperation getSupportedOperations() {
-        return SupportedOperation.READ;
-    }
-
-    @Override
-    public String getName() {
-        return HDFS_INDEXING_ADAPTER;
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        if (!configured) {
-            throw new IllegalStateException("Adapter factory has not been configured yet");
-        }
-        return clusterLocations;
-    }
-
-    @Override
-    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        JobConf conf = confFactory.getConf();
-        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
-        String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
-        ((HDFSIndexingParserFactory) parserFactory).setJobConf(conf);
-        ((HDFSIndexingParserFactory) parserFactory).setArguments(configuration);
-        HDFSIndexingAdapter hdfsIndexingAdapter = new HDFSIndexingAdapter(atype, readSchedule, executed, inputSplits,
-                conf, clusterLocations, files, parserFactory, ctx, nodeName,
-                (String) configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT),
-                (String) configuration.get(AsterixTupleParserFactory.KEY_FORMAT));
-        return hdfsIndexingAdapter;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
-        if (!initialized) {
-            hdfsScheduler = initializeHDFSScheduler();
-            initialized = true;
-        }
-        this.configuration = configuration;
-        JobConf conf = HDFSAdapterFactory.configureJobConf(configuration);
-        confFactory = new ConfFactory(conf);
-        clusterLocations = getClusterLocations();
-        InputSplit[] inputSplits = getSplits(conf);
-        inputSplitsFactory = new InputSplitsFactory(inputSplits);
-        readSchedule = hdfsScheduler.getLocationConstraints(inputSplits);
-        executed = new boolean[readSchedule.length];
-        Arrays.fill(executed, false);
-        configured = true;
-        atype = outputType;
-        // The function below is overwritten to create indexing adapter factory instead of regular adapter factory
-        configureFormat(atype);
-    }
-
-    @Override
-    protected void configureFormat(IAType sourceDatatype) throws Exception {
-
-        char delimiter = AsterixTupleParserFactory.getDelimiter(configuration);
-        char quote = AsterixTupleParserFactory.getQuote(configuration, delimiter);
-
-        parserFactory = new HDFSIndexingParserFactory((ARecordType) atype,
-                configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT),
-                configuration.get(AsterixTupleParserFactory.KEY_FORMAT), delimiter, quote,
-                configuration.get(HDFSAdapterFactory.KEY_PARSER));
-    }
-
-    /**
-     * A static function that creates and return delimited text data parser
-     *
-     * @param recordType
-     *            (the record type to be parsed)
-     * @param delimiter
-     *            (the delimiter value)
-     * @return
-     */
-    public static DelimitedDataParser getDelimitedDataParser(ARecordType recordType, char delimiter, char quote) {
-        int n = recordType.getFieldTypes().length;
-        IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
-        for (int i = 0; i < n; i++) {
-            ATypeTag tag = null;
-            if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) {
-                if (!NonTaggedFormatUtil.isOptional(recordType.getFieldTypes()[i])) {
-                    throw new NotImplementedException("Non-optional UNION type is not supported.");
-                }
-                tag = ((AUnionType) recordType.getFieldTypes()[i]).getNullableType().getTypeTag();
-            } else {
-                tag = recordType.getFieldTypes()[i].getTypeTag();
-            }
-            if (tag == null) {
-                throw new NotImplementedException("Failed to get the type information for field " + i + ".");
-            }
-            IValueParserFactory vpf = valueParserFactoryMap.get(tag);
-            if (vpf == null) {
-                throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
-            }
-            fieldParserFactories[i] = vpf;
-        }
-        return new DelimitedDataParser(recordType, fieldParserFactories, delimiter, quote, false);
-    }
-
-    public static AlgebricksPartitionConstraint getClusterLocations() {
-        ArrayList<String> locs = new ArrayList<String>();
-        Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
-        for (String i : stores.keySet()) {
-            String[] nodeStores = stores.get(i);
-            for (int j = 0; j < nodeStores.length; j++) {
-                locs.add(i);
-            }
-        }
-        String[] cluster = new String[locs.size()];
-        cluster = locs.toArray(cluster);
-        return new AlgebricksAbsolutePartitionConstraint(cluster);
-    }
-
-    private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
-
-    private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
-        Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
-        m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
-        m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
-        m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
-        m.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
-        m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
-        return m;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java
deleted file mode 100644
index 553682e..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.external.dataset.adapter.HDFSAdapter;
-import org.apache.asterix.external.dataset.adapter.HiveAdapter;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * A factory class for creating an instance of HiveAdapter
- */
-public class HiveAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory {
-    private static final long serialVersionUID = 1L;
-
-    public static final String HIVE_DATABASE = "database";
-    public static final String HIVE_TABLE = "table";
-    public static final String HIVE_HOME = "hive-home";
-    public static final String HIVE_METASTORE_URI = "metastore-uri";
-    public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
-    public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
-
-    private HDFSAdapterFactory hdfsAdapterFactory;
-    private HDFSAdapter hdfsAdapter;
-    private boolean configured = false;
-    private IAType atype;
-
-    public HiveAdapterFactory() {
-        hdfsAdapterFactory = new HDFSAdapterFactory();
-    }
-
-    @Override
-    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        hdfsAdapter = (HDFSAdapter) hdfsAdapterFactory.createAdapter(ctx, partition);
-        HiveAdapter hiveAdapter = new HiveAdapter(atype, hdfsAdapter, parserFactory, ctx);
-        return hiveAdapter;
-    }
-
-    @Override
-    public String getName() {
-        return "hive";
-    }
-
-    @Override
-    public SupportedOperation getSupportedOperations() {
-        return SupportedOperation.READ;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
-        if (!configured) {
-            populateConfiguration(configuration);
-            hdfsAdapterFactory.configure(configuration, outputType);
-            this.atype = outputType;
-        }
-    }
-
-    public static void populateConfiguration(Map<String, String> configuration) throws Exception {
-        /** configure hive */
-        String database = configuration.get(HIVE_DATABASE);
-        String tablePath = null;
-        if (database == null) {
-            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
-        } else {
-            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
-                    + configuration.get(HIVE_TABLE);
-        }
-        configuration.put(HDFSAdapterFactory.KEY_PATH, tablePath);
-        if (!configuration.get(AsterixTupleParserFactory.KEY_FORMAT)
-                .equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
-            throw new IllegalArgumentException(
-                    "format" + configuration.get(AsterixTupleParserFactory.KEY_FORMAT) + " is not supported");
-        }
-
-        if (!(configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)
-                || configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT)
-                        .equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))) {
-            throw new IllegalArgumentException(
-                    "file input format" + configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT) + " is not supported");
-        }
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return hdfsAdapterFactory.getPartitionConstraint();
-    }
-
-    @Override
-    public ARecordType getAdapterOutputType() {
-        return (ARecordType) atype;
-    }
-
-    @Override
-    public InputDataFormat getInputDataFormat() {
-        return InputDataFormat.UNKNOWN;
-    }
-
-    public void setFiles(List<ExternalFile> files) {
-        hdfsAdapterFactory.setFiles(files);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java
deleted file mode 100644
index b8005cd..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
- * Acts as a marker interface indicating that the implementation provides functionality
- * for creating an adapter.
- */
-public interface IAdapterFactory extends Serializable {
-
-    public static final String KEY_TYPE_NAME = "type-name";
-
-    public enum SupportedOperation {
-        READ,
-        WRITE,
-        READ_WRITE
-    }
-
-    /**
-     * Returns the type of adapter indicating if the adapter can be used for
-     * reading from an external data source or writing to an external data
-     * source or can be used for both purposes.
-     * 
-     * @see SupportedOperation
-     * @return
-     */
-    public SupportedOperation getSupportedOperations();
-
-    /**
-     * Returns the display name corresponding to the Adapter type that is created by the factory.
-     * 
-     * @return the display name
-     */
-    public String getName();
-
-    /**
-     * Gets a list of partition constraints. A partition constraint can be a
-     * requirement to execute at a particular location or could be cardinality
-     * constraints indicating the number of instances that need to run in
-     * parallel. example, a IDatasourceAdapter implementation written for data
-     * residing on the local file system of a node cannot run on any other node
-     * and thus has a location partition constraint. The location partition
-     * constraint can be expressed as a node IP address or a node controller id.
-     * In the former case, the IP address is translated to a node controller id
-     * running on the node with the given IP address.
-     */
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
-
-    /**
-     * Creates an instance of IDatasourceAdapter.
-     * 
-     * @param HyracksTaskContext
-     * @param partition
-     * @return An instance of IDatasourceAdapter.
-     * @throws Exception
-     */
-    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception;
-
-    /**
-     * @param configuration
-     * @param outputType
-     * @throws Exception
-     */
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception;
-
-    /**
-     * Gets the record type associated with the output of the adapter
-     * 
-     * @return
-     */
-    public ARecordType getAdapterOutputType();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java
deleted file mode 100644
index 0de6fad..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.asterix.external.dataset.adapter.IControlledAdapter;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-public interface IControlledAdapterFactory extends Serializable {
-    public IControlledAdapter createAdapter(IHyracksTaskContext ctx, ExternalFileIndexAccessor fileIndexAccessor,
-            RecordDescriptor inRecDesc);
-
-    public void configure(IAType atype, boolean propagateInput, int[] ridFields,
-            Map<String, String> adapterConfiguration, boolean retainNull);
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java
deleted file mode 100644
index 9358a52..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-
-public interface IFeedAdapterFactory extends IAdapterFactory {
-
-    public boolean isRecordTrackingEnabled();
-
-    public IIntakeProgressTracker createIntakeProgressTracker();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
new file mode 100644
index 0000000..866910b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.adapter.factory;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.asterix.external.api.ILookupReaderFactory;
+import org.apache.asterix.external.api.ILookupRecordReader;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.dataset.adapter.LookupAdapter;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordIdReader;
+import org.apache.asterix.external.indexing.RecordIdReaderFactory;
+import org.apache.asterix.external.input.record.reader.LookupReaderFactoryProvider;
+import org.apache.asterix.external.provider.ParserFactoryProvider;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class LookupAdapterFactory<T> implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private IRecordDataParserFactory dataParserFactory;
+    private ILookupReaderFactory readerFactory;
+    private ARecordType recordType;
+    private int[] ridFields;
+    private Map<String, String> configuration;
+    private boolean retainInput;
+    private boolean retainNull;
+    private int[] propagatedFields;
+    private INullWriterFactory iNullWriterFactory;
+
+    public LookupAdapterFactory(ARecordType recordType, int[] ridFields, boolean retainInput, boolean retainNull,
+            INullWriterFactory iNullWriterFactory) {
+        this.recordType = recordType;
+        this.ridFields = ridFields;
+        this.retainInput = retainInput;
+        this.retainNull = retainNull;
+        this.iNullWriterFactory = iNullWriterFactory;
+    }
+
+    public LookupAdapter<T> createAdapter(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecDesc,
+            ExternalFileIndexAccessor snapshotAccessor, IFrameWriter writer) throws HyracksDataException {
+        try {
+            IRecordDataParser<T> dataParser = dataParserFactory.createRecordParser(ctx);
+            dataParser.configure(configuration, recordType);
+            ILookupRecordReader<? extends T> reader = readerFactory.createRecordReader(ctx, partition,
+                    snapshotAccessor);
+            reader.configure(configuration);
+            RecordIdReader ridReader = RecordIdReaderFactory.create(configuration, ridFields);
+            configurePropagatedFields(inRecDesc);
+            return new LookupAdapter<T>(dataParser, reader, inRecDesc, ridReader, retainInput, propagatedFields,
+                    retainNull, iNullWriterFactory, ctx, writer);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+        readerFactory = LookupReaderFactoryProvider.getLookupReaderFactory(configuration);
+        dataParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider.getDataParserFactory(configuration);
+        dataParserFactory.setRecordType(recordType);
+        readerFactory.configure(configuration);
+        dataParserFactory.configure(configuration);
+    }
+
+    private void configurePropagatedFields(RecordDescriptor inRecDesc) {
+        int ptr = 0;
+        boolean skip = false;
+        propagatedFields = new int[inRecDesc.getFieldCount() - ridFields.length];
+        for (int i = 0; i < inRecDesc.getFieldCount(); i++) {
+            if (ptr < ridFields.length) {
+                skip = false;
+                for (int j = 0; j < ridFields.length; j++) {
+                    if (ridFields[j] == i) {
+                        ptr++;
+                        skip = true;
+                        break;
+                    }
+                }
+                if (!skip)
+                    propagatedFields[i - ptr] = i;
+            } else {
+                propagatedFields[i - ptr] = i;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
deleted file mode 100644
index 251d69a..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.adapter.factory;
-
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.external.dataset.adapter.NCFileSystemAdapter;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.util.DNSResolverFactory;
-import org.apache.asterix.external.util.INodeResolver;
-import org.apache.asterix.external.util.INodeResolverFactory;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-
-/**
- * Factory class for creating an instance of NCFileSystemAdapter. An
- * NCFileSystemAdapter reads external data residing on the local file system of
- * an NC.
- */
-public class NCFileSystemAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory {
-    private static final long serialVersionUID = 1L;
-
-    public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
-
-    private static final INodeResolver DEFAULT_NODE_RESOLVER = new DNSResolverFactory().createNodeResolver();
-
-    private IAType sourceDatatype;
-    private FileSplit[] fileSplits;
-    private ARecordType outputType;
-
-    @Override
-    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(fileSplits, parserFactory, sourceDatatype, ctx);
-        return fsAdapter;
-    }
-
-    @Override
-    public String getName() {
-        return NC_FILE_SYSTEM_ADAPTER_NAME;
-    }
-
-    @Override
-    public SupportedOperation getSupportedOperations() {
-        return SupportedOperation.READ;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
-        this.configuration = configuration;
-        this.outputType = outputType;
-        String[] splits = configuration.get(AsterixTupleParserFactory.KEY_PATH).split(",");
-        IAType sourceDatatype = outputType;
-        configureFileSplits(splits);
-        configureFormat(sourceDatatype);
-
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return configurePartitionConstraint();
-    }
-
-    private void configureFileSplits(String[] splits) throws AsterixException {
-        if (fileSplits == null) {
-            fileSplits = new FileSplit[splits.length];
-            String nodeName;
-            String nodeLocalPath;
-            int count = 0;
-            String trimmedValue;
-            for (String splitPath : splits) {
-                trimmedValue = splitPath.trim();
-                if (!trimmedValue.contains("://")) {
-                    throw new AsterixException(
-                            "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
-                }
-                nodeName = trimmedValue.split(":")[0];
-                nodeLocalPath = trimmedValue.split("://")[1];
-                FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
-                fileSplits[count++] = fileSplit;
-            }
-        }
-    }
-
-    private AlgebricksPartitionConstraint configurePartitionConstraint() throws AsterixException {
-        String[] locs = new String[fileSplits.length];
-        String location;
-        for (int i = 0; i < fileSplits.length; i++) {
-            location = getNodeResolver().resolveNode(fileSplits[i].getNodeName());
-            locs[i] = location;
-        }
-        return new AlgebricksAbsolutePartitionConstraint(locs);
-    }
-
-    protected INodeResolver getNodeResolver() {
-        if (nodeResolver == null) {
-            nodeResolver = initializeNodeResolver();
-        }
-        return nodeResolver;
-    }
-
-    private static INodeResolver initializeNodeResolver() {
-        INodeResolver nodeResolver = null;
-        String configuredNodeResolverFactory = System
-                .getProperty(AsterixTupleParserFactory.NODE_RESOLVER_FACTORY_PROPERTY);
-        if (configuredNodeResolverFactory != null) {
-            try {
-                nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
-                        .createNodeResolver();
-
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.log(Level.WARNING, "Unable to create node resolver from the configured classname "
-                            + configuredNodeResolverFactory + "\n" + e.getMessage());
-                }
-                nodeResolver = DEFAULT_NODE_RESOLVER;
-            }
-        } else {
-            nodeResolver = DEFAULT_NODE_RESOLVER;
-        }
-        return nodeResolver;
-    }
-
-    @Override
-    public ARecordType getAdapterOutputType() {
-        return outputType;
-    }
-
-    @Override
-    public InputDataFormat getInputDataFormat() {
-        return InputDataFormat.UNKNOWN;
-    }
-
-    public void setFiles(List<ExternalFile> files) throws AlgebricksException {
-        throw new AlgebricksException("can't set files for this Adapter");
-    }
-
-}



Mime
View raw message