storm-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roshannaik <...@git.apache.org>
Subject [GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Date Thu, 27 Jul 2017 01:28:39 GMT
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r129736039
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -155,134 +150,141 @@ public void start() throws Exception {
     
             Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
                 @Override public Object run() throws Exception {
    -                workerState =
    -                    new WorkerState(conf, context, topologyId, assignmentId, port, workerId,
topologyConf, stateStorage,
    +                return loadWorker(topologyConf, stateStorage, stormClusterState, initCreds,
initialCredentials);
    +            }
    +        }); // Subject.doAs(...)
    +
    +    }
    +
    +    private Object loadWorker(Map topologyConf, IStateStorage stateStorage, IStormClusterState
stormClusterState, Map<String, String> initCreds, Credentials initialCredentials)
    +            throws Exception {
    +        workerState =
    +                new WorkerState(conf, context, topologyId, assignmentId, port, workerId,
topologyConf, stateStorage,
                             stormClusterState);
     
    -                // Heartbeat here so that worker process dies if this fails
    -                // it's important that worker heartbeat to supervisor ASAP so that supervisor
knows
    -                // that worker is running and moves on
    -                doHeartBeat();
    +        // Heartbeat here so that worker process dies if this fails
    +        // it's important that worker heartbeat to supervisor ASAP so that supervisor
knows
    +        // that worker is running and moves on
    +        doHeartBeat();
     
    -                executorsAtom = new AtomicReference<>(null);
    +        executorsAtom = new AtomicReference<>(null);
     
    -                // launch heartbeat threads immediately so that slow-loading tasks don't
cause the worker to timeout
    -                // to the supervisor
    -                workerState.heartbeatTimer
    -                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
() -> {
    -                        try {
    -                            doHeartBeat();
    -                        } catch (IOException e) {
    -                            throw new RuntimeException(e);
    -                        }
    -                    });
    +        // launch heartbeat threads immediately so that slow-loading tasks don't cause
the worker to timeout
    +        // to the supervisor
    +        workerState.heartbeatTimer
    +                .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
() -> {
    +                    try {
    +                        doHeartBeat();
    +                    } catch (IOException e) {
    +                        throw new RuntimeException(e);
    +                    }
    +                });
     
    -                workerState.executorHeartbeatTimer
    -                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
    +        workerState.executorHeartbeatTimer
    +                .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
                             Worker.this::doExecutorHeartbeats);
     
    -                workerState.registerCallbacks();
    +        workerState.registerCallbacks();
     
    -                workerState.refreshConnections(null);
    +        workerState.refreshConnections(null);
     
    -                workerState.activateWorkerWhenAllConnectionsReady();
    +        workerState.activateWorkerWhenAllConnectionsReady();
     
    -                workerState.refreshStormActive(null);
    +        workerState.refreshStormActive(null);
     
    -                workerState.runWorkerStartHooks();
    +        workerState.runWorkerStartHooks();
     
    -                List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    -                for (List<Long> e : workerState.getExecutors()) {
    -                    if (ConfigUtils.isLocalMode(topologyConf)) {
    -                        newExecutors.add(
    -                            LocalExecutor.mkExecutor(workerState, e, initCreds)
    +        List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    +        for (List<Long> e : workerState.getExecutors()) {
    +            if (ConfigUtils.isLocalMode(topologyConf)) {
    +                newExecutors.add(
    +                        LocalExecutor.mkExecutor(workerState, e, initCreds)
                                     .execute());
    -                    } else {
    -                        newExecutors.add(
    -                            Executor.mkExecutor(workerState, e, initCreds)
    +            } else {
    +                newExecutors.add(
    +                        Executor.mkExecutor(workerState, e, initCreds)
                                     .execute());
    -                    }
    -                }
    -                executorsAtom.set(newExecutors);
    +            }
    +        }
     
    -                EventHandler<Object> tupleHandler = (packets, seqId, batchEnd)
-> workerState
    -                    .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>)
packets, seqId, batchEnd);
    +        executorsAtom.set(newExecutors);
     
    -                // This thread will publish the messages destined for remote tasks to
