cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r886253 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: config/DatabaseDescriptor.java db/Table.java gms/EndPointState.java service/CassandraServer.java service/StorageService.java
Date Wed, 02 Dec 2009 18:51:53 GMT
Author: jbellis
Date: Wed Dec  2 18:51:45 2009
New Revision: 886253

URL: http://svn.apache.org/viewvc?rev=886253&view=rev
Log:
add StorageService.initClient, which starts up Gossiper without setting a token or anything
other application state.
patch by gdusbabek and jbellis for CASSANDRA-535

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=886253&r1=886252&r2=886253&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Wed Dec  2 18:51:45 2009
@@ -390,22 +390,9 @@
                 columnIndexSizeInKB_ = Integer.parseInt(columnIndexSizeInKB);
             }
 
-            /* data file directory */
+            /* data file and commit log directories. they get created later, when they're
needed. */
             dataFileDirectories_ = xmlUtils.getNodeValues("/Storage/DataFileDirectories/DataFileDirectory");
-            if (dataFileDirectories_.length == 0)
-            {
-                throw new ConfigurationException("At least one DataFileDirectory must be
specified");
-            }
-            for ( String dataFileDirectory : dataFileDirectories_ )
-                FileUtils.createDirectory(dataFileDirectory);
-
-            /* commit log directory */
             logFileDirectory_ = xmlUtils.getNodeValue("/Storage/CommitLogDirectory");
-            if (logFileDirectory_ == null)
-            {
-                throw new ConfigurationException("CommitLogDirectory must be specified");
-            }
-            FileUtils.createDirectory(logFileDirectory_);
 
             /* threshold after which commit log should be rotated. */
             String value = xmlUtils.getNodeValue("/Storage/CommitLogRotationThresholdInMB");
@@ -547,9 +534,6 @@
             tableToCFMetaDataMap_.put(Table.SYSTEM_TABLE, systemMetadata);
             tableKeysCachedFractions_.put(Table.SYSTEM_TABLE, 0.0);
 
-            /* make sure we have a directory for each table */
-            createTableDirectories();
-
             /* Load the seeds for node contact points */
             String[] seeds = xmlUtils.getNodeValues("/Storage/Seeds/Seed");
             if (seeds.length <= 0)
@@ -603,11 +587,31 @@
     }
 
     /**
-     * Create the table directory in each data directory
+     * Creates all storage-related directories.
+     * @throws IOException when a disk problem is encountered.
      */
