cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1004259 - in /cassandra/trunk/src/java/org/apache/cassandra: concurrent/ db/ gms/ io/ io/sstable/ locator/ net/ service/ utils/
Date Mon, 04 Oct 2010 14:36:29 GMT
Author: jbellis
Date: Mon Oct  4 14:36:28 2010
New Revision: 1004259

URL: http://svn.apache.org/viewvc?rev=1004259&view=rev
Log:
replace one-off Timers with a central ScheduledExecutorService.  patch by jbellis; reviewed
by Stu Hood for CASSANDRA-1288

Added:
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
Removed:
    cassandra/trunk/src/java/org/apache/cassandra/service/LoadDisseminator.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ResourceWatcher.java

Added: cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java?rev=1004259&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
Mon Oct  4 14:36:28 2010
@@ -0,0 +1,93 @@
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+import org.apache.log4j.Logger;
+
+public class RetryingScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor
+{
+    protected static Logger logger = Logger.getLogger(RetryingScheduledThreadPoolExecutor.class);
+
+    public RetryingScheduledThreadPoolExecutor(String threadPoolName, int priority)
+    {
+        this(1, threadPoolName, priority);
+    }
+
+    public RetryingScheduledThreadPoolExecutor(int corePoolSize, String threadPoolName, int
priority)
+    {
+        super(corePoolSize, new NamedThreadFactory(threadPoolName, priority));
+    }
+
+    public RetryingScheduledThreadPoolExecutor(String threadPoolName)
+    {
+        this(1, threadPoolName, Thread.NORM_PRIORITY);
+    }
+
+    @Override
+    protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable,
RunnableScheduledFuture<V> task)
+    {
+        return new LoggingScheduledFuture<V>(task);
+    }
+
+    private class LoggingScheduledFuture<V> implements RunnableScheduledFuture<V>
+    {
+        private final RunnableScheduledFuture<V> task;
+
+        public LoggingScheduledFuture(RunnableScheduledFuture<V> task)
+        {
+            this.task = task;
+        }
+
+        public boolean isPeriodic()
+        {
+            return task.isPeriodic();
+        }
+
+        public long getDelay(TimeUnit unit)
+        {
+            return task.getDelay(unit);
+        }
+
+        public int compareTo(Delayed o)
+        {
+            return task.compareTo(o);
+        }
+
+        public void run()
+        {
+            try
+            {
+                task.run();
+            }
+            catch (Exception e)
+            {
+                logger.error("error running scheduled task", e);
+            }
+        }
+
+        public boolean cancel(boolean mayInterruptIfRunning)
+        {
+            return task.cancel(mayInterruptIfRunning);
+        }
+
+        public boolean isCancelled()
+        {
+            return task.isCancelled();
+        }
+
+        public boolean isDone()
+        {
+            return task.isDone();
+        }
+
+        public V get() throws InterruptedException, ExecutionException
+        {
+            return task.get();
+        }
+
+        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException
+        {
+            return task.get(timeout, unit);
+        }
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1004259&r1=1004258&r2=1004259&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon Oct  4 14:36:28 2010
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.base.Function;
@@ -57,8 +59,6 @@ public class Table
     /* accesses to CFS.memtable should acquire this for thread safety.  only switchMemtable
should aquire the writeLock. */
     static final ReentrantReadWriteLock flusherLock = new ReentrantReadWriteLock(true);
 
-    private static Timer flushTimer = new Timer("FLUSH-TIMER");
-
     // This is a result of pushing down the point in time when storage directories get created.
 It used to happen in
     // CassandraDaemon, but it is possible to call Table.open without a running daemon, so
it made sense to ensure
     // proper directories here.
@@ -83,9 +83,9 @@ public class Table
     public final Map<Integer, ColumnFamilyStore> columnFamilyStores = new HashMap<Integer,
ColumnFamilyStore>(); // TODO make private again
     // cache application CFs since Range queries ask for them a _lot_
     private SortedSet<String> applicationColumnFamilies;
-    private final TimerTask flushTask;
     private final Object[] indexLocks;
-    
+    private ScheduledFuture<?> flushTask;
+
     public static Table open(String table)
     {
         Table tableInstance = instances.get(table);
@@ -114,7 +114,7 @@ public class Table
             Table t = instances.remove(table);
             if (t != null)
             {
-                t.flushTask.cancel();
+                t.flushTask.cancel(false);
                 for (ColumnFamilyStore cfs : t.getColumnFamilyStores())
                     t.unloadCf(cfs);
             }
@@ -250,7 +250,7 @@ public class Table
 
         // check 10x as often as the lifetime, so we can exceed lifetime by 10% at most
         int checkMs = DatabaseDescriptor.getMemtableLifetimeMS() / 10;
-        flushTask = new TimerTask()
+        Runnable runnable = new Runnable()
         {
             public void run()
             {
@@ -260,7 +260,7 @@ public class Table
                 }
             }
         };
-        flushTimer.schedule(flushTask, checkMs, checkMs);
+        flushTask = StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, checkMs,
checkMs, TimeUnit.MILLISECONDS);
     }
     
     public void dropCf(Integer cfId) throws IOException

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1004259&r1=1004258&r2=1004259&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon Oct  4 14:36:28 2010
@@ -18,22 +18,22 @@
 
 package org.apache.cassandra.gms;
 
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
 import java.util.*;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.net.InetAddress;
