Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2021
Change subject: [NO ISSUE] Minor active refactoring
......................................................................
[NO ISSUE] Minor active refactoring
- Remove unused ActiveRuntimeManager
- Rename StatsRequestMessage -> ActiveStatsRequestMessage
- Add ActiveManager API to return all active runtimes
Change-Id: I79249f7cd42496d6679eb9b0acbe8cda1892f9d3
---
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
D asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
R asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
M asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
9 files changed, 36 insertions(+), 116 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/21/2021/1
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index df59dca..1d771a7 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -18,8 +18,10 @@
*/
package org.apache.asterix.active;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -32,7 +34,7 @@
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.active.message.ActiveStatsResponse;
-import org.apache.asterix.active.message.StatsRequestMessage;
+import org.apache.asterix.active.message.ActiveStatsRequestMessage;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -54,7 +56,7 @@
private volatile boolean shutdown;
public ActiveManager(ExecutorService executor, String nodeId, long activeMemoryBudget,
int frameSize,
- INCServiceContext serviceCtx) throws HyracksDataException {
+ INCServiceContext serviceCtx) throws HyracksDataException {
this.executor = executor;
this.nodeId = nodeId;
this.activeFramePool = new ConcurrentFramePool(nodeId, activeMemoryBudget, frameSize);
@@ -77,6 +79,10 @@
runtimes.remove(id);
}
+ public Set<ActiveRuntimeId> getRuntimeIds() {
+ return Collections.unmodifiableSet(runtimes.keySet());
+ }
+
public IActiveRuntime getRuntime(ActiveRuntimeId runtimeId) {
return runtimes.get(runtimeId);
}
@@ -93,14 +99,14 @@
stopRuntime(message);
break;
case REQUEST_STATS:
- requestStats((StatsRequestMessage) message);
+ requestStats((ActiveStatsRequestMessage) message);
break;
default:
LOGGER.warning("Unknown message type received: " + message.getKind());
}
}
- private void requestStats(StatsRequestMessage message) throws HyracksDataException {
+ private void requestStats(ActiveStatsRequestMessage message) throws HyracksDataException
{
try {
ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
IActiveRuntime runtime = runtimes.get(runtimeId);
@@ -111,13 +117,13 @@
((NodeControllerService) serviceCtx.getControllerService())
.sendApplicationMessageToCC(
JavaSerializationUtils
- .serialize(new ActiveStatsResponse(reqId, null, new
RuntimeDataException(
- ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME,
runtimeId.toString()))),
- null);
+ .serialize(new ActiveStatsResponse(reqId, nodeId,
null,
+ new RuntimeDataException(ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME,
+ runtimeId.toString()))), null);
return;
}
String stats = runtime.getStats();
- ActiveStatsResponse response = new ActiveStatsResponse(reqId, stats, null);
+ ActiveStatsResponse response = new ActiveStatsResponse(reqId, nodeId, stats,
null);
((NodeControllerService) serviceCtx.getControllerService())
.sendApplicationMessageToCC(JavaSerializationUtils.serialize(response),
null);
} catch (Exception e) {
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
deleted file mode 100644
index 18368ae..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class ActiveRuntimeManager {
-
- private static final Logger LOGGER = Logger.getLogger(ActiveRuntimeManager.class.getName());
- private final Map<ActiveRuntimeId, ActiveSourceOperatorNodePushable> activeRuntimes;
-
- private final ExecutorService executorService;
-
- public ActiveRuntimeManager() {
- this.activeRuntimes = new ConcurrentHashMap<>();
- this.executorService = Executors.newCachedThreadPool();
- }
-
- public void close() throws IOException {
- if (executorService != null) {
- executorService.shutdown();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Shut down executor service for :" + ActiveRuntimeManager.class.getSimpleName());
- }
- try {
- executorService.awaitTermination(10L, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.log(Level.SEVERE, ActiveRuntimeManager.class.getSimpleName()
- + " was interrupted while waiting for runtime managers to shutdown",
e);
- }
- if (!executorService.isTerminated()) {
- LOGGER.severe(ActiveRuntimeManager.class.getSimpleName()
- + " failed to shutdown successfully. Will be forced to shutdown");
- executorService.shutdownNow();
- }
- }
- }
-
- public ActiveSourceOperatorNodePushable getRuntime(ActiveRuntimeId runtimeId) {
- return activeRuntimes.get(runtimeId);
- }
-
- public void registerRuntime(ActiveRuntimeId runtimeId, ActiveSourceOperatorNodePushable
feedRuntime)
- throws HyracksDataException {
- if (activeRuntimes.containsKey(runtimeId)) {
- throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_ALREADY_REGISTERED,
runtimeId);
- }
- activeRuntimes.put(runtimeId, feedRuntime);
- }
-
- public void deregisterRuntime(ActiveRuntimeId runtimeId) throws HyracksDataException
{
- if (!activeRuntimes.containsKey(runtimeId)) {
- throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_NOT_REGISTERED, runtimeId);
- }
- activeRuntimes.remove(runtimeId);
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public Set<ActiveRuntimeId> getFeedRuntimes() {
- return activeRuntimes.keySet();
- }
-
-}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
similarity index 88%
rename from asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
rename to asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
index d43f00e..0dbba52 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
@@ -20,11 +20,11 @@
import java.io.Serializable;
-public class StatsRequestMessage extends ActiveManagerMessage {
+public class ActiveStatsRequestMessage extends ActiveManagerMessage {
private static final long serialVersionUID = 1L;
private final long reqId;
- public StatsRequestMessage(Serializable payload, long reqId) {
+ public ActiveStatsRequestMessage(Serializable payload, long reqId) {
super(Kind.REQUEST_STATS, payload);
this.reqId = reqId;
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java
index 8738a06..b8ba271 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java
@@ -27,17 +27,20 @@
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.messaging.api.INcResponse;
import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ActiveStatsResponse implements ICcAddressedMessage, INcResponse {
private static final long serialVersionUID = 1L;
private final long reqId;
+ private final String nodeId;
private final String stats;
private final Exception failure;
- public ActiveStatsResponse(long reqId, String stats, Exception failure) {
+ public ActiveStatsResponse(long reqId, String nodeId, String stats, Exception failure)
{
this.reqId = reqId;
+ this.nodeId = nodeId;
this.stats = stats;
this.failure = failure;
}
@@ -54,13 +57,13 @@
switch (responseState) {
case UNINITIALIZED:
// First to arrive
- result.setRight(new ArrayList<String>());
+ result.setRight(new ArrayList<Pair<String, String>>());
// No failure, change state to success
result.setLeft(ResponseState.SUCCESS);
- // Fallthrough
+ // fall-through
case SUCCESS:
- List<String> response = (List<String>) result.getRight();
- response.add(stats);
+ List<Pair<String, String>> response = (List<Pair<String,
String>>) result.getRight();
+ response.add(Pair.of(nodeId, stats));
break;
default:
break;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index c1e772c..0d27d6c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -38,7 +38,7 @@
import org.apache.asterix.active.NoRetryPolicyFactory;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.active.message.ActivePartitionMessage.Event;
-import org.apache.asterix.active.message.StatsRequestMessage;
+import org.apache.asterix.active.message.ActiveStatsRequestMessage;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -290,7 +290,7 @@
List<INcAddressedMessage> requests = new ArrayList<>();
List<String> ncs = Arrays.asList(locations.getLocations());
for (int i = 0; i < ncs.size(); i++) {
- requests.add(new StatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName,
i), reqId));
+ requests.add(new ActiveStatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName,
i), reqId));
}
try {
List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId,
ncs, requests, timeout);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index e3424ec..8bc6eb0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -26,7 +26,6 @@
}
enum ClusterState {
- STARTING, // the initial state
UNUSABLE, // one or more cluster partitions are inactive or max id resources
have not been reported
PENDING, // the metadata node has not yet joined & initialized
RECOVERING, // global recovery has not yet completed
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java
b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java
index f823404..c51e2cf 100644
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java
+++ b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java
@@ -29,7 +29,7 @@
private final ZooKeeper zk;
private String clusterStatePath;
private boolean done = false;
- private ClusterState clusterState = ClusterState.STARTING;
+ private ClusterState clusterState = ClusterState.UNUSABLE;
private boolean failed = false;
private Exception failureCause = null;
private static Logger LOGGER = Logger.getLogger(ClusterStateWatcher.class.getName());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index c85e236..824f51a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -36,6 +36,9 @@
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController
{
+ public static final String INCOMING_RECORDS_COUNT_FIELD_NAME = "incoming-records-count";
+ public static final String FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME = "failed-at-parser-records-count";
+
public enum State {
CREATED,
STARTED,
@@ -278,7 +281,7 @@
@Override
public String getStats() {
- return "{\"incoming-records-count\": " + incomingRecordsCount + ", \"failed-at-parser-records-count\":
"
- + failedRecordsCount + "}";
+ return "{\"" + INCOMING_RECORDS_COUNT_FIELD_NAME + "\": " + incomingRecordsCount
+ ", \"" +
+ FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME + "\": " + failedRecordsCount +
"}";
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 334b683..51c87b4 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -165,6 +165,10 @@
@Override
public synchronized void refreshState() throws HyracksDataException {
+ if (state == ClusterState.SHUTTING_DOWN) {
+ LOGGER.log(Level.INFO, "Not refreshing final state %s", state);
+ return;
+ }
resetClusterPartitionConstraint();
if (clusterPartitions.isEmpty()) {
LOGGER.info("Cluster does not have any registered partitions");
--
To view, visit https://asterix-gerrit.ics.uci.edu/2021
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I79249f7cd42496d6679eb9b0acbe8cda1892f9d3
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <mblow@apache.org>
|