storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [05/21] storm git commit: STORM-2018: Supervisor V2
Date Thu, 27 Oct 2016 16:05:26 GMT
http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
new file mode 100644
index 0000000..5e89eea
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.DaemonCommon;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
+import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.localizer.AsyncLocalizer;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Supervisor implements DaemonCommon, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(Supervisor.class);
+    private final Map<String, Object> conf;
+    private final IContext sharedContext;
+    private volatile boolean active;
+    private final ISupervisor iSupervisor;
+    private final Utils.UptimeComputer upTime;
+    private final String stormVersion;
+    private final IStormClusterState stormClusterState;
+    private final LocalState localState;
+    private final String supervisorId;
+    private final String assignmentId;
+    private final String hostName;
+    // used for reporting used ports when heartbeating
+    private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
+    private final StormTimer heartbeatTimer;
+    private final StormTimer eventTimer;
+    private final StormTimer blobUpdateTimer;
+    private final Localizer localizer;
+    private final ILocalizer asyncLocalizer;
+    private EventManager eventManager;
+    private ReadClusterState readState;
+    
+    private Supervisor(ISupervisor iSupervisor) throws IOException {
+        this(Utils.readStormConfig(), null, iSupervisor);
+    }
+    
+    public Supervisor(Map<String, Object> conf, IContext sharedContext, ISupervisor iSupervisor) throws IOException {
+        this.conf = conf;
+        this.iSupervisor = iSupervisor;
+        this.active = true;
+        this.upTime = Utils.makeUptimeComputer();
+        this.stormVersion = VersionInfo.getVersion();
+        this.sharedContext = sharedContext;
+        
+        iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf));
+        
+        List<ACL> acls = null;
+        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
+            acls = SupervisorUtils.supervisorZkAcls();
+        }
+
+        try {
+            this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR));
+        } catch (Exception e) {
+            LOG.error("supervisor can't create stormClusterState");
+            throw Utils.wrapInRuntime(e);
+        }
+
+        try {
+            this.localState = ConfigUtils.supervisorState(conf);
+            this.localizer = Utils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf));
+            this.asyncLocalizer = new AsyncLocalizer(conf, this.localizer);
+        } catch (IOException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+        this.supervisorId = iSupervisor.getSupervisorId();
+        this.assignmentId = iSupervisor.getAssignmentId();
+
+        try {
+            this.hostName = Utils.hostname(conf);
+        } catch (UnknownHostException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+
+        this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>());
+
+        this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+
+        this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+
+        this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler());
+    }
+    
+    public String getId() {
+        return supervisorId;
+    }
+    
+    IContext getSharedContext() {
+        return sharedContext;
+    }
+
+    public Map<String, Object> getConf() {
+        return conf;
+    }
+
+    public ISupervisor getiSupervisor() {
+        return iSupervisor;
+    }
+
+    public Utils.UptimeComputer getUpTime() {
+        return upTime;
+    }
+
+    public String getStormVersion() {
+        return stormVersion;
+    }
+
+    public IStormClusterState getStormClusterState() {
+        return stormClusterState;
+    }
+
+    LocalState getLocalState() {
+        return localState;
+    }
+
+    public String getAssignmentId() {
+        return assignmentId;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
+
+    public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
+        return currAssignment;
+    }
+
+    public Localizer getLocalizer() {
+        return localizer;
+    }
+    
+    ILocalizer getAsyncLocalizer() {
+        return asyncLocalizer;
+    }
+    
+    EventManager getEventManger() {
+        return eventManager;
+    }
+    
+    /**
+     * Launch the supervisor
+     */
+    public void launch() throws Exception {
+        LOG.info("Starting Supervisor with conf {}", conf);
+        String path = ConfigUtils.supervisorTmpDir(conf);
+        FileUtils.cleanDirectory(new File(path));
+
+        Localizer localizer = getLocalizer();
+
+        SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this);
+        hb.run();
+        // should synchronize supervisor so it doesn't launch anything after being down (optimization)
+        Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
+        heartbeatTimer.scheduleRecurring(0, heartbeatFrequency, hb);
+
+        this.eventManager = new EventManagerImp(false);
+        this.readState = new ReadClusterState(this);
+        
+        Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf);
+        for (String topoId : downloadedTopoIds) {
+            SupervisorUtils.addBlobReferences(localizer, topoId, conf);
+        }
+        // do this after adding the references so we don't try to clean things being used
+        localizer.startCleaner();
+
+        UpdateBlobs updateBlobsThread = new UpdateBlobs(this);
+
+        if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
+            // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
+            // to date even if callbacks don't all work exactly right
+            eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(readState, eventManager));
+
+            // Blob update thread. Starts with 30 seconds delay, every 30 seconds
+            blobUpdateTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, eventManager));
+
+            // supervisor health check
+            eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(this));
+        }
+        LOG.info("Starting supervisor with id {} at host {}.", getId(), getHostName());
+    }
+
+    /**
+     * start distribute supervisor
+     */
+    private void launchDaemon() {
+        LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
+        try {
+            Map<String, Object> conf = getConf();
+            if (ConfigUtils.isLocalMode(conf)) {
+                throw new IllegalArgumentException("Cannot start server in local mode!");
+            }
+            launch();
+            Utils.addShutdownHookWithForceKillIn1Sec(() -> {this.close();});
+            registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
+            StormMetricsRegistry.startMetricsReporters(conf);
+        } catch (Exception e) {
+            LOG.error("Failed to start supervisor\n", e);
+            System.exit(1);
+        }
+    }
+
+    private void registerWorkerNumGauge(String name, final Map<String, Object> conf) {
+        StormMetricsRegistry.registerGauge(name, new Callable<Integer>() {
+            @Override
+            public Integer call() throws Exception {
+                Collection<String> pids = SupervisorUtils.supervisorWorkerIds(conf);
+                return pids.size();
+            }
+        });
+    }
+    
+    @Override
+    public void close() {
+        try {
+            LOG.info("Shutting down supervisor {}", getId());
+            this.active = false;
+            heartbeatTimer.close();
+            eventTimer.close();
+            blobUpdateTimer.close();
+            if (eventManager != null) {
+                eventManager.close();
+            }
+            if (readState != null) {
+                readState.close();
+            }
+            getStormClusterState().disconnect();
+        } catch (Exception e) {
+            LOG.error("Error Shutting down", e);
+        }
+    }
+    
+    void killWorkers(Collection<String> workerIds, ContainerLauncher launcher) throws InterruptedException, IOException {
+        HashSet<Killable> containers = new HashSet<>();
+        for (String workerId : workerIds) {
+            try {
+                Killable k = launcher.recoverContainer(workerId, localState);
+                if (!k.areAllProcessesDead()) {
+                    k.kill();
+                    containers.add(k);
+                } else {
+                    k.cleanUp();
+                }
+            } catch (Exception e) {
+                LOG.error("Error trying to kill {}", workerId, e);
+            }
+        }
+        int shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS), 1);
+        if (!containers.isEmpty()) {
+            Time.sleepSecs(shutdownSleepSecs);
+        }
+        for (Killable k: containers) {
+            try {
+                k.forceKill();
+                long start = Time.currentTimeMillis();
+                while(!k.areAllProcessesDead()) {
+                    if ((Time.currentTimeMillis() - start) > 10_000) {
+                        throw new RuntimeException("Giving up on killing " + k 
+                                + " after " + (Time.currentTimeMillis() - start) + " ms");
+                    }
+                    Time.sleep(100);
+                    k.forceKill();
+                }
+                k.cleanUp();
+            } catch (Exception e) {
+                LOG.error("Error trying to clean up {}", k, e);
+            }
+        }
+    }
+
+    public void shutdownAllWorkers() {
+        if (readState != null) {
+            readState.shutdownAllWorkers();
+        } else {
+            try {
+                ContainerLauncher launcher = ContainerLauncher.make(getConf(), getId(), getSharedContext());
+                killWorkers(SupervisorUtils.supervisorWorkerIds(conf), launcher);
+            } catch (Exception e) {
+                throw Utils.wrapInRuntime(e);
+            }
+        }
+    }
+
+    @Override
+    public boolean isWaiting() {
+        if (!active) {
+            return true;
+        }
+
+        if (heartbeatTimer.isTimerWaiting() && eventTimer.isTimerWaiting() && eventManager.waiting()) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * supervisor daemon enter entrance
+     *
+     * @param args
+     */
+    public static void main(String[] args) throws Exception {
+        Utils.setupDefaultUncaughtExceptionHandler();
+        @SuppressWarnings("resource")
+        Supervisor instance = new Supervisor(new StandaloneSupervisor());
+        instance.launchDaemon();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
new file mode 100644
index 0000000..0784631
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class SupervisorUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class);
+
+    private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+    private static SupervisorUtils _instance = INSTANCE;
+    public static void setInstance(SupervisorUtils u) {
+        _instance = u;
+    }
+    public static void resetInstance() {
+        _instance = INSTANCE;
+    }
+
+    static Process processLauncher(Map<String, Object> conf, String user, List<String> commandPrefix, List<String> args, Map<String, String> environment, final String logPreFix,
+                                          final ExitCodeCallback exitCodeCallback, File dir) throws IOException {
+        if (StringUtils.isBlank(user)) {
+            throw new IllegalArgumentException("User cannot be blank when calling processLauncher.");
+        }
+        String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+        String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+        String wl;
+        if (StringUtils.isNotBlank(wlinitial)) {
+            wl = wlinitial;
+        } else {
+            wl = stormHome + "/bin/worker-launcher";
+        }
+        List<String> commands = new ArrayList<>();
+        if (commandPrefix != null){
+            commands.addAll(commandPrefix);
+        }
+        commands.add(wl);
+        commands.add(user);
+        commands.addAll(args);
+        LOG.info("Running as user: {} command: {}", user, commands);
+        return SupervisorUtils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir);
+    }
+
+    public static int processLauncherAndWait(Map<String, Object> conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix)
+            throws IOException {
+        int ret = 0;
+        Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null);
+        if (StringUtils.isNotBlank(logPreFix))
+            Utils.readAndLogStream(logPreFix, process.getInputStream());
+        try {
+            process.waitFor();
+        } catch (InterruptedException e) {
+            LOG.info("{} interrupted.", logPreFix);
+        }
+        ret = process.exitValue();
+        return ret;
+    }
+
+    public static void setupStormCodeDir(Map<String, Object> conf, Map<String, Object> stormConf, String dir) throws IOException {
+        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+            String logPrefix = "setup conf for " + dir;
+            List<String> commands = new ArrayList<>();
+            commands.add("code-dir");
+            commands.add(dir);
+            processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+        }
+    }
+
+    public static void rmrAsUser(Map<String, Object> conf, String id, String path) throws IOException {
+        String user = Utils.getFileOwner(path);
+        String logPreFix = "rmr " + id;
+        List<String> commands = new ArrayList<>();
+        commands.add("rmr");
+        commands.add(path);
+        SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPreFix);
+        if (Utils.checkFileExists(path)) {
+            throw new RuntimeException(path + " was not deleted.");
+        }
+    }
+
+    /**
+     * Given the blob information returns the value of the uncompress field, handling it either being a string or a boolean value, or if it's not specified then
+     * returns false
+     * 
+     * @param blobInfo
+     * @return
+     */
+    public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) {
+        return Utils.getBoolean(blobInfo.get("uncompress"), false);
+    }
+
+    /**
+     * Returns a list of LocalResources based on the blobstore-map passed in
+     * 
+     * @param blobstoreMap
+     * @return
+     */
+    public static List<LocalResource> blobstoreMapToLocalresources(Map<String, Map<String, Object>> blobstoreMap) {
+        List<LocalResource> localResourceList = new ArrayList<>();
+        if (blobstoreMap != null) {
+            for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) {
+                LocalResource localResource = new LocalResource(map.getKey(), shouldUncompressBlob(map.getValue()));
+                localResourceList.add(localResource);
+            }
+        }
+        return localResourceList;
+    }
+
+    /**
+     * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the cache on restart.
+     * 
+     * @param localizer
+     * @param stormId
+     * @param conf
+     */
+    static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf) throws IOException {
+        Map<String, Object> stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+        List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+        if (blobstoreMap != null) {
+            localizer.addReferences(localresources, user, topoName);
+        }
+    }
+
+    public static Set<String> readDownloadedTopologyIds(Map<String, Object> conf) throws IOException {
+        Set<String> stormIds = new HashSet<>();
+        String path = ConfigUtils.supervisorStormDistRoot(conf);
+        Collection<String> rets = Utils.readDirContents(path);
+        for (String ret : rets) {
+            stormIds.add(URLDecoder.decode(ret));
+        }
+        return stormIds;
+    }
+
+    public static Collection<String> supervisorWorkerIds(Map<String, Object> conf) {
+        String workerRoot = ConfigUtils.workerRoot(conf);
+        return Utils.readDirContents(workerRoot);
+    }
+
+    static boolean doRequiredTopoFilesExist(Map<String, Object> conf, String stormId) throws IOException {
+        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+        String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot);
+        String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot);
+        String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot);
+        if (!Utils.checkFileExists(stormroot))
+            return false;
+        if (!Utils.checkFileExists(stormcodepath))
+            return false;
+        if (!Utils.checkFileExists(stormconfpath))
+            return false;
+        if (ConfigUtils.isLocalMode(conf) || Utils.checkFileExists(stormjarpath))
+            return true;
+        return false;
+    }
+
+    /**
+     * map from worker id to heartbeat
+     *
+     * @param conf
+     * @return
+     * @throws Exception
+     */
+    public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map<String, Object> conf) throws Exception {
+        return _instance.readWorkerHeartbeatsImpl(conf);
+    }
+
+    public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) throws Exception {
+        Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
+
+        Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
+
+        for (String workerId : workerIds) {
+            LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId);
+            // ATTENTION: whb can be null
+            workerHeartbeats.put(workerId, whb);
+        }
+        return workerHeartbeats;
+    }
+
+
+    /**
+     * get worker heartbeat by workerId
+     *
+     * @param conf
+     * @param workerId
+     * @return
+     * @throws IOException
+     */
+    private static LSWorkerHeartbeat readWorkerHeartbeat(Map<String, Object> conf, String workerId) {
+        return _instance.readWorkerHeartbeatImpl(conf, workerId);
+    }
+
+    protected LSWorkerHeartbeat readWorkerHeartbeatImpl(Map<String, Object> conf, String workerId) {
+        try {
+            LocalState localState = ConfigUtils.workerState(conf, workerId);
+            return localState.getWorkerHeartBeat();
+        } catch (Exception e) {
+            LOG.warn("Failed to read local heartbeat for workerId : {},Ignoring exception.", workerId, e);
+            return null;
+        }
+    }
+
+    public static boolean  isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) {
+        return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
+    }
+
+    private  boolean  isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) {
+        return (now - whb.get_time_secs()) > Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+    }
+    
+    /**
+     * Launch a new process as per {@link java.lang.ProcessBuilder} with a given
+     * callback.
+     * @param command the command to be executed in the new process
+     * @param environment the environment to be applied to the process. Can be
+     *                    null.
+     * @param logPrefix a prefix for log entries from the output of the process.
+     *                  Can be null.
+     * @param exitCodeCallback code to be called passing the exit code value
+     *                         when the process completes
+     * @param dir the working directory of the new process
+     * @return the new process
+     * @throws IOException
+     * @see java.lang.ProcessBuilder
+     */
+    public static Process launchProcess(List<String> command,
+                                        Map<String,String> environment,
+                                        final String logPrefix,
+                                        final ExitCodeCallback exitCodeCallback,
+                                        File dir)
+            throws IOException {
+        ProcessBuilder builder = new ProcessBuilder(command);
+        Map<String,String> procEnv = builder.environment();
+        if (dir != null) {
+            builder.directory(dir);
+        }
+        builder.redirectErrorStream(true);
+        if (environment != null) {
+            procEnv.putAll(environment);
+        }
+        final Process process = builder.start();
+        if (logPrefix != null || exitCodeCallback != null) {
+            Utils.asyncLoop(new Callable<Object>() {
+                public Object call() {
+                    if (logPrefix != null ) {
+                        Utils.readAndLogStream(logPrefix,
+                                process.getInputStream());
+                    }
+                    if (exitCodeCallback != null) {
+                        try {
+                            process.waitFor();
+                            exitCodeCallback.call(process.exitValue());
+                        } catch (InterruptedException ie) {
+                            LOG.info("{} interrupted", logPrefix);
+                            exitCodeCallback.call(-1);
+                        }
+                    }
+                    return null; // Run only once.
+                }
+            });
+        }
+        return process;
+    }
+    
+    static List<ACL> supervisorZkAcls() {
+        final List<ACL> acls = new ArrayList<>();
+        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
+        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
+        return acls;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
new file mode 100644
index 0000000..0017092
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import java.util.Map;
+
+import org.apache.storm.command.HealthCheck;
+import org.apache.storm.daemon.supervisor.Supervisor;
+
+public class SupervisorHealthCheck implements Runnable {
+    private final Supervisor supervisor;
+
+    public SupervisorHealthCheck(Supervisor supervisor) {
+        this.supervisor = supervisor;
+    }
+
+    @Override
+    public void run() {
+        Map<String, Object> conf = supervisor.getConf();
+        int healthCode = HealthCheck.healthCheck(conf);
+        if (healthCode != 0) {
+            supervisor.shutdownAllWorkers();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
new file mode 100644
index 0000000..34c5682
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SupervisorHeartbeat implements Runnable {
+
+     private final IStormClusterState stormClusterState;
+     private final String supervisorId;
+     private final Map<String, Object> conf;
+     private final Supervisor supervisor;
+
+    public SupervisorHeartbeat(Map<String, Object> conf, Supervisor supervisor) {
+        this.stormClusterState = supervisor.getStormClusterState();
+        this.supervisorId = supervisor.getId();
+        this.supervisor = supervisor;
+        this.conf = conf;
+    }
+
+    private SupervisorInfo buildSupervisorInfo(Map<String, Object> conf, Supervisor supervisor) {
+        SupervisorInfo supervisorInfo = new SupervisorInfo();
+        supervisorInfo.set_time_secs(Time.currentTimeSecs());
+        supervisorInfo.set_hostname(supervisor.getHostName());
+        supervisorInfo.set_assignment_id(supervisor.getAssignmentId());
+
+        List<Long> usedPorts = new ArrayList<>();
+        usedPorts.addAll(supervisor.getCurrAssignment().get().keySet());
+        supervisorInfo.set_used_ports(usedPorts);
+        List metaDatas = (List)supervisor.getiSupervisor().getMetadata();
+        List<Long> portList = new ArrayList<>();
+        if (metaDatas != null){
+            for (Object data : metaDatas){
+                Integer port = Utils.getInt(data);
+                if (port != null)
+                    portList.add(port.longValue());
+            }
+        }
+
+        supervisorInfo.set_meta(portList);
+        supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META));
+        supervisorInfo.set_uptime_secs(supervisor.getUpTime().upTime());
+        supervisorInfo.set_version(supervisor.getStormVersion());
+        supervisorInfo.set_resources_map(mkSupervisorCapacities(conf));
+        return supervisorInfo;
+    }
+
+    private Map<String, Double> mkSupervisorCapacities(Map conf) {
+        Map<String, Double> ret = new HashMap<String, Double>();
+        Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB));
+        ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
+        Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY));
+        ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
+        return ret;
+    }
+
+    @Override
+    public void run() {
+        SupervisorInfo supervisorInfo = buildSupervisorInfo(conf, supervisor);
+        stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
new file mode 100644
index 0000000..0b6d996
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.NimbusLeaderNotFoundException;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The
+ * Runnable is intended to be run periodically by a timer, created elsewhere.
+ */
+public class UpdateBlobs implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class);
+
+    private Supervisor supervisor;
+
+    public UpdateBlobs(Supervisor supervisor) {
+        this.supervisor = supervisor;
+    }
+
+    @Override
+    public void run() {
+        try {
+            Map<String, Object> conf = supervisor.getConf();
+            Set<String> downloadedStormIds = SupervisorUtils.readDownloadedTopologyIds(conf);
+            AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisor.getCurrAssignment();
+            Set<String> assignedStormIds = new HashSet<>();
+            for (LocalAssignment localAssignment : newAssignment.get().values()) {
+                assignedStormIds.add(localAssignment.get_topology_id());
+            }
+            for (String stormId : downloadedStormIds) {
+                if (assignedStormIds.contains(stormId)) {
+                    String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+                    LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot);
+                    updateBlobsForTopology(conf, stormId, supervisor.getLocalizer());
+                }
+            }
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
+                LOG.error("Network error while updating blobs, will retry again later", e);
+            } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
+                LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
+            } else {
+                throw Utils.wrapInRuntime(e);
+            }
+        }
+    }
+
+    /**
+     * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded.
+     * 
+     * @param conf
+     * @param stormId
+     * @param localizer
+     * @throws IOException
+     */
+    private void updateBlobsForTopology(Map conf, String stormId, Localizer localizer) throws IOException {
+        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+        try {
+            localizer.updateBlobs(localresources, user);
+        } catch (AuthorizationException authExp) {
+            LOG.error("AuthorizationException error", authExp);
+        } catch (KeyNotFoundException knf) {
+            LOG.error("KeyNotFoundException error", knf);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/event/EventManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/event/EventManager.java b/storm-core/src/jvm/org/apache/storm/event/EventManager.java
new file mode 100644
index 0000000..64536c1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/event/EventManager.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.event;
+
+public interface EventManager extends AutoCloseable {
+    void add(Runnable eventFn);
+
+    boolean waiting();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/event/EventManagerImp.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/event/EventManagerImp.java b/storm-core/src/jvm/org/apache/storm/event/EventManagerImp.java
new file mode 100644
index 0000000..42e6d6b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/event/EventManagerImp.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.event;
+
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InterruptedIOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class EventManagerImp implements EventManager {
+    private static final Logger LOG = LoggerFactory.getLogger(EventManagerImp.class);
+
+    private AtomicInteger added;
+    private AtomicInteger processed;
+    private AtomicBoolean running;
+    private Thread runner;
+
+    private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
+
+    public EventManagerImp(boolean isDaemon) {
+        added = new AtomicInteger();
+        processed = new AtomicInteger();
+        running = new AtomicBoolean(true);
+        runner = new Thread() {
+            @Override
+            public void run() {
+                while (running.get()) {
+                    try {
+                        Runnable r = queue.take();
+                        if (r == null) {
+                            return;
+                        }
+
+                        r.run();
+                        proccessInc();
+                    } catch (Throwable t) {
+                        if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) {
+                            LOG.info("Event manager interrupted while doing IO");
+                        } else if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) {
+                            LOG.info("Event manager interrupted");
+                        } else {
+                            LOG.error("{} Error when processing event", t);
+                            Utils.exitProcess(20, "Error when processing an event");
+                        }
+                    }
+                }
+            }
+        };
+        runner.setDaemon(isDaemon);
+        runner.start();
+    }
+
+    public void proccessInc() {
+        processed.incrementAndGet();
+    }
+
+    @Override
+    public void add(Runnable eventFn) {
+        if (!running.get()) {
+            throw new RuntimeException("Cannot add events to a shutdown event manager");
+        }
+        added.incrementAndGet();
+        queue.add(eventFn);
+    }
+
+    @Override
+    public boolean waiting() {
+        return (Time.isThreadWaiting(runner) || (processed.get() == added.get()));
+    }
+
+    @Override
+    public void close() throws Exception {
+        running.set(false);
+        runner.interrupt();
+        runner.join();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
new file mode 100644
index 0000000..7887281
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is a wrapper around the Localizer class that provides the desired
+ * async interface to Slot.
+ */
+public class AsyncLocalizer implements ILocalizer, Shutdownable {
+    /**
+     * A future that has already completed.
+     */
+    private static class AllDoneFuture implements Future<Void> {
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return false;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return false;
+        }
+
+        @Override
+        public boolean isDone() {
+            return true;
+        }
+
+        @Override
+        public Void get() {
+            return null;
+        }
+
+        @Override
+        public Void get(long timeout, TimeUnit unit) {
+            return null;
+        }
+
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizer.class);
+
+    private final Localizer _localizer;
+    private final ExecutorService _execService;
+    private final boolean _isLocalMode;
+    private final Map<String, Object> _conf;
+    private final Map<String, LocalDownloadedResource> _basicPending;
+    private final Map<String, LocalDownloadedResource> _blobPending;
+    private final AdvancedFSOps _fsOps;
+
+    private class DownloadBaseBlobsDistributed implements Callable<Void> {
+        protected final String _topologyId;
+        protected final File _stormRoot;
+        
+        public DownloadBaseBlobsDistributed(String topologyId) throws IOException {
+            _topologyId = topologyId;
+            _stormRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf, _topologyId));
+        }
+        
+        protected void downloadBaseBlobs(File tmproot) throws Exception {
+            String stormJarKey = ConfigUtils.masterStormJarKey(_topologyId);
+            String stormCodeKey = ConfigUtils.masterStormCodeKey(_topologyId);
+            String stormConfKey = ConfigUtils.masterStormConfKey(_topologyId);
+            String jarPath = ConfigUtils.supervisorStormJarPath(tmproot.getAbsolutePath());
+            String codePath = ConfigUtils.supervisorStormCodePath(tmproot.getAbsolutePath());
+            String confPath = ConfigUtils.supervisorStormConfPath(tmproot.getAbsolutePath());
+            _fsOps.forceMkdir(tmproot);
+            _fsOps.restrictDirectoryPermissions(tmproot);
+            ClientBlobStore blobStore = Utils.getClientBlobStoreForSupervisor(_conf);
+            try {
+                Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore);
+                Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath, blobStore);
+                Utils.downloadResourcesAsSupervisor(stormConfKey, confPath, blobStore);
+            } finally {
+                blobStore.shutdown();
+            }
+            Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR, tmproot);
+        }
+        
+        @Override
+        public Void call() throws Exception {
+            try {
+                if (_fsOps.fileExists(_stormRoot)) {
+                    if (!_fsOps.supportsAtomicDirectoryMove()) {
+                        LOG.warn("{} may have partially downloaded blobs, recovering", _topologyId);
+                        _fsOps.deleteIfExists(_stormRoot);
+                    } else {
+                        LOG.warn("{} already downloaded blobs, skipping", _topologyId);
+                        return null;
+                    }
+                }
+                boolean deleteAll = true;
+                String tmproot = ConfigUtils.supervisorTmpDir(_conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid();
+                File tr = new File(tmproot);
+                try {
+                    downloadBaseBlobs(tr);
+                    _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
+                    _fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, _topologyId), _stormRoot);
+                    deleteAll = false;
+                } finally {
+                    if (deleteAll) {
+                        LOG.warn("Failed to download basic resources for topology-id {}", _topologyId);
+                        _fsOps.deleteIfExists(tr);
+                        _fsOps.deleteIfExists(_stormRoot);
+                    }
+                }
+                return null;
+            } catch (Exception e) {
+                LOG.warn("Caught Exception While Downloading (rethrowing)... ", e);
+                throw e;
+            }
+        }
+    }
+    
+    private class DownloadBaseBlobsLocal extends DownloadBaseBlobsDistributed {
+
+        public DownloadBaseBlobsLocal(String topologyId) throws IOException {
+            super(topologyId);
+        }
+        
+        @Override
+        protected void downloadBaseBlobs(File tmproot) throws Exception {
+            _fsOps.forceMkdir(tmproot);
+            String stormCodeKey = ConfigUtils.masterStormCodeKey(_topologyId);
+            String stormConfKey = ConfigUtils.masterStormConfKey(_topologyId);
+            File codePath = new File(ConfigUtils.supervisorStormCodePath(tmproot.getAbsolutePath()));
+            File confPath = new File(ConfigUtils.supervisorStormConfPath(tmproot.getAbsolutePath()));
+            BlobStore blobStore = Utils.getNimbusBlobStore(_conf, null);
+            try {
+                try (OutputStream codeOutStream = _fsOps.getOutputStream(codePath)){
+                    blobStore.readBlobTo(stormCodeKey, codeOutStream, null);
+                }
+                try (OutputStream confOutStream = _fsOps.getOutputStream(confPath)) {
+                    blobStore.readBlobTo(stormConfKey, confOutStream, null);
+                }
+            } finally {
+                blobStore.shutdown();
+            }
+
+            ClassLoader classloader = Thread.currentThread().getContextClassLoader();
+            String resourcesJar = AsyncLocalizer.resourcesJar();
+            URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR);
+
+            String targetDir = tmproot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
+
+            if (resourcesJar != null) {
+                LOG.info("Extracting resources from jar at {} to {}", resourcesJar, targetDir);
+                Utils.extractDirFromJar(resourcesJar, ConfigUtils.RESOURCES_SUBDIR, _stormRoot);
+            } else if (url != null) {
+                LOG.info("Copying resources at {} to {} ", url.toString(), targetDir);
+                if ("jar".equals(url.getProtocol())) {
+                    JarURLConnection urlConnection = (JarURLConnection) url.openConnection();
+                    Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), ConfigUtils.RESOURCES_SUBDIR, _stormRoot);
+                } else {
+                    _fsOps.copyDirectory(new File(url.getFile()), new File(targetDir));
+                }
+            }
+        }
+    }
+    
+    private class DownloadBlobs implements Callable<Void> {
+        private final String _topologyId;
+
+        public DownloadBlobs(String topologyId) {
+            _topologyId = topologyId;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            try {
+                String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);
+                Map<String, Object> stormConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+
+                @SuppressWarnings("unchecked")
+                Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+                String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+                String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+
+                List<LocalResource> localResourceList = new ArrayList<>();
+                if (blobstoreMap != null) {
+                    List<LocalResource> tmp = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+                    if (tmp != null) {
+                        localResourceList.addAll(tmp);
+                    }
+                }
+
+                StormTopology stormCode = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _fsOps);
+                List<String> dependencies = new ArrayList<>();
+                if (stormCode.is_set_dependency_jars()) {
+                    dependencies.addAll(stormCode.get_dependency_jars());
+                }
+                if (stormCode.is_set_dependency_artifacts()) {
+                    dependencies.addAll(stormCode.get_dependency_artifacts());
+                }
+                for (String dependency : dependencies) {
+                    localResourceList.add(new LocalResource(dependency, false));
+                }
+
+                if (!localResourceList.isEmpty()) {
+                    File userDir = _localizer.getLocalUserFileCacheDir(user);
+                    if (!_fsOps.fileExists(userDir)) {
+                        _fsOps.forceMkdir(userDir);
+                    }
+                    List<LocalizedResource> localizedResources = _localizer.getBlobs(localResourceList, user, topoName, userDir);
+                    _fsOps.setupBlobPermissions(userDir, user);
+                    for (LocalizedResource localizedResource : localizedResources) {
+                        String keyName = localizedResource.getKey();
+                        //The sym link we are pointing to
+                        File rsrcFilePath = new File(localizedResource.getCurrentSymlinkPath());
+
+                        String symlinkName = null;
+                        if (blobstoreMap != null) {
+                            Map<String, Object> blobInfo = blobstoreMap.get(keyName);
+                            if (blobInfo != null && blobInfo.containsKey("localname")) {
+                                symlinkName = (String) blobInfo.get("localname");
+                            } else {
+                                symlinkName = keyName;
+                            }
+                        } else {
+                            // all things are from dependencies
+                            symlinkName = keyName;
+                        }
+                        _fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath);
+                    }
+                }
+
+                return null;
+            } catch (Exception e) {
+                LOG.warn("Caught Exception While Downloading (rethrowing)... ", e);
+                throw e;
+            }
+        }
+    }
+    
+    //Visible for testing
+    AsyncLocalizer(Map<String, Object> conf, Localizer localizer, AdvancedFSOps ops) {
+        _conf = conf;
+        _isLocalMode = ConfigUtils.isLocalMode(conf);
+        _localizer = localizer;
+        _execService = Executors.newFixedThreadPool(1,  
+                new ThreadFactoryBuilder()
+                .setNameFormat("Async Localizer")
+                .build());
+        _basicPending = new HashMap<>();
+        _blobPending = new HashMap<>();
+        _fsOps = ops;
+    }
+    
+    public AsyncLocalizer(Map<String, Object> conf, Localizer localizer) {
+        this(conf, localizer, AdvancedFSOps.make(conf));
+    }
+
+    @Override
+    public synchronized Future<Void> requestDownloadBaseTopologyBlobs(final LocalAssignment assignment, final int port) throws IOException {
+        final String topologyId = assignment.get_topology_id();
+        LocalDownloadedResource localResource = _basicPending.get(topologyId);
+        if (localResource == null) {
+            Callable<Void> c;
+            if (_isLocalMode) {
+                c = new DownloadBaseBlobsLocal(topologyId);
+            } else {
+                c = new DownloadBaseBlobsDistributed(topologyId);
+            }
+            localResource = new LocalDownloadedResource(_execService.submit(c));
+            _basicPending.put(topologyId, localResource);
+        }
+        Future<Void> ret = localResource.reserve(port, assignment);
+        LOG.debug("Reserved basic {} {}", topologyId, localResource);
+        return ret;
+    }
+
+    private static String resourcesJar() throws IOException {
+        String path = Utils.currentClasspath();
+        if (path == null) {
+            return null;
+        }
+        
+        for (String jpath : path.split(File.pathSeparator)) {
+            if (jpath.endsWith(".jar")) {
+                if (Utils.zipDoesContainDir(jpath, ConfigUtils.RESOURCES_SUBDIR)) {
+                    return jpath;
+                }
+            }
+        }
+        return null;
+    }
+    
+    @Override
+    public synchronized void recoverRunningTopology(LocalAssignment assignment, int port) {
+        final String topologyId = assignment.get_topology_id();
+        LocalDownloadedResource localResource = _basicPending.get(topologyId);
+        if (localResource == null) {
+            localResource = new LocalDownloadedResource(new AllDoneFuture());
+            _basicPending.put(topologyId, localResource);
+        }
+        localResource.reserve(port, assignment);
+        LOG.debug("Recovered basic {} {}", topologyId, localResource);
+        
+        localResource = _blobPending.get(topologyId);
+        if (localResource == null) {
+            localResource = new LocalDownloadedResource(new AllDoneFuture());
+            _blobPending.put(topologyId, localResource);
+        }
+        localResource.reserve(port, assignment);
+        LOG.debug("Recovered blobs {} {}", topologyId, localResource);
+    }
+    
+    @Override
+    public synchronized Future<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int port) {
+        final String topologyId = assignment.get_topology_id();
+        LocalDownloadedResource localResource = _blobPending.get(topologyId);
+        if (localResource == null) {
+            Callable<Void> c = new DownloadBlobs(topologyId);
+            localResource = new LocalDownloadedResource(_execService.submit(c));
+            _blobPending.put(topologyId, localResource);
+        }
+        Future<Void> ret = localResource.reserve(port, assignment);
+        LOG.debug("Reserved blobs {} {}", topologyId, localResource);
+        return ret;
+    }
+
+    @Override
+    public synchronized void releaseSlotFor(LocalAssignment assignment, int port) throws IOException {
+        final String topologyId = assignment.get_topology_id();
+        LOG.debug("Releasing slot for {} {}", topologyId, port);
+        LocalDownloadedResource localResource = _blobPending.get(topologyId);
+        if (localResource == null || !localResource.release(port, assignment)) {
+            LOG.warn("Released blob reference {} {} for something that we didn't have {}", topologyId, port, localResource);
+        } else if (localResource.isDone()){
+            LOG.info("Released blob reference {} {} Cleaning up BLOB references...", topologyId, port);
+            _blobPending.remove(topologyId);
+            Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, topologyId);
+            @SuppressWarnings("unchecked")
+            Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+            if (blobstoreMap != null) {
+                String user = (String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+                String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
+                
+                for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
+                    String key = entry.getKey();
+                    Map<String, Object> blobInfo = entry.getValue();
+                    try {
+                        _localizer.removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo));
+                    } catch (Exception e) {
+                        throw new IOException(e);
+                    }
+                }
+            }
+        } else {
+            LOG.debug("Released blob reference {} {} still waiting on {}", topologyId, port, localResource);
+        }
+        
+        localResource = _basicPending.get(topologyId);
+        if (localResource == null || !localResource.release(port, assignment)) {
+            LOG.warn("Released basic reference {} {} for something that we didn't have {}", topologyId, port, localResource);
+        } else if (localResource.isDone()){
+            LOG.info("Released blob reference {} {} Cleaning up basic files...", topologyId, port);
+            _basicPending.remove(topologyId);
+            String path = ConfigUtils.supervisorStormDistRoot(_conf, topologyId);
+            _fsOps.deleteIfExists(new File(path), null, "rmr "+topologyId);
+        } else {
+            LOG.debug("Released basic reference {} {} still waiting on {}", topologyId, port, localResource);
+        }
+    }
+
+    @Override
+    public synchronized void cleanupUnusedTopologies() throws IOException {
+        File distRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf));
+        LOG.info("Cleaning up unused topologies in {}", distRoot);
+        File[] children = distRoot.listFiles();
+        if (children != null) {
+            for (File topoDir : children) {
+                String topoId = URLDecoder.decode(topoDir.getName(), "UTF-8");
+                if (_basicPending.get(topoId) == null && _blobPending.get(topoId) == null) {
+                    _fsOps.deleteIfExists(topoDir, null, "rmr " + topoId);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        _execService.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java b/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java
new file mode 100644
index 0000000..7105095
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.io.IOException;
+import java.util.concurrent.Future;
+
+import org.apache.storm.generated.LocalAssignment;
+
+/**
+ * Download blobs from the blob store and keep them up to date.
+ */
+public interface ILocalizer {
+
+    /**
+     * Recover a running topology by incrementing references for what it has already downloaded.
+     * @param assignment the assignment the resources are for
+     * @param port the port the topology is running in.
+     */
+    void recoverRunningTopology(LocalAssignment assignemnt, int port);
+    
+    /**
+     * Download storm.jar, storm.conf, and storm.ser for this topology if not done so already,
+     * and inc a reference count on them.
+     * @param assignment the assignment the resources are for
+     * @param port the port the topology is running on
+     * @return a future to let you know when they are done.
+     * @throws IOException on error 
+     */
+    Future<Void> requestDownloadBaseTopologyBlobs(LocalAssignment assignment, int port) throws IOException;
+
+    /**
+     * Download the blobs for this topology (reading in list in from the config)
+     * and inc reference count for these blobs.
+     * PRECONDITION: requestDownloadBaseTopologyBlobs has completed downloading.
+     * @param assignment the assignment the resources are for
+     * @param port the port the topology is running on
+     * @return a future to let you know when they are done.
+     */
+    Future<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int port);
+    
+    /**
+     * dec reference count on all blobs associated with this topology.
+     * @param assignment the assignment the resources are for
+     * @param port the port the topology is running on
+     * @throws IOException on any error
+     */
+    void releaseSlotFor(LocalAssignment assignment, int port) throws IOException;
+    
+    /**
+     * Clean up any topologies that are not in use right now.
+     * @throws IOException on any error.
+     */
+    void cleanupUnusedTopologies() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java b/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java
new file mode 100644
index 0000000..570c951
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.storm.generated.LocalAssignment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LocalDownloadedResource {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalDownloadedResource.class);
+    private static class NoCancelFuture<T> implements Future<T> {
+        private final Future<T> _wrapped;
+        
+        public NoCancelFuture(Future<T> wrapped) {
+            _wrapped = wrapped;
+        }
+        
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            //cancel not currently supported
+            return false;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return false;
+        }
+
+        @Override
+        public boolean isDone() {
+            return _wrapped.isDone();
+        }
+
+        @Override
+        public T get() throws InterruptedException, ExecutionException {
+            return _wrapped.get();
+        }
+
+        @Override
+        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            return _wrapped.get(timeout, unit);
+        }
+    }
+    private static class PortNAssignment {
+        private final int _port;
+        private final LocalAssignment _assignment;
+        
+        public PortNAssignment(int port, LocalAssignment assignment) {
+            _port = port;
+            _assignment = assignment;
+        }
+        
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof PortNAssignment)) {
+                return false;
+            }
+            PortNAssignment pna = (PortNAssignment) other;
+            return pna._port == _port && _assignment.equals(pna._assignment); 
+        }
+        
+        @Override
+        public int hashCode() {
+            return (17 * _port) + _assignment.hashCode();
+        }
+        
+        @Override
+        public String toString() {
+            return "{"+ _port + " " + _assignment +"}";
+        }
+    }
+    private final Future<Void> _pending;
+    private final Set<PortNAssignment> _references;
+    private boolean _isDone;
+    
+    
+    public LocalDownloadedResource(Future<Void> pending) {
+        _pending = new NoCancelFuture<>(pending);
+        _references = new HashSet<>();
+        _isDone = false;
+    }
+
+    /**
+     * Reserve the resources
+     * @param port the port this is for
+     * @param la the assignment this is for
+     * @return a future that can be used to track it being downloaded.
+     */
+    public synchronized Future<Void> reserve(int port, LocalAssignment la) {
+        PortNAssignment pna = new PortNAssignment(port, la);
+        if (!_references.add(pna)) {
+            LOG.warn("Resources {} already reserved {} for this topology", pna, _references);
+        }
+        return _pending;
+    }
+    
+    /**
+     * Release a port from the reference count, and update isDone if all is done.
+     * @param port the port to release
+     * @param la the assignment to release
+     * @return true if the port was being counted else false
+     */
+    public synchronized boolean release(int port, LocalAssignment la) {
+        PortNAssignment pna = new PortNAssignment(port, la);
+        boolean ret = _references.remove(pna);
+        if (ret && _references.isEmpty()) {
+            _isDone = true;
+        }
+        return ret;
+    }
+    
+    /**
+     * Is this has been cleaned up completely.
+     * @return true if it is done else false
+     */
+    public synchronized boolean isDone() {
+        return _isDone;
+    }
+
+    @Override
+    public String toString() {
+        return _references.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
index 380e777..9f42b47 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
@@ -93,7 +93,7 @@ public class LocalizedResourceRetentionSet {
           i.remove();
         } else {
           // since it failed to delete add it back so it gets retried
-          set.addResource(resource.getKey(), resource, resource.isUncompressed());
+          set.add(resource.getKey(), resource, resource.isUncompressed());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java
index b5f00c3..62d5b2f 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java
@@ -57,7 +57,7 @@ public class LocalizedResourceSet {
     return _localrsrcFiles.get(name);
   }
 
-  public void updateResource(String resourceName, LocalizedResource updatedResource,
+  public void putIfAbsent(String resourceName, LocalizedResource updatedResource,
                             boolean uncompress) {
     if (uncompress) {
       _localrsrcArchives.putIfAbsent(resourceName, updatedResource);
@@ -66,7 +66,7 @@ public class LocalizedResourceSet {
     }
   }
 
-  public void addResource(String resourceName, LocalizedResource newResource, boolean uncompress) {
+  public void add(String resourceName, LocalizedResource newResource, boolean uncompress) {
     if (uncompress) {
       _localrsrcArchives.put(resourceName, newResource);
     } else {
@@ -76,9 +76,9 @@ public class LocalizedResourceSet {
 
   public boolean exists(String resourceName, boolean uncompress) {
     if (uncompress) {
-      return (_localrsrcArchives.get(resourceName) != null);
+      return _localrsrcArchives.containsKey(resourceName);
     }
-    return (_localrsrcFiles.get(resourceName) != null);
+    return _localrsrcFiles.containsKey(resourceName);
   }
 
   public boolean remove(LocalizedResource resource) {

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
index b91cecb..0135397 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
@@ -63,20 +63,6 @@ import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
  */
 public class Localizer {
   public static final Logger LOG = LoggerFactory.getLogger(Localizer.class);
-
-  private Map _conf;
-  private int _threadPoolSize;
-  // thread pool for initial download
-  private ExecutorService _execService;
-  // thread pool for updates
-  private ExecutorService _updateExecService;
-  private int _blobDownloadRetries;
-
-  // track resources - user to resourceSet
-  private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new
-      ConcurrentHashMap<String, LocalizedResourceSet>();
-
-  private String _localBaseDir;
   public static final String USERCACHE = "usercache";
   public static final String FILECACHE = "filecache";
 
@@ -85,13 +71,29 @@ public class Localizer {
   public static final String ARCHIVESDIR = "archives";
 
   private static final String TO_UNCOMPRESS = "_tmp_";
+  
+  
+  
+  private final Map<String, Object> _conf;
+  private final int _threadPoolSize;
+  // thread pool for initial download
+  private final ExecutorService _execService;
+  // thread pool for updates
+  private final ExecutorService _updateExecService;
+  private final int _blobDownloadRetries;
+
+  // track resources - user to resourceSet
+  private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new
+      ConcurrentHashMap<String, LocalizedResourceSet>();
+
+  private final String _localBaseDir;
 
   // cleanup
   private long _cacheTargetSize;
   private long _cacheCleanupPeriod;
   private ScheduledExecutorService _cacheCleanupService;
 
-  public Localizer(Map conf, String baseDir) {
+  public Localizer(Map<String, Object> conf, String baseDir) {
     _conf = conf;
     _localBaseDir = baseDir;
     // default cache size 10GB, converted to Bytes
@@ -189,7 +191,7 @@ public class Localizer {
         LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path);
         LocalizedResource lrsrc = new LocalizedResource(new File(path).getName(), path,
             uncompress);
-        lrsrcSet.addResource(lrsrc.getKey(), lrsrc, uncompress);
+        lrsrcSet.add(lrsrc.getKey(), lrsrc, uncompress);
       }
     }
   }
@@ -369,7 +371,7 @@ public class Localizer {
           if (newlrsrcSet == null) {
             newlrsrcSet = newSet;
           }
-          newlrsrcSet.updateResource(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
+          newlrsrcSet.putIfAbsent(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
           results.add(lrsrc);
         }
         catch (ExecutionException e) {
@@ -451,7 +453,7 @@ public class Localizer {
       for (Future<LocalizedResource> futureRsrc: futures) {
         LocalizedResource lrsrc = futureRsrc.get();
         lrsrc.addReference(topo);
-        lrsrcSet.addResource(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
+        lrsrcSet.add(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
         results.add(lrsrc);
       }
     } catch (ExecutionException e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java b/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
new file mode 100644
index 0000000..d433920
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric;
+
+import com.codahale.metrics.*;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unchecked")
+public class StormMetricsRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistry.class);
+    public static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry();
+
+    public static Meter registerMeter(String name) {
+        Meter meter = new Meter();
+        return register(name, meter);
+    }
+
+    // TODO: should replace Callable to Gauge<Integer> when nimbus.clj is translated to java
+    public static Gauge<Integer> registerGauge(final String name, final Callable fn) {
+        Gauge<Integer> gauge = new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                try {
+                    return (Integer) fn.call();
+                } catch (Exception e) {
+                    LOG.error("Error getting gauge value for {}", name, e);
+                }
+                return 0;
+            }
+        };
+        return register(name, gauge);
+    }
+
+    public static Histogram registerHistogram(String name, Reservoir reservoir) {
+        Histogram histogram = new Histogram(reservoir);
+        return register(name, histogram);
+    }
+
+    public static void startMetricsReporters(Map stormConf) {
+        for (PreparableReporter reporter : MetricsUtils.getPreparableReporters(stormConf)) {
+            startMetricsReporter(reporter, stormConf);
+        }
+    }
+
+    private static void startMetricsReporter(PreparableReporter reporter, Map stormConf) {
+        reporter.prepare(StormMetricsRegistry.DEFAULT_REGISTRY, stormConf);
+        reporter.start();
+        LOG.info("Started statistics report plugin...");
+    }
+
+    private static <T extends Metric> T register(String name, T metric) {
+        T ret;
+        try {
+            ret = DEFAULT_REGISTRY.register(name, metric);
+        } catch (IllegalArgumentException e) {
+            // swallow IllegalArgumentException when the metric exists already
+            ret = (T) DEFAULT_REGISTRY.getMetrics().get(name);
+            if (ret == null) {
+                throw e;
+            } else {
+                LOG.warn("Metric {} has already been registered", name);
+            }
+        }
+        return ret;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/nimbus/ILeaderElector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/nimbus/ILeaderElector.java b/storm-core/src/jvm/org/apache/storm/nimbus/ILeaderElector.java
index 3c729ec..e8789df 100644
--- a/storm-core/src/jvm/org/apache/storm/nimbus/ILeaderElector.java
+++ b/storm-core/src/jvm/org/apache/storm/nimbus/ILeaderElector.java
@@ -37,14 +37,14 @@ public interface ILeaderElector extends Closeable {
      * check isLeader() to perform any leadership action. This method can be called
      * multiple times so it needs to be idempotent.
      */
-    void addToLeaderLockQueue();
+    void addToLeaderLockQueue() throws Exception;
 
     /**
      * Removes the caller from the leader lock queue. If the caller is leader
      * also releases the lock. This method can be called multiple times so it needs
      * to be idempotent.
      */
-    void removeFromLeaderLockQueue();
+    void removeFromLeaderLockQueue() throws Exception;
 
     /**
      *
@@ -62,7 +62,7 @@ public interface ILeaderElector extends Closeable {
      *
      * @return list of current nimbus addresses, includes leader.
      */
-    List<NimbusInfo> getAllNimbuses();
+    List<NimbusInfo> getAllNimbuses()throws Exception;
 
     /**
      * Method called to allow for cleanup. once close this object can not be reused.

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java b/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
index 32d28fe..8415ce3 100644
--- a/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
@@ -134,6 +134,21 @@ public class SerializationFactory {
         Map<String, Map<String, Integer>> streamNametoId = new HashMap<>();
         Map<String, Map<Integer, String>> streamIdToName = new HashMap<>();
 
+        /**
+         * "{:a 1  :b 2} -> {1 :a  2 :b}"
+         *
+         * Note: Only one key wins if there are duplicate values.
+         *       Which key wins is indeterminate:
+         * "{:a 1  :b 1} -> {1 :a} *or* {1 :b}"
+         */
+        private static <K, V> Map<V, K> simpleReverseMap(Map<K, V> map) {
+            Map<V, K> ret = new HashMap<V, K>();
+            for (Map.Entry<K, V> entry : map.entrySet()) {
+                ret.put(entry.getValue(), entry.getKey());
+            }
+            return ret;
+        }
+
         public IdDictionary(StormTopology topology) {
             List<String> componentNames = new ArrayList<>(topology.get_spouts().keySet());
             componentNames.addAll(topology.get_bolts().keySet());
@@ -143,7 +158,7 @@ public class SerializationFactory {
                 ComponentCommon common = Utils.getComponentCommon(topology, name);
                 List<String> streams = new ArrayList<>(common.get_streams().keySet());
                 streamNametoId.put(name, idify(streams));
-                streamIdToName.put(name, Utils.reverseMap(streamNametoId.get(name)));
+                streamIdToName.put(name, simpleReverseMap(streamNametoId.get(name)));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/318ab5f3/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java b/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
index 2f06102..0284725 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
@@ -17,17 +17,18 @@
  */
 package org.apache.storm.testing;
 
-import org.apache.storm.topology.OutputFieldsDeclarer;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.InprocMessaging;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
 
 
 public class FeederSpout extends BaseRichSpout {
@@ -51,7 +52,15 @@ public class FeederSpout extends BaseRichSpout {
 
     public void feed(List<Object> tuple, Object msgId) {
         InprocMessaging.sendMessage(_id, new Values(tuple, msgId));
-    }    
+    }
+    
+    public void feedNoWait(List<Object> tuple, Object msgId) {
+        InprocMessaging.sendMessageNoWait(_id, new Values(tuple, msgId));
+    }
+    
+    public void waitForReader() {
+        InprocMessaging.waitForReader(_id);
+    }
     
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         _collector = collector;
@@ -63,17 +72,11 @@ public class FeederSpout extends BaseRichSpout {
 
     public void nextTuple() {
         List<Object> toEmit = (List<Object>) InprocMessaging.pollMessage(_id);
-        if(toEmit!=null) {
+        if (toEmit!=null) {
             List<Object> tuple = (List<Object>) toEmit.get(0);
             Object msgId = toEmit.get(1);
             
             _collector.emit(tuple, msgId);
-        } else {
-            try {
-                Thread.sleep(1);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
         }
     }
 


Mime
View raw message