Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 41A7E18301 for ; Thu, 14 Jan 2016 20:32:01 +0000 (UTC) Received: (qmail 3426 invoked by uid 500); 14 Jan 2016 20:32:01 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 3385 invoked by uid 500); 14 Jan 2016 20:32:00 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 3368 invoked by uid 99); 14 Jan 2016 20:32:00 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Jan 2016 20:32:00 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 7CD0CC0423 for ; Thu, 14 Jan 2016 20:32:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.779 X-Spam-Level: * X-Spam-Status: No, score=1.779 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id ll5an12teuSW for ; Thu, 14 Jan 2016 20:31:58 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 4E3A731AE3 for ; Thu, 14 Jan 2016 20:31:56 +0000 (UTC) Received: (qmail 3310 invoked by uid 99); 14 Jan 2016 20:31:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Jan 2016 20:31:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 33991E3893; Thu, 14 Jan 2016 20:31:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amoudi@apache.org To: commits@asterixdb.incubator.apache.org Date: Thu, 14 Jan 2016 20:32:02 -0000 Message-Id: <7d815cb12f54481f886e350d25ae2b5d@git.apache.org> In-Reply-To: <37f4aec6a47e4af4835cbc81b4634719@git.apache.org> References: <37f4aec6a47e4af4835cbc81b4634719@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/26] incubator-asterixdb git commit: Feed Fixes and Cleanup http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java deleted file mode 100644 index db38c12..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapter.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.runtime; - -import java.io.File; -import java.io.FileInputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.common.feeds.api.IDataSourceAdapter; -import org.apache.hyracks.api.comm.IFrameWriter; - -public class SocketClientAdapter implements IDataSourceAdapter { - - private static final long serialVersionUID = 1L; - - private static final Logger LOGGER = Logger.getLogger(SocketClientAdapter.class.getName()); - - private static final String LOCALHOST = "127.0.0.1"; - - private static final long RECONNECT_PERIOD = 2000; - - private final String localFile; - - private final int port; - - private boolean continueStreaming = true; - - public SocketClientAdapter(Integer port, String localFile) { - this.localFile = localFile; - this.port = port; - } - - @Override - public void start(int partition, IFrameWriter writer) throws Exception { - Socket socket = waitForReceiver(); - OutputStream os = socket.getOutputStream(); - FileInputStream fin = new FileInputStream(new File(localFile)); - byte[] chunk = new byte[1024]; - int read; - try { - while (continueStreaming) { - read = fin.read(chunk); - if (read > 0) { - os.write(chunk, 0, read); - } else { - break; - } - } - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Finished streaming file " + localFile + "to port [" + port + "]"); - } - - } finally { - socket.close(); - fin.close(); - } - - } - - private Socket waitForReceiver() throws Exception { - Socket socket = null; - while (socket == null) { - try { - socket = new Socket(LOCALHOST, port); - } catch (Exception e) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Receiver not ready, would wait for " + (RECONNECT_PERIOD / 1000) - + " seconds before reconnecting"); - } - Thread.sleep(RECONNECT_PERIOD); - } - } - return socket; - } - - @Override - public boolean stop() throws Exception { - continueStreaming = false; - return true; - } - - @Override - public boolean handleException(Throwable e) { - return false; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java deleted file mode 100644 index a1e90a8..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/SocketClientAdapterFactory.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.runtime; - -import java.util.Map; - -import org.apache.asterix.common.feeds.api.IDataSourceAdapter; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.api.context.IHyracksTaskContext; - -public class SocketClientAdapterFactory implements IAdapterFactory { - - private static final long serialVersionUID = 1L; - - private ARecordType outputType; - - private GenericSocketFeedAdapterFactory genericSocketAdapterFactory; - - private String[] fileSplits; - - public static final String KEY_FILE_SPLITS = "file_splits"; - - @Override - public void configure(Map configuration, ARecordType outputType) throws Exception { - this.outputType = outputType; - String fileSplitsValue = configuration.get(KEY_FILE_SPLITS); - if (fileSplitsValue == null) { - throw new IllegalArgumentException( - "File splits not specified. File split is specified as a comma separated list of paths"); - } - fileSplits = fileSplitsValue.trim().split(","); - genericSocketAdapterFactory = new GenericSocketFeedAdapterFactory(); - genericSocketAdapterFactory.configure(configuration, outputType); - } - - @Override - public String getAlias() { - return ExternalDataConstants.ALIAS_SOCKET_CLIENT_ADAPTER; - } - - @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { - return genericSocketAdapterFactory.getPartitionConstraint(); - } - - @Override - public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception { - Pair socket = genericSocketAdapterFactory.getSockets().get(partition); - return new SocketClientAdapter(socket.second, fileSplits[partition]); - } - - @Override - public ARecordType getAdapterOutputType() { - return outputType; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java deleted file mode 100644 index b5fd454..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/runtime/TweetGenerator.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.runtime; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.external.runtime.DataGenerator.InitializationInfo; -import org.apache.asterix.external.runtime.DataGenerator.TweetMessage; -import org.apache.asterix.external.runtime.DataGenerator.TweetMessageIterator; - -public class TweetGenerator { - - private static Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName()); - - public static final String KEY_DURATION = "duration"; - public static final String KEY_TPS = "tps"; - public static final String KEY_VERBOSE = "verbose"; - public static final String KEY_FIELDS = "fields"; - public static final int INFINITY = 0; - - private static final int DEFAULT_DURATION = INFINITY; - - private int duration; - private TweetMessageIterator tweetIterator = null; - private int partition; - private long tweetCount = 0; - private int frameTweetCount = 0; - private int numFlushedTweets = 0; - private DataGenerator dataGenerator = null; - private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024); - private String[] fields; - private final List subscribers; - private final Object lock = new Object(); - private final List subscribersForRemoval = new ArrayList(); - - public TweetGenerator(Map configuration, int partition) throws Exception { - this.partition = partition; - String value = configuration.get(KEY_DURATION); - this.duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION; - dataGenerator = new DataGenerator(new InitializationInfo()); - tweetIterator = dataGenerator.new TweetMessageIterator(duration); - this.fields = configuration.get(KEY_FIELDS) != null ? configuration.get(KEY_FIELDS).split(",") : null; - this.subscribers = new ArrayList(); - } - - private void writeTweetString(TweetMessage tweetMessage) throws IOException { - String tweet = tweetMessage.getAdmEquivalent(fields) + "\n"; - System.out.println(tweet); - tweetCount++; - byte[] b = tweet.getBytes(); - if (outputBuffer.position() + b.length > outputBuffer.limit()) { - flush(); - numFlushedTweets += frameTweetCount; - frameTweetCount = 0; - outputBuffer.put(b); - } else { - outputBuffer.put(b); - } - frameTweetCount++; - } - - private void flush() throws IOException { - outputBuffer.flip(); - synchronized (lock) { - for (OutputStream os : subscribers) { - try { - os.write(outputBuffer.array(), 0, outputBuffer.limit()); - } catch (Exception e) { - subscribersForRemoval.add(os); - } - } - if (!subscribersForRemoval.isEmpty()) { - subscribers.removeAll(subscribersForRemoval); - subscribersForRemoval.clear(); - } - } - outputBuffer.position(0); - outputBuffer.limit(32 * 1024); - } - - public boolean generateNextBatch(int numTweets) throws Exception { - boolean moreData = tweetIterator.hasNext(); - if (!moreData) { - if (outputBuffer.position() > 0) { - flush(); - } - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Reached end of batch. Tweet Count: [" + partition + "]" + tweetCount); - } - return false; - } else { - int count = 0; - while (count < numTweets) { - writeTweetString(tweetIterator.next()); - count++; - } - return true; - } - } - - public int getNumFlushedTweets() { - return numFlushedTweets; - } - - public void registerSubscriber(OutputStream os) { - synchronized (lock) { - subscribers.add(os); - } - } - - public void deregisterSubscribers(OutputStream os) { - synchronized (lock) { - subscribers.remove(os); - } - } - - public void close() throws IOException { - synchronized (lock) { - for (OutputStream os : subscribers) { - os.close(); - } - } - } - - public boolean isSubscribed() { - return !subscribers.isEmpty(); - } - - public long getTweetCount() { - return tweetCount; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java deleted file mode 100644 index f8585bb..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DNSResolverFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.util; - -import org.apache.asterix.external.api.INodeResolver; -import org.apache.asterix.external.api.INodeResolverFactory; - -/** - * Factory for creating instance of {@link NodeResolver} - */ -public class DNSResolverFactory implements INodeResolverFactory { - - private static final INodeResolver INSTANCE = new NodeResolver(); - - @Override - public INodeResolver createNodeResolver() { - return INSTANCE; - } - -}