cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject git commit: Adds binary protocol events for schema changes
Date Fri, 05 Oct 2012 17:47:17 GMT
Updated Branches:
  refs/heads/trunk c2a8f1288 -> 11f7d7253


Adds binary protocol events for schema changes

patch by slebresne; reviewed by jbellis for CASSANDRA-4684


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/11f7d725
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/11f7d725
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/11f7d725

Branch: refs/heads/trunk
Commit: 11f7d72536c3acf2ecdb255ddbcee545f29d6742
Parents: c2a8f12
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Fri Oct 5 19:46:10 2012 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Fri Oct 5 19:46:10 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 doc/native_protocol.spec                           |   28 +++++-
 .../cql3/statements/AlterKeyspaceStatement.java    |   12 +++
 .../cql3/statements/AlterTableStatement.java       |    5 +
 .../statements/CreateColumnFamilyStatement.java    |    6 +
 .../cql3/statements/CreateIndexStatement.java      |    7 ++
 .../cql3/statements/CreateKeyspaceStatement.java   |   12 +++
 .../cql3/statements/DropColumnFamilyStatement.java |    6 +
 .../cql3/statements/DropIndexStatement.java        |    7 ++
 .../cql3/statements/DropKeyspaceStatement.java     |   12 +++
 .../cql3/statements/SchemaAlteringStatement.java   |    5 +-
 src/java/org/apache/cassandra/db/DefsTable.java    |   16 +++
 .../cassandra/service/IMigrationListener.java      |   30 ++++++
 .../apache/cassandra/service/MigrationManager.java |   53 ++++++++++
 .../apache/cassandra/service/StorageService.java   |    2 +-
 src/java/org/apache/cassandra/transport/Event.java |   48 +++++++++-
 .../org/apache/cassandra/transport/Server.java     |   38 +++++++-
 .../transport/messages/ResultMessage.java          |   74 ++++++++++++++-
 18 files changed, 355 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7cfc311..cab1425 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -113,6 +113,7 @@
  * Pluggable Thrift transport factories for CLI (CASSANDRA-4609)
  * Backport adding AlterKeyspace statement (CASSANDRA-4611)
  * (CQL3) Correcty accept upper-case data types (CASSANDRA-4770)
+ * Add binary protocol events for schema changes (CASSANDRA-4684)
 Merged from 1.0:
  * Switch from NBHM to CHM in MessagingService's callback map, which
    prevents OOM in long-running instances (CASSANDRA-4708)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/doc/native_protocol.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec
index 2e03a02..71a7c71 100644
--- a/doc/native_protocol.spec
+++ b/doc/native_protocol.spec
@@ -31,6 +31,7 @@ Table of Contents
         4.2.5.2. Rows
         4.2.5.3. Set_keyspace
         4.2.5.4. Prepared
+        4.2.5.5. Schema_change
       4.2.6. EVENT
   5. Compression
   6. Collection types
@@ -328,7 +329,8 @@ Table of Contents
     0x0001    Void: for results carrying no information.
     0x0002    Rows: for results to select queries, returning a set of rows.
     0x0003    Set_keyspace: the result to a `use` query.
-    0x0004    Prepared: result to a PREPARE message
+    0x0004    Prepared: result to a PREPARE message.
+    0x0005    Schema_change: the result to a schema altering query.
 
   The body for each kind (after the [int] kind) is defined below.
 
@@ -416,6 +418,24 @@ Table of Contents
     - <id> is [short bytes] representing the prepared query ID.
     - <metadata> is defined exactly as for a Rows RESULT (See section 4.2.5.2).
 
+4.2.5.5. Schema_change
+
+  The result to a schema altering query (creation/update/drop of a
+  keyspace/table/index). The body (after the kind [int]) is composed of 3
+  [string]:
+    <change><keyspace><table>
+  where:
+    - <change> describe the type of change that has occured. It can be one of
+      "CREATED", "UPDATED" or "DROPPED".
+    - <keyspace> is the name of the affected keyspace or the keyspace of the
+      affected table.
+    - <table> is the name of the affected table. <table> will be empty (i.e.
+      the empty string "") if the change was affecting a keyspace and not a
+      table.
+
+  Note that queries to create and drop an index are considered as change
+  updating the table the index is on.
+
 
 4.2.6. EVENT
 
@@ -434,6 +454,12 @@ Table of Contents
       consists of a [string] and an [inet], corresponding respectively to the
       type of status change ("UP" or "DOWN") followed by the address of the
       concerned node.