remote connections
    -                transferThread = Utils.asyncLoop(() -> {
    -                    workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
    -                    return 0L;
    -                });
    +        JCQueue.Consumer tupleHandler = workerState;
     
    -                DisruptorBackpressureCallback disruptorBackpressureHandler =
    -                    mkDisruptorBackpressureHandler(workerState);
    -                workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
    -                workerState.transferQueue
    -                    .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
    -                workerState.transferQueue
    -                    .setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    -                workerState.transferQueue
    -                    .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    -
    -                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
    -                backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger,
workerState, backpressureCallback);
    -                if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE))
{
    -                    backpressureThread.start();
    -                    stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
    -                    
    -                    int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
    -                    workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs,
workerState::refreshThrottle);
    -                }
    +        // This thread will send the messages destined for remote tasks (out of process)
    +        transferThread = Utils.asyncLoop(() -> {
    +            int x = workerState.transferQueue.consume(tupleHandler);
    +            if(x==0)
    +                return 1L;
    +            return 0L;
    +        });
    +        transferThread.setName("Worker-Transfer");
     
    -                credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
    +        credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
     
    -                establishLogSettingCallback();
    +        establishLogSettingCallback();
    +
    +        workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
     
    -                workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
    +        workerState.refreshCredentialsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable()
{
    +                    @Override public void run() {
    +                        checkCredentialsChanged();
    +                    }
    +                });
     
    -                workerState.refreshCredentialsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable()
{
    -                        @Override public void run() {
    -                            checkCredentialsChanged();
    -                            if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE))
{
    -                               checkThrottleChanged();
    -                            }
    +        workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
    +                (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS,
10), new Runnable() {
    +                    @Override public void run() {
    +                        try {
    +                            LOG.debug("Checking if blobs have updated");
    +                            updateBlobUpdates();
    +                        } catch (IOException e) {
    +                            // IOException from reading the version files to be ignored
    +                            LOG.error(e.getStackTrace().toString());
                             }
    -                    });
    -
    -                workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
    -                        (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS,
10), new Runnable() {
    -                            @Override public void run() {
    -                                try {
    -                                    LOG.debug("Checking if blobs have updated");
    -                                    updateBlobUpdates();
    -                                } catch (IOException e) {
    -                                    // IOException from reading the version files to
be ignored
    -                                    LOG.error(e.getStackTrace().toString());
    -                                }
    -                            }
    -                        });
    -
    -                // The jitter allows the clients to get the data at different times,
and avoids thundering herd
    -                if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING))
{
    -                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500,
workerState::refreshLoad);
    -                }
    +                    }
    +                });
     
    -                workerState.refreshConnectionsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
    +        // The jitter allows the clients to get the data at different times, and avoids
thundering herd
    +        if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING))
{
    +            workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
    +        }
     
    -                workerState.resetLogLevelsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
    +        workerState.refreshConnectionsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
     
    -                workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    -                    workerState::refreshStormActive);
    +        workerState.resetLogLevelsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
     
    -                LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf,
Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    -                LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId,
topologyId, assignmentId, port);
    -                return this;
    -            };
    -        });
    +        workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    +                workerState::refreshStormActive);
    +
    +        setupFlushTupleTimer(newExecutors);
    +
    +        LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    +        LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId,
assignmentId, port);
    +        return this;
    +    }
     
    +    private void setupFlushTupleTimer(final List<IRunningExecutor> executors) {
    +//                StormTimer timerTask = workerState.getUserTimer();
    +        Integer batchSize = ObjectReader.getInt(conf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
    +        final Long flushIntervalMs = ObjectReader.getLong( conf.get(Config.TOPOLOGY_FLUSH_TUPLE_FREQ_MILLIS)
);
    +        if(batchSize==1 || flushIntervalMs==0)
    +            return;
    +
    +        workerState.flushTupleTimer.scheduleRecurringMs(flushIntervalMs, flushIntervalMs,
new Runnable() {
    +            @Override
    +            public void run() {
    +                for (int i = 0; i < executors.size(); i++) {
    +                    IRunningExecutor exec = executors.get(i);
    +                    if(exec.getExecutorId().get(0) != -1) // dont send to system bolt
    --- End diff --
    
    agree.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message