asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Till Westmann (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: ASTERIXDB-1176: remove unused feeds code
Date Fri, 13 Nov 2015 17:43:07 GMT
Till Westmann has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/494

Change subject: ASTERIXDB-1176: remove unused feeds code
......................................................................

ASTERIXDB-1176: remove unused feeds code

Change-Id: I2de2d7c7fd816ddbd53a80c855f64923c02efe35
---
D asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedMessageService.java
D asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedTupleCommitAckMessage.java
D asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java
D asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/EndFeedMessage.java
D asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedManagerElectMessage.java
D asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessage.java
6 files changed, 0 insertions(+), 473 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/94/494/1

diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedMessageService.java
b/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedMessageService.java
deleted file mode 100644
index 46dd029..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedMessageService.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.message;
-
-import java.net.Socket;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.feeds.FeedConstants;
-import org.apache.asterix.common.feeds.api.IFeedMessage;
-import org.apache.asterix.common.feeds.api.IFeedMessageService;
-
-/**
- * Sends feed report messages on behalf of an operator instance
- * to the SuperFeedManager associated with the feed.
- */
-public class FeedMessageService implements IFeedMessageService {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedMessageService.class.getName());
-
-    private final LinkedBlockingQueue<String> inbox;
-    private final FeedMessageHandler mesgHandler;
-    private final String nodeId;
-    private ExecutorService executor;
-
-    public FeedMessageService(AsterixFeedProperties feedProperties, String nodeId, String
ccClusterIp) {
-        this.inbox = new LinkedBlockingQueue<String>();
-        this.mesgHandler = new FeedMessageHandler(inbox, ccClusterIp, feedProperties.getFeedCentralManagerPort());
-        this.nodeId = nodeId;
-        this.executor = Executors.newSingleThreadExecutor();
-    }
-
-    public void start() throws Exception {
-        executor.execute(mesgHandler);
-    }
-
-    public void stop() {
-        synchronized (mesgHandler.getLock()) {
-            executor.shutdownNow();
-        }
-        mesgHandler.stop();
-    }
-
-    @Override
-    public void sendMessage(IFeedMessage message) {
-        try {
-            JSONObject obj = message.toJSON();
-            obj.put(FeedConstants.MessageConstants.NODE_ID, nodeId);
-            obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, message.getMessageType().name());
-            inbox.add(obj.toString());
-        } catch (JSONException jse) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("JSON Exception in parsing message " + message);
-            }
-        }
-    }
-
-    private static class FeedMessageHandler implements Runnable {
-
-        private final LinkedBlockingQueue<String> inbox;
-        private final String host;
-        private final int port;
-
-        private Socket cfmSocket;
-        private Object lock;
-
-        private static final byte[] EOL = "\n".getBytes();
-
-        public FeedMessageHandler(LinkedBlockingQueue<String> inbox, String host, int
port) {
-            this.inbox = inbox;
-            this.host = host;
-            this.port = port;
-            this.lock = new Object();
-        }
-
-        public void run() {
-            try {
-                cfmSocket = new Socket(host, port);
-                if (cfmSocket != null) {
-                    while (true) {
-                        String message = inbox.take();
-                        synchronized (lock) {
-                            cfmSocket.getOutputStream().write(message.getBytes());
-                            cfmSocket.getOutputStream().write(EOL);
-                        }
-                    }
-                } else {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Unable to start feed message service");
-                    }
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Exception in handling incoming feed messages" + e.getMessage());
-                }
-            } finally {
-                stop();
-            }
-
-        }
-
-        public void stop() {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Stopping feed message handler");
-            }
-            if (cfmSocket != null) {
-                try {
-                    cfmSocket.close();
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Exception in closing socket " + e.getMessage());
-                    }
-                }
-            }
-        }
-
-        public Object getLock() {
-            return lock;
-        }
-
-    }
-
-}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedTupleCommitAckMessage.java
b/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedTupleCommitAckMessage.java
deleted file mode 100644
index 4c48a5e..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedTupleCommitAckMessage.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.message;
-
-import javax.xml.bind.DatatypeConverter;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedConstants;
-import org.apache.asterix.common.feeds.FeedId;
-
-public class FeedTupleCommitAckMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedConnectionId connectionId;
-    private int intakePartition;
-    private int base;
-    private byte[] commitAcks;
-
-    public FeedTupleCommitAckMessage(FeedConnectionId connectionId, int intakePartition,
int base, byte[] commitAcks) {
-        super(MessageType.COMMIT_ACK);
-        this.connectionId = connectionId;
-        this.intakePartition = intakePartition;
-        this.base = base;
-        this.commitAcks = commitAcks;
-    }
-
-    @Override
-    public JSONObject toJSON() throws JSONException {
-        JSONObject obj = new JSONObject();
-        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
-        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
-        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
-        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
-        obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
-        obj.put(FeedConstants.MessageConstants.BASE, base);
-        String commitAcksString = DatatypeConverter.printBase64Binary(commitAcks);
-        obj.put(FeedConstants.MessageConstants.COMMIT_ACKS, commitAcksString);
-        return obj;
-    }
-
-    public static FeedTupleCommitAckMessage read(JSONObject obj) throws JSONException {
-        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
-                obj.getString(FeedConstants.MessageConstants.FEED));
-        FeedConnectionId connectionId = new FeedConnectionId(feedId,
-                obj.getString(FeedConstants.MessageConstants.DATASET));
-        int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
-        int base = obj.getInt(FeedConstants.MessageConstants.BASE);
-        String commitAcksString = obj.getString(FeedConstants.MessageConstants.COMMIT_ACKS);
-        byte[] commitAcks = DatatypeConverter.parseBase64Binary(commitAcksString);
-        return new FeedTupleCommitAckMessage(connectionId, intakePartition, base, commitAcks);
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public int getIntakePartition() {
-        return intakePartition;
-    }
-
-    public byte[] getCommitAcks() {
-        return commitAcks;
-    }
-
-    public void reset(int intakePartition, int base, byte[] commitAcks) {
-        this.intakePartition = intakePartition;
-        this.base = base;
-        this.commitAcks = commitAcks;
-    }
-
-    public int getBase() {
-        return base;
-    }
-
-    public void setBase(int base) {
-        this.base = base;
-    }
-
-}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java
b/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java
deleted file mode 100644
index f68f5ea..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.message;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedConstants;
-import org.apache.asterix.common.feeds.FeedId;
-
-public class FeedTupleCommitResponseMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedConnectionId connectionId;
-    private final int intakePartition;
-    private final int maxWindowAcked;
-
-    public FeedTupleCommitResponseMessage(FeedConnectionId connectionId, int intakePartition,
int maxWindowAcked) {
-        super(MessageType.COMMIT_ACK_RESPONSE);
-        this.connectionId = connectionId;
-        this.intakePartition = intakePartition;
-        this.maxWindowAcked = maxWindowAcked;
-    }
-
-    @Override
-    public JSONObject toJSON() throws JSONException {
-        JSONObject obj = new JSONObject();
-        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
-        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
-        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
-        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
-        obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
-        obj.put(FeedConstants.MessageConstants.MAX_WINDOW_ACKED, maxWindowAcked);
-        return obj;
-    }
-
-    public static FeedTupleCommitResponseMessage read(JSONObject obj) throws JSONException
{
-        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
-                obj.getString(FeedConstants.MessageConstants.FEED));
-        FeedConnectionId connectionId = new FeedConnectionId(feedId,
-                obj.getString(FeedConstants.MessageConstants.DATASET));
-        int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
-        int maxWindowAcked = obj.getInt(FeedConstants.MessageConstants.MAX_WINDOW_ACKED);
-        return new FeedTupleCommitResponseMessage(connectionId, intakePartition, maxWindowAcked);
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public int getMaxWindowAcked() {
-        return maxWindowAcked;
-    }
-
-}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/EndFeedMessage.java
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/EndFeedMessage.java
deleted file mode 100644
index 3f59859..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/EndFeedMessage.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-
-/**
- * A feed control message indicating the need to end the feed. This message is dispatched
- * to all locations that host an operator invovled in the feed pipeline.
- */
-public class EndFeedMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedConnectionId feedId;
-
-    public EndFeedMessage(FeedConnectionId feedId) {
-        super(MessageType.END, feedId);
-        this.feedId = feedId;
-    }
-
-    @Override
-    public String toString() {
-        return MessageType.END.name() + feedId;
-    }
-}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedManagerElectMessage.java
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedManagerElectMessage.java
deleted file mode 100644
index 3dd7ca0..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedManagerElectMessage.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-
-/**
- * A feed control message containing the altered values for
- * adapter configuration parameters. This message is dispatched
- * to all runtime instances of the feed's adapter.
- */
-public class FeedManagerElectMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private final String host;
-    private final String nodeId;
-    private final int port;
-
-    public FeedManagerElectMessage(String host, String nodeId, int port, FeedConnectionId
feedId) {
-        super(MessageType.SUPER_FEED_MANAGER_ELECT, feedId);
-        this.host = host;
-        this.port = port;
-        this.nodeId = nodeId;
-    }
-
-    @Override
-    public MessageType getMessageType() {
-        return MessageType.SUPER_FEED_MANAGER_ELECT;
-    }
-
-    @Override
-    public String toString() {
-        return MessageType.SUPER_FEED_MANAGER_ELECT.name() + " " + host + "_" + nodeId +
"[" + port + "]";
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessage.java
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessage.java
deleted file mode 100644
index 37d929b..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-
-/**
- * A control message that can be sent to the runtime instance of a
- * feed's adapter.
- */
-public class FeedMessage implements IFeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    protected final MessageType messageType;
-    protected final FeedConnectionId feedId;
-
-    public FeedMessage(MessageType messageType, FeedConnectionId feedId) {
-        this.messageType = messageType;
-        this.feedId = feedId;
-    }
-
-    public MessageType getMessageType() {
-        return messageType;
-    }
-
-    public FeedConnectionId getFeedId() {
-        return feedId;
-    }
-
-}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/494
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2de2d7c7fd816ddbd53a80c855f64923c02efe35
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <tillw@apache.org>

Mime
View raw message