+    - "SCHEMA_CHANGE": events related to schema change. The body of the message
+      (after the event type) consists of 3 [string] corresponding respectively
+      to the type of schema change ("CREATED", "UPDATED" or "DROPPED"),
+      followed by the name of the affected keyspace and the name of the
+      affected table within that keyspace. For changes that affect a keyspace
+      directly, the table name will be empty (i.e. the empty string "").
 
   All EVENT message have a streamId of -1 (Section 2.3).
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
index 13c9e44..52c422a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.transport.messages.ResultMessage;
 
 public class AlterKeyspaceStatement extends SchemaAlteringStatement
 {
@@ -41,6 +42,12 @@ public class AlterKeyspaceStatement extends SchemaAlteringStatement
         this.attrs = attrs;
     }
 
+    @Override
+    public String keyspace()
+    {
+        return name;
+    }
+
     public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
     {
         state.hasKeyspaceAccess(name, Permission.ALTER);
@@ -83,4 +90,9 @@ public class AlterKeyspaceStatement extends SchemaAlteringStatement
 
         MigrationManager.announceKeyspaceUpdate(attrs.asKSMetadataUpdate(ksm));
     }
+
+    public ResultMessage.SchemaChange.Change changeType()
+    {
+        return ResultMessage.SchemaChange.Change.UPDATED;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index a33ff12..40eb8f8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.transport.messages.ResultMessage;
 
 import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
 
@@ -195,4 +196,8 @@ public class AlterTableStatement extends SchemaAlteringStatement
                              validator);
     }
 
+    public ResultMessage.SchemaChange.Change changeType()
+    {
+        return ResultMessage.SchemaChange.Change.UPDATED;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
index 1cf3137..1775398 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.transport.messages.ResultMessage;
 
 /** A <code>CREATE COLUMNFAMILY</code> parsed from a CQL query statement. */
 public class CreateColumnFamilyStatement extends SchemaAlteringStatement
@@ -100,6 +101,11 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
         MigrationManager.announceNewColumnFamily(getCFMetaData());
     }
 
+    public ResultMessage.SchemaChange.Change changeType()
+    {
+        return ResultMessage.SchemaChange.Change.CREATED;
+    }
+
     /**
      * Returns a CFMetaData instance based on the parameters parsed from this
      * <code>CREATE</code> statement, or defaults where applicable.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index fedbf27..710de11 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.thrift.IndexType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.transport.messages.ResultMessage;
 
 /** A <code>CREATE INDEX</code> statement parsed from a CQL query. */
 public class CreateIndexStatement extends SchemaAlteringStatement
@@ -111,4 +112,10 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         cfm.addDefaultIndexNames();
         MigrationManager.announceColumnFamilyUpdate(cfm);
     }
+
+    public ResultMessage.SchemaChange.Change changeType()
+    {
+        // Creating an index is akin to updating the CF
+        return ResultMessage.SchemaChange.Change.UPDATED;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
index 5933292..378a8c7 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.transport.messages.ResultMessage;
 
 /** A <code>CREATE KEYSPACE</code> statement parsed from a CQL query. */
 public class CreateKeyspaceStatement extends SchemaAlteringStatement
@@ -55,6 +56,12 @@ public class CreateKeyspaceStatement extends SchemaAlteringStatement
         this.attrs = attrs;
     }
 
