Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5C6F719337 for ; Thu, 7 Apr 2016 14:59:49 +0000 (UTC) Received: (qmail 86900 invoked by uid 500); 7 Apr 2016 14:59:49 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 86860 invoked by uid 500); 7 Apr 2016 14:59:48 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 86851 invoked by uid 99); 7 Apr 2016 14:59:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Apr 2016 14:59:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 60858C634E for ; Thu, 7 Apr 2016 14:59:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.216 X-Spam-Level: X-Spam-Status: No, score=-4.216 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.996] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id laQaD1rEJ8Cc for ; Thu, 7 Apr 2016 14:59:44 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id D67375FB44 for ; Thu, 7 Apr 2016 14:59:42 +0000 (UTC) Received: (qmail 85503 invoked by uid 99); 7 Apr 2016 14:59:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Apr 2016 14:59:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2D825E07EF; Thu, 7 Apr 2016 14:59:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: imaxon@apache.org To: commits@asterixdb.incubator.apache.org Date: Thu, 07 Apr 2016 14:59:50 -0000 Message-Id: <6596ae9eeec94ce6815524416c94707a@git.apache.org> In-Reply-To: <54f3ff538fc347d59763ae613b6b0ccf@git.apache.org> References: <54f3ff538fc347d59763ae613b6b0ccf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master' into hyracks-merge2 http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java ---------------------------------------------------------------------- diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java index 00c1eb7,0000000..3c3b8fb mode 100644,000000..100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java @@@ -1,182 -1,0 +1,178 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.stream; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; - import java.nio.file.Path; - import java.util.Map; + +import org.apache.asterix.external.api.AsterixInputStream; +import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.FeedLogManager; +import org.apache.asterix.external.util.FileSystemWatcher; - import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; - import org.apache.hyracks.dataflow.std.file.FileSplit; +import org.apache.log4j.Logger; + +public class LocalFSInputStream extends AsterixInputStream { + + private static final Logger LOGGER = Logger.getLogger(LocalFSInputStream.class.getName()); - private final Path path; + private final FileSystemWatcher watcher; + private FileInputStream in; + private byte lastByte; + private File currentFile; + - public LocalFSInputStream(final FileSplit[] fileSplits, final IHyracksTaskContext ctx, - final Map configuration, final int partition, final String expression, final boolean isFeed) - throws IOException { - this.path = fileSplits[partition].getLocalFile().getFile().toPath(); - this.watcher = new FileSystemWatcher(path, expression, isFeed); - this.watcher.init(); - } - - @Override - public void setFeedLogManager(FeedLogManager logManager) { - super.setFeedLogManager(logManager); - watcher.setFeedLogManager(logManager); ++ public LocalFSInputStream(FileSystemWatcher watcher) { ++ this.watcher = watcher; + } + + @Override + public void setController(AbstractFeedDataFlowController controller) { + super.setController(controller); - watcher.setController(controller); + } + + @Override ++ public void setFeedLogManager(FeedLogManager logManager) throws HyracksDataException { ++ super.setFeedLogManager(logManager); ++ watcher.setFeedLogManager(logManager); ++ }; ++ ++ @Override + public void close() throws IOException { + IOException ioe = null; + if (in != null) { + try { + closeFile(); + } catch (Exception e) { + ioe = new IOException(e); + } + } + try { + watcher.close(); + } catch (Exception e) { + if (ioe == null) { + throw e; + } + ioe.addSuppressed(e); + throw ioe; + } + } + + private void closeFile() throws IOException { + if (in != null) { ++ if (logManager != null) { ++ logManager.endPartition(currentFile.getAbsolutePath()); ++ } + try { + in.close(); + } finally { + in = null; + currentFile = null; + } + } + } + + /** + * Closes the current input stream and opens the next one, if any. + */ + private boolean advance() throws IOException { + closeFile(); - if (watcher.hasNext()) { - currentFile = watcher.next(); ++ currentFile = watcher.poll(); ++ if (currentFile == null) { ++ if (controller != null) { ++ controller.flush(); ++ } ++ currentFile = watcher.take(); ++ } ++ if (currentFile != null) { + in = new FileInputStream(currentFile); ++ if (notificationHandler != null) { ++ notificationHandler.notifyNewSource(); ++ } + return true; + } + return false; + } + + @Override + public int read() throws IOException { + throw new HyracksDataException( + "read() is not supported with this stream. use read(byte[] b, int off, int len)"); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (in == null) { + if (!advance()) { + return -1; + } + } + int result = in.read(b, off, len); + while ((result < 0) && advance()) { + // return a new line at the end of every file <--Might create problems for some cases + // depending on the parser implementation--> + if ((lastByte != ExternalDataConstants.BYTE_LF) && (lastByte != ExternalDataConstants.BYTE_LF)) { + lastByte = ExternalDataConstants.BYTE_LF; + b[off] = ExternalDataConstants.BYTE_LF; + return 1; + } + // recursive call + result = in.read(b, off, len); + } + if (result > 0) { + lastByte = b[(off + result) - 1]; + } + return result; + } + + @Override + public boolean stop() throws Exception { ++ closeFile(); + watcher.close(); + return true; + } + + @Override + public boolean handleException(Throwable th) { + if (in == null) { + return false; + } + if (th instanceof IOException) { + // TODO: Change from string check to exception type + if (th.getCause().getMessage().contains("Malformed input stream")) { + if (currentFile != null) { + try { + logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file"); + } catch (IOException e) { + LOGGER.warn("Filed to write to feed log file", e); + } + LOGGER.warn("Corrupted input file: " + currentFile.getAbsolutePath()); + } + try { + advance(); + return true; + } catch (Exception e) { - return false; - } - } else { - try { - watcher.init(); - } catch (IOException e) { - LOGGER.warn("Failed to initialize watcher during failure recovery", e); - return false; ++ LOGGER.warn("An exception was thrown while trying to skip a file", e); + } + } - return true; + } ++ LOGGER.warn("Failed to recover from failure", th); + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java ---------------------------------------------------------------------- diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java index 85d0e41,0000000..ae012f3 mode 100644,000000..100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java @@@ -1,158 -1,0 +1,164 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.stream.factory; + +import java.io.File; - import java.io.IOException; ++import java.nio.file.Path; ++import java.util.ArrayList; +import java.util.Map; ++import java.util.Set; ++import java.util.TreeSet; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.external.api.AsterixInputStream; +import org.apache.asterix.external.api.IInputStreamFactory; +import org.apache.asterix.external.api.INodeResolver; +import org.apache.asterix.external.api.INodeResolverFactory; +import org.apache.asterix.external.input.stream.LocalFSInputStream; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.ExternalDataUtils; - import org.apache.asterix.external.util.FeedUtils; ++import org.apache.asterix.external.util.FileSystemWatcher; +import org.apache.asterix.external.util.NodeResolverFactory; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.dataflow.std.file.FileSplit; + +public class LocalFSInputStreamFactory implements IInputStreamFactory { + + private static final long serialVersionUID = 1L; + + protected static final INodeResolver DEFAULT_NODE_RESOLVER = new NodeResolverFactory().createNodeResolver(); + protected static final Logger LOGGER = Logger.getLogger(LocalFSInputStreamFactory.class.getName()); + protected static INodeResolver nodeResolver; + protected Map configuration; + protected FileSplit[] inputFileSplits; - protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage + protected boolean isFeed; + protected String expression; + // transient fields (They don't need to be serialized and transferred) + private transient AlgebricksAbsolutePartitionConstraint constraints; ++ private transient FileSystemWatcher watcher; + + @Override - public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException { - try { - return new LocalFSInputStream(inputFileSplits, ctx, configuration, partition, expression, isFeed); - } catch (IOException e) { - throw new HyracksDataException(e); ++ public synchronized AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) ++ throws HyracksDataException { ++ if (watcher == null) { ++ String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId(); ++ ArrayList inputResources = new ArrayList<>(); ++ for (int i = 0; i < inputFileSplits.length; i++) { ++ if (inputFileSplits[i].getNodeName().equals(nodeName)) { ++ inputResources.add(inputFileSplits[i].getLocalFile().getFile().toPath()); ++ } ++ } ++ watcher = new FileSystemWatcher(inputResources, expression, isFeed); + } ++ return new LocalFSInputStream(watcher); + } + + @Override + public DataSourceType getDataSourceType() { + return DataSourceType.STREAM; + } + + @Override + public boolean isIndexible() { + return false; + } + + @Override + public void configure(Map configuration) throws AsterixException { + this.configuration = configuration; + String[] splits = configuration.get(ExternalDataConstants.KEY_PATH).split(","); + configureFileSplits(splits); + configurePartitionConstraint(); + this.isFeed = ExternalDataUtils.isFeed(configuration) && ExternalDataUtils.keepDataSourceOpen(configuration); - if (isFeed) { - feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration), - ExternalDataUtils.getFeedName(configuration), constraints); - } + this.expression = configuration.get(ExternalDataConstants.KEY_EXPRESSION); + } + + @Override + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { + return constraints; + } + + private void configureFileSplits(String[] splits) throws AsterixException { ++ INodeResolver resolver = getNodeResolver(); + if (inputFileSplits == null) { + inputFileSplits = new FileSplit[splits.length]; + String nodeName; + String nodeLocalPath; + int count = 0; + String trimmedValue; + for (String splitPath : splits) { + trimmedValue = splitPath.trim(); + if (!trimmedValue.contains("://")) { + throw new AsterixException( + "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\""); + } - nodeName = trimmedValue.split(":")[0]; ++ nodeName = resolver.resolveNode(trimmedValue.split(":")[0]); + nodeLocalPath = trimmedValue.split("://")[1]; + FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath))); + inputFileSplits[count++] = fileSplit; + } + } + } + + private void configurePartitionConstraint() throws AsterixException { - String[] locs = new String[inputFileSplits.length]; - String location; ++ Set locs = new TreeSet<>(); + for (int i = 0; i < inputFileSplits.length; i++) { - location = getNodeResolver().resolveNode(inputFileSplits[i].getNodeName()); - locs[i] = location; ++ String location = inputFileSplits[i].getNodeName(); ++ locs.add(location); + } - constraints = new AlgebricksAbsolutePartitionConstraint(locs); ++ constraints = new AlgebricksAbsolutePartitionConstraint(locs.toArray(new String[locs.size()])); + } + + protected INodeResolver getNodeResolver() { + if (nodeResolver == null) { + synchronized (DEFAULT_NODE_RESOLVER) { + if (nodeResolver == null) { + nodeResolver = initializeNodeResolver(); + } + } + } + return nodeResolver; + } + + private static INodeResolver initializeNodeResolver() { + INodeResolver nodeResolver = null; + String configuredNodeResolverFactory = System.getProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY); + if (configuredNodeResolverFactory != null) { + try { + nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance())) + .createNodeResolver(); + + } catch (Exception e) { + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.log(Level.WARNING, "Unable to create node resolver from the configured classname " + + configuredNodeResolverFactory + "\n" + e.getMessage()); + } + nodeResolver = DEFAULT_NODE_RESOLVER; + } + } else { + nodeResolver = DEFAULT_NODE_RESOLVER; + } + return nodeResolver; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java ---------------------------------------------------------------------- diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java index d362201,0000000..6ba27d8 mode 100644,000000..100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java @@@ -1,129 -1,0 +1,124 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.provider; + +import java.io.IOException; +import java.util.Map; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.external.api.AsterixInputStream; +import org.apache.asterix.external.api.IDataFlowController; +import org.apache.asterix.external.api.IDataParserFactory; +import org.apache.asterix.external.api.IExternalDataSourceFactory; +import org.apache.asterix.external.api.IIndexingDatasource; +import org.apache.asterix.external.api.IInputStreamFactory; +import org.apache.asterix.external.api.IRecordDataParser; +import org.apache.asterix.external.api.IRecordDataParserFactory; +import org.apache.asterix.external.api.IRecordReader; +import org.apache.asterix.external.api.IRecordReaderFactory; +import org.apache.asterix.external.api.IRecordWithPKDataParser; +import org.apache.asterix.external.api.IStreamDataParser; +import org.apache.asterix.external.api.IStreamDataParserFactory; +import org.apache.asterix.external.dataflow.ChangeFeedDataFlowController; +import org.apache.asterix.external.dataflow.ChangeFeedWithMetaDataFlowController; +import org.apache.asterix.external.dataflow.FeedRecordDataFlowController; +import org.apache.asterix.external.dataflow.FeedStreamDataFlowController; +import org.apache.asterix.external.dataflow.FeedTupleForwarder; +import org.apache.asterix.external.dataflow.FeedWithMetaDataFlowController; +import org.apache.asterix.external.dataflow.IndexingDataFlowController; +import org.apache.asterix.external.dataflow.RecordDataFlowController; +import org.apache.asterix.external.dataflow.StreamDataFlowController; +import org.apache.asterix.external.parser.RecordWithMetadataParser; +import org.apache.asterix.external.util.DataflowUtils; +import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.asterix.external.util.FeedLogManager; +import org.apache.asterix.external.util.FeedUtils; +import org.apache.asterix.om.types.ARecordType; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; - import org.apache.hyracks.dataflow.std.file.FileSplit; + +public class DataflowControllerProvider { + + // TODO: Instead, use a factory just like data source and data parser. + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx, + int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory, - Map configuration, boolean indexingOp, boolean isFeed, FileSplit[] feedLogFileSplits) - throws HyracksDataException { ++ Map configuration, boolean indexingOp, boolean isFeed, FeedLogManager feedLogManager) ++ throws HyracksDataException { + try { - FeedLogManager feedLogManager = null; - if (isFeed) { - feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits); - } + switch (dataSourceFactory.getDataSourceType()) { + case RECORDS: + IRecordReaderFactory recordReaderFactory = (IRecordReaderFactory) dataSourceFactory; + IRecordReader recordReader = recordReaderFactory.createRecordReader(ctx, partition); + IRecordDataParserFactory recordParserFactory = (IRecordDataParserFactory) dataParserFactory; + IRecordDataParser dataParser = recordParserFactory.createRecordParser(ctx); + if (indexingOp) { + return new IndexingDataFlowController(ctx, + DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser, + recordReader, ((IIndexingDatasource) recordReader).getIndexer()); + } else if (isFeed) { + FeedTupleForwarder tupleForwarder = (FeedTupleForwarder) DataflowUtils + .getTupleForwarder(configuration, feedLogManager); + boolean isChangeFeed = ExternalDataUtils.isChangeFeed(configuration); + boolean isRecordWithMeta = ExternalDataUtils.isRecordWithMeta(configuration); + if (isRecordWithMeta) { + if (isChangeFeed) { + int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration); + return new ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, + numOfKeys + 2, (RecordWithMetadataParser) dataParser, recordReader); + } else { + return new FeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, 2, + (RecordWithMetadataParser) dataParser, recordReader); + } + } else if (isChangeFeed) { + int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration); + return new ChangeFeedDataFlowController(ctx, tupleForwarder, feedLogManager, numOfKeys + 1, + (IRecordWithPKDataParser) dataParser, recordReader); + } else { + return new FeedRecordDataFlowController(ctx, tupleForwarder, feedLogManager, 1, dataParser, + recordReader); + } + } else { + return new RecordDataFlowController(ctx, + DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser, + recordReader, 1); + } + case STREAM: + IInputStreamFactory streamFactory = (IInputStreamFactory) dataSourceFactory; + AsterixInputStream stream = streamFactory.createInputStream(ctx, partition); + IStreamDataParserFactory streamParserFactory = (IStreamDataParserFactory) dataParserFactory; + IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition); + streamParser.setInputStream(stream); + if (isFeed) { + return new FeedStreamDataFlowController(ctx, + (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager), + feedLogManager, FeedUtils.getNumOfFields(configuration), streamParser, stream); + } else { + return new StreamDataFlowController(ctx, DataflowUtils.getTupleForwarder(configuration, null), + streamParser); + } + default: + throw new HyracksDataException( + "Unknown data source type: " + dataSourceFactory.getDataSourceType()); + } + } catch (IOException | AsterixException e) { + throw new HyracksDataException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java ---------------------------------------------------------------------- diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java index f8d64e0,0000000..0f24f91 mode 100644,000000..100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java @@@ -1,149 -1,0 +1,115 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.provider; + +import java.util.Map; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.external.api.IExternalDataSourceFactory; +import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType; +import org.apache.asterix.external.api.IInputStreamFactory; +import org.apache.asterix.external.api.IRecordReaderFactory; +import org.apache.asterix.external.input.HDFSDataSourceFactory; +import org.apache.asterix.external.input.record.reader.RecordWithPKTestReaderFactory; +import org.apache.asterix.external.input.record.reader.kv.KVReaderFactory; +import org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory; - import org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReaderFactory; - import org.apache.asterix.external.input.record.reader.stream.LineRecordReaderFactory; - import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReaderFactory; ++import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory; +import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory; +import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory; ++import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory; +import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory; +import org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamFactory; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.ExternalDataUtils; + +public class DatasourceFactoryProvider { + + public static IExternalDataSourceFactory getExternalDataSourceFactory(Map configuration) + throws AsterixException { + if (ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.RECORDS)) { + String reader = configuration.get(ExternalDataConstants.KEY_READER); + return DatasourceFactoryProvider.getRecordReaderFactory(reader, configuration); + } else { + // get stream source + String streamSource = configuration.get(ExternalDataConstants.KEY_STREAM_SOURCE); + return DatasourceFactoryProvider.getInputStreamFactory(streamSource, configuration); + } + } + - public static IInputStreamFactory getInputStreamFactory(String streamSource, - Map configuration) throws AsterixException { ++ public static IInputStreamFactory getInputStreamFactory(String streamSource, Map configuration) ++ throws AsterixException { + IInputStreamFactory streamSourceFactory; + if (ExternalDataUtils.isExternal(streamSource)) { + String dataverse = ExternalDataUtils.getDataverse(configuration); + streamSourceFactory = ExternalDataUtils.createExternalInputStreamFactory(dataverse, streamSource); + } else { + switch (streamSource) { - case ExternalDataConstants.STREAM_HDFS: - streamSourceFactory = new HDFSDataSourceFactory(); - break; + case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM: + streamSourceFactory = new LocalFSInputStreamFactory(); + break; - case ExternalDataConstants.STREAM_SOCKET: ++ case ExternalDataConstants.SOCKET: + case ExternalDataConstants.ALIAS_SOCKET_ADAPTER: + streamSourceFactory = new SocketServerInputStreamFactory(); + break; + case ExternalDataConstants.STREAM_SOCKET_CLIENT: + streamSourceFactory = new SocketServerInputStreamFactory(); + break; + case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER: + streamSourceFactory = new TwitterFirehoseStreamFactory(); + break; + default: + throw new AsterixException("unknown input stream factory"); + } + } + return streamSourceFactory; + } + + public static IRecordReaderFactory getRecordReaderFactory(String reader, Map configuration) + throws AsterixException { + if (reader.equals(ExternalDataConstants.EXTERNAL)) { + return ExternalDataUtils.createExternalRecordReaderFactory(configuration); + } - String parser = configuration.get(ExternalDataConstants.KEY_PARSER); - IInputStreamFactory inputStreamFactory; - switch (parser) { - case ExternalDataConstants.FORMAT_ADM: - case ExternalDataConstants.FORMAT_JSON: - case ExternalDataConstants.FORMAT_SEMISTRUCTURED: - inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration); - return new SemiStructuredRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory); - case ExternalDataConstants.FORMAT_LINE_SEPARATED: - inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration); - return new EmptyLineSeparatedRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory); - case ExternalDataConstants.FORMAT_DELIMITED_TEXT: - case ExternalDataConstants.FORMAT_CSV: - inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration); - return new LineRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory); - case ExternalDataConstants.FORMAT_RECORD_WITH_METADATA: - switch (reader) { - case ExternalDataConstants.READER_KV: - return new KVReaderFactory(); - case ExternalDataConstants.READER_KV_TEST: - return new KVTestReaderFactory(); - } - } - String format = configuration.get(ExternalDataConstants.KEY_FORMAT); - if (format != null) { - switch (format) { - case ExternalDataConstants.FORMAT_ADM: - case ExternalDataConstants.FORMAT_JSON: - case ExternalDataConstants.FORMAT_SEMISTRUCTURED: - inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration); - return new SemiStructuredRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory); - case ExternalDataConstants.FORMAT_LINE_SEPARATED: - inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration); - return new EmptyLineSeparatedRecordReaderFactory() - .setInputStreamFactoryProvider(inputStreamFactory); - case ExternalDataConstants.FORMAT_DELIMITED_TEXT: - case ExternalDataConstants.FORMAT_CSV: - inputStreamFactory = DatasourceFactoryProvider.getInputStreamFactory(reader, configuration); - return new LineRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory); - } - } + switch (reader) { ++ case ExternalDataConstants.READER_KV: ++ return new KVReaderFactory(); ++ case ExternalDataConstants.READER_KV_TEST: ++ return new KVTestReaderFactory(); + case ExternalDataConstants.READER_HDFS: + return new HDFSDataSourceFactory(); ++ case ExternalDataConstants.ALIAS_LOCALFS_ADAPTER: ++ return new StreamRecordReaderFactory(new LocalFSInputStreamFactory()); + case ExternalDataConstants.READER_TWITTER_PULL: + case ExternalDataConstants.READER_TWITTER_PUSH: ++ case ExternalDataConstants.READER_PUSH_TWITTER: ++ case ExternalDataConstants.READER_PULL_TWITTER: + return new TwitterRecordReaderFactory(); - case ExternalDataConstants.READER_KV: - return new KVReaderFactory(); - case ExternalDataConstants.READER_KV_TEST: - return new KVTestReaderFactory(); + case ExternalDataConstants.TEST_RECORD_WITH_PK: + return new RecordWithPKTestReaderFactory(); ++ case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER: ++ return new StreamRecordReaderFactory(new TwitterFirehoseStreamFactory()); ++ case ExternalDataConstants.ALIAS_SOCKET_ADAPTER: ++ case ExternalDataConstants.SOCKET: ++ return new StreamRecordReaderFactory(new SocketServerInputStreamFactory()); ++ case ExternalDataConstants.STREAM_SOCKET_CLIENT: ++ return new StreamRecordReaderFactory(new SocketClientInputStreamFactory()); + default: + throw new AsterixException("unknown record reader factory: " + reader); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java ---------------------------------------------------------------------- diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java index 06928b3,0000000..682fb89 mode 100644,000000..100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java @@@ -1,76 -1,0 +1,76 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.provider; + +import java.util.Map; + +import javax.annotation.Nonnull; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.external.api.IDataParserFactory; +import org.apache.asterix.external.parser.factory.ADMDataParserFactory; +import org.apache.asterix.external.parser.factory.DelimitedDataParserFactory; +import org.apache.asterix.external.parser.factory.HiveDataParserFactory; +import org.apache.asterix.external.parser.factory.RSSParserFactory; +import org.apache.asterix.external.parser.factory.RecordWithMetadataParserFactory; +import org.apache.asterix.external.parser.factory.TestRecordWithPKParserFactory; +import org.apache.asterix.external.parser.factory.TweetParserFactory; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.ExternalDataUtils; + +public class ParserFactoryProvider { + public static IDataParserFactory getDataParserFactory(Map configuration) throws AsterixException { + IDataParserFactory parserFactory = null; + String parserFactoryName = configuration.get(ExternalDataConstants.KEY_DATA_PARSER); + if ((parserFactoryName != null) && ExternalDataUtils.isExternal(parserFactoryName)) { + return ExternalDataUtils.createExternalParserFactory(ExternalDataUtils.getDataverse(configuration), + parserFactoryName); + } else { + parserFactory = ParserFactoryProvider + .getDataParserFactory(ExternalDataUtils.getRecordFormat(configuration)); + } + return parserFactory; + } + + @SuppressWarnings("rawtypes") + public static IDataParserFactory getDataParserFactory(@Nonnull String parser) throws AsterixException { + switch (parser) { + case ExternalDataConstants.FORMAT_ADM: + case ExternalDataConstants.FORMAT_JSON: + case ExternalDataConstants.FORMAT_SEMISTRUCTURED: + return new ADMDataParserFactory(); + case ExternalDataConstants.FORMAT_DELIMITED_TEXT: + case ExternalDataConstants.FORMAT_CSV: + return new DelimitedDataParserFactory(); + case ExternalDataConstants.FORMAT_HIVE: + case ExternalDataConstants.PARSER_HIVE: + return new HiveDataParserFactory(); + case ExternalDataConstants.FORMAT_TWEET: + return new TweetParserFactory(); + case ExternalDataConstants.FORMAT_RSS: + return new RSSParserFactory(); + case ExternalDataConstants.FORMAT_RECORD_WITH_METADATA: + return new RecordWithMetadataParserFactory(); + case ExternalDataConstants.TEST_RECORD_WITH_PK: + return new TestRecordWithPKParserFactory(); + default: - throw new AsterixException("Unknown parser " + parser); ++ throw new AsterixException("Unknown format: " + parser); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java ---------------------------------------------------------------------- diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index a02152b,0000000..b5ec27a mode 100644,000000..100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@@ -1,232 -1,0 +1,232 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util; + +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.TextInputFormat; + +public class ExternalDataConstants { + // TODO: Remove unused variables. + /** + * Keys + */ + // used to specify the stream factory for an adapter that has a stream data source + public static final String KEY_STREAM = "stream"; + // used to specify the dataverse of the adapter + public static final String KEY_DATAVERSE = "dataverse"; + // used to specify the socket addresses when reading data from sockets + public static final String KEY_SOCKETS = "sockets"; + // specify whether the socket address points to an NC or an IP + public static final String KEY_MODE = "address-type"; + // specify the HDFS name node address when reading HDFS data + public static final String KEY_HDFS_URL = "hdfs"; + // specify the path when reading from a file system + public static final String KEY_PATH = "path"; + // specify the HDFS input format when reading data from HDFS + public static final String KEY_INPUT_FORMAT = "input-format"; + // specifies the filesystem (localfs or HDFS) when using a filesystem data source + public static final String KEY_FILESYSTEM = "fs"; + // specifies the address of the HDFS name node + public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS"; + // specifies the class implementation of the accessed instance of HDFS + public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl"; + public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir"; + public static final String KEY_HADOOP_INPUT_FORMAT = "mapred.input.format.class"; + public static final String KEY_HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit"; + public static final String KEY_HADOOP_SOCKET_PATH = "dfs.domain.socket.path"; + public static final String KEY_HADOOP_BUFFER_SIZE = "io.file.buffer.size"; + public static final String KEY_SOURCE_DATATYPE = "type-name"; + public static final String KEY_DELIMITER = "delimiter"; + public static final String KEY_PARSER_FACTORY = "tuple-parser"; + public static final String KEY_DATA_PARSER = "parser"; + public static final String KEY_HEADER = "header"; + public static final String KEY_READER = "reader"; + public static final String KEY_READER_STREAM = "stream"; + public static final String KEY_TYPE_NAME = "type-name"; + public static final String KEY_RECORD_START = "record-start"; + public static final String KEY_RECORD_END = "record-end"; + public static final String KEY_EXPRESSION = "expression"; + public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path"; + public static final String KEY_FORMAT = "format"; + public static final String KEY_QUOTE = "quote"; + public static final String KEY_PARSER = "parser"; + public static final String KEY_DATASET_RECORD = "dataset-record"; + public static final String KEY_HIVE_SERDE = "hive-serde"; + public static final String KEY_RSS_URL = "url"; + public static final String KEY_INTERVAL = "interval"; - public static final String KEY_PULL = "pull"; - public static final String KEY_PUSH = "push"; + public static final String KEY_IS_FEED = "is-feed"; + public static final String KEY_WAIT_FOR_DATA = "wait-for-data"; + public static final String KEY_FEED_NAME = "feed"; + // a string representing external bucket name + public static final String KEY_BUCKET = "bucket"; + // a comma delimited list of nodes + public static final String KEY_NODES = "nodes"; + // a string representing the password used to authenticate with the external data source + public static final String KEY_PASSWORD = "password"; + // an integer representing the number of raw records that can be buffered in the parsing queue + public static final String KEY_QUEUE_SIZE = "queue-size"; + // a comma delimited integers representing the indexes of the meta fields in the raw record (i,e: "3,1,0,2" denotes that the first meta field is in index 3 in the actual record) + public static final String KEY_META_INDEXES = "meta-indexes"; + // an integer representing the index of the value field in the data type + public static final String KEY_VALUE_INDEX = "value-index"; + // a string representing the format of the raw record in the value field in the data type + public static final String KEY_VALUE_FORMAT = "value-format"; + // a boolean indicating whether the feed is a change feed + public static final String KEY_IS_CHANGE_FEED = "change-feed"; + // an integer representing the number of keys in a change feed + public static final String KEY_KEY_SIZE = "key-size"; + // a boolean indicating whether the feed produces records with metadata + public static final String FORMAT_RECORD_WITH_METADATA = "record-with-metadata"; + // a string representing the format of the record (for adapters which produces records with additional information like pk or metadata) + public static final String KEY_RECORD_FORMAT = "record-format"; + public static final String KEY_META_TYPE_NAME = "meta-type-name"; + public static final String READER_STREAM = "stream"; + /** + * HDFS class names + */ + public static final String CLASS_NAME_TEXT_INPUT_FORMAT = TextInputFormat.class.getName(); + public static final String CLASS_NAME_SEQUENCE_INPUT_FORMAT = SequenceFileInputFormat.class.getName(); + public static final String CLASS_NAME_RC_INPUT_FORMAT = RCFileInputFormat.class.getName(); + public static final String CLASS_NAME_HDFS_FILESYSTEM = DistributedFileSystem.class.getName(); + /** + * input formats aliases + */ + public static final String INPUT_FORMAT_TEXT = "text-input-format"; + public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format"; + public static final String INPUT_FORMAT_RC = "rc-input-format"; + /** + * Builtin streams + */ + + /** + * Builtin record readers + */ + public static final String READER_HDFS = "hdfs"; + public static final String READER_KV = "key-value"; - public static final String READER_TWITTER_PUSH = "twitter-push"; - public static final String READER_TWITTER_PULL = "twitter-pull"; ++ public static final String READER_TWITTER_PUSH = "twitter_push"; ++ public static final String READER_PUSH_TWITTER = "push_twitter"; ++ public static final String READER_TWITTER_PULL = "twitter_pull"; ++ public static final String READER_PULL_TWITTER = "pull_twitter"; + + public static final String CLUSTER_LOCATIONS = "cluster-locations"; + public static final String SCHEDULER = "hdfs-scheduler"; + public static final String PARSER_HIVE = "hive-parser"; + public static final String HAS_HEADER = "has.header"; + public static final String TIME_TRACKING = "time.tracking"; + public static final String DEFAULT_QUOTE = "\""; + public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver"; + public static final String DEFAULT_DELIMITER = ","; + public static final String EXTERNAL_LIBRARY_SEPARATOR = "#"; + public static final String HDFS_INDEXING_ADAPTER = "hdfs-indexing-adapter"; + /** + * supported builtin record formats + */ + public static final String FORMAT_HIVE = "hive"; + public static final String FORMAT_BINARY = "binary"; + public static final String FORMAT_ADM = "adm"; + public static final String FORMAT_JSON = "json"; + public static final String FORMAT_DELIMITED_TEXT = "delimited-text"; + public static final String FORMAT_TWEET = "twitter-status"; + public static final String FORMAT_RSS = "rss"; + public static final String FORMAT_SEMISTRUCTURED = "semi-structured"; + public static final String FORMAT_LINE_SEPARATED = "line-separated"; + public static final String FORMAT_HDFS_WRITABLE = "hdfs-writable"; + public static final String FORMAT_KV = "kv"; + + /** + * input streams + */ + public static final String STREAM_HDFS = "hdfs"; + public static final String STREAM_LOCAL_FILESYSTEM = "localfs"; - public static final String STREAM_SOCKET = "socket"; ++ public static final String SOCKET = "socket"; + public static final String STREAM_SOCKET_CLIENT = "socket-client"; + + /** + * adapter aliases + */ + public static final String ALIAS_GENERIC_ADAPTER = "adapter"; + public static final String ALIAS_LOCALFS_ADAPTER = "localfs"; + public static final String ALIAS_LOCALFS_PUSH_ADAPTER = "push_localfs"; + public static final String ALIAS_HDFS_ADAPTER = "hdfs"; + public static final String ALIAS_SOCKET_ADAPTER = "socket_adapter"; + public static final String ALIAS_TWITTER_FIREHOSE_ADAPTER = "twitter_firehose"; + public static final String ALIAS_SOCKET_CLIENT_ADAPTER = "socket_client"; + public static final String ALIAS_RSS_ADAPTER = "rss_feed"; + public static final String ALIAS_FILE_FEED_ADAPTER = "file_feed"; + public static final String ALIAS_TWITTER_PUSH_ADAPTER = "push_twitter"; + public static final String ALIAS_TWITTER_PULL_ADAPTER = "pull_twitter"; + public static final String ALIAS_CNN_ADAPTER = "cnn_feed"; + public static final String ALIAS_FEED_WITH_META_ADAPTER = "feed_with_meta"; + public static final String ALIAS_CHANGE_FEED_WITH_META_ADAPTER = "change_feed_with_meta"; + // for testing purposes + public static final String ALIAS_TEST_CHANGE_ADAPTER = "test_change_feed"; + + /** + * Constant String values + */ + public static final String TRUE = "true"; + public static final String FALSE = "false"; + + /** + * Constant characters + */ + public static final char ESCAPE = '\\'; + public static final char QUOTE = '"'; + public static final char SPACE = ' '; + public static final char TAB = '\t'; + public static final char LF = '\n'; + public static final char CR = '\r'; + public static final char DEFAULT_RECORD_START = '{'; + public static final char DEFAULT_RECORD_END = '}'; + + /** + * Constant byte characters + */ + public static final byte BYTE_LF = '\n'; + public static final byte BYTE_CR = '\r'; + /** + * Size default values + */ + public static final int DEFAULT_BUFFER_SIZE = 4096; + public static final int DEFAULT_BUFFER_INCREMENT = 2048; + public static final int DEFAULT_QUEUE_SIZE = 64; + public static final int MAX_RECORD_SIZE = 32000000; + + /** + * Expected parameter values + */ + public static final String PARAMETER_OF_SIZE_ONE = "Value of size 1"; + public static final String LARGE_RECORD_ERROR_MESSAGE = "Record is too large"; + public static final String KEY_RECORD_INDEX = "record-index"; + public static final String FORMAT_DCP = "dcp"; + public static final String KEY_KEY_INDEXES = "key-indexes"; + public static final String KEY_KEY_INDICATORS = "key-indicators"; + public static final String KEY_STREAM_SOURCE = "stream-source"; + public static final String EXTERNAL = "external"; + public static final String KEY_READER_FACTORY = "reader-factory"; + public static final String READER_KV_TEST = "kv_test"; + public static final String READER_RSS = "rss"; + public static final String FORMAT_CSV = "csv"; + public static final String TEST_RECORD_WITH_PK = "test-record-with-pk"; + + public static final String ERROR_LARGE_RECORD = "Record is too large"; + public static final String ERROR_PARSE_RECORD = "Parser failed to parse record"; +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java ---------------------------------------------------------------------- diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 42fe8bf,0000000..76898c2 mode 100644,000000..100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@@ -1,342 -1,0 +1,326 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.external.api.IDataParserFactory; +import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType; +import org.apache.asterix.external.api.IInputStreamFactory; +import org.apache.asterix.external.api.IRecordReaderFactory; +import org.apache.asterix.external.library.ExternalLibraryManager; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.AUnionType; +import org.apache.asterix.om.types.IAType; +import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; +import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory; +import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory; +import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory; +import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory; +import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory; +import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; + +public class ExternalDataUtils { + + // Get a delimiter from the given configuration + public static char getDelimiter(Map configuration) throws AsterixException { + String delimiterValue = configuration.get(ExternalDataConstants.KEY_DELIMITER); + if (delimiterValue == null) { + delimiterValue = ExternalDataConstants.DEFAULT_DELIMITER; + } else if (delimiterValue.length() != 1) { + throw new AsterixException( + "'" + delimiterValue + "' is not a valid delimiter. The length of a delimiter should be 1."); + } + return delimiterValue.charAt(0); + } + + // Get a quote from the given configuration when the delimiter is given + // Need to pass delimiter to check whether they share the same character + public static char getQuote(Map configuration, char delimiter) throws AsterixException { + String quoteValue = configuration.get(ExternalDataConstants.KEY_QUOTE); + if (quoteValue == null) { + quoteValue = ExternalDataConstants.DEFAULT_QUOTE; + } else if (quoteValue.length() != 1) { + throw new AsterixException("'" + quoteValue + "' is not a valid quote. The length of a quote should be 1."); + } + + // Since delimiter (char type value) can't be null, + // we only check whether delimiter and quote use the same character + if (quoteValue.charAt(0) == delimiter) { + throw new AsterixException( + "Quote '" + quoteValue + "' cannot be used with the delimiter '" + delimiter + "'. "); + } + + return quoteValue.charAt(0); + } + + // Get the header flag + public static boolean getHasHeader(Map configuration) { + return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_HEADER)); + } + + public static void validateParameters(Map configuration) throws AsterixException { + String reader = configuration.get(ExternalDataConstants.KEY_READER); + if (reader == null) { + throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER + " must be specified."); + } - String parser = configuration.get(ExternalDataConstants.KEY_PARSER); ++ String parser = configuration.get(ExternalDataConstants.KEY_FORMAT); + if (parser == null) { - throw new AsterixException("The parameter " + ExternalDataConstants.KEY_PARSER + " must be specified."); ++ throw new AsterixException("The parameter " + ExternalDataConstants.KEY_FORMAT + " must be specified."); + } + } + + public static DataSourceType getDataSourceType(Map configuration) { + String reader = configuration.get(ExternalDataConstants.KEY_READER); + if ((reader != null) && reader.equals(ExternalDataConstants.READER_STREAM)) { + return DataSourceType.STREAM; + } else { + return DataSourceType.RECORDS; + } + } + + public static boolean isExternal(String aString) { + return ((aString != null) && aString.contains(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) + && (aString.trim().length() > 1)); + } + + public static ClassLoader getClassLoader(String dataverse, String library) { + return ExternalLibraryManager.getLibraryClassLoader(dataverse, library); + } + + public static String getLibraryName(String aString) { + return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0]; + } + + public static String getExternalClassName(String aString) { + return aString.trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[1]; + } + + public static IInputStreamFactory createExternalInputStreamFactory(String dataverse, String stream) + throws AsterixException { + try { + String libraryName = getLibraryName(stream); + String className = getExternalClassName(stream); + ClassLoader classLoader = getClassLoader(dataverse, libraryName); + return ((IInputStreamFactory) (classLoader.loadClass(className).newInstance())); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new AsterixException("Failed to create stream factory", e); + } + } + + public static String getDataverse(Map configuration) { + return configuration.get(ExternalDataConstants.KEY_DATAVERSE); + } + + public static String getRecordFormat(Map configuration) { + String parserFormat = configuration.get(ExternalDataConstants.KEY_DATA_PARSER); + return parserFormat != null ? parserFormat : configuration.get(ExternalDataConstants.KEY_FORMAT); + } + + public static void setRecordFormat(Map configuration, String format) { + if (!configuration.containsKey(ExternalDataConstants.KEY_DATA_PARSER)) { + configuration.put(ExternalDataConstants.KEY_DATA_PARSER, format); + } + if (!configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) { + configuration.put(ExternalDataConstants.KEY_FORMAT, format); + } + } + + private static Map valueParserFactoryMap = initializeValueParserFactoryMap(); + + private static Map initializeValueParserFactoryMap() { + Map m = new HashMap(); + m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE); + m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE); + m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE); + m.put(ATypeTag.INT64, LongParserFactory.INSTANCE); + m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE); + return m; + } + + public static IValueParserFactory[] getValueParserFactories(ARecordType recordType) { + int n = recordType.getFieldTypes().length; + IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n]; + for (int i = 0; i < n; i++) { + ATypeTag tag = null; + if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) { + List unionTypes = ((AUnionType) recordType.getFieldTypes()[i]).getUnionList(); + if ((unionTypes.size() != 2) && (unionTypes.get(0).getTypeTag() != ATypeTag.NULL)) { + throw new NotImplementedException("Non-optional UNION type is not supported."); + } + tag = unionTypes.get(1).getTypeTag(); + } else { + tag = recordType.getFieldTypes()[i].getTypeTag(); + } + if (tag == null) { + throw new NotImplementedException("Failed to get the type information for field " + i + "."); + } + fieldParserFactories[i] = getParserFactory(tag); + } + return fieldParserFactories; + } + + public static IValueParserFactory getParserFactory(ATypeTag tag) { + IValueParserFactory vpf = valueParserFactoryMap.get(tag); + if (vpf == null) { + throw new NotImplementedException("No value parser factory for fields of type " + tag); + } + return vpf; + } + + public static String getRecordReaderStreamName(Map configuration) { + return configuration.get(ExternalDataConstants.KEY_READER_STREAM); + } + + public static boolean hasHeader(Map configuration) { + String value = configuration.get(ExternalDataConstants.KEY_HEADER); + if (value != null) { + return Boolean.valueOf(value); + } + return false; + } + - public static boolean isPull(Map configuration) { - String pull = configuration.get(ExternalDataConstants.KEY_PULL); - if (pull == null) { - return false; - } - return Boolean.parseBoolean(pull); - } - - public static boolean isPush(Map configuration) { - String push = configuration.get(ExternalDataConstants.KEY_PUSH); - if (push == null) { - return false; - } - return Boolean.parseBoolean(push); - } - + public static IRecordReaderFactory createExternalRecordReaderFactory(Map configuration) + throws AsterixException { + String readerFactory = configuration.get(ExternalDataConstants.KEY_READER_FACTORY); + if (readerFactory == null) { + throw new AsterixException("to use " + ExternalDataConstants.EXTERNAL + " reader, the parameter " + + ExternalDataConstants.KEY_READER_FACTORY + " must be specified."); + } + String[] libraryAndFactory = readerFactory.split(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR); + if (libraryAndFactory.length != 2) { + throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY + + " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\""); + } + String[] dataverseAndLibrary = libraryAndFactory[0].split("."); + if (dataverseAndLibrary.length != 2) { + throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY + + " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\""); + } + + ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(dataverseAndLibrary[0], + dataverseAndLibrary[1]); + try { + return (IRecordReaderFactory) classLoader.loadClass(libraryAndFactory[1]).newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new AsterixException("Failed to create record reader factory", e); + } + } + + public static IDataParserFactory createExternalParserFactory(String dataverse, String parserFactoryName) + throws AsterixException { + try { + String library = parserFactoryName.substring(0, + parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR)); + ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(dataverse, library); + return (IDataParserFactory) classLoader + .loadClass(parserFactoryName + .substring(parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) + 1)) + .newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new AsterixException("Failed to create an external parser factory", e); + } + } + + public static boolean isFeed(Map configuration) { + if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) { + return false; + } else { + return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_FEED)); + } + } + + public static void prepareFeed(Map configuration, String dataverseName, String feedName) { + if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) { + configuration.put(ExternalDataConstants.KEY_IS_FEED, ExternalDataConstants.TRUE); + } + configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataverseName); + configuration.put(ExternalDataConstants.KEY_FEED_NAME, feedName); + } + + public static boolean keepDataSourceOpen(Map configuration) { + if (!configuration.containsKey(ExternalDataConstants.KEY_WAIT_FOR_DATA)) { + return true; + } + return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_WAIT_FOR_DATA)); + } + + public static String getFeedName(Map configuration) { + return configuration.get(ExternalDataConstants.KEY_FEED_NAME); + } + + public static int getQueueSize(Map configuration) { + return configuration.containsKey(ExternalDataConstants.KEY_QUEUE_SIZE) + ? Integer.parseInt(configuration.get(ExternalDataConstants.KEY_QUEUE_SIZE)) + : ExternalDataConstants.DEFAULT_QUEUE_SIZE; + } + + public static boolean isRecordWithMeta(Map configuration) { + return configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME); + } + + public static void setRecordWithMeta(Map configuration, String booleanString) { + configuration.put(ExternalDataConstants.FORMAT_RECORD_WITH_METADATA, booleanString); + } + + public static boolean isChangeFeed(Map configuration) { + return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_IS_CHANGE_FEED)); + } + + public static int getNumberOfKeys(Map configuration) throws AsterixException { + String keyIndexes = configuration.get(ExternalDataConstants.KEY_KEY_INDEXES); + if (keyIndexes == null) { + throw new AsterixException( + "A change feed must have the parameter " + ExternalDataConstants.KEY_KEY_INDEXES); + } + return keyIndexes.split(",").length; + } + + public static void setNumberOfKeys(Map configuration, int value) { + configuration.put(ExternalDataConstants.KEY_KEY_SIZE, String.valueOf(value)); + } + + public static void setChangeFeed(Map configuration, String booleanString) { + configuration.put(ExternalDataConstants.KEY_IS_CHANGE_FEED, booleanString); + } + + public static int[] getPKIndexes(Map configuration) { + String keyIndexes = configuration.get(ExternalDataConstants.KEY_KEY_INDEXES); + String[] stringIndexes = keyIndexes.split(","); + int[] intIndexes = new int[stringIndexes.length]; + for (int i = 0; i < stringIndexes.length; i++) { + intIndexes[i] = Integer.parseInt(stringIndexes[i]); + } + return intIndexes; + } + + public static int[] getPKSourceIndicators(Map configuration) { + String keyIndicators = configuration.get(ExternalDataConstants.KEY_KEY_INDICATORS); + String[] stringIndicators = keyIndicators.split(","); + int[] intIndicators = new int[stringIndicators.length]; + for (int i = 0; i < stringIndicators.length; i++) { + intIndicators[i] = Integer.parseInt(stringIndicators[i]); + } + return intIndicators; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java ---------------------------------------------------------------------- diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java index fc15d3c,0000000..5bb8ec3 mode 100644,000000..100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java @@@ -1,172 -1,0 +1,180 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.TreeSet; + +import org.apache.commons.io.FileUtils; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class FeedLogManager { + + public enum LogEntryType { + START, // partition start + END, // partition end + COMMIT, // a record commit within a partition - SNAPSHOT // an identifier that partitions with identifiers before this one should be - // ignored ++ SNAPSHOT // an identifier that partitions with identifiers before this one should be ignored + } + + public static final String PROGRESS_LOG_FILE_NAME = "progress.log"; + public static final String ERROR_LOG_FILE_NAME = "error.log"; + public static final String BAD_RECORDS_FILE_NAME = "failed_record.log"; + public static final String START_PREFIX = "s:"; + public static final String END_PREFIX = "e:"; + public static final int PREFIX_SIZE = 2; + private String currentPartition; + private final TreeSet completed; + private final Path dir; + private BufferedWriter progressLogger; + private BufferedWriter errorLogger; + private BufferedWriter recordLogger; + private final StringBuilder stringBuilder = new StringBuilder(); ++ private int count = 0; + + public FeedLogManager(File file) throws HyracksDataException { + try { + this.dir = file.toPath(); + this.completed = new TreeSet(); + if (!exists()) { + create(); + } + open(); + } catch (IOException e) { + throw new HyracksDataException(e); + } + } + - public void endPartition() throws IOException { ++ public synchronized void touch() { ++ count++; ++ } ++ ++ public synchronized void endPartition() throws IOException { + logProgress(END_PREFIX + currentPartition); + completed.add(currentPartition); + } + - public void endPartition(String partition) throws IOException { ++ public synchronized void endPartition(String partition) throws IOException { + currentPartition = partition; + logProgress(END_PREFIX + currentPartition); + completed.add(currentPartition); + } + - public void startPartition(String partition) throws IOException { ++ public synchronized void startPartition(String partition) throws IOException { + currentPartition = partition; + logProgress(START_PREFIX + currentPartition); + } + + public boolean exists() { + return Files.exists(dir); + } + - public void open() throws IOException { ++ public synchronized void open() throws IOException { + // read content of logs. + BufferedReader reader = Files.newBufferedReader( + Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME)); + String log = reader.readLine(); + while (log != null) { + if (log.startsWith(END_PREFIX)) { + completed.add(getSplitId(log)); + } + log = reader.readLine(); + } + reader.close(); + + progressLogger = Files.newBufferedWriter( + Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME), + StandardCharsets.UTF_8, StandardOpenOption.APPEND); + errorLogger = Files.newBufferedWriter( + Paths.get(dir.toAbsolutePath().toString() + File.separator + ERROR_LOG_FILE_NAME), + StandardCharsets.UTF_8, StandardOpenOption.APPEND); + recordLogger = Files.newBufferedWriter( + Paths.get(dir.toAbsolutePath().toString() + File.separator + BAD_RECORDS_FILE_NAME), + StandardCharsets.UTF_8, StandardOpenOption.APPEND); + } + - public void close() throws IOException { ++ public synchronized void close() throws IOException { ++ count--; ++ if (count > 0) { ++ return; ++ } + progressLogger.close(); + errorLogger.close(); + recordLogger.close(); + } + - public boolean create() throws IOException { ++ public synchronized boolean create() throws IOException { + File f = dir.toFile(); + f.mkdirs(); + new File(f, PROGRESS_LOG_FILE_NAME).createNewFile(); + new File(f, ERROR_LOG_FILE_NAME).createNewFile(); + new File(f, BAD_RECORDS_FILE_NAME).createNewFile(); + return true; + } + - public boolean destroy() throws IOException { ++ public synchronized boolean destroy() throws IOException { + File f = dir.toFile(); + FileUtils.deleteDirectory(f); + return true; + } + - public void logProgress(String log) throws IOException { ++ public synchronized void logProgress(String log) throws IOException { + stringBuilder.setLength(0); + stringBuilder.append(log); + stringBuilder.append(ExternalDataConstants.LF); + progressLogger.write(stringBuilder.toString()); + progressLogger.flush(); + } + - public void logError(String error, Throwable th) throws IOException { ++ public synchronized void logError(String error, Throwable th) throws IOException { + stringBuilder.setLength(0); + stringBuilder.append(error); + stringBuilder.append(ExternalDataConstants.LF); + stringBuilder.append(th.toString()); + stringBuilder.append(ExternalDataConstants.LF); + errorLogger.write(stringBuilder.toString()); + errorLogger.flush(); + } + - public void logRecord(String record, String errorMessage) throws IOException { ++ public synchronized void logRecord(String record, String errorMessage) throws IOException { + stringBuilder.setLength(0); + stringBuilder.append(record); + stringBuilder.append(ExternalDataConstants.LF); + stringBuilder.append(errorMessage); + stringBuilder.append(ExternalDataConstants.LF); + recordLogger.write(stringBuilder.toString()); + recordLogger.flush(); + } + + public static String getSplitId(String log) { + return log.substring(PREFIX_SIZE); + } + - public boolean isSplitRead(String split) { ++ public synchronized boolean isSplitRead(String split) { + return completed.contains(split); + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java ---------------------------------------------------------------------- diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java index 5ab41af,0000000..502a432 mode 100644,000000..100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java @@@ -1,123 -1,0 +1,111 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.asterix.om.util.AsterixClusterProperties; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType; +import org.apache.hyracks.api.comm.FrameHelper; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.util.IntSerDeUtils; +import org.apache.hyracks.dataflow.std.file.FileSplit; + +public class FeedUtils { + private static String prepareDataverseFeedName(String dataverseName, String feedName) { + return dataverseName + File.separator + feedName; + } + - public static FileSplit splitsForAdapter(String dataverseName, String feedName, int partition, - ClusterPartition[] nodePartitions) { ++ public static FileSplit splitsForAdapter(String dataverseName, String feedName, String nodeName, ++ ClusterPartition partition) { + File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName)); + String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName(); - ClusterPartition nodePartition = nodePartitions[0]; + String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName, - nodePartition.getPartitionId()); - // format: 'storage dir name'/partition_#/dataverse/feed/adapter_# - File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator - + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + partition); - return StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f); ++ partition.getPartitionId()); ++ // Note: feed adapter instances in a single node share the feed logger ++ // format: 'storage dir name'/partition_#/dataverse/feed/node ++ File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + nodeName); ++ return StoragePathUtil.getFileSplitForClusterPartition(partition, f); + } + + public static FileSplit[] splitsForAdapter(String dataverseName, String feedName, + AlgebricksPartitionConstraint partitionConstraints) throws AsterixException { + if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) { + throw new AsterixException("Can't create file splits for adapter with count partitioning constraints"); + } - File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName)); - String[] locations = null; - locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations(); ++ String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations(); + List splits = new ArrayList(); - String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName(); - int i = 0; + for (String nd : locations) { - // Always get the first partition - ClusterPartition nodePartition = AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0]; - String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName, - nodePartition.getPartitionId()); - // format: 'storage dir name'/partition_#/dataverse/feed/adapter_# - File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator - + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + i); - splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f)); - i++; ++ splits.add(splitsForAdapter(dataverseName, feedName, nd, ++ AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0])); + } + return splits.toArray(new FileSplit[] {}); + } + + public static FileReference getAbsoluteFileRef(String relativePath, int ioDeviceId, IIOManager ioManager) { + return ioManager.getAbsoluteFileRef(ioDeviceId, relativePath); + } + + public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, int partition, + FileSplit[] feedLogFileSplits) throws HyracksDataException { + return new FeedLogManager( + FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getLocalFile().getFile().getPath(), + feedLogFileSplits[partition].getIODeviceId(), ctx.getIOManager()).getFile()); + } + + public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, FileSplit feedLogFileSplit) + throws HyracksDataException { + return new FeedLogManager(FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getLocalFile().getFile().getPath(), + feedLogFileSplit.getIODeviceId(), ctx.getIOManager()).getFile()); + } + + public static void processFeedMessage(ByteBuffer input, ByteBuffer message, FrameTupleAccessor fta) { + // read the message and reduce the number of tuples + fta.reset(input); + int tc = fta.getTupleCount() - 1; + int offset = fta.getTupleStartOffset(tc); + int len = fta.getTupleLength(tc); + message.clear(); + message.put(input.array(), offset, len); + message.flip(); + IntSerDeUtils.putInt(input.array(), FrameHelper.getTupleCountOffset(input.capacity()), tc); + } + + public static int getNumOfFields(Map configuration) { + return 1; + } + + public static String getFeedMetaTypeName(Map configuration) { + return configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME); + + } +}