asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [01/21] incubator-asterixdb git commit: First stage of external data cleanup
Date Sun, 03 Jan 2016 17:40:59 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master 1d5cf6403 -> 284590ed9


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/GenericSocketFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/GenericSocketFeedAdapter.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/GenericSocketFeedAdapter.java
deleted file mode 100644
index 7dd5130..0000000
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/GenericSocketFeedAdapter.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.tools.external.data;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.logging.Level;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IFeedAdapter;
-import org.apache.asterix.external.dataset.adapter.StreamBasedAdapter;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-public class GenericSocketFeedAdapter extends StreamBasedAdapter implements IFeedAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    private final int port;
-    private SocketFeedServer socketFeedServer;
-
-    public GenericSocketFeedAdapter(ITupleParserFactory parserFactory, ARecordType outputType, int port,
-            IHyracksTaskContext ctx, int partition) throws AsterixException, IOException {
-        super(parserFactory, outputType, ctx, partition);
-        this.port = port;
-        this.socketFeedServer = new SocketFeedServer(outputType, port);
-    }
-
-    @Override
-    public void start(int partition, IFrameWriter writer) throws Exception {
-        super.start(partition, writer);
-    }
-
-    @Override
-    public InputStream getInputStream(int partition) throws IOException {
-        return socketFeedServer.getInputStream();
-    }
-
-    private static class SocketFeedServer {
-        private ServerSocket serverSocket;
-        private InputStream inputStream;
-
-        public SocketFeedServer(ARecordType outputtype, int port) throws IOException, AsterixException {
-            try {
-                serverSocket = new ServerSocket(port);
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("port: " + port + " unusable ");
-                }
-            }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Feed server configured to use port: " + port);
-            }
-        }
-
-        public InputStream getInputStream() {
-            Socket socket;
-            try {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("waiting for client at " + serverSocket.getLocalPort());
-                }
-                socket = serverSocket.accept();
-                inputStream = socket.getInputStream();
-            } catch (IOException e) {
-                if (LOGGER.isLoggable(Level.SEVERE)) {
-                    LOGGER.severe("Unable to create input stream required for feed ingestion");
-                }
-            }
-            return inputStream;
-        }
-
-        public void stop() throws IOException {
-            try {
-                serverSocket.close();
-            } catch (IOException ioe) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Unable to close socket at " + serverSocket.getLocalPort());
-                }
-            }
-        }
-
-    }
-
-    @Override
-    public void stop() throws Exception {
-        socketFeedServer.stop();
-    }
-
-    public DataExchangeMode getDataExchangeMode() {
-        return DataExchangeMode.PUSH;
-    }
-
-    @Override
-    public boolean handleException(Exception e) {
-        try {
-            this.socketFeedServer = new SocketFeedServer((ARecordType) sourceDatatype, port);
-            return true;
-        } catch (Exception re) {
-            return false;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
deleted file mode 100644
index 5d28f3d..0000000
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.tools.external.data;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
-import org.apache.asterix.external.adapter.factory.StreamBasedAdapterFactory;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.util.AsterixRuntimeUtil;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * Factory class for creating @see{GenericSocketFeedAdapter} The
- * adapter listens at a port for receiving data (from external world).
- * Data received is transformed into Asterix Data Format (ADM).
- */
-public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory implements IFeedAdapterFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    private ARecordType outputType;
-
-    private List<Pair<String, Integer>> sockets;
-
-    private Mode mode = Mode.IP;
-
-    public static final String KEY_SOCKETS = "sockets";
-
-    public static final String KEY_MODE = "address-type";
-
-    public static enum Mode {
-        NC,
-        IP
-    }
-
-    @Override
-    public String getName() {
-        return "socket_adapter";
-    }
-
-    @Override
-    public SupportedOperation getSupportedOperations() {
-        return SupportedOperation.READ;
-    }
-
-    public List<Pair<String, Integer>> getSockets() {
-        return sockets;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
-        this.configuration = configuration;
-        this.configureSockets(configuration);
-        this.configureFormat(outputType);
-        this.outputType = (ARecordType) outputType;
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        List<String> locations = new ArrayList<String>();
-        for (Pair<String, Integer> socket : sockets) {
-            locations.add(socket.first);
-        }
-        return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
-    }
-
-    @Override
-    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        Pair<String, Integer> socket = sockets.get(partition);
-        return new GenericSocketFeedAdapter(parserFactory, outputType, socket.second, ctx, partition);
-    }
-
-    private void configureSockets(Map<String, String> configuration) throws Exception {
-        sockets = new ArrayList<Pair<String, Integer>>();
-        String modeValue = configuration.get(KEY_MODE);
-        if (modeValue != null) {
-            mode = Mode.valueOf(modeValue.trim().toUpperCase());
-        }
-        String socketsValue = configuration.get(KEY_SOCKETS);
-        if (socketsValue == null) {
-            throw new IllegalArgumentException("\'sockets\' parameter not specified as part of adapter configuration");
-        }
-        Map<InetAddress, Set<String>> ncMap = AsterixRuntimeUtil.getNodeControllerMap();
-        List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
-        String[] socketsArray = socketsValue.split(",");
-        Random random = new Random();
-        for (String socket : socketsArray) {
-            String[] socketTokens = socket.split(":");
-            String host = socketTokens[0].trim();
-            int port = Integer.parseInt(socketTokens[1].trim());
-            Pair<String, Integer> p = null;
-            switch (mode) {
-                case IP:
-                    Set<String> ncsOnIp = ncMap.get(InetAddress.getByName(host));
-                    if (ncsOnIp == null || ncsOnIp.isEmpty()) {
-                        throw new IllegalArgumentException("Invalid host " + host
-                                + " as it is not part of the AsterixDB cluster. Valid choices are "
-                                + StringUtils.join(ncMap.keySet(), ", "));
-                    }
-                    String[] ncArray = ncsOnIp.toArray(new String[] {});
-                    String nc = ncArray[random.nextInt(ncArray.length)];
-                    p = new Pair<String, Integer>(nc, port);
-                    break;
-
-                case NC:
-                    p = new Pair<String, Integer>(host, port);
-                    if (!ncs.contains(host)) {
-                        throw new IllegalArgumentException("Invalid NC " + host
-                                + " as it is not part of the AsterixDB cluster. Valid choices are "
-                                + StringUtils.join(ncs, ", "));
-
-                    }
-                    break;
-            }
-            sockets.add(p);
-        }
-    }
-
-    @Override
-    public ARecordType getAdapterOutputType() {
-        return outputType;
-    }
-
-    @Override
-    public InputDataFormat getInputDataFormat() {
-        return InputDataFormat.UNKNOWN;
-    }
-
-    public boolean isRecordTrackingEnabled() {
-        return false;
-    }
-
-    public IIntakeProgressTracker createIntakeProgressTracker() {
-        throw new UnsupportedOperationException("Tracking of ingested records not enabled");
-    }
-    
-    public void setFiles(List<ExternalFile> files) throws AlgebricksException {
-        throw new AlgebricksException("files access not supported for this adapter");
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
deleted file mode 100644
index 679f1af..0000000
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.tools.external.data;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IFeedAdapter;
-import org.apache.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-/**
- * An adapter that simulates a feed from the contents of a source file. The file
- * can be on the local file system or on HDFS. The feed ends when the content of
- * the source file has been ingested.
- */
-
-public class RateControlledFileSystemBasedAdapter extends FileSystemBasedAdapter implements IFeedAdapter {
-
-    private static final long serialVersionUID = 1L;
-    private FileSystemBasedAdapter coreAdapter;
-
-    public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, String> configuration,
-            FileSystemBasedAdapter coreAdapter, String format, ITupleParserFactory parserFactory,
-            IHyracksTaskContext ctx) throws Exception {
-        super(parserFactory, atype, ctx);
-        this.coreAdapter = coreAdapter;
-    }
-
-    @Override
-    public InputStream getInputStream(int partition) throws IOException {
-        return coreAdapter.getInputStream(partition);
-    }
-
-    @Override
-    public void stop() {
-       // ((RateControlledTupleParser) tupleParser).stop();
-    }
-
-    @Override
-    public DataExchangeMode getDataExchangeMode() {
-        return DataExchangeMode.PULL;
-    }
-
-    @Override
-    public boolean handleException(Exception e) {
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
deleted file mode 100644
index a8c77ac..0000000
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.tools.external.data;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
-import org.apache.asterix.external.adapter.factory.IAdapterFactory;
-import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
-import org.apache.asterix.external.adapter.factory.NCFileSystemAdapterFactory;
-import org.apache.asterix.external.adapter.factory.StreamBasedAdapterFactory;
-import org.apache.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * Factory class for creating @see{RateControllerFileSystemBasedAdapter} The
- * adapter simulates a feed from the contents of a source file. The file can be
- * on the local file system or on HDFS. The feed ends when the content of the
- * source file has been ingested.
- */
-public class RateControlledFileSystemBasedAdapterFactory extends StreamBasedAdapterFactory implements
-        IFeedAdapterFactory {
-    private static final long serialVersionUID = 1L;
-
-    public static final String KEY_FILE_SYSTEM = "fs";
-    public static final String LOCAL_FS = "localfs";
-    public static final String HDFS = "hdfs";
-    public static final String KEY_PATH = "path";
-    public static final String KEY_FORMAT = "format";
-
-    private IAdapterFactory adapterFactory;
-    private String format;
-    private ARecordType atype;
-
-    @Override
-    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        FileSystemBasedAdapter coreAdapter = (FileSystemBasedAdapter) adapterFactory.createAdapter(ctx, partition);
-        return new RateControlledFileSystemBasedAdapter(atype, configuration, coreAdapter, format, parserFactory, ctx);
-    }
-
-    @Override
-    public String getName() {
-        return "file_feed";
-    }
-
-    private void checkRequiredArgs(Map<String, String> configuration) throws Exception {
-        if (configuration.get(KEY_FILE_SYSTEM) == null) {
-            throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
-        }
-        if (configuration.get(IAdapterFactory.KEY_TYPE_NAME) == null) {
-            throw new Exception("Record type not specified (type-name=?)");
-        }
-        if (configuration.get(KEY_PATH) == null) {
-            throw new Exception("File path not specified (path=?)");
-        }
-        if (configuration.get(KEY_FORMAT) == null) {
-            throw new Exception("File format not specified (format=?)");
-        }
-    }
-
-    @Override
-    public SupportedOperation getSupportedOperations() {
-        return SupportedOperation.READ;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
-        this.configuration = configuration;
-        checkRequiredArgs(configuration);
-        String fileSystem = (String) configuration.get(KEY_FILE_SYSTEM);
-        String adapterFactoryClass = null;
-        if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
-            adapterFactoryClass = NCFileSystemAdapterFactory.class.getName();
-        } else if (fileSystem.equals(HDFS)) {
-            adapterFactoryClass = HDFSAdapterFactory.class.getName();
-        } else {
-            throw new AsterixException("Unsupported file system type " + fileSystem);
-        }
-        this.atype = outputType;
-        format = configuration.get(KEY_FORMAT);
-        adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
-        adapterFactory.configure(configuration, outputType);
-        configureFormat(outputType);
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return adapterFactory.getPartitionConstraint();
-    }
-
-    @Override
-    public ARecordType getAdapterOutputType() {
-        return atype;
-    }
-
-    @Override
-    public InputDataFormat getInputDataFormat() {
-        return InputDataFormat.UNKNOWN;
-    }
-
-    public boolean isRecordTrackingEnabled() {
-        return false;
-    }
-
-    public IIntakeProgressTracker createIntakeProgressTracker() {
-        throw new UnsupportedOperationException("Tracking of ingested records not enabled");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapter.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapter.java
deleted file mode 100644
index e537ef7..0000000
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapter.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.tools.external.data;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedAdapter;
-import org.apache.hyracks.api.comm.IFrameWriter;
-
-public class SocketClientAdapter implements IFeedAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOGGER = Logger.getLogger(SocketClientAdapter.class.getName());
-
-    private static final String LOCALHOST = "127.0.0.1";
-
-    private static final long RECONNECT_PERIOD = 2000;
-
-    private final String localFile;
-
-    private final int port;
-
-    private boolean continueStreaming = true;
-
-    public SocketClientAdapter(Integer port, String localFile) {
-        this.localFile = localFile;
-        this.port = port;
-    }
-
-    @Override
-    public void start(int partition, IFrameWriter writer) throws Exception {
-        Socket socket = waitForReceiver();
-        OutputStream os = socket.getOutputStream();
-        FileInputStream fin = new FileInputStream(new File(localFile));
-        byte[] chunk = new byte[1024];
-        int read;
-        try {
-            while (continueStreaming) {
-                read = fin.read(chunk);
-                if (read > 0) {
-                    os.write(chunk, 0, read);
-                } else {
-                    break;
-                }
-            }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Finished streaming file " + localFile + "to port [" + port + "]");
-            }
-
-        } finally {
-            socket.close();
-            fin.close();
-        }
-
-    }
-
-    private Socket waitForReceiver() throws Exception {
-        Socket socket = null;
-        while (socket == null) {
-            try {
-                socket = new Socket(LOCALHOST, port);
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Receiver not ready, would wait for " + (RECONNECT_PERIOD / 1000)
-                            + " seconds before reconnecting");
-                }
-                Thread.sleep(RECONNECT_PERIOD);
-            }
-        }
-        return socket;
-    }
-
-    @Override
-    public DataExchangeMode getDataExchangeMode() {
-        return DataExchangeMode.PUSH;
-    }
-
-    @Override
-    public void stop() throws Exception {
-        continueStreaming = false;
-    }
-
-    @Override
-    public boolean handleException(Exception e) {
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapterFactory.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapterFactory.java
deleted file mode 100644
index 3d2f5af..0000000
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapterFactory.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.tools.external.data;
-
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class SocketClientAdapterFactory implements IFeedAdapterFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    private ARecordType outputType;
-
-    private GenericSocketFeedAdapterFactory genericSocketAdapterFactory;
-
-    private String[] fileSplits;
-
-    public static final String KEY_FILE_SPLITS = "file_splits";
-
-    @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
-        this.outputType = outputType;
-        String fileSplitsValue = configuration.get(KEY_FILE_SPLITS);
-        if (fileSplitsValue == null) {
-            throw new IllegalArgumentException(
-                    "File splits not specified. File split is specified as a comma separated list of paths");
-        }
-        fileSplits = fileSplitsValue.trim().split(",");
-        genericSocketAdapterFactory = new GenericSocketFeedAdapterFactory();
-        genericSocketAdapterFactory.configure(configuration, outputType);
-    }
-
-    @Override
-    public SupportedOperation getSupportedOperations() {
-        return SupportedOperation.READ;
-    }
-
-    @Override
-    public String getName() {
-        return "socket_client";
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return genericSocketAdapterFactory.getPartitionConstraint();
-    }
-
-    @Override
-    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        Pair<String, Integer> socket = genericSocketAdapterFactory.getSockets().get(partition);
-        return new SocketClientAdapter(socket.second, fileSplits[partition]);
-    }
-
-    @Override
-    public ARecordType getAdapterOutputType() {
-        return outputType;
-    }
-
-    @Override
-    public boolean isRecordTrackingEnabled() {
-        return false;
-    }
-
-    @Override
-    public IIntakeProgressTracker createIntakeProgressTracker() {
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TweetGenerator.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TweetGenerator.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TweetGenerator.java
deleted file mode 100644
index 85195fb..0000000
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TweetGenerator.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.tools.external.data;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.tools.external.data.DataGenerator.InitializationInfo;
-import org.apache.asterix.tools.external.data.DataGenerator.TweetMessage;
-import org.apache.asterix.tools.external.data.DataGenerator.TweetMessageIterator;
-
-public class TweetGenerator {
-
-    private static Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName());
-
-    public static final String KEY_DURATION = "duration";
-    public static final String KEY_TPS = "tps";
-    public static final String KEY_VERBOSE = "verbose";
-    public static final String KEY_FIELDS = "fields";
-    public static final int INFINITY = 0;
-
-    private static final int DEFAULT_DURATION = INFINITY;
-
-    private int duration;
-    private TweetMessageIterator tweetIterator = null;
-    private int partition;
-    private long tweetCount = 0;
-    private int frameTweetCount = 0;
-    private int numFlushedTweets = 0;
-    private DataGenerator dataGenerator = null;
-    private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
-    private String[] fields;
-    private final List<OutputStream> subscribers;
-    private final Object lock = new Object();
-    private final List<OutputStream> subscribersForRemoval = new ArrayList<OutputStream>();
-
-    public TweetGenerator(Map<String, String> configuration, int partition) throws Exception {
-        this.partition = partition;
-        String value = configuration.get(KEY_DURATION);
-        this.duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
-        dataGenerator = new DataGenerator(new InitializationInfo());
-        tweetIterator = dataGenerator.new TweetMessageIterator(duration);
-        this.fields = configuration.get(KEY_FIELDS) != null ? configuration.get(KEY_FIELDS).split(",") : null;
-        this.subscribers = new ArrayList<OutputStream>();
-    }
-
-    private void writeTweetString(TweetMessage tweetMessage) throws IOException {
-        String tweet = tweetMessage.getAdmEquivalent(fields) + "\n";
-        System.out.println(tweet);
-        tweetCount++;
-        byte[] b = tweet.getBytes();
-        if (outputBuffer.position() + b.length > outputBuffer.limit()) {
-            flush();
-            numFlushedTweets += frameTweetCount;
-            frameTweetCount = 0;
-            outputBuffer.put(b);
-        } else {
-            outputBuffer.put(b);
-        }
-        frameTweetCount++;
-    }
-
-    private void flush() throws IOException {
-        outputBuffer.flip();
-        synchronized (lock) {
-            for (OutputStream os : subscribers) {
-                try {
-                    os.write(outputBuffer.array(), 0, outputBuffer.limit());
-                } catch (Exception e) {
-                    subscribersForRemoval.add(os);
-                }
-            }
-            if (!subscribersForRemoval.isEmpty()) {
-                subscribers.removeAll(subscribersForRemoval);
-                subscribersForRemoval.clear();
-            }
-        }
-        outputBuffer.position(0);
-        outputBuffer.limit(32 * 1024);
-    }
-
-    public boolean generateNextBatch(int numTweets) throws Exception {
-        boolean moreData = tweetIterator.hasNext();
-        if (!moreData) {
-            if (outputBuffer.position() > 0) {
-                flush();
-            }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Reached end of batch. Tweet Count: [" + partition + "]" + tweetCount);
-            }
-            return false;
-        } else {
-            int count = 0;
-            while (count < numTweets) {
-                writeTweetString(tweetIterator.next());
-                count++;
-            }
-            return true;
-        }
-    }
-
-    public int getNumFlushedTweets() {
-        return numFlushedTweets;
-    }
-
-    public void registerSubscriber(OutputStream os) {
-        synchronized (lock) {
-            subscribers.add(os);
-        }
-    }
-
-    public void deregisterSubscribers(OutputStream os) {
-        synchronized (lock) {
-            subscribers.remove(os);
-        }
-    }
-
-    public void close() throws IOException {
-        synchronized (lock) {
-            for (OutputStream os : subscribers) {
-                os.close();
-            }
-        }
-    }
-
-    public boolean isSubscribed() {
-        return !subscribers.isEmpty();
-    }
-
-    public long getTweetCount() {
-        return tweetCount;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
deleted file mode 100644
index fffbc17..0000000
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.tools.external.data;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedAdapter;
-import org.apache.asterix.external.dataset.adapter.StreamBasedAdapter;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-/**
- * A simulator of the Twitter Firehose. Generates meaningful tweets
- * at a configurable rate
- */
-public class TwitterFirehoseFeedAdapter extends StreamBasedAdapter implements IFeedAdapter {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseFeedAdapter.class.getName());
-
-    private ExecutorService executorService = Executors.newCachedThreadPool();
-
-    private PipedOutputStream outputStream = new PipedOutputStream();
-
-    private PipedInputStream inputStream = new PipedInputStream(outputStream);
-
-    private final TwitterServer twitterServer;
-
-    public TwitterFirehoseFeedAdapter(Map<String, String> configuration, ITupleParserFactory parserFactory,
-            ARecordType outputtype, IHyracksTaskContext ctx, int partition) throws Exception {
-        super(parserFactory, outputtype, ctx, partition);
-        this.twitterServer = new TwitterServer(configuration, partition, outputtype, outputStream, executorService);
-    }
-
-    @Override
-    public void start(int partition, IFrameWriter writer) throws Exception {
-        twitterServer.start();
-        super.start(partition, writer);
-    }
-
-    @Override
-    public InputStream getInputStream(int partition) throws IOException {
-        return inputStream;
-    }
-
-    private static class TwitterServer {
-        private final DataProvider dataProvider;
-        private final ExecutorService executorService;
-
-        public TwitterServer(Map<String, String> configuration, int partition, ARecordType outputtype, OutputStream os,
-                ExecutorService executorService) throws Exception {
-            dataProvider = new DataProvider(configuration, outputtype, partition, os);
-            this.executorService = executorService;
-        }
-
-        public void stop() throws IOException {
-            dataProvider.stop();
-        }
-
-        public void start() {
-            executorService.execute(dataProvider);
-        }
-
-    }
-
-    private static class DataProvider implements Runnable {
-
-        public static final String KEY_MODE = "mode";
-
-        private TweetGenerator tweetGenerator;
-        private boolean continuePush = true;
-        private int batchSize;
-        private final Mode mode;
-        private final OutputStream os;
-
-        public static enum Mode {
-            AGGRESSIVE,
-            CONTROLLED
-        }
-
-        public DataProvider(Map<String, String> configuration, ARecordType outputtype, int partition, OutputStream os)
-                throws Exception {
-            this.tweetGenerator = new TweetGenerator(configuration, partition);
-            this.tweetGenerator.registerSubscriber(os);
-            this.os = os;
-            mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
-                    : Mode.AGGRESSIVE;
-            switch (mode) {
-                case CONTROLLED:
-                    String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
-                    if (tpsValue == null) {
-                        throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
-                    }
-                    batchSize = Integer.parseInt(tpsValue);
-                    break;
-                case AGGRESSIVE:
-                    batchSize = 5000;
-                    break;
-            }
-        }
-
-        @Override
-        public void run() {
-            boolean moreData = true;
-            long startBatch;
-            long endBatch;
-
-            while (true) {
-                try {
-                    while (moreData && continuePush) {
-                        switch (mode) {
-                            case AGGRESSIVE:
-                                moreData = tweetGenerator.generateNextBatch(batchSize);
-                                break;
-                            case CONTROLLED:
-                                startBatch = System.currentTimeMillis();
-                                moreData = tweetGenerator.generateNextBatch(batchSize);
-                                endBatch = System.currentTimeMillis();
-                                if (endBatch - startBatch < 1000) {
-                                    Thread.sleep(1000 - (endBatch - startBatch));
-                                }
-                                break;
-                        }
-                    }
-                    os.close();
-                    break;
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Exception in adaptor " + e.getMessage());
-                    }
-                }
-            }
-        }
-
-        public void stop() {
-            continuePush = false;
-        }
-
-    }
-
-    @Override
-    public void stop() throws Exception {
-        twitterServer.stop();
-    }
-
-    @Override
-    public DataExchangeMode getDataExchangeMode() {
-        return DataExchangeMode.PUSH;
-    }
-
-    @Override
-    public boolean handleException(Exception e) {
-        try {
-            twitterServer.stop();
-        } catch (Exception re) {
-            re.printStackTrace();
-            return false;
-        }
-        twitterServer.start();
-        return true;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
deleted file mode 100644
index f7e79f7..0000000
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.tools.external.data;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
-import org.apache.asterix.external.adapter.factory.StreamBasedAdapterFactory;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
- * simulates a twitter firehose with tweets being "pushed" into Asterix at a
- * configurable rate measured in terms of TPS (tweets/second). The stream of
- * tweets lasts for a configurable duration (measured in seconds).
- */
-public class TwitterFirehoseFeedAdapterFactory extends StreamBasedAdapterFactory implements IFeedAdapterFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * Degree of parallelism for feed ingestion activity. Defaults to 1. This
-     * determines the count constraint for the ingestion operator.
-     **/
-    private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
-
-    /**
-     * The absolute locations where ingestion operator instances will be placed.
-     **/
-    private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
-
-    private ARecordType outputType;
-
-    @Override
-    public String getName() {
-        return "twitter_firehose";
-    }
-
-    @Override
-    public SupportedOperation getSupportedOperations() {
-        return SupportedOperation.READ;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
-        configuration.put(AsterixTupleParserFactory.KEY_FORMAT, AsterixTupleParserFactory.FORMAT_ADM);
-        this.configuration = configuration;
-        this.outputType = outputType;
-        this.configureFormat(outputType);
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
-        String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
-        String[] locations = null;
-        if (ingestionLocationParam != null) {
-            locations = ingestionLocationParam.split(",");
-        }
-        int count = locations != null ? locations.length : 1;
-        if (ingestionCardinalityParam != null) {
-            count = Integer.parseInt(ingestionCardinalityParam);
-        }
-
-        List<String> chosenLocations = new ArrayList<String>();
-        String[] availableLocations = locations != null ? locations
-                : AsterixClusterProperties.INSTANCE.getParticipantNodes().toArray(new String[] {});
-        for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
-            chosenLocations.add(availableLocations[k]);
-        }
-        return new AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {}));
-    }
-
-    @Override
-    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        return new TwitterFirehoseFeedAdapter(configuration, parserFactory, outputType, ctx, partition);
-    }
-
-    @Override
-    public ARecordType getAdapterOutputType() {
-        return outputType;
-    }
-
-    @Override
-    public InputDataFormat getInputDataFormat() {
-        return InputDataFormat.ADM;
-    }
-
-    @Override
-    public boolean isRecordTrackingEnabled() {
-        return false;
-    }
-
-    @Override
-    public IIntakeProgressTracker createIntakeProgressTracker() {
-        throw new UnsupportedOperationException("Tracking of ingested records not enabled");
-    }
-
-}
\ No newline at end of file


Mime
View raw message