Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C28361816D for ; Sun, 3 Jan 2016 17:41:17 +0000 (UTC) Received: (qmail 40636 invoked by uid 500); 3 Jan 2016 17:41:17 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 40555 invoked by uid 500); 3 Jan 2016 17:41:17 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 40539 invoked by uid 99); 3 Jan 2016 17:41:17 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jan 2016 17:41:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 17ADEC6D2F for ; Sun, 3 Jan 2016 17:41:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.779 X-Spam-Level: * X-Spam-Status: No, score=1.779 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id ds9jKAb8AnZJ for ; Sun, 3 Jan 2016 17:41:11 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id C39F323056 for ; Sun, 3 Jan 2016 17:40:59 +0000 (UTC) Received: (qmail 39848 invoked by uid 99); 3 Jan 2016 17:40:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jan 2016 17:40:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7F2AAE0AC0; Sun, 3 Jan 2016 17:40:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amoudi@apache.org To: commits@asterixdb.incubator.apache.org Date: Sun, 03 Jan 2016 17:41:18 -0000 Message-Id: <6bc81bf4cf094e45add266e9d37282ff@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [20/21] incubator-asterixdb git commit: First stage of external data cleanup http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java index 822390a..4067508 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.common.feeds.api; -public interface ITupleTrackingFeedAdapter extends IFeedAdapter { +public interface ITupleTrackingFeedAdapter extends IDataSourceAdapter { public void tuplePersistedTimeCallback(long timestamp); } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/main/java/org/apache/asterix/common/parse/IAsterixTupleParser.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/IAsterixTupleParser.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/IAsterixTupleParser.java deleted file mode 100644 index 87f4c58..0000000 --- a/asterix-common/src/main/java/org/apache/asterix/common/parse/IAsterixTupleParser.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.parse; - -import java.util.Map; - -import org.apache.hyracks.dataflow.std.file.ITupleParser; - -public interface IAsterixTupleParser extends ITupleParser{ - - public void configure(Map configuration); - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java deleted file mode 100644 index df5a983..0000000 --- a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwardPolicy.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.parse; - -import java.util.Map; - -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.context.IHyracksCommonContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; - -public interface ITupleForwardPolicy { - - public static final String PARSER_POLICY = "parser-policy"; - - public enum TupleForwardPolicyType { - FRAME_FULL, - COUNTER_TIMER_EXPIRED, - RATE_CONTROLLED - } - - public void configure(Map configuration); - - public void initialize(IHyracksCommonContext ctx, IFrameWriter frameWriter) throws HyracksDataException; - - public TupleForwardPolicyType getType(); - - public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException; - - public void close() throws HyracksDataException; - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java new file mode 100644 index 0000000..5ee065a --- /dev/null +++ b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.parse; + +import java.util.Map; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksCommonContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; + +public interface ITupleForwarder { + + public static final String FORWARD_POLICY = "forward-policy"; + + public enum TupleForwardPolicy { + FRAME_FULL, + COUNTER_TIMER_EXPIRED, + RATE_CONTROLLED + } + + public void configure(Map configuration); + + public void initialize(IHyracksCommonContext ctx, IFrameWriter frameWriter) throws HyracksDataException; + + public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException; + + public void close() throws HyracksDataException; + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java ---------------------------------------------------------------------- diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java index d8147b6..6afe692 100644 --- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java +++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java @@ -29,7 +29,6 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.net.URL; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -63,12 +62,12 @@ public class TestExecutor { private static final long MAX_URL_LENGTH = 2000l; private static Method managixExecuteMethod = null; - private static String host; - private static int port; + private String host; + private int port; public TestExecutor() { - this.host = "127.0.0.1"; - this.port = 19002; + host = "127.0.0.1"; + port = 19002; } public TestExecutor(String host, int port) { http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java ---------------------------------------------------------------------- diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java b/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java index 96fb6ec..a10b5ea 100644 --- a/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java +++ b/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java @@ -29,6 +29,10 @@ import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.asterix.event.error.EventException; +import org.apache.asterix.event.model.AsterixInstance; +import org.apache.asterix.installer.schema.conf.Configuration; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -38,10 +42,6 @@ import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; -import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; -import org.apache.asterix.event.error.EventException; -import org.apache.asterix.event.model.AsterixInstance; -import org.apache.asterix.installer.schema.conf.Configuration; public class ZooKeeperService implements ILookupService { @@ -63,6 +63,7 @@ public class ZooKeeperService implements ILookupService { private LinkedBlockingQueue msgQ = new LinkedBlockingQueue(); private ZooKeeperWatcher watcher = new ZooKeeperWatcher(msgQ); + @Override public boolean isRunning(Configuration conf) throws Exception { List servers = conf.getZookeeper().getServers().getServer(); int clientPort = conf.getZookeeper().getClientPort().intValue(); @@ -92,6 +93,7 @@ public class ZooKeeperService implements ILookupService { return isRunning; } + @Override public void startService(Configuration conf) throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Starting ZooKeeper at " + zkConnectionString); @@ -107,22 +109,29 @@ public class ZooKeeperService implements ILookupService { for (String zkServer : zkServers) { cmdBuffer.append(zkServer + " "); } - Runtime.getRuntime().exec(cmdBuffer.toString()); + //TODO: Create a better way to interact with zookeeper + Process zkProcess = Runtime.getRuntime().exec(cmdBuffer.toString()); + int output = zkProcess.waitFor(); + if (output != 0) { + throw new Exception("Error starting zookeeper server. output code = " + output); + } zk = new ZooKeeper(zkConnectionString, ZOOKEEPER_SESSION_TIME_OUT, watcher); - String head = msgQ.poll(10, TimeUnit.SECONDS); + String head = msgQ.poll(60, TimeUnit.SECONDS); if (head == null) { StringBuilder msg = new StringBuilder( "Unable to start Zookeeper Service. This could be because of the following reasons.\n"); msg.append("1) Managix is incorrectly configured. Please run " + "managix validate" + " to run a validation test and correct the errors reported."); - msg.append("\n2) If validation in (1) is successful, ensure that java_home parameter is set correctly in Managix configuration (" - + AsterixEventServiceUtil.MANAGIX_CONF_XML + ")"); + msg.append( + "\n2) If validation in (1) is successful, ensure that java_home parameter is set correctly in Managix configuration (" + + AsterixEventServiceUtil.MANAGIX_CONF_XML + ")"); throw new Exception(msg.toString()); } msgQ.take(); createRootIfNotExist(); } + @Override public void stopService(Configuration conf) throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Stopping ZooKeeper running at " + zkConnectionString); @@ -141,6 +150,7 @@ public class ZooKeeperService implements ILookupService { } } + @Override public void writeAsterixInstance(AsterixInstance asterixInstance) throws Exception { String instanceBasePath = ASTERIX_INSTANCE_BASE_PATH + File.separator + asterixInstance.getName(); ByteArrayOutputStream b = new ByteArrayOutputStream(); @@ -166,6 +176,7 @@ public class ZooKeeperService implements ILookupService { } } + @Override public AsterixInstance getAsterixInstance(String name) throws Exception { String path = ASTERIX_INSTANCE_BASE_PATH + File.separator + name; Stat stat = zk.exists(ASTERIX_INSTANCE_BASE_PATH + File.separator + name, false); @@ -176,10 +187,12 @@ public class ZooKeeperService implements ILookupService { return readAsterixInstanceObject(asterixInstanceBytes); } + @Override public boolean exists(String path) throws Exception { return zk.exists(ASTERIX_INSTANCE_BASE_PATH + File.separator + path, false) != null; } + @Override public void removeAsterixInstance(String name) throws Exception { if (!exists(name)) { throw new EventException("Asterix instance by name " + name + " does not exists."); @@ -195,6 +208,7 @@ public class ZooKeeperService implements ILookupService { zk.delete(ASTERIX_INSTANCE_BASE_PATH + File.separator + name, DEFAULT_NODE_VERSION); } + @Override public List getAsterixInstances() throws Exception { List instanceNames = zk.getChildren(ASTERIX_INSTANCE_BASE_PATH, false); List asterixInstances = new ArrayList(); @@ -207,13 +221,14 @@ public class ZooKeeperService implements ILookupService { return asterixInstances; } - private AsterixInstance readAsterixInstanceObject(byte[] asterixInstanceBytes) throws IOException, - ClassNotFoundException { + private AsterixInstance readAsterixInstanceObject(byte[] asterixInstanceBytes) + throws IOException, ClassNotFoundException { ByteArrayInputStream b = new ByteArrayInputStream(asterixInstanceBytes); ObjectInputStream ois = new ObjectInputStream(b); return (AsterixInstance) ois.readObject(); } + @Override public void updateAsterixInstance(AsterixInstance updatedInstance) throws Exception { removeAsterixInstance(updatedInstance.getName()); writeAsterixInstance(updatedInstance); @@ -249,6 +264,7 @@ class ZooKeeperWatcher implements Watcher { this.msgQ = msgQ; } + @Override public void process(WatchedEvent wEvent) { if (wEvent.getState() == KeeperState.SyncConnected) { msgQ.add("connected"); @@ -276,7 +292,8 @@ class ZookeeperUtil { List servers = conf.getZookeeper().getServers().getServer(); int serverId = 1; for (String server : servers) { - buffer.append("server" + "." + serverId + "=" + server + ":" + leaderConnPort + ":" + leaderElecPort + "\n"); + buffer.append( + "server" + "." + serverId + "=" + server + ":" + leaderConnPort + ":" + leaderElecPort + "\n"); serverId++; } AsterixEventServiceUtil.dumpToFile(zooKeeperConfigPath, buffer.toString()); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/pom.xml ---------------------------------------------------------------------- diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml index 4062b23..867c96b 100644 --- a/asterix-external-data/pom.xml +++ b/asterix-external-data/pom.xml @@ -35,6 +35,43 @@ + org.apache.asterix + lexer-generator-maven-plugin + 0.8.8-SNAPSHOT + + src/main/resources/adm.grammar + ${project.build.directory}/generated-sources/org/apache/asterix/runtime/operators/file/adm + + + + generate-lexer + generate-sources + + generate-lexer + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.9 + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/ + + + + + + org.jvnet.jaxb2.maven2 maven-jaxb2-plugin 0.9.0 @@ -91,6 +128,50 @@ + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.asterix + lexer-generator-maven-plugin + [0.1,) + + generate-lexer + + + + + false + + + + + + org.codehaus.mojo + build-helper-maven-plugin + [1.7,) + + add-source + + + + + + + + + + + + @@ -139,6 +220,10 @@ compile + org.apache.hadoop + hadoop-hdfs + + net.java.dev.rome rome-fetcher 1.0.0 @@ -186,5 +271,11 @@ jdo2-api 2.3-20090302111651 + + com.e-movimento.tinytools + privilegedaccessor + 1.2.2 + test + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java deleted file mode 100644 index 8b7b6d5..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.adapter.factory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.asterix.common.feeds.api.IDatasourceAdapter; -import org.apache.asterix.common.feeds.api.IIntakeProgressTracker; -import org.apache.asterix.external.dataset.adapter.RSSFeedAdapter; -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.api.context.IHyracksTaskContext; - -/** - * A factory class for creating the @see {CNNFeedAdapter}. - */ -public class CNNFeedAdapterFactory implements IFeedAdapterFactory { - private static final long serialVersionUID = 1L; - - private Map configuration; - - private List feedURLs = new ArrayList(); - private static Map topicFeeds = new HashMap(); - private ARecordType recordType; - public static final String KEY_RSS_URL = "topic"; - public static final String KEY_INTERVAL = "interval"; - public static final String TOP_STORIES = "topstories"; - public static final String WORLD = "world"; - public static final String US = "us"; - public static final String SPORTS = "sports"; - public static final String BUSINESS = "business"; - public static final String POLITICS = "politics"; - public static final String CRIME = "crime"; - public static final String TECHNOLOGY = "technology"; - public static final String HEALTH = "health"; - public static final String ENTERNTAINMENT = "entertainemnt"; - public static final String TRAVEL = "travel"; - public static final String LIVING = "living"; - public static final String VIDEO = "video"; - public static final String STUDENT = "student"; - public static final String POPULAR = "popular"; - public static final String RECENT = "recent"; - - private void initTopics() { - topicFeeds.put(TOP_STORIES, "http://rss.cnn.com/rss/cnn_topstories.rss"); - topicFeeds.put(WORLD, "http://rss.cnn.com/rss/cnn_world.rss"); - topicFeeds.put(US, "http://rss.cnn.com/rss/cnn_us.rss"); - topicFeeds.put(SPORTS, "http://rss.cnn.com/rss/si_topstories.rss"); - topicFeeds.put(BUSINESS, "http://rss.cnn.com/rss/money_latest.rss"); - topicFeeds.put(POLITICS, "http://rss.cnn.com/rss/cnn_allpolitics.rss"); - topicFeeds.put(CRIME, "http://rss.cnn.com/rss/cnn_crime.rss"); - topicFeeds.put(TECHNOLOGY, "http://rss.cnn.com/rss/cnn_tech.rss"); - topicFeeds.put(HEALTH, "http://rss.cnn.com/rss/cnn_health.rss"); - topicFeeds.put(ENTERNTAINMENT, "http://rss.cnn.com/rss/cnn_showbiz.rss"); - topicFeeds.put(LIVING, "http://rss.cnn.com/rss/cnn_living.rss"); - topicFeeds.put(VIDEO, "http://rss.cnn.com/rss/cnn_freevideo.rss"); - topicFeeds.put(TRAVEL, "http://rss.cnn.com/rss/cnn_travel.rss"); - topicFeeds.put(STUDENT, "http://rss.cnn.com/rss/cnn_studentnews.rss"); - topicFeeds.put(POPULAR, "http://rss.cnn.com/rss/cnn_mostpopular.rss"); - topicFeeds.put(RECENT, "http://rss.cnn.com/rss/cnn_latest.rss"); - } - - @Override - public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception { - RSSFeedAdapter cnnFeedAdapter = new RSSFeedAdapter(configuration, recordType, ctx); - return cnnFeedAdapter; - } - - @Override - public String getName() { - return "cnn_feed"; - } - - @Override - public void configure(Map configuration, ARecordType outputType) throws Exception { - this.configuration = configuration; - String rssURLProperty = configuration.get(KEY_RSS_URL); - if (rssURLProperty == null) { - throw new IllegalArgumentException("no rss url provided"); - } - initializeFeedURLs(rssURLProperty); - this.recordType = outputType; - } - - private void initializeFeedURLs(String rssURLProperty) { - feedURLs.clear(); - String[] rssTopics = rssURLProperty.split(","); - initTopics(); - for (String topic : rssTopics) { - String feedURL = topicFeeds.get(topic); - if (feedURL == null) { - throw new IllegalArgumentException( - " unknown topic :" + topic + " please choose from the following " + getValidTopics()); - } - feedURLs.add(feedURL); - } - } - - private static String getValidTopics() { - StringBuilder builder = new StringBuilder(); - for (String key : topicFeeds.keySet()) { - builder.append(key); - builder.append(" "); - } - return new String(builder); - } - - @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { - return new AlgebricksCountPartitionConstraint(feedURLs.size()); - } - - @Override - public SupportedOperation getSupportedOperations() { - return SupportedOperation.READ; - } - - @Override - public ARecordType getAdapterOutputType() { - return recordType; - } - - @Override - public boolean isRecordTrackingEnabled() { - return false; - } - - @Override - public IIntakeProgressTracker createIntakeProgressTracker() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java new file mode 100644 index 0000000..2e7158d --- /dev/null +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.adapter.factory; + +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.feeds.api.IDataSourceAdapter; +import org.apache.asterix.external.api.IAdapterFactory; +import org.apache.asterix.external.api.IDataFlowController; +import org.apache.asterix.external.api.IDataParserFactory; +import org.apache.asterix.external.api.IExternalDataSourceFactory; +import org.apache.asterix.external.api.IIndexibleExternalDataSource; +import org.apache.asterix.external.api.IIndexingAdapterFactory; +import org.apache.asterix.external.dataset.adapter.GenericAdapter; +import org.apache.asterix.external.indexing.ExternalFile; +import org.apache.asterix.external.provider.DataflowControllerProvider; +import org.apache.asterix.external.provider.DatasourceFactoryProvider; +import org.apache.asterix.external.provider.ParserFactoryProvider; +import org.apache.asterix.external.util.ExternalDataCompatibilityUtils; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.om.types.ARecordType; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.api.context.IHyracksTaskContext; + +public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterFactory { + + private static final long serialVersionUID = 1L; + private IExternalDataSourceFactory dataSourceFactory; + private IDataParserFactory dataParserFactory; + private ARecordType recordType; + private Map configuration; + private List files; + private boolean indexingOp; + + @Override + public void setSnapshot(List files, boolean indexingOp) { + this.files = files; + this.indexingOp = indexingOp; + } + + @Override + public String getAlias() { + return ExternalDataConstants.ALIAS_GENERIC_ADAPTER; + } + + @Override + public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { + return dataSourceFactory.getPartitionConstraint(); + } + + /** + * Runs on each node controller (after serialization-deserialization) + */ + @Override + public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception { + IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition, + dataSourceFactory, dataParserFactory, configuration, indexingOp); + return new GenericAdapter(controller); + } + + @Override + public void configure(Map configuration, ARecordType outputType) throws Exception { + this.recordType = outputType; + this.configuration = configuration; + dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration); + dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration); + prepare(); + ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory, dataParserFactory); + } + + private void prepare() throws Exception { + if (dataSourceFactory.isIndexible() && (files != null)) { + ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp); + } + dataSourceFactory.configure(configuration); + dataParserFactory.setRecordType(recordType); + dataParserFactory.configure(configuration); + } + + @Override + public ARecordType getAdapterOutputType() { + return recordType; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java deleted file mode 100644 index c4a96f4..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.adapter.factory; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; -import org.apache.asterix.common.feeds.api.IDatasourceAdapter; -import org.apache.asterix.external.dataset.adapter.HDFSAdapter; -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.asterix.external.indexing.dataflow.HDFSObjectTupleParserFactory; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.util.AsterixAppContextInfo; -import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory; -import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.api.context.ICCContext; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.hdfs.dataflow.ConfFactory; -import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory; -import org.apache.hyracks.hdfs.scheduler.Scheduler; - -/** - * A factory class for creating an instance of HDFSAdapter - */ -public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory { - private static final long serialVersionUID = 1L; - - public static final String HDFS_ADAPTER_NAME = "hdfs"; - public static final String CLUSTER_LOCATIONS = "cluster-locations"; - public static transient String SCHEDULER = "hdfs-scheduler"; - - public static final String KEY_HDFS_URL = "hdfs"; - public static final String KEY_PATH = "path"; - public static final String KEY_INPUT_FORMAT = "input-format"; - public static final String INPUT_FORMAT_TEXT = "text-input-format"; - public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format"; - // New - public static final String KEY_PARSER = "parser"; - public static final String PARSER_HIVE = "hive-parser"; - public static final String INPUT_FORMAT_RC = "rc-input-format"; - public static final String FORMAT_BINARY = "binary"; - - public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path"; - - // Hadoop property names constants - public static final String CLASS_NAME_TEXT_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat"; - public static final String CLASS_NAME_SEQUENCE_INPUT_FORMAT = "org.apache.hadoop.mapred.SequenceFileInputFormat"; - public static final String CLASS_NAME_RC_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat"; - public static final String CLASS_NAME_HDFS_FILESYSTEM = "org.apache.hadoop.hdfs.DistributedFileSystem"; - public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS"; - public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl"; - public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir"; - public static final String KEY_HADOOP_INPUT_FORMAT = "mapred.input.format.class"; - public static final String KEY_HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit"; - public static final String KEY_HADOOP_SOCKET_PATH = "dfs.domain.socket.path"; - - private transient AlgebricksPartitionConstraint clusterLocations; - private String[] readSchedule; - private boolean executed[]; - private InputSplitsFactory inputSplitsFactory; - private ConfFactory confFactory; - private IAType atype; - private boolean configured = false; - public static Scheduler hdfsScheduler; - private static boolean initialized = false; - protected List files; - - private static Scheduler initializeHDFSScheduler() { - ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext(); - Scheduler scheduler = null; - try { - scheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), - ccContext.getClusterControllerInfo().getClientNetPort()); - } catch (HyracksException e) { - throw new IllegalStateException("Cannot obtain hdfs scheduler"); - } - return scheduler; - } - - protected static final Map formatClassNames = initInputFormatMap(); - - protected static Map initInputFormatMap() { - Map formatClassNames = new HashMap(); - formatClassNames.put(INPUT_FORMAT_TEXT, CLASS_NAME_TEXT_INPUT_FORMAT); - formatClassNames.put(INPUT_FORMAT_SEQUENCE, CLASS_NAME_SEQUENCE_INPUT_FORMAT); - formatClassNames.put(INPUT_FORMAT_RC, CLASS_NAME_RC_INPUT_FORMAT); - return formatClassNames; - } - - public JobConf getJobConf() throws HyracksDataException { - return confFactory.getConf(); - } - - @Override - public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception { - JobConf conf = confFactory.getConf(); - InputSplit[] inputSplits = inputSplitsFactory.getSplits(); - String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId(); - HDFSAdapter hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, nodeName, - parserFactory, ctx, configuration, files); - return hdfsAdapter; - } - - @Override - public String getName() { - return HDFS_ADAPTER_NAME; - } - - public static JobConf configureJobConf(Map configuration) throws Exception { - JobConf conf = new JobConf(); - String formatClassName = formatClassNames.get(configuration.get(KEY_INPUT_FORMAT).trim()); - String localShortCircuitSocketPath = configuration.get(KEY_LOCAL_SOCKET_PATH); - if (formatClassName == null) { - formatClassName = configuration.get(KEY_INPUT_FORMAT).trim(); - } - conf.set(KEY_HADOOP_FILESYSTEM_URI, configuration.get(KEY_HDFS_URL).trim()); - conf.set(KEY_HADOOP_FILESYSTEM_CLASS, CLASS_NAME_HDFS_FILESYSTEM); - conf.setClassLoader(HDFSAdapter.class.getClassLoader()); - conf.set(KEY_HADOOP_INPUT_DIR, configuration.get(KEY_PATH).trim()); - conf.set(KEY_HADOOP_INPUT_FORMAT, formatClassName); - - // Enable local short circuit reads if user supplied the parameters - if (localShortCircuitSocketPath != null) { - conf.set(KEY_HADOOP_SHORT_CIRCUIT, "true"); - conf.set(KEY_HADOOP_SOCKET_PATH, localShortCircuitSocketPath.trim()); - } - return conf; - } - - @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { - if (!configured) { - throw new IllegalStateException("Adapter factory has not been configured yet"); - } - return clusterLocations; - } - - @Override - public void configure(Map configuration, ARecordType outputType) throws Exception { - if (!initialized) { - hdfsScheduler = initializeHDFSScheduler(); - initialized = true; - } - this.configuration = configuration; - JobConf conf = configureJobConf(configuration); - confFactory = new ConfFactory(conf); - - clusterLocations = getClusterLocations(); - int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length; - - // if files list was set, we restrict the splits to the list since this dataset is indexed - InputSplit[] inputSplits; - if (files == null) { - inputSplits = conf.getInputFormat().getSplits(conf, numPartitions); - } else { - inputSplits = getSplits(conf); - } - inputSplitsFactory = new InputSplitsFactory(inputSplits); - - readSchedule = hdfsScheduler.getLocationConstraints(inputSplits); - executed = new boolean[readSchedule.length]; - Arrays.fill(executed, false); - configured = true; - - atype = outputType; - configureFormat(atype); - } - - @Override - public SupportedOperation getSupportedOperations() { - return SupportedOperation.READ; - } - - public static AlgebricksPartitionConstraint getClusterLocations() { - ArrayList locs = new ArrayList(); - Map stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores(); - for (String i : stores.keySet()) { - String[] nodeStores = stores.get(i); - for (int j = 0; j < nodeStores.length; j++) { - //two readers per partition - locs.add(i); - locs.add(i); - } - } - String[] cluster = new String[locs.size()]; - cluster = locs.toArray(cluster); - return new AlgebricksAbsolutePartitionConstraint(cluster); - } - - @Override - public ARecordType getAdapterOutputType() { - return (ARecordType) atype; - } - - @Override - public InputDataFormat getInputDataFormat() { - return InputDataFormat.UNKNOWN; - } - - /* - * This method is overridden to do the following: - * if data is text data (adm or delimited text), it will use a text tuple parser, - * otherwise it will use hdfs record object parser - */ - @Override - protected void configureFormat(IAType sourceDatatype) throws Exception { - String specifiedFormat = configuration.get(AsterixTupleParserFactory.KEY_FORMAT); - if (specifiedFormat == null) { - throw new IllegalArgumentException(" Unspecified data format"); - } - - if (AsterixTupleParserFactory.FORMAT_BINARY.equalsIgnoreCase(specifiedFormat)) { - parserFactory = new HDFSObjectTupleParserFactory((ARecordType) atype, this, configuration); - } else { - InputDataFormat inputFormat = InputDataFormat.UNKNOWN; - if (AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) { - inputFormat = InputDataFormat.DELIMITED; - } else if (AsterixTupleParserFactory.FORMAT_ADM.equalsIgnoreCase(specifiedFormat)) { - inputFormat = InputDataFormat.ADM; - } - parserFactory = new AsterixTupleParserFactory(configuration, (ARecordType) sourceDatatype, inputFormat); - } - - } - - /** - * Instead of creating the split using the input format, we do it manually - * This function returns fileSplits (1 per hdfs file block) irrespective of the number of partitions - * and the produced splits only cover intersection between current files in hdfs and files stored internally - * in AsterixDB - * 1. NoOp means appended file - * 2. AddOp means new file - * 3. UpdateOp means the delta of a file - * - * @return - * @throws IOException - */ - protected InputSplit[] getSplits(JobConf conf) throws IOException { - ArrayList fileSplits = new ArrayList(); - ArrayList orderedExternalFiles = new ArrayList(); - // Create file system object - try (FileSystem fs = FileSystem.get(conf)) { - // Create files splits - for (ExternalFile file : files) { - Path filePath = new Path(file.getFileName()); - FileStatus fileStatus; - try { - fileStatus = fs.getFileStatus(filePath); - } catch (FileNotFoundException e) { - // file was deleted at some point, skip to next file - continue; - } - if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP - && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) { - // Get its information from HDFS name node - BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize()); - // Create a split per block - for (BlockLocation block : fileBlocks) { - if (block.getOffset() < file.getSize()) { - fileSplits.add(new FileSplit(filePath, - block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize() - ? block.getLength() : (file.getSize() - block.getOffset()), - block.getHosts())); - orderedExternalFiles.add(file); - } - } - } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP - && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) { - long oldSize = 0L; - long newSize = file.getSize(); - for (int i = 0; i < files.size(); i++) { - if (files.get(i).getFileName() == file.getFileName() - && files.get(i).getSize() != file.getSize()) { - newSize = files.get(i).getSize(); - oldSize = file.getSize(); - break; - } - } - - // Get its information from HDFS name node - BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize); - // Create a split per block - for (BlockLocation block : fileBlocks) { - if (block.getOffset() + block.getLength() > oldSize) { - if (block.getOffset() < newSize) { - // Block interact with delta -> Create a split - long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset(); - long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L - : block.getOffset() + block.getLength() - newSize; - long splitLength = block.getLength() - startCut - endCut; - fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength, - block.getHosts())); - orderedExternalFiles.add(file); - } - } - } - } - } - } - files = orderedExternalFiles; - return fileSplits.toArray(new FileSplit[fileSplits.size()]); - } - - // Used to tell the factory to restrict the splits to the intersection between this list and the actual files on hdfs side - public void setFiles(List files) { - this.files = files; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java deleted file mode 100644 index 8bf6d93..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.adapter.factory; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import org.apache.asterix.common.feeds.api.IDatasourceAdapter; -import org.apache.asterix.external.dataset.adapter.HDFSIndexingAdapter; -import org.apache.asterix.external.indexing.dataflow.HDFSIndexingParserFactory; -import org.apache.asterix.external.indexing.dataflow.IndexingScheduler; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.AUnionType; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.util.AsterixAppContextInfo; -import org.apache.asterix.om.util.NonTaggedFormatUtil; -import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory; -import org.apache.asterix.runtime.operators.file.DelimitedDataParser; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; -import org.apache.hyracks.api.context.ICCContext; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory; -import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory; -import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory; -import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory; -import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory; -import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; -import org.apache.hyracks.hdfs.dataflow.ConfFactory; -import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory; - -public class HDFSIndexingAdapterFactory extends HDFSAdapterFactory { - - private static final long serialVersionUID = 1L; - - private transient AlgebricksPartitionConstraint clusterLocations; - private String[] readSchedule; - private boolean executed[]; - private InputSplitsFactory inputSplitsFactory; - private ConfFactory confFactory; - private IAType atype; - private boolean configured = false; - public static IndexingScheduler hdfsScheduler; - private static boolean initialized = false; - private Map configuration; - - public static final String HDFS_INDEXING_ADAPTER = "hdfs-indexing-adapter"; - - private static IndexingScheduler initializeHDFSScheduler() { - ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext(); - IndexingScheduler scheduler = null; - try { - scheduler = new IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), - ccContext.getClusterControllerInfo().getClientNetPort()); - } catch (HyracksException e) { - throw new IllegalStateException("Cannot obtain hdfs scheduler"); - } - return scheduler; - } - - @Override - public SupportedOperation getSupportedOperations() { - return SupportedOperation.READ; - } - - @Override - public String getName() { - return HDFS_INDEXING_ADAPTER; - } - - @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { - if (!configured) { - throw new IllegalStateException("Adapter factory has not been configured yet"); - } - return clusterLocations; - } - - @Override - public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception { - JobConf conf = confFactory.getConf(); - InputSplit[] inputSplits = inputSplitsFactory.getSplits(); - String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId(); - ((HDFSIndexingParserFactory) parserFactory).setJobConf(conf); - ((HDFSIndexingParserFactory) parserFactory).setArguments(configuration); - HDFSIndexingAdapter hdfsIndexingAdapter = new HDFSIndexingAdapter(atype, readSchedule, executed, inputSplits, - conf, clusterLocations, files, parserFactory, ctx, nodeName, - (String) configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT), - (String) configuration.get(AsterixTupleParserFactory.KEY_FORMAT)); - return hdfsIndexingAdapter; - } - - @Override - public void configure(Map configuration, ARecordType outputType) throws Exception { - if (!initialized) { - hdfsScheduler = initializeHDFSScheduler(); - initialized = true; - } - this.configuration = configuration; - JobConf conf = HDFSAdapterFactory.configureJobConf(configuration); - confFactory = new ConfFactory(conf); - clusterLocations = getClusterLocations(); - InputSplit[] inputSplits = getSplits(conf); - inputSplitsFactory = new InputSplitsFactory(inputSplits); - readSchedule = hdfsScheduler.getLocationConstraints(inputSplits); - executed = new boolean[readSchedule.length]; - Arrays.fill(executed, false); - configured = true; - atype = outputType; - // The function below is overwritten to create indexing adapter factory instead of regular adapter factory - configureFormat(atype); - } - - @Override - protected void configureFormat(IAType sourceDatatype) throws Exception { - - char delimiter = AsterixTupleParserFactory.getDelimiter(configuration); - char quote = AsterixTupleParserFactory.getQuote(configuration, delimiter); - - parserFactory = new HDFSIndexingParserFactory((ARecordType) atype, - configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT), - configuration.get(AsterixTupleParserFactory.KEY_FORMAT), delimiter, quote, - configuration.get(HDFSAdapterFactory.KEY_PARSER)); - } - - /** - * A static function that creates and return delimited text data parser - * - * @param recordType - * (the record type to be parsed) - * @param delimiter - * (the delimiter value) - * @return - */ - public static DelimitedDataParser getDelimitedDataParser(ARecordType recordType, char delimiter, char quote) { - int n = recordType.getFieldTypes().length; - IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n]; - for (int i = 0; i < n; i++) { - ATypeTag tag = null; - if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) { - if (!NonTaggedFormatUtil.isOptional(recordType.getFieldTypes()[i])) { - throw new NotImplementedException("Non-optional UNION type is not supported."); - } - tag = ((AUnionType) recordType.getFieldTypes()[i]).getNullableType().getTypeTag(); - } else { - tag = recordType.getFieldTypes()[i].getTypeTag(); - } - if (tag == null) { - throw new NotImplementedException("Failed to get the type information for field " + i + "."); - } - IValueParserFactory vpf = valueParserFactoryMap.get(tag); - if (vpf == null) { - throw new NotImplementedException("No value parser factory for delimited fields of type " + tag); - } - fieldParserFactories[i] = vpf; - } - return new DelimitedDataParser(recordType, fieldParserFactories, delimiter, quote, false); - } - - public static AlgebricksPartitionConstraint getClusterLocations() { - ArrayList locs = new ArrayList(); - Map stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores(); - for (String i : stores.keySet()) { - String[] nodeStores = stores.get(i); - for (int j = 0; j < nodeStores.length; j++) { - locs.add(i); - } - } - String[] cluster = new String[locs.size()]; - cluster = locs.toArray(cluster); - return new AlgebricksAbsolutePartitionConstraint(cluster); - } - - private static Map valueParserFactoryMap = initializeValueParserFactoryMap(); - - private static Map initializeValueParserFactoryMap() { - Map m = new HashMap(); - m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE); - m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE); - m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE); - m.put(ATypeTag.INT64, LongParserFactory.INSTANCE); - m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE); - return m; - } -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java deleted file mode 100644 index 553682e..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.adapter.factory; - -import java.util.List; -import java.util.Map; - -import org.apache.asterix.common.feeds.api.IDatasourceAdapter; -import org.apache.asterix.external.dataset.adapter.HDFSAdapter; -import org.apache.asterix.external.dataset.adapter.HiveAdapter; -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory; -import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.api.context.IHyracksTaskContext; - -/** - * A factory class for creating an instance of HiveAdapter - */ -public class HiveAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory { - private static final long serialVersionUID = 1L; - - public static final String HIVE_DATABASE = "database"; - public static final String HIVE_TABLE = "table"; - public static final String HIVE_HOME = "hive-home"; - public static final String HIVE_METASTORE_URI = "metastore-uri"; - public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir"; - public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl"; - - private HDFSAdapterFactory hdfsAdapterFactory; - private HDFSAdapter hdfsAdapter; - private boolean configured = false; - private IAType atype; - - public HiveAdapterFactory() { - hdfsAdapterFactory = new HDFSAdapterFactory(); - } - - @Override - public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception { - hdfsAdapter = (HDFSAdapter) hdfsAdapterFactory.createAdapter(ctx, partition); - HiveAdapter hiveAdapter = new HiveAdapter(atype, hdfsAdapter, parserFactory, ctx); - return hiveAdapter; - } - - @Override - public String getName() { - return "hive"; - } - - @Override - public SupportedOperation getSupportedOperations() { - return SupportedOperation.READ; - } - - @Override - public void configure(Map configuration, ARecordType outputType) throws Exception { - if (!configured) { - populateConfiguration(configuration); - hdfsAdapterFactory.configure(configuration, outputType); - this.atype = outputType; - } - } - - public static void populateConfiguration(Map configuration) throws Exception { - /** configure hive */ - String database = configuration.get(HIVE_DATABASE); - String tablePath = null; - if (database == null) { - tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE); - } else { - tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/" - + configuration.get(HIVE_TABLE); - } - configuration.put(HDFSAdapterFactory.KEY_PATH, tablePath); - if (!configuration.get(AsterixTupleParserFactory.KEY_FORMAT) - .equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) { - throw new IllegalArgumentException( - "format" + configuration.get(AsterixTupleParserFactory.KEY_FORMAT) + " is not supported"); - } - - if (!(configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT) - || configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT) - .equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))) { - throw new IllegalArgumentException( - "file input format" + configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT) + " is not supported"); - } - } - - @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { - return hdfsAdapterFactory.getPartitionConstraint(); - } - - @Override - public ARecordType getAdapterOutputType() { - return (ARecordType) atype; - } - - @Override - public InputDataFormat getInputDataFormat() { - return InputDataFormat.UNKNOWN; - } - - public void setFiles(List files) { - hdfsAdapterFactory.setFiles(files); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java deleted file mode 100644 index b8005cd..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.adapter.factory; - -import java.io.Serializable; -import java.util.Map; - -import org.apache.asterix.common.feeds.api.IDatasourceAdapter; -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.api.context.IHyracksTaskContext; - -/** - * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory. - * Acts as a marker interface indicating that the implementation provides functionality - * for creating an adapter. - */ -public interface IAdapterFactory extends Serializable { - - public static final String KEY_TYPE_NAME = "type-name"; - - public enum SupportedOperation { - READ, - WRITE, - READ_WRITE - } - - /** - * Returns the type of adapter indicating if the adapter can be used for - * reading from an external data source or writing to an external data - * source or can be used for both purposes. - * - * @see SupportedOperation - * @return - */ - public SupportedOperation getSupportedOperations(); - - /** - * Returns the display name corresponding to the Adapter type that is created by the factory. - * - * @return the display name - */ - public String getName(); - - /** - * Gets a list of partition constraints. A partition constraint can be a - * requirement to execute at a particular location or could be cardinality - * constraints indicating the number of instances that need to run in - * parallel. example, a IDatasourceAdapter implementation written for data - * residing on the local file system of a node cannot run on any other node - * and thus has a location partition constraint. The location partition - * constraint can be expressed as a node IP address or a node controller id. - * In the former case, the IP address is translated to a node controller id - * running on the node with the given IP address. - */ - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception; - - /** - * Creates an instance of IDatasourceAdapter. - * - * @param HyracksTaskContext - * @param partition - * @return An instance of IDatasourceAdapter. - * @throws Exception - */ - public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception; - - /** - * @param configuration - * @param outputType - * @throws Exception - */ - public void configure(Map configuration, ARecordType outputType) throws Exception; - - /** - * Gets the record type associated with the output of the adapter - * - * @return - */ - public ARecordType getAdapterOutputType(); - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java deleted file mode 100644 index 0de6fad..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.adapter.factory; - -import java.io.Serializable; -import java.util.Map; - -import org.apache.asterix.external.dataset.adapter.IControlledAdapter; -import org.apache.asterix.external.indexing.ExternalFileIndexAccessor; -import org.apache.asterix.om.types.IAType; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; - -public interface IControlledAdapterFactory extends Serializable { - public IControlledAdapter createAdapter(IHyracksTaskContext ctx, ExternalFileIndexAccessor fileIndexAccessor, - RecordDescriptor inRecDesc); - - public void configure(IAType atype, boolean propagateInput, int[] ridFields, - Map adapterConfiguration, boolean retainNull); -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java deleted file mode 100644 index 9358a52..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.adapter.factory; - -import org.apache.asterix.common.feeds.api.IIntakeProgressTracker; - -public interface IFeedAdapterFactory extends IAdapterFactory { - - public boolean isRecordTrackingEnabled(); - - public IIntakeProgressTracker createIntakeProgressTracker(); - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java new file mode 100644 index 0000000..866910b --- /dev/null +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.adapter.factory; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.asterix.external.api.ILookupReaderFactory; +import org.apache.asterix.external.api.ILookupRecordReader; +import org.apache.asterix.external.api.IRecordDataParser; +import org.apache.asterix.external.api.IRecordDataParserFactory; +import org.apache.asterix.external.dataset.adapter.LookupAdapter; +import org.apache.asterix.external.indexing.ExternalFileIndexAccessor; +import org.apache.asterix.external.indexing.RecordIdReader; +import org.apache.asterix.external.indexing.RecordIdReaderFactory; +import org.apache.asterix.external.input.record.reader.LookupReaderFactoryProvider; +import org.apache.asterix.external.provider.ParserFactoryProvider; +import org.apache.asterix.om.types.ARecordType; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.INullWriterFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class LookupAdapterFactory implements Serializable { + + private static final long serialVersionUID = 1L; + private IRecordDataParserFactory dataParserFactory; + private ILookupReaderFactory readerFactory; + private ARecordType recordType; + private int[] ridFields; + private Map configuration; + private boolean retainInput; + private boolean retainNull; + private int[] propagatedFields; + private INullWriterFactory iNullWriterFactory; + + public LookupAdapterFactory(ARecordType recordType, int[] ridFields, boolean retainInput, boolean retainNull, + INullWriterFactory iNullWriterFactory) { + this.recordType = recordType; + this.ridFields = ridFields; + this.retainInput = retainInput; + this.retainNull = retainNull; + this.iNullWriterFactory = iNullWriterFactory; + } + + public LookupAdapter createAdapter(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecDesc, + ExternalFileIndexAccessor snapshotAccessor, IFrameWriter writer) throws HyracksDataException { + try { + IRecordDataParser dataParser = dataParserFactory.createRecordParser(ctx); + dataParser.configure(configuration, recordType); + ILookupRecordReader reader = readerFactory.createRecordReader(ctx, partition, + snapshotAccessor); + reader.configure(configuration); + RecordIdReader ridReader = RecordIdReaderFactory.create(configuration, ridFields); + configurePropagatedFields(inRecDesc); + return new LookupAdapter(dataParser, reader, inRecDesc, ridReader, retainInput, propagatedFields, + retainNull, iNullWriterFactory, ctx, writer); + } catch (Exception e) { + throw new HyracksDataException(e); + } + } + + public void configure(Map configuration) throws Exception { + this.configuration = configuration; + readerFactory = LookupReaderFactoryProvider.getLookupReaderFactory(configuration); + dataParserFactory = (IRecordDataParserFactory) ParserFactoryProvider.getDataParserFactory(configuration); + dataParserFactory.setRecordType(recordType); + readerFactory.configure(configuration); + dataParserFactory.configure(configuration); + } + + private void configurePropagatedFields(RecordDescriptor inRecDesc) { + int ptr = 0; + boolean skip = false; + propagatedFields = new int[inRecDesc.getFieldCount() - ridFields.length]; + for (int i = 0; i < inRecDesc.getFieldCount(); i++) { + if (ptr < ridFields.length) { + skip = false; + for (int j = 0; j < ridFields.length; j++) { + if (ridFields[j] == i) { + ptr++; + skip = true; + break; + } + } + if (!skip) + propagatedFields[i - ptr] = i; + } else { + propagatedFields[i - ptr] = i; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java deleted file mode 100644 index 251d69a..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.adapter.factory; - -import java.io.File; -import java.util.List; -import java.util.Map; -import java.util.logging.Level; - -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.feeds.api.IDatasourceAdapter; -import org.apache.asterix.external.dataset.adapter.NCFileSystemAdapter; -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.asterix.external.util.DNSResolverFactory; -import org.apache.asterix.external.util.INodeResolver; -import org.apache.asterix.external.util.INodeResolverFactory; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory; -import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.dataflow.std.file.FileSplit; - -/** - * Factory class for creating an instance of NCFileSystemAdapter. An - * NCFileSystemAdapter reads external data residing on the local file system of - * an NC. - */ -public class NCFileSystemAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory { - private static final long serialVersionUID = 1L; - - public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs"; - - private static final INodeResolver DEFAULT_NODE_RESOLVER = new DNSResolverFactory().createNodeResolver(); - - private IAType sourceDatatype; - private FileSplit[] fileSplits; - private ARecordType outputType; - - @Override - public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception { - NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(fileSplits, parserFactory, sourceDatatype, ctx); - return fsAdapter; - } - - @Override - public String getName() { - return NC_FILE_SYSTEM_ADAPTER_NAME; - } - - @Override - public SupportedOperation getSupportedOperations() { - return SupportedOperation.READ; - } - - @Override - public void configure(Map configuration, ARecordType outputType) throws Exception { - this.configuration = configuration; - this.outputType = outputType; - String[] splits = configuration.get(AsterixTupleParserFactory.KEY_PATH).split(","); - IAType sourceDatatype = outputType; - configureFileSplits(splits); - configureFormat(sourceDatatype); - - } - - @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { - return configurePartitionConstraint(); - } - - private void configureFileSplits(String[] splits) throws AsterixException { - if (fileSplits == null) { - fileSplits = new FileSplit[splits.length]; - String nodeName; - String nodeLocalPath; - int count = 0; - String trimmedValue; - for (String splitPath : splits) { - trimmedValue = splitPath.trim(); - if (!trimmedValue.contains("://")) { - throw new AsterixException( - "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\""); - } - nodeName = trimmedValue.split(":")[0]; - nodeLocalPath = trimmedValue.split("://")[1]; - FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath))); - fileSplits[count++] = fileSplit; - } - } - } - - private AlgebricksPartitionConstraint configurePartitionConstraint() throws AsterixException { - String[] locs = new String[fileSplits.length]; - String location; - for (int i = 0; i < fileSplits.length; i++) { - location = getNodeResolver().resolveNode(fileSplits[i].getNodeName()); - locs[i] = location; - } - return new AlgebricksAbsolutePartitionConstraint(locs); - } - - protected INodeResolver getNodeResolver() { - if (nodeResolver == null) { - nodeResolver = initializeNodeResolver(); - } - return nodeResolver; - } - - private static INodeResolver initializeNodeResolver() { - INodeResolver nodeResolver = null; - String configuredNodeResolverFactory = System - .getProperty(AsterixTupleParserFactory.NODE_RESOLVER_FACTORY_PROPERTY); - if (configuredNodeResolverFactory != null) { - try { - nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance())) - .createNodeResolver(); - - } catch (Exception e) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.log(Level.WARNING, "Unable to create node resolver from the configured classname " - + configuredNodeResolverFactory + "\n" + e.getMessage()); - } - nodeResolver = DEFAULT_NODE_RESOLVER; - } - } else { - nodeResolver = DEFAULT_NODE_RESOLVER; - } - return nodeResolver; - } - - @Override - public ARecordType getAdapterOutputType() { - return outputType; - } - - @Override - public InputDataFormat getInputDataFormat() { - return InputDataFormat.UNKNOWN; - } - - public void setFiles(List files) throws AlgebricksException { - throw new AlgebricksException("can't set files for this Adapter"); - } - -}