cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdusba...@apache.org
Subject svn commit: r931200 - in /cassandra/trunk: src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/thrift/ test/conf/
Date Tue, 06 Apr 2010 16:01:57 GMT
Author: gdusbabek
Date: Tue Apr  6 16:01:57 2010
New Revision: 931200

URL: http://svn.apache.org/viewvc?rev=931200&view=rev
Log:
Gossip metadata version and request updates. Patch by Gary Dusbabek, reviewed by Jonthan Ellis.
CASSANDRA-827

Added:
    cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
    cassandra/trunk/test/conf/storage-conf.xml

Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=931200&r1=931199&r2=931200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Tue Apr  6
16:01:57 2010
@@ -47,6 +47,7 @@ public class StageManager
     public static final String RESPONSE_STAGE = "RESPONSE-STAGE";
     public final static String AE_SERVICE_STAGE = "AE-SERVICE-STAGE";
     private static final String LOADBALANCE_STAGE = "LOAD-BALANCER-STAGE";
+    public static final String MIGRATION_STAGE = "MIGRATION-STAGE";
 
     static
     {
@@ -58,6 +59,7 @@ public class StageManager
         stages.put(GOSSIP_STAGE, new JMXEnabledThreadPoolExecutor("GMFD"));
         stages.put(AE_SERVICE_STAGE, new JMXEnabledThreadPoolExecutor(AE_SERVICE_STAGE));
         stages.put(LOADBALANCE_STAGE, new JMXEnabledThreadPoolExecutor(LOADBALANCE_STAGE));
+        stages.put(MIGRATION_STAGE, new JMXEnabledThreadPoolExecutor(MIGRATION_STAGE));
     }
 
     private static ThreadPoolExecutor multiThreadedStage(String name, int numThreads)

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=931200&r1=931199&r2=931200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Tue Apr 
6 16:01:57 2010
@@ -132,7 +132,8 @@ public class DatabaseDescriptor
 
     private final static String STORAGE_CONF_FILE = "storage-conf.xml";
 
-    private static UUID defsVersion = null;
+    private static final UUID INITIAL_VERSION = new UUID(4096, 0); // has type nibble set
to 1, everything else to zero.
+    private static UUID defsVersion = INITIAL_VERSION;
 
     /**
      * Try the storage-config system property, and then inspect the classpath.

Added: cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java?rev=931200&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java Tue
Apr  6 16:01:57 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.MigrationManager;
+
+import java.util.UUID;
+
+public class DefinitionsAnnounceVerbHandler implements IVerbHandler
+{
+    
+    /** someone is announcing their schema version. */
+    public void doVerb(Message message)
+    {
+        UUID theirVersion = UUID.fromString(new String(message.getMessageBody()));
+        MigrationManager.rectify(theirVersion, message.getFrom());
+    } 
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java?rev=931200&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
Tue Apr  6 16:01:57 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.migration.Migration;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+
+public class DefinitionsUpdateResponseVerbHandler implements IVerbHandler
+{
+    private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateResponseVerbHandler.class);
+
+    /** someone sent me their data definitions */
+    public void doVerb(final Message message)
+    {
+        try
+        {
+            // these are the serialized row mutations that I must apply.
+            // check versions at every step along the way to make sure migrations are not
applied out of order.
+            Collection<Column> cols = MigrationManager.makeColumns(message);
+            for (Column col : cols)
+            {
+                final UUID version = UUIDGen.makeType1UUID(col.name());
+                if (version.timestamp() > DatabaseDescriptor.getDefsVersion().timestamp())
+                {
+                    final Migration m = Migration.deserialize(new ByteArrayInputStream(col.value()));
+                    assert m.getVersion().equals(version);
+                    StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new WrappedRunnable()
+                    {
+                        @Override
+                        protected void runMayThrow() throws Exception
+                        {
+                            // check to make sure the current version is before this one.
+                            if (DatabaseDescriptor.getDefsVersion().timestamp() >= version.timestamp())
+                                logger.debug("Not applying " + version.toString());
+                            else
+                            {
+                                logger.debug("Applying {} from {}", m.getClass().getSimpleName(),
message.getFrom());
+                                m.apply();
+                                m.announce();
+                            }
+                        }
+                    });
+                }
+            }
+        }
+        catch (IOException ex)
+        {
+            throw new IOError(ex);
+        }
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=931200&r1=931199&r2=931200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Apr  6 16:01:57
2010
@@ -19,7 +19,6 @@
 package org.apache.cassandra.service;
 
 import java.io.IOException;
-import java.io.IOError;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
 import java.util.*;
@@ -40,9 +39,7 @@ import org.apache.cassandra.db.commitlog
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.DeletionService;
-import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.IndexSummary;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
@@ -106,6 +103,8 @@ public class StorageService implements I
         GOSSIP_DIGEST_SYN,
         GOSSIP_DIGEST_ACK,
         GOSSIP_DIGEST_ACK2,