+    @Override
+    public String keyspace()
+    {
+        return name;
+    }
+
     public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
     {
         state.hasKeyspaceAccess(name, Permission.CREATE);
@@ -96,4 +103,9 @@ public class CreateKeyspaceStatement extends SchemaAlteringStatement
     {
         MigrationManager.announceNewKeyspace(attrs.asKSMetadata(name));
     }
+
+    public ResultMessage.SchemaChange.Change changeType()
+    {
+        return ResultMessage.SchemaChange.Change.CREATED;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/DropColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropColumnFamilyStatement.java
b/src/java/org/apache/cassandra/cql3/statements/DropColumnFamilyStatement.java
index cba5944..7321642 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropColumnFamilyStatement.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.transport.messages.ResultMessage;
 
 public class DropColumnFamilyStatement extends SchemaAlteringStatement
 {
@@ -41,4 +42,9 @@ public class DropColumnFamilyStatement extends SchemaAlteringStatement
     {
         MigrationManager.announceColumnFamilyDrop(keyspace(), columnFamily());
     }
+
+    public ResultMessage.SchemaChange.Change changeType()
+    {
+        return ResultMessage.SchemaChange.Change.DROPPED;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
index 3ed391f..d7f966c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.transport.messages.ResultMessage;
 
 public class DropIndexStatement extends SchemaAlteringStatement
 {
@@ -79,4 +80,10 @@ public class DropIndexStatement extends SchemaAlteringStatement
 
         return null;
     }
+
+    public ResultMessage.SchemaChange.Change changeType()
+    {
+        // Dropping an index is akin to updating the CF
+        return ResultMessage.SchemaChange.Change.UPDATED;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
index 79f23aa..710e750 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.exceptions.UnauthorizedException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.transport.messages.ResultMessage;
 
 public class DropKeyspaceStatement extends SchemaAlteringStatement
 {
@@ -42,6 +43,12 @@ public class DropKeyspaceStatement extends SchemaAlteringStatement
     }
 
     @Override
+    public String keyspace()
+    {
+        return keyspace;
+    }
+
+    @Override
     public void validate(ClientState state) throws RequestValidationException
     {
         super.validate(state);
@@ -52,4 +59,9 @@ public class DropKeyspaceStatement extends SchemaAlteringStatement
     {
         MigrationManager.announceKeyspaceDrop(keyspace);
     }
+
+    public ResultMessage.SchemaChange.Change changeType()
+    {
+        return ResultMessage.SchemaChange.Change.DROPPED;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index 36407b4..95fc473 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -69,6 +69,8 @@ public abstract class SchemaAlteringStatement extends CFStatement implements
CQL
         return new Prepared(this);
     }
 
+    public abstract ResultMessage.SchemaChange.Change changeType();
+
     public abstract void announceMigration() throws RequestValidationException;
 
     @Override
@@ -80,6 +82,8 @@ public abstract class SchemaAlteringStatement extends CFStatement implements
CQL
         try
         {
             announceMigration();
+            String tableName = cfName == null || columnFamily() == null ? "" : columnFamily();
+            return new ResultMessage.SchemaChange(changeType(), keyspace(), tableName);
         }
         catch (ConfigurationException e)
         {
@@ -87,7 +91,6 @@ public abstract class SchemaAlteringStatement extends CFStatement implements
CQL
             ex.initCause(e);
             throw ex;
         }
-        return null;
     }
 
     public ResultMessage executeInternal(ClientState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index e696ad5..a012168 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.migration.avro.KsDef;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -486,7 +487,10 @@ public class DefsTable
         Schema.instance.load(ksm);
 
         if (!StorageService.instance.isClientMode())
+        {
             Table.open(ksm.name);
+            MigrationManager.instance.notifyCreateKeyspace(ksm);
+        }
     }
 
     private static void addColumnFamily(CFMetaData cfm)
@@ -504,7 +508,10 @@ public class DefsTable
         Schema.instance.setTableDefinition(ksm);
 
         if (!StorageService.instance.isClientMode())
+        {
             Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true);
+            MigrationManager.instance.notifyCreateColumnFamily(cfm);
+        }
     }
 
     private static void updateKeyspace(KSMetaData newState)
@@ -518,7 +525,10 @@ public class DefsTable
         try
         {
             if (!StorageService.instance.isClientMode())
+            {
                 Table.open(newState.name).createReplicationStrategy(newKsm);
+                MigrationManager.instance.notifyUpdateKeyspace(newKsm);
+            }
         }
         catch (ConfigurationException e)
         {
@@ -537,6 +547,7 @@ public class DefsTable
         {
             Table table = Table.open(cfm.ksName);
             table.getColumnFamilyStore(cfm.cfName).reload();
+            MigrationManager.instance.notifyUpdateColumnFamily(cfm);
         }
     }
 
@@ -565,6 +576,10 @@ public class DefsTable
         // remove the table from the static instances.
         Table.clear(ksm.name);
         Schema.instance.clearTableDefinition(ksm);
+        if (!StorageService.instance.isClientMode())
+        {
+            MigrationManager.instance.notifyDropKeyspace(ksm);
+        }
     }
 
     private static void dropColumnFamily(String ksName, String cfName) throws IOException
@@ -587,6 +602,7 @@ public class DefsTable
             if (DatabaseDescriptor.isAutoSnapshot())
                 cfs.snapshot(Table.getTimestampedSnapshotName(cfs.columnFamily));
             Table.open(ksm.name).dropCf(cfm.cfId);
+            MigrationManager.instance.notifyDropColumnFamily(cfm);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/service/IMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java
new file mode 100644
index 0000000..1a6854b
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/IMigrationListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+public interface IMigrationListener
+{
+    public void onCreateKeyspace(String ksName);
+    public void onCreateColumnFamly(String ksName, String cfName);
+
+    public void onUpdateKeyspace(String ksName);
+    public void onUpdateColumnFamly(String ksName, String cfName);
+
+    public void onDropKeyspace(String ksName);
+    public void onDropColumnFamly(String ksName, String cfName);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index f7ba701..d1987c2 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -22,6 +22,7 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.ArrayList;
@@ -55,6 +56,22 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
 
     private static final ByteBuffer LAST_MIGRATION_KEY = ByteBufferUtil.bytes("Last Migration");
 
+    public static final MigrationManager instance = new MigrationManager();
+
+    private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<IMigrationListener>();
+
+    private MigrationManager() {}
+
+    public void register(IMigrationListener listener)
+    {
+        listeners.add(listener);
+    }
+
+    public void unregister(IMigrationListener listener)
+    {
+        listeners.remove(listener);
+    }
+
     public void onJoin(InetAddress endpoint, EndpointState epState)
     {}
 
@@ -107,6 +124,42 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         return StageManager.getStage(Stage.MIGRATION).getActiveCount() == 0;
     }
 
+    public void notifyCreateKeyspace(KSMetaData ksm)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onCreateKeyspace(ksm.name);
+    }
+
+    public void notifyCreateColumnFamily(CFMetaData cfm)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onCreateColumnFamly(cfm.ksName, cfm.cfName);
+    }
+
+    public void notifyUpdateKeyspace(KSMetaData ksm)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onUpdateKeyspace(ksm.name);
+    }
+
+    public void notifyUpdateColumnFamily(CFMetaData cfm)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onUpdateColumnFamly(cfm.ksName, cfm.cfName);
+    }
+
+    public void notifyDropKeyspace(KSMetaData ksm)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onDropKeyspace(ksm.name);
+    }
+
+    public void notifyDropColumnFamily(CFMetaData cfm)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onDropColumnFamly(cfm.ksName, cfm.cfName);
+    }
+
     public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException
     {
         ksm.validate();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 47c4c92..7d92fbe 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -166,7 +166,7 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
     private static enum Mode { NORMAL, CLIENT, JOINING, LEAVING, DECOMMISSIONED, MOVING,
DRAINING, DRAINED, RELOCATING }
     private Mode operationMode;
 
-    private final MigrationManager migrationManager = new MigrationManager();
+    private final MigrationManager migrationManager = MigrationManager.instance;
 
     /* Used for tracking drain progress */
     private volatile int totalCFs, remainingCFs;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index 849caff..855049d 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -25,7 +25,7 @@ import org.jboss.netty.buffer.ChannelBuffers;
 
 public abstract class Event
 {
-    public enum Type { TOPOLOGY_CHANGE, STATUS_CHANGE }
+    public enum Type { TOPOLOGY_CHANGE, STATUS_CHANGE, SCHEMA_CHANGE }
 
     public final Type type;
 
@@ -42,6 +42,8 @@ public abstract class Event
                 return TopologyChange.deserializeEvent(cb);
             case STATUS_CHANGE:
                 return StatusChange.deserializeEvent(cb);
+            case SCHEMA_CHANGE:
+                return SchemaChange.deserializeEvent(cb);
         }
         throw new AssertionError();
     }