+import java.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * This module is responsible for Gossiping information for the local endpoint. This abstraction
  * maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
@@ -48,8 +48,9 @@ import org.slf4j.LoggerFactory;
 public class Gossiper implements IFailureDetectionEventListener
 {
     static final ApplicationState[] STATES = ApplicationState.values();
+    private ScheduledFuture<?> scheduledGossipTask;
 
-    private class GossipTimerTask extends TimerTask
+    private class GossipTask implements Runnable
     {
         public void run()
         {
@@ -107,7 +108,6 @@ public class Gossiper implements IFailur
     private static Logger logger_ = LoggerFactory.getLogger(Gossiper.class);
     public static final Gossiper instance = new Gossiper();
 
-    private Timer gossipTimer_;
     private InetAddress localEndpoint_;
     private long aVeryLongTime_;
     private long FatClientTimeout_;
@@ -143,7 +143,6 @@ public class Gossiper implements IFailur
 
     private Gossiper()
     {
-        gossipTimer_ = new Timer(false);
         // 3 days
         aVeryLongTime_ = 259200 * 1000;
         // 1 hour
@@ -859,8 +858,10 @@ public class Gossiper implements IFailur
             endpointStateMap_.put(localEndpoint_, localState);
         }
 
-        /* starts a timer thread */
-        gossipTimer_.schedule( new GossipTimerTask(), Gossiper.intervalInMillis_, Gossiper.intervalInMillis_);
+        scheduledGossipTask = StorageService.scheduledTasks.scheduleWithFixedDelay(new GossipTask(),
+                                                                                   Gossiper.intervalInMillis_,
+                                                                                   Gossiper.intervalInMillis_,
+                                                                                   TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -890,8 +891,7 @@ public class Gossiper implements IFailur
 
     public void stop()
     {
-        gossipTimer_.cancel();
-        gossipTimer_ = new Timer(false); // makes the Gossiper reentrant.
+        scheduledGossipTask.cancel(false);
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java?rev=1004259&r1=1004258&r2=1004259&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java Mon Oct  4 14:36:28
2010
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
-
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.WrappedRunnable;

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java?rev=1004259&r1=1004258&r2=1004259&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java
Mon Oct  4 14:36:28 2010
@@ -20,27 +20,22 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
-import java.io.IOError;
 import java.io.IOException;
 import java.lang.ref.PhantomReference;
 import java.lang.ref.ReferenceQueue;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.io.DeletionService;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.StorageService;
 
 public class SSTableDeletingReference extends PhantomReference<SSTableReader>
 {
     private static final Logger logger = LoggerFactory.getLogger(SSTableDeletingReference.class);
 
-    private static final Timer timer = new Timer("SSTABLE-CLEANUP-TIMER");
     public static final int RETRY_DELAY = 10000;
 
     private final SSTableTracker tracker;
@@ -70,15 +65,14 @@ public class SSTableDeletingReference ex
             // this is tricky because the mmapping might not have been finalized yet,
             // and delete will fail (on Windows) until it is.  additionally, we need to make
sure to
             // delete the data file first, so on restart the others will be recognized as
GCable
-            timer.schedule(new CleanupTask(), RETRY_DELAY);
+            StorageService.scheduledTasks.schedule(new CleanupTask(), RETRY_DELAY, TimeUnit.MILLISECONDS);
         }
     }
 
-    private class CleanupTask extends TimerTask
+    private class CleanupTask implements Runnable
     {
         int attempts = 0;
 
-        @Override
         public void run()
         {
             // retry until we can successfully delete the DATA component: see above
@@ -87,12 +81,11 @@ public class SSTableDeletingReference ex
             {
                 if (attempts++ < DeletionService.MAX_RETRIES)
                 {
-                    timer.schedule(new CleanupTask(), RETRY_DELAY); // re-using TimerTasks
is not allowed
+                    StorageService.scheduledTasks.schedule(this, RETRY_DELAY, TimeUnit.MILLISECONDS);
                     return;
                 }
                 else
                 {
-                    // don't throw an exception; it will prevent any future tasks from running
in this Timer
                     logger.error("Unable to delete " + datafile + " (it will be removed on
server restart)");
                     return;
                 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1004259&r1=1004258&r2=1004259&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Mon Oct
 4 14:36:28 2010
@@ -22,6 +22,7 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import java.lang.management.ManagementFactory;
@@ -53,14 +54,14 @@ public class DynamicEndpointSnitch exten
     public DynamicEndpointSnitch(IEndpointSnitch snitch)
     {
         subsnitch = snitch;
-        TimerTask update = new TimerTask()
+        Runnable update = new Runnable()
         {
             public void run()
             {
                 updateScores();
             }
         };
-        TimerTask reset = new TimerTask()
+        Runnable reset = new Runnable()
         {
             public void run()
             {
@@ -69,9 +70,8 @@ public class DynamicEndpointSnitch exten
                 reset();
             }
         };
-        Timer timer = new Timer("DynamicEndpointSnitch");
-        timer.schedule(update, UPDATE_INTERVAL_IN_MS, UPDATE_INTERVAL_IN_MS);
-        timer.schedule(reset, RESET_INTERVAL_IN_MS, RESET_INTERVAL_IN_MS);
+        StorageService.scheduledTasks.scheduleWithFixedDelay(update, UPDATE_INTERVAL_IN_MS,
UPDATE_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+        StorageService.scheduledTasks.scheduleWithFixedDelay(reset, RESET_INTERVAL_IN_MS,
RESET_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1004259&r1=1004258&r2=1004259&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Oct  4 14:36:28
2010
@@ -29,7 +29,9 @@ import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ServerSocketChannel;
 import java.security.MessageDigest;
-import java.util.*;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -40,7 +42,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -104,15 +105,14 @@ public class MessagingService implements
         taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()));
 
         streamExecutor_ = new DebuggableThreadPoolExecutor("Streaming", DatabaseDescriptor.getCompactionThreadPriority());
-        TimerTask logDropped = new TimerTask()
+        Runnable logDropped = new Runnable()
         {
             public void run()
             {
                 logDroppedMessages();
             }
         };
-        Timer timer = new Timer("DroppedMessagesLogger");
-        timer.schedule(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS);
+        StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS,
LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java?rev=1004259&r1=1004258&r2=1004259&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java Mon Oct  4 14:36:28
2010
@@ -20,25 +20,25 @@ package org.apache.cassandra.service;
  * 
  */
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryUsage;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.*;
-import org.apache.cassandra.concurrent.IExecutorMBean;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.CompactionManager;
-import org.apache.cassandra.net.MessagingService;
-
-import java.lang.management.MemoryUsage;
-import java.lang.management.ManagementFactory;
+import java.util.concurrent.TimeUnit;
 import javax.management.JMX;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
 import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.IExecutorMBean;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.net.MessagingService;
 
 public class GCInspector
 {
@@ -90,14 +90,14 @@ public class GCInspector
         // don't bother starting a thread that will do nothing.
         if (beans.size() == 0)
             return;         
-        TimerTask t = new TimerTask()
+        Runnable t = new Runnable()
         {
             public void run()
             {
                 logIntervalGCStats();
             }
         };
-        new Timer("GC inspection").schedule(t, INTERVAL_IN_MS, INTERVAL_IN_MS);
+        StorageService.scheduledTasks.scheduleWithFixedDelay(t, INTERVAL_IN_MS, INTERVAL_IN_MS,
TimeUnit.MILLISECONDS);
     }
 
     private void logIntervalGCStats()

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1004259&r1=1004258&r2=1004259&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Mon Oct
 4 14:36:28 2010
@@ -18,18 +18,15 @@
 
 package org.apache.cassandra.service;
 
+import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.cassandra.gms.*;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-
-import java.net.InetAddress;
+import org.apache.cassandra.gms.*;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -178,9 +175,6 @@ public class StorageLoadBalancer impleme
     /* This map is a clone of the one above and is used for various calculations during LB
operation */
     private Map<InetAddress, Double> loadInfo2_ = new HashMap<InetAddress, Double>();
 
-    /* Timer is used to disseminate load information */
-    private Timer loadTimer_ = new Timer(false);
-
     private StorageLoadBalancer()
     {
         Gossiper.instance.register(this);
@@ -347,7 +341,17 @@ public class StorageLoadBalancer impleme
     {
         // send the first broadcast "right away" (i.e., in 2 gossip heartbeats, when we should
have someone to talk to);
         // after that send every BROADCAST_INTERVAL.
-        loadTimer_.schedule(new LoadDisseminator(), 2 * Gossiper.intervalInMillis_, BROADCAST_INTERVAL);
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                if (logger_.isDebugEnabled())
+                    logger_.debug("Disseminating load info ...");
+                Gossiper.instance.addLocalApplicationState(ApplicationState.LOAD,
+                                                           StorageService.valueFactory.load(StorageService.instance.getLoad()));
+            }
+        };
+        StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, 2 * Gossiper.intervalInMillis_,
BROADCAST_INTERVAL, TimeUnit.MILLISECONDS);
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1004259&r1=1004258&r2=1004259&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Oct  4 14:36:28
2010
@@ -36,10 +36,7 @@ import org.apache.commons.lang.StringUti
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.migration.AddKeyspace;
@@ -140,6 +137,8 @@ public class StorageService implements I
     private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner();
     public static VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner_);
 
+    public static RetryingScheduledThreadPoolExecutor scheduledTasks = new RetryingScheduledThreadPoolExecutor("ScheduledTasks");
+
     public static final StorageService instance = new StorageService();
 
     public static IPartitioner getPartitioner() {

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ResourceWatcher.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ResourceWatcher.java?rev=1004259&r1=1004258&r2=1004259&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ResourceWatcher.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ResourceWatcher.java Mon Oct  4 14:36:28
2010
@@ -22,22 +22,21 @@ package org.apache.cassandra.utils;
 
 
 import java.io.File;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.service.StorageService;
+
 public class ResourceWatcher
 {
-    private static Timer timer = new Timer("RESOURCE-WATCHER");
-
     public static void watch(String resource, Runnable callback, int period)
     {
-        timer.schedule(new WatchedResource(resource, callback), period, period);
+        StorageService.scheduledTasks.scheduleWithFixedDelay(new WatchedResource(resource,
callback), period, period, TimeUnit.MILLISECONDS);
     }
     
-    public static class WatchedResource extends TimerTask
+    public static class WatchedResource implements Runnable
     {
         private static Logger logger = LoggerFactory.getLogger(WatchedResource.class);
         private String resource;



Mime
View raw message