storm-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [storm] d2r commented on a change in pull request #3050: STORM-3434: server: fix all checkstyle warnings
Date Sat, 10 Aug 2019 17:38:24 GMT
d2r commented on a change in pull request #3050: STORM-3434: server: fix all checkstyle warnings
URL: https://github.com/apache/storm/pull/3050#discussion_r312709830
 
 

 ##########
 File path: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
 ##########
 @@ -1277,6 +1277,132 @@ private static void validatePortAvailable(Map<String, Object>
conf) throws IOExc
         }
     }
 
+    @VisibleForTesting
+    public void launchServer() throws Exception {
+        try {
+            IStormClusterState state = stormClusterState;
+            NimbusInfo hpi = nimbusHostPortInfo;
+
+            LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf));
+            validator.prepare(conf);
+
+            //add to nimbuses
+            state.addNimbusHost(hpi.getHost(),
+                    new NimbusSummary(hpi.getHost(), hpi.getPort(), Time.currentTimeSecs(),
false, STORM_VERSION));
+            leaderElector.addToLeaderLockQueue();
+            this.blobStore.startSyncBlobs();
+
+            for (ClusterMetricsConsumerExecutor exec: clusterConsumerExceutors) {
+                exec.prepare();
+            }
+
+            if (isLeader()) {
+                for (String topoId : state.activeStorms()) {
+                    transition(topoId, TopologyActions.STARTUP, null);
+                }
+                clusterMetricSet.setActive(true);
+            }
+
+            final boolean doNotReassign = (Boolean) conf.getOrDefault(ServerConfigUtils.NIMBUS_DO_NOT_REASSIGN,
false);
+            timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)),
+                () -> {
+                    try {
+                        if (!doNotReassign) {
+                            mkAssignments();
+                        }
+                        doCleanup();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
+            // Schedule Nimbus inbox cleaner
+            final int jarExpSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_INBOX_JAR_EXPIRATION_SECS));
+            timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_CLEANUP_INBOX_FREQ_SECS)),
+                () -> {
+                    try {
+                        cleanInbox(getInbox(), jarExpSecs);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
+
+            // Schedule topology history cleaner
+            Integer interval = ObjectReader.getInt(conf.get(DaemonConfig.LOGVIEWER_CLEANUP_INTERVAL_SECS),
null);
+            if (interval != null) {
+                final int lvCleanupAgeMins = ObjectReader.getInt(conf.get(DaemonConfig.LOGVIEWER_CLEANUP_AGE_MINS));
+                timer.scheduleRecurring(0, interval,
+                    () -> {
+                        try {
+                            cleanTopologyHistory(lvCleanupAgeMins);
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+            }
+
+            timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_CREDENTIAL_RENEW_FREQ_SECS)),
+                () -> {
+                    try {
+                        renewCredentials();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
+            metricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", ()
-> nodeIdToResources.get().values()
+                    .parallelStream()
+                    .mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableMem(),
0))
+                    .sum());
+            metricsRegistry.registerGauge("nimbus:available-cpu-non-negative", () -> nodeIdToResources.get().values()
+                    .parallelStream()
+                    .mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableCpu(),
0))
+                    .sum());
+            metricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values()
+                    .parallelStream()
+                    .mapToDouble(SupervisorResources::getTotalMem)
+                    .sum());
+            metricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values()
+                    .parallelStream()
+                    .mapToDouble(SupervisorResources::getTotalCpu)
+                    .sum());
+            metricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+                //We want to update longest scheduling time in real time in case scheduler
get stuck
+                // Get current time before startTime to avoid potential race with scheduler's
Timer
+                Long currTime = Time.nanoTime();
+                Long startTime = schedulingStartTimeNs.get();
+                return TimeUnit.NANOSECONDS.toMillis(startTime == null
+                        ? longestSchedulingTime.get()
+                        : Math.max(currTime - startTime, longestSchedulingTime.get()));
+            });
+            metricsRegistry.registerMeter("nimbus:num-launched").mark();
+
+            timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
+                () -> {
+                    try {
+                        if (isLeader()) {
+                            sendClusterMetricsToExecutors();
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
+            timer.scheduleRecurring(5, 5, clusterMetricSet);
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
+                throw e;
+            }
+
+            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {
+                throw e;
+            }
+            LOG.error("Error on initialization of nimbus", e);
+            Utils.exitProcess(13, "Error on initialization of nimbus");
+        }
+    }
+
 
 Review comment:
   FYI: This was moved and modified. The modifications were all white-space, except for reformatting
one use of a ternary operator to put the `?` at the start of the following line.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message