@@ -143,4 +145,48 @@ public abstract class Event
             return status + " " + node;
         }
     }
+
+    public static class SchemaChange extends Event
+    {
+        public enum Change { CREATED, UPDATED, DROPPED }
+
+        public final Change change;
+        public final String keyspace;
+        public final String table;
+
+        public SchemaChange(Change change, String keyspace, String table)
+        {
+            super(Type.SCHEMA_CHANGE);
+            this.change = change;
+            this.keyspace = keyspace;
+            this.table = table;
+        }
+
+        public SchemaChange(Change change, String keyspace)
+        {
+            this(change, keyspace, "");
+        }
+
+        // Assumes the type has already by been deserialized
+        private static SchemaChange deserializeEvent(ChannelBuffer cb)
+        {
+            Change change = Enum.valueOf(Change.class, CBUtil.readString(cb).toUpperCase());
+            String keyspace = CBUtil.readString(cb);
+            String table = CBUtil.readString(cb);
+            return new SchemaChange(change, keyspace, table);
+        }
+
+        protected ChannelBuffer serializeEvent()
+        {
+            return ChannelBuffers.wrappedBuffer(CBUtil.stringToCB(change.toString()),
+                                                CBUtil.stringToCB(keyspace),
+                                                CBUtil.stringToCB(table));
+        }
+
+        @Override
+        public String toString()
+        {
+            return change + " " + keyspace + (table.isEmpty() ? "" : "." + table);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index ab91b19..e820554 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.service.IMigrationListener;
+import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.transport.messages.EventMessage;
 
@@ -60,7 +62,9 @@ public class Server implements CassandraDaemon.Server
     public Server(InetSocketAddress socket)
     {
         this.socket = socket;
-        Gossiper.instance.register(new EventNotifier(this));
+        EventNotifier notifier = new EventNotifier(this);
+        Gossiper.instance.register(notifier);
+        MigrationManager.instance.register(notifier);
     }
 
     public Server(String hostname, int port)
@@ -199,7 +203,7 @@ public class Server implements CassandraDaemon.Server
       }
     }
 