-    public static void createTableDirectories() throws IOException
+    public static void createAllDirectories() throws IOException
     {
-        for (String dataFile : dataFileDirectories_) 
+        try {
+            if (dataFileDirectories_.length == 0)
+            {
+                throw new ConfigurationException("At least one DataFileDirectory must be
specified");
+            }
+            for ( String dataFileDirectory : dataFileDirectories_ )
+                FileUtils.createDirectory(dataFileDirectory);
+            if (logFileDirectory_ == null)
+            {
+                throw new ConfigurationException("CommitLogDirectory must be specified");
+            }
+            FileUtils.createDirectory(logFileDirectory_);
+        }
+        catch (ConfigurationException ex) {
+            logger_.error("Fatal error: " + ex.getMessage());
+            System.err.println("Bad configuration; unable to start server");
+            System.exit(1);
+        }
+        /* make sure we have a directory for each table */
+        for (String dataFile : dataFileDirectories_)
         {
             FileUtils.createDirectory(dataFile + File.separator + Table.SYSTEM_TABLE);
             for (String table : tables_)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=886253&r1=886252&r2=886253&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Dec  2 18:51:45
2009
@@ -29,14 +29,8 @@
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.io.SSTableWriter;
 import org.apache.cassandra.io.DataOutputBuffer;
 import java.net.InetAddress;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.io.IStreamComplete;
-import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.db.filter.*;
 
@@ -53,6 +47,21 @@
 
     private static Timer flushTimer_ = new Timer("FLUSH-TIMER");
 
+    // This is a result of pushing down the point in time when storage directories get created.
 It used to happen in
+    // CassandraDaemon, but it is possible to call Table.open without a running daemon, so
it made sense to ensure
+    // proper directories here.
+    static
+    {
+        try
+        {
+            DatabaseDescriptor.createAllDirectories();
+        }
+        catch (IOException ex)
+        {
+            throw new RuntimeException(ex);
+        }
+    }
+
     /*
      * This class represents the metadata of this Table. The metadata
      * is basically the column family name and the ID associated with

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java?rev=886253&r1=886252&r2=886253&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java Wed Dec
 2 18:51:45 2009
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.util.*;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.service.StorageService;
 
 import org.apache.log4j.Logger;
 
@@ -82,7 +83,8 @@
     }
     
     void addApplicationState(String key, ApplicationState appState)
-    {        
+    {
+        assert !StorageService.instance().isClientMode();
         applicationState_.put(key, appState);        
     }
     

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=886253&r1=886252&r2=886253&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Wed
Dec  2 18:51:45 2009
@@ -66,7 +66,7 @@
 		LogUtil.init();
 		//LogUtil.setLogLevel("com.facebook", "DEBUG");
 		// Start the storage service
-		storageService.start();
+		storageService.initServer();
 	}
 
     protected Map<String, ColumnFamily> readColumnFamily(List<ReadCommand> commands,
int consistency_level)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=886253&r1=886252&r2=886253&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed
Dec  2 18:51:45 2009
@@ -32,6 +32,7 @@
 import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.locator.*;
@@ -149,6 +150,8 @@
     /* Are we starting this node in bootstrap mode? */
     private boolean isBootstrapMode;
     private Set<InetAddress> bootstrapSet;
+    /* when intialized as a client, we shouldn't write to the system table. */
+    private boolean isClientMode;
   
     public synchronized void addBootstrapSource(InetAddress s)
     {
@@ -185,8 +188,9 @@
     }
 
     /**
-     * for bulk loading clients to be able to use tokenmetadata/messagingservice
-     * without fully starting storageservice / systemtable.
+     * Intended for operation in client-only (non-storage mode). E.g.: for bulk loading clients
+     * to be able to use tokenmetadata/messagingservice without fully starting storageservice
/ systemtable,
+     * or java clients that wish to bypase Thrift entirely.
      */
     public void updateForeignTokenUnsafe(Token token, InetAddress endpoint)
     {
@@ -251,10 +255,34 @@
         }
         return replicationStrategy;
     }
+
+    public void stopClient()
+    {
+        Gossiper.instance().unregister(this);
+        Gossiper.instance().stop();
+        MessagingService.shutdown();
+    }
+
+    public void initClient() throws IOException
+    {
+        isClientMode = true;
+        logger_.info("Starting up client gossip");
+        MessagingService.instance().listen(FBUtilities.getLocalAddress());
+        MessagingService.instance().listenUDP(FBUtilities.getLocalAddress());
+
+        SelectorManager.getSelectorManager().start();
+        SelectorManager.getUdpSelectorManager().start();
+
+        Gossiper.instance().register(this);
+        Gossiper.instance().start(FBUtilities.getLocalAddress(), (int)(System.currentTimeMillis()
/ 1000)); // needed for node-ring gathering.
+    }
     
-    public void start() throws IOException
+    public void initServer() throws IOException
     {
+        isClientMode = false;
         storageMetadata_ = SystemTable.initMetadata();
+        DatabaseDescriptor.createAllDirectories();
+        logger_.info("Starting up server gossip");
 
         /* Listen for application messages */
         MessagingService.instance().listen(FBUtilities.getLocalAddress());
@@ -270,7 +298,7 @@
         // for bootstrap to get the load info it needs.
         // (we won't be part of the storage ring though until we add a nodeId to our state,
below.)
         Gossiper.instance().register(this);
-        Gossiper.instance().start(FBUtilities.getLocalAddress(), storageMetadata_.getGeneration());
+        Gossiper.instance().start(FBUtilities.getLocalAddress(), storageMetadata_.getGeneration());
// needed for node-ring gathering.
 
         if (DatabaseDescriptor.isAutoBootstrap()
             && !(DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress())
|| SystemTable.isBootstrapped()))
@@ -284,7 +312,7 @@
             while (isBootstrapMode)
             {
                 try
-                {   
+                {
                     Thread.sleep(100);
                 }
                 catch (InterruptedException e)
@@ -405,7 +433,10 @@
             Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
             if (logger_.isDebugEnabled())
                 logger_.debug(endpoint + " state normal, token " + token);
-            updateForeignToken(token, endpoint);
+            if (isClientMode)
+                updateForeignTokenUnsafe(token, endpoint);
+            else
+                updateForeignToken(token, endpoint);
             replicationStrategy_.removeObsoletePendingRanges();
         }
         else if (STATE_LEAVING.equals(stateName))
@@ -499,7 +530,8 @@
 
     public void onAlive(InetAddress endpoint, EndPointState state)
     {
-        deliverHints(endpoint);
+        if (!isClientMode)
+            deliverHints(endpoint);
     }
 
     public void onDead(InetAddress endpoint, EndPointState state) {}
@@ -983,6 +1015,7 @@
             public void run()
             {
                 Gossiper.instance().stop();
+                MessagingService.shutdown();
                 logger_.info("DECOMMISSION FINISHED.");
                 // let op be responsible for killing the process
             }
@@ -1111,4 +1144,9 @@
     {
         tokenMetadata_.clearPendingRanges();
     }
+
+    public boolean isClientMode()
+    {
+        return isClientMode;
+    }
 }



Mime
View raw message