+        DEFINITIONS_ANNOUNCE,
+        DEFINITIONS_UPDATE_RESPONSE,
         ;
         // remember to add new verbs at the end, since we serialize by ordinal
     }
@@ -151,6 +150,7 @@ public class StorageService implements I
     private boolean isClientMode;
     private boolean initialized;
     private String operationMode;
+    private MigrationManager migrationManager = new MigrationManager();
 
     public void addBootstrapSource(InetAddress s, String table)
     {
@@ -226,14 +226,13 @@ public class StorageService implements I
         MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new Gossiper.GossipDigestSynVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new Gossiper.GossipDigestAckVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new Gossiper.GossipDigestAck2VerbHandler());
+        
+        MessagingService.instance.registerVerbHandlers(Verb.DEFINITIONS_ANNOUNCE, new DefinitionsAnnounceVerbHandler());
+        MessagingService.instance.registerVerbHandlers(Verb.DEFINITIONS_UPDATE_RESPONSE,
new DefinitionsUpdateResponseVerbHandler());
 
         replicationStrategies = new HashMap<String, AbstractReplicationStrategy>();
         for (String table : DatabaseDescriptor.getNonSystemTables())
-        {
-            AbstractReplicationStrategy strat = getReplicationStrategy(tokenMetadata_, table);
-            replicationStrategies.put(table, strat);
-        }
-        replicationStrategies = Collections.unmodifiableMap(replicationStrategies);
+            initReplicationStrategy(table);
 
         // spin up the streaming serivice so it is available for jmx tools.
         if (StreamingService.instance == null)
@@ -281,6 +280,7 @@ public class StorageService implements I
 
     public void stopClient()
     {
+        Gossiper.instance.unregister(migrationManager);
         Gossiper.instance.unregister(this);
         Gossiper.instance.stop();
         MessagingService.shutdown();
@@ -336,6 +336,7 @@ public class StorageService implements I
         // for bootstrap to get the load info it needs.
         // (we won't be part of the storage ring though until we add a nodeId to our state,
below.)
         Gossiper.instance.register(this);
+        Gossiper.instance.register(migrationManager);
         Gossiper.instance.start(FBUtilities.getLocalAddress(), storageMetadata_.getGeneration());
// needed for node-ring gathering.
 
         if (DatabaseDescriptor.isAutoBootstrap()
@@ -357,7 +358,17 @@ public class StorageService implements I
             }
             setMode("Joining: getting bootstrap token", true);
             Token token = BootStrapper.getBootstrapToken(tokenMetadata_, StorageLoadBalancer.instance.getLoadInfo());
-            startBootstrap(token);
+            // don't bootstrap if there are no tables defined.
+            if (DatabaseDescriptor.getNonSystemTables().size() > 0)
+                startBootstrap(token);
+            else
+            {
+                isBootstrapMode = false;
+                SystemTable.setBootstrapped(true);
+                tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress());
+                Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL
+ Delimiter + partitioner_.getTokenFactory().toString(token)));
+                setMode("Normal", false);
+            }
             // don't finish startup (enabling thrift) until after bootstrap is done
             while (isBootstrapMode)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=931200&r1=931199&r2=931200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Tue Apr  6 16:01:57
2010
@@ -530,14 +530,12 @@ public class CassandraServer implements 
     {
         Map<String, Map<String, String>> columnFamiliesMap = new HashMap<String,
Map<String, String>>();
 
-        Map<String, CFMetaData> tableMetaData = DatabaseDescriptor.getTableMetaData(table);
-        // table doesn't exist
-        if (tableMetaData == null)
-        {
+        KSMetaData ksm = DatabaseDescriptor.getTableDefinition(table); 
+        if (ksm == null)
             throw new NotFoundException();
-        }
+        
 
-        for (Map.Entry<String, CFMetaData> stringCFMetaDataEntry : tableMetaData.entrySet())
+        for (Map.Entry<String, CFMetaData> stringCFMetaDataEntry : ksm.cfMetaData().entrySet())
         {
             CFMetaData columnFamilyMetaData = stringCFMetaDataEntry.getValue();
 
@@ -679,3 +677,4 @@ public class CassandraServer implements 
     
     // main method moved to CassandraDaemon
 }
+    
\ No newline at end of file

Modified: cassandra/trunk/test/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/storage-conf.xml?rev=931200&r1=931199&r2=931200&view=diff
==============================================================================
--- cassandra/trunk/test/conf/storage-conf.xml (original)
+++ cassandra/trunk/test/conf/storage-conf.xml Tue Apr  6 16:01:57 2010
@@ -77,7 +77,7 @@
      </Keyspace>
    </Keyspaces>
    <Seeds>
-     <!-- Add names of hosts that are deemed contact points -->
-     <Seed>127.0.0.1</Seed>
+     <!-- we don't want this node to think it is a seed. -->
+     <Seed>127.0.0.2</Seed>
    </Seeds>
 </Storage>



Mime
View raw message