-    private static class EventNotifier implements IEndpointStateChangeSubscriber
+    private static class EventNotifier implements IEndpointStateChangeSubscriber, IMigrationListener
     {
         private final Server server;
 
@@ -251,5 +255,35 @@ public class Server implements CassandraDaemon.Server
         {
             server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint),
server.socket.getPort()));
         }
+
+        public void onCreateKeyspace(String ksName)
+        {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED,
ksName));
+        }
+
+        public void onCreateColumnFamly(String ksName, String cfName)
+        {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED,
ksName, cfName));
+        }
+
+        public void onUpdateKeyspace(String ksName)
+        {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED,
ksName));
+        }
+
+        public void onUpdateColumnFamly(String ksName, String cfName)
+        {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED,
ksName, cfName));
+        }
+
+        public void onDropKeyspace(String ksName)
+        {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED,
ksName));
+        }
+
+        public void onDropColumnFamly(String ksName, String cfName)
+        {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED,
ksName, cfName));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index 95fd333..955abc6 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -56,7 +56,8 @@ public abstract class ResultMessage extends Message.Response
         VOID         (1, Void.subcodec),
         ROWS         (2, Rows.subcodec),
         SET_KEYSPACE (3, SetKeyspace.subcodec),
-        PREPARED     (4, Prepared.subcodec);
+        PREPARED     (4, Prepared.subcodec),
+        SCHEMA_CHANGE(5, SchemaChange.subcodec);
 
         public final int id;
         public final Message.Codec<ResultMessage> subcodec;
@@ -314,4 +315,75 @@ public abstract class ResultMessage extends Message.Response
             return "RESULT PREPARED " + statementId + " " + metadata;
         }
     }
+
+    public static class SchemaChange extends ResultMessage
+    {
+        public enum Change { CREATED, UPDATED, DROPPED }
+
+        public final Change change;
+        public final String keyspace;
+        public final String columnFamily;
+
+        public SchemaChange(Change change, String keyspace)
+        {
+            this(change, keyspace, "");
+        }
+
+        public SchemaChange(Change change, String keyspace, String columnFamily)
+        {
+            super(Kind.SCHEMA_CHANGE);
+            this.change = change;
+            this.keyspace = keyspace;
+            this.columnFamily = columnFamily;
+        }
+
+        public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>()
+        {
+            public ResultMessage decode(ChannelBuffer body)
+            {
+                String cStr = CBUtil.readString(body);
+                Change change = null;
+                try
+                {
+                    change = Enum.valueOf(Change.class, cStr.toUpperCase());
+                }
+                catch (IllegalStateException e)
+                {
+                    throw new ProtocolException("Unknown Schema change action: " + cStr);
+                }
+
+                String keyspace = CBUtil.readString(body);
+                String columnFamily = CBUtil.readString(body);
+                return new SchemaChange(change, keyspace, columnFamily);
+
+            }
+
+            public ChannelBuffer encode(ResultMessage msg)
+            {
+                assert msg instanceof SchemaChange;
+                SchemaChange scm = (SchemaChange)msg;
+
+                ChannelBuffer a = CBUtil.stringToCB(scm.change.toString());
+                ChannelBuffer k = CBUtil.stringToCB(scm.keyspace);
+                ChannelBuffer c = CBUtil.stringToCB(scm.columnFamily);
+                return ChannelBuffers.wrappedBuffer(a, k, c);
+            }
+        };
+
+        protected ChannelBuffer encodeBody()
+        {
+            return subcodec.encode(this);
+        }
+
+        public CqlResult toThriftResult()
+        {
+            return new CqlResult(CqlResultType.VOID);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "RESULT schema change " + change + " on " + keyspace + (columnFamily.isEmpty()
? "" : "." + columnFamily);
+        }
+    }
 }


Mime
View raw message