cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pa...@apache.org
Subject cassandra git commit: Skip building views during base table streams on range movements
Date Thu, 06 Apr 2017 21:19:10 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 30820eacb -> c794d2bed


Skip building views during base table streams on range movements

patch by Benjamin Roth; reviewed by Paulo Motta for CASSANDRA-13065


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c794d2be
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c794d2be
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c794d2be

Branch: refs/heads/trunk
Commit: c794d2bed7ca1d10e13c4da08a3d45f5c755c1d8
Parents: 30820ea
Author: brstgt <brstgt@googlemail.com>
Authored: Tue Feb 28 13:36:16 2017 +0100
Committer: Paulo Motta <paulo@apache.org>
Committed: Thu Apr 6 18:18:55 2017 -0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/SystemKeyspace.java |  5 +-
 .../org/apache/cassandra/dht/BootStrapper.java  |  2 +-
 .../org/apache/cassandra/dht/RangeStreamer.java |  7 +-
 .../apache/cassandra/dht/StreamStateStore.java  |  2 +-
 .../cassandra/io/sstable/SSTableLoader.java     |  2 +-
 .../net/IncomingStreamingConnection.java        |  2 +-
 .../apache/cassandra/repair/LocalSyncTask.java  |  9 +-
 .../cassandra/repair/StreamingRepairTask.java   |  3 +-
 .../cassandra/service/StorageService.java       |  8 +-
 .../cassandra/streaming/ConnectionHandler.java  |  2 +-
 .../apache/cassandra/streaming/StreamEvent.java |  4 +-
 .../cassandra/streaming/StreamOperation.java    | 69 +++++++++++++++
 .../apache/cassandra/streaming/StreamPlan.java  | 18 ++--
 .../cassandra/streaming/StreamReceiveTask.java  | 93 ++++++++++++--------
 .../cassandra/streaming/StreamResultFuture.java | 34 +++----
 .../cassandra/streaming/StreamSession.java      |  4 +-
 .../apache/cassandra/streaming/StreamState.java |  6 +-
 .../management/StreamStateCompositeData.java    |  7 +-
 .../streaming/messages/StreamInitMessage.java   | 13 +--
 .../cassandra/tools/nodetool/NetStats.java      |  2 +-
 .../apache/cassandra/dht/BootStrapperTest.java  |  3 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |  5 +-
 .../streaming/StreamOperationTest.java          | 47 ++++++++++
 .../streaming/StreamTransferTaskTest.java       |  2 +-
 .../streaming/StreamingTransferTest.java        | 12 +--
 26 files changed, 254 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0e3da3f..b7b2cc9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Skip building views during base table streams on range movements (CASSANDRA-13065)
  * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197)
  * Remove deprecated repair JMX APIs (CASSANDRA-11530)
  * Fix version check to enable streaming keep-alive (CASSANDRA-12929)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 058b378..0d64a94 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -59,6 +59,7 @@ import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.PaxosState;
+import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.*;
 
@@ -1196,7 +1197,7 @@ public final class SystemKeyspace
         availableRanges.truncateBlocking();
     }
 
-    public static synchronized void updateTransferredRanges(String description,
+    public static synchronized void updateTransferredRanges(StreamOperation streamOperation,
                                                          InetAddress peer,
                                                          String keyspace,
                                                          Collection<Range<Token>> streamedRanges)
@@ -1207,7 +1208,7 @@ public final class SystemKeyspace
         {
             rangesToUpdate.add(rangeToBytes(range));
         }
-        executeInternal(format(cql, TRANSFERRED_RANGES), rangesToUpdate, description, peer, keyspace);
+        executeInternal(format(cql, TRANSFERRED_RANGES), rangesToUpdate, streamOperation.getDescription(), peer, keyspace);
     }
 
     public static synchronized Map<InetAddress, Set<Range<Token>>> getTransferredRanges(String description, String keyspace, IPartitioner partitioner)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 9235844..a25f867 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -73,7 +73,7 @@ public class BootStrapper extends ProgressEventNotifierSupport
         RangeStreamer streamer = new RangeStreamer(tokenMetadata,
                                                    tokens,
                                                    address,
-                                                   "Bootstrap",
+                                                   StreamOperation.BOOTSTRAP,
                                                    useStrictConsistency,
                                                    DatabaseDescriptor.getEndpointSnitch(),
                                                    stateStore,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 4d7c903..89a96cd 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -144,7 +145,7 @@ public class RangeStreamer
     public RangeStreamer(TokenMetadata metadata,
                          Collection<Token> tokens,
                          InetAddress address,
-                         String description,
+                         StreamOperation streamOperation,
                          boolean useStrictConsistency,
                          IEndpointSnitch snitch,
                          StreamStateStore stateStore,
@@ -154,8 +155,8 @@ public class RangeStreamer
         this.metadata = metadata;
         this.tokens = tokens;
         this.address = address;
-        this.description = description;
-        this.streamPlan = new StreamPlan(description, ActiveRepairService.UNREPAIRED_SSTABLE, connectionsPerHost,
+        this.description = streamOperation.getDescription();
+        this.streamPlan = new StreamPlan(streamOperation, ActiveRepairService.UNREPAIRED_SSTABLE, connectionsPerHost,
                 true, false, connectSequentially, null);
         this.useStrictConsistency = useStrictConsistency;
         this.snitch = snitch;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/dht/StreamStateStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/StreamStateStore.java b/src/java/org/apache/cassandra/dht/StreamStateStore.java
index 47b3072..e3ea838 100644
--- a/src/java/org/apache/cassandra/dht/StreamStateStore.java
+++ b/src/java/org/apache/cassandra/dht/StreamStateStore.java
@@ -69,7 +69,7 @@ public class StreamStateStore implements StreamEventHandler
                 Set<String> keyspaces = se.transferredRangesPerKeyspace.keySet();
                 for (String keyspace : keyspaces)
                 {
-                    SystemKeyspace.updateTransferredRanges(se.description, se.peer, keyspace, se.transferredRangesPerKeyspace.get(keyspace));
+                    SystemKeyspace.updateTransferredRanges(se.streamOperation, se.peer, keyspace, se.transferredRangesPerKeyspace.get(keyspace));
                 }
                 for (StreamRequest request : se.requests)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 7e79fa9..759fa0f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -159,7 +159,7 @@ public class SSTableLoader implements StreamEventHandler
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
 
-        StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false, false, null).connectionFactory(client.getConnectionFactory());
+        StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, 0, connectionsPerHost, false, false, false, null).connectionFactory(client.getConnectionFactory());
 
         Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
         openSSTables(endpointToRanges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 19bf3d4..eee0042 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -73,7 +73,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable
             // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
             // Note: we cannot use the same socket for incoming and outgoing streams because we want to
             // parallelize said streams and the socket is blocking, so we might deadlock.
-            StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental, init.pendingRepair);
+            StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.streamOperation, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental, init.pendingRepair);
         }
         catch (Throwable t)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 56411d9..3dd6532 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.streaming.StreamEvent;
 import org.apache.cassandra.streaming.StreamEventHandler;
 import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
@@ -79,10 +80,10 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
             isIncremental = prs.isIncremental;
         }
         Tracing.traceRepair(message);
-        StreamPlan plan = new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false, pendingRepair).listeners(this)
-                                            .flushBeforeTransfer(true)
-                                            // request ranges from the remote node
-                                            .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily);
+        StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, repairedAt, 1, false, isIncremental, false, pendingRepair).listeners(this)
+                                                                                                                           .flushBeforeTransfer(true)
+                                                                                                                           // request ranges from the remote node
+                                                                                                                           .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily);
         if (!pullRepair)
         {
             // send ranges to the remote node if we are not performing a pull repair

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 6bce1fa..c5f3c95 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.streaming.StreamEvent;
 import org.apache.cassandra.streaming.StreamEventHandler;
 import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.streaming.StreamOperation;
 
 /**
  * StreamingRepairTask performs data streaming between two remote replica which neither is not repair coordinator.
@@ -71,7 +72,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
     @VisibleForTesting
     StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred, boolean isIncremental)
     {
-        return new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false, isConsistent ? desc.parentSessionId : null)
+        return new StreamPlan(StreamOperation.REPAIR, repairedAt, 1, false, isIncremental, false, isConsistent ? desc.parentSessionId : null)
                .listeners(this)
                .flushBeforeTransfer(!isIncremental) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary
                .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) // request ranges from the remote node

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index abbc001..735c7cf 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1124,7 +1124,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             RangeStreamer streamer = new RangeStreamer(tokenMetadata,
                                                        null,
                                                        FBUtilities.getBroadcastAddress(),
-                                                       "Rebuild",
+                                                       StreamOperation.REBUILD,
                                                        useStrictConsistency && !replacing,
                                                        DatabaseDescriptor.getEndpointSnitch(),
                                                        streamStateStore,
@@ -2560,7 +2560,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             }
         }
 
-        StreamPlan stream = new StreamPlan("Restore replica count");
+        StreamPlan stream = new StreamPlan(StreamOperation.RESTORE_REPLICA_COUNT);
         for (String keyspaceName : rangesToFetch.keySet())
         {
             for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName))
@@ -3876,7 +3876,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     private class RangeRelocator
     {
-        private final StreamPlan streamPlan = new StreamPlan("Relocation");
+        private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION);
 
         private RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames)
         {
@@ -4690,7 +4690,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             sessionsToStreamByKeyspace.put(keyspace, rangesPerEndpoint);
         }
 
-        StreamPlan streamPlan = new StreamPlan("Unbootstrap");
+        StreamPlan streamPlan = new StreamPlan(StreamOperation.DECOMMISSION);
 
         // Vinculate StreamStateStore to current StreamPlan to update transferred ranges per StreamSession
         streamPlan.listeners(streamStateStore);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 10c5827..91f1249 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -195,7 +195,7 @@ public class ConnectionHandler
                     FBUtilities.getBroadcastAddress(),
                     session.sessionIndex(),
                     session.planId(),
-                    session.description(),
+                    session.streamOperation(),
                     !isOutgoingHandler,
                     session.keepSSTableLevel(),
                     session.isIncremental(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/StreamEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java
index 49172fb..6ea2814 100644
--- a/src/java/org/apache/cassandra/streaming/StreamEvent.java
+++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java
@@ -52,7 +52,7 @@ public abstract class StreamEvent
         public final boolean success;
         public final int sessionIndex;
         public final Set<StreamRequest> requests;
-        public final String description;
+        public final StreamOperation streamOperation;
         public final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace;
 
         public SessionCompleteEvent(StreamSession session)
@@ -62,7 +62,7 @@ public abstract class StreamEvent
             this.success = session.isSuccess();
             this.sessionIndex = session.sessionIndex();
             this.requests = ImmutableSet.copyOf(session.requests);
-            this.description = session.description();
+            this.streamOperation = session.streamOperation();
             this.transferredRangesPerKeyspace = Collections.unmodifiableMap(session.transferredRangesPerKeyspace);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/StreamOperation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOperation.java b/src/java/org/apache/cassandra/streaming/StreamOperation.java
new file mode 100644
index 0000000..8151b47
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamOperation.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+public enum StreamOperation
+{
+    OTHER("Other"), // Fallback to avoid null types when deserializing from string
+    RESTORE_REPLICA_COUNT("Restore replica count", false), // Handles removeNode
+    DECOMMISSION("Unbootstrap", false),
+    RELOCATION("Relocation", false),
+    BOOTSTRAP("Bootstrap", false),
+    REBUILD("Rebuild", false),
+    BULK_LOAD("Bulk Load"),
+    REPAIR("Repair");
+
+    private final String description;
+    private final boolean requiresViewBuild;
+
+
+    StreamOperation(String description) {
+        this(description, true);
+    }
+
+    /**
+     * @param description The operation description
+     * @param requiresViewBuild Whether this operation requires views to be updated if it involves a base table
+     */
+    StreamOperation(String description, boolean requiresViewBuild) {
+        this.description = description;
+        this.requiresViewBuild = requiresViewBuild;
+    }
+
+    public static StreamOperation fromString(String text) {
+        for (StreamOperation b : StreamOperation.values()) {
+            if (b.description.equalsIgnoreCase(text)) {
+                return b;
+            }
+        }
+
+        return OTHER;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    /**
+     * Wether this operation requires views to be updated
+     */
+    public boolean requiresViewBuild()
+    {
+        return this.requiresViewBuild;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 5a2ce77..faaac0e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -34,7 +34,7 @@ public class StreamPlan
 {
     public static final String[] EMPTY_COLUMN_FAMILIES = new String[0];
     private final UUID planId = UUIDGen.getTimeUUID();
-    private final String description;
+    private final StreamOperation streamOperation;
     private final List<StreamEventHandler> handlers = new ArrayList<>();
     private final long repairedAt;
     private final StreamCoordinator coordinator;
@@ -44,22 +44,22 @@ public class StreamPlan
     /**
      * Start building stream plan.
      *
-     * @param description Stream type that describes this StreamPlan
+     * @param streamOperation Stream streamOperation that describes this StreamPlan
      */
-    public StreamPlan(String description)
+    public StreamPlan(StreamOperation streamOperation)
     {
-        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false, null);
+        this(streamOperation, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false, null);
     }
 
-    public StreamPlan(String description, boolean keepSSTableLevels, boolean connectSequentially)
+    public StreamPlan(StreamOperation streamOperation, boolean keepSSTableLevels, boolean connectSequentially)
     {
-        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially, null);
+        this(streamOperation, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially, null);
     }
 
-    public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels,
+    public StreamPlan(StreamOperation streamOperation, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels,
                       boolean isIncremental, boolean connectSequentially, UUID pendingRepair)
     {
-        this.description = description;
+        this.streamOperation = streamOperation;
         this.repairedAt = repairedAt;
         this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(),
                                                  connectSequentially, pendingRepair);
@@ -187,7 +187,7 @@ public class StreamPlan
      */
     public StreamResultFuture execute()
     {
-        return StreamResultFuture.init(planId, description, handlers, coordinator);
+        return StreamResultFuture.init(planId, streamOperation, handlers, coordinator);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index d0c4d50..b7e475a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -147,11 +148,61 @@ public class StreamReceiveTask extends StreamTask
             this.task = task;
         }
 
+        /*
+         * We have a special path for views and for CDC.
+         *
+         * For views, since the view requires cleaning up any pre-existing state, we must put all partitions
+         * through the same write path as normal mutations. This also ensures any 2is are also updated.
+         *
+         * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they
+         * can be archived by the CDC process on discard.
+         */
+        private boolean requiresWritePath(ColumnFamilyStore cfs) {
+            return hasCDC(cfs) || (task.session.streamOperation().requiresViewBuild() && hasViews(cfs));
+        }
+
+        private boolean hasViews(ColumnFamilyStore cfs)
+        {
+            return !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, cfs.getTableName()));
+        }
+
+        private boolean hasCDC(ColumnFamilyStore cfs)
+        {
+            return cfs.metadata().params.cdc;
+        }
+
+        Mutation createMutation(ColumnFamilyStore cfs, UnfilteredRowIterator rowIterator)
+        {
+            return new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata())));
+        }
+
+        private void sendThroughWritePath(ColumnFamilyStore cfs, Collection<SSTableReader> readers) {
+            boolean hasCdc = hasCDC(cfs);
+            for (SSTableReader reader : readers)
+            {
+                Keyspace ks = Keyspace.open(reader.getKeyspaceName());
+                try (ISSTableScanner scanner = reader.getScanner())
+                {
+                    while (scanner.hasNext())
+                    {
+                        try (UnfilteredRowIterator rowIterator = scanner.next())
+                        {
+                            // MV *can* be applied unsafe if there's no CDC on the CFS as we flush
+                            // before transaction is done.
+                            //
+                            // If the CFS has CDC, however, these updates need to be written to the CommitLog
+                            // so they get archived into the cdc_raw folder
+                            ks.apply(createMutation(cfs, rowIterator), hasCdc, true, false);
+                        }
+                    }
+                }
+            }
+        }
+
         public void run()
         {
-            boolean hasViews = false;
-            boolean hasCDC = false;
             ColumnFamilyStore cfs = null;
+            boolean requiresWritePath = false;
             try
             {
                 cfs = ColumnFamilyStore.getIfExists(task.tableId);
@@ -163,45 +214,15 @@ public class StreamReceiveTask extends StreamTask
                     task.session.taskCompleted(task);
                     return;
                 }
-                hasViews = !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, cfs.getTableName()));
-                hasCDC = cfs.metadata().params.cdc;
 
+                requiresWritePath = requiresWritePath(cfs);
                 Collection<SSTableReader> readers = task.sstables;
 
                 try (Refs<SSTableReader> refs = Refs.ref(readers))
                 {
-                    /*
-                     * We have a special path for views and for CDC.
-                     *
-                     * For views, since the view requires cleaning up any pre-existing state, we must put all partitions
-                     * through the same write path as normal mutations. This also ensures any 2is are also updated.
-                     *
-                     * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they
-                     * can be archived by the CDC process on discard.
-                     */
-                    if (hasViews || hasCDC)
+                    if (requiresWritePath)
                     {
-                        for (SSTableReader reader : readers)
-                        {
-                            Keyspace ks = Keyspace.open(reader.getKeyspaceName());
-                            try (ISSTableScanner scanner = reader.getScanner())
-                            {
-                                while (scanner.hasNext())
-                                {
-                                    try (UnfilteredRowIterator rowIterator = scanner.next())
-                                    {
-                                        Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata())));
-
-                                        // MV *can* be applied unsafe if there's no CDC on the CFS as we flush below
-                                        // before transaction is done.
-                                        //
-                                        // If the CFS has CDC, however, these updates need to be written to the CommitLog
-                                        // so they get archived into the cdc_raw folder
-                                        ks.apply(m, hasCDC, true, false);
-                                    }
-                                }
-                            }
-                        }
+                        sendThroughWritePath(cfs, readers);
                     }
                     else
                     {
@@ -249,7 +270,7 @@ public class StreamReceiveTask extends StreamTask
             {
                 // We don't keep the streamed sstables since we've applied them manually so we abort the txn and delete
                 // the streamed sstables.
-                if (hasViews || hasCDC)
+                if (requiresWritePath)
                 {
                     if (cfs != null)
                         cfs.forceBlockingFlush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 6d0c03b..4890b63 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -48,22 +48,22 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
     private static final Logger logger = LoggerFactory.getLogger(StreamResultFuture.class);
 
     public final UUID planId;
-    public final String description;
+    public final StreamOperation streamOperation;
     private final StreamCoordinator coordinator;
     private final Collection<StreamEventHandler> eventListeners = new ConcurrentLinkedQueue<>();
 
     /**
-     * Create new StreamResult of given {@code planId} and type.
+     * Create new StreamResult of given {@code planId} and streamOperation.
      *
      * Constructor is package private. You need to use {@link StreamPlan#execute()} to get the instance.
      *
      * @param planId Stream plan ID
-     * @param description Stream description
+     * @param streamOperation Stream streamOperation
      */
-    private StreamResultFuture(UUID planId, String description, StreamCoordinator coordinator)
+    private StreamResultFuture(UUID planId, StreamOperation streamOperation, StreamCoordinator coordinator)
     {
         this.planId = planId;
-        this.description = description;
+        this.streamOperation = streamOperation;
         this.coordinator = coordinator;
 
         // if there is no session to listen to, we immediately set result for returning
@@ -71,22 +71,22 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             set(getCurrentState());
     }
 
-    private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels, boolean isIncremental, UUID pendingRepair)
+    private StreamResultFuture(UUID planId, StreamOperation streamOperation, boolean keepSSTableLevels, boolean isIncremental, UUID pendingRepair)
     {
-        this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(), false, pendingRepair));
+        this(planId, streamOperation, new StreamCoordinator(0, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(), false, pendingRepair));
     }
 
-    static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners,
+    static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners,
                                    StreamCoordinator coordinator)
     {
-        StreamResultFuture future = createAndRegister(planId, description, coordinator);
+        StreamResultFuture future = createAndRegister(planId, streamOperation, coordinator);
         if (listeners != null)
         {
             for (StreamEventHandler listener : listeners)
                 future.addEventListener(listener);
         }
 
-        logger.info("[Stream #{}] Executing streaming plan for {}", planId,  description);
+        logger.info("[Stream #{}] Executing streaming plan for {}", planId,  streamOperation.getDescription());
 
         // Initialize and start all sessions
         for (final StreamSession session : coordinator.getAllStreamSessions())
@@ -101,7 +101,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
 
     public static synchronized StreamResultFuture initReceivingSide(int sessionIndex,
                                                                     UUID planId,
-                                                                    String description,
+                                                                    StreamOperation streamOperation,
                                                                     InetAddress from,
                                                                     IncomingStreamingConnection connection,
                                                                     boolean isForOutgoing,
@@ -113,20 +113,20 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
         if (future == null)
         {
-            logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description);
+            logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, streamOperation.getDescription());
 
             // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
-            future = new StreamResultFuture(planId, description, keepSSTableLevel, isIncremental, pendingRepair);
+            future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, isIncremental, pendingRepair);
             StreamManager.instance.registerReceiving(future);
         }
         future.attachConnection(from, sessionIndex, connection, isForOutgoing, version);
-        logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, description);
+        logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, streamOperation.getDescription());
         return future;
     }
 
-    private static StreamResultFuture createAndRegister(UUID planId, String description, StreamCoordinator coordinator)
+    private static StreamResultFuture createAndRegister(UUID planId, StreamOperation streamOperation, StreamCoordinator coordinator)
     {
-        StreamResultFuture future = new StreamResultFuture(planId, description, coordinator);
+        StreamResultFuture future = new StreamResultFuture(planId, streamOperation, coordinator);
         StreamManager.instance.register(future);
         return future;
     }
@@ -149,7 +149,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
      */
     public StreamState getCurrentState()
     {
-        return new StreamState(planId, description, coordinator.getAllSessionInfo());
+        return new StreamState(planId, streamOperation, coordinator.getAllSessionInfo());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index bfae0bf..62fa317 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -212,9 +212,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         return index;
     }
 
-    public String description()
+    public StreamOperation streamOperation()
     {
-        return streamResult == null ? null : streamResult.description;
+        return streamResult == null ? null : streamResult.streamOperation;
     }
 
     public boolean keepSSTableLevel()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/StreamState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamState.java b/src/java/org/apache/cassandra/streaming/StreamState.java
index db50c2a..4ee3c8d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamState.java
+++ b/src/java/org/apache/cassandra/streaming/StreamState.java
@@ -30,14 +30,14 @@ import com.google.common.collect.Iterables;
 public class StreamState implements Serializable
 {
     public final UUID planId;
-    public final String description;
+    public final StreamOperation streamOperation;
     public final Set<SessionInfo> sessions;
 
-    public StreamState(UUID planId, String description, Set<SessionInfo> sessions)
+    public StreamState(UUID planId, StreamOperation streamOperation, Set<SessionInfo> sessions)
     {
         this.planId = planId;
-        this.description = description;
         this.sessions = sessions;
+        this.streamOperation = streamOperation;
     }
 
     public boolean hasFailedSession()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java b/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
index e25ab1a..de88762 100644
--- a/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
 
 import org.apache.cassandra.streaming.SessionInfo;
 import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.streaming.StreamOperation;
 
 /**
  */
@@ -73,7 +74,7 @@ public class StreamStateCompositeData
     {
         Map<String, Object> valueMap = new HashMap<>();
         valueMap.put(ITEM_NAMES[0], streamState.planId.toString());
-        valueMap.put(ITEM_NAMES[1], streamState.description);
+        valueMap.put(ITEM_NAMES[1], streamState.streamOperation.getDescription());
 
         CompositeData[] sessions = new CompositeData[streamState.sessions.size()];
         Lists.newArrayList(Iterables.transform(streamState.sessions, new Function<SessionInfo, CompositeData>()
@@ -121,7 +122,7 @@ public class StreamStateCompositeData
         assert cd.getCompositeType().equals(COMPOSITE_TYPE);
         Object[] values = cd.getAll(ITEM_NAMES);
         UUID planId = UUID.fromString((String) values[0]);
-        String description = (String) values[1];
+        String typeString = (String) values[1];
         Set<SessionInfo> sessions = Sets.newHashSet(Iterables.transform(Arrays.asList((CompositeData[]) values[2]),
                                                                         new Function<CompositeData, SessionInfo>()
                                                                         {
@@ -130,6 +131,6 @@ public class StreamStateCompositeData
                                                                                 return SessionInfoCompositeData.fromCompositeData(input);
                                                                             }
                                                                         }));
-        return new StreamState(planId, description, sessions);
+        return new StreamState(planId, StreamOperation.fromString(typeString), sessions);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 3b4b512..59f28e0 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
@@ -43,7 +44,7 @@ public class StreamInitMessage
     public final InetAddress from;
     public final int sessionIndex;
     public final UUID planId;
-    public final String description;
+    public final StreamOperation streamOperation;
 
     // true if this init message is to connect for outgoing message on receiving side
     public final boolean isForOutgoing;
@@ -51,12 +52,12 @@ public class StreamInitMessage
     public final boolean isIncremental;
     public final UUID pendingRepair;
 
-    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel, boolean isIncremental, UUID pendingRepair)
+    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean isForOutgoing, boolean keepSSTableLevel, boolean isIncremental, UUID pendingRepair)
     {
         this.from = from;
         this.sessionIndex = sessionIndex;
         this.planId = planId;
-        this.description = description;
+        this.streamOperation = streamOperation;
         this.isForOutgoing = isForOutgoing;
         this.keepSSTableLevel = keepSSTableLevel;
         this.isIncremental = isIncremental;
@@ -112,7 +113,7 @@ public class StreamInitMessage
             CompactEndpointSerializationHelper.serialize(message.from, out);
             out.writeInt(message.sessionIndex);
             UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version);
-            out.writeUTF(message.description);
+            out.writeUTF(message.streamOperation.getDescription());
             out.writeBoolean(message.isForOutgoing);
             out.writeBoolean(message.keepSSTableLevel);
             out.writeBoolean(message.isIncremental);
@@ -135,7 +136,7 @@ public class StreamInitMessage
 
             boolean isIncremental = in.readBoolean();
             UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
-            return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel, isIncremental, pendingRepair);
+            return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), sentByInitiator, keepSSTableLevel, isIncremental, pendingRepair);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
@@ -143,7 +144,7 @@ public class StreamInitMessage
             long size = CompactEndpointSerializationHelper.serializedSize(message.from);
             size += TypeSizes.sizeof(message.sessionIndex);
             size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version);
-            size += TypeSizes.sizeof(message.description);
+            size += TypeSizes.sizeof(message.streamOperation.getDescription());
             size += TypeSizes.sizeof(message.isForOutgoing);
             size += TypeSizes.sizeof(message.keepSSTableLevel);
             size += TypeSizes.sizeof(message.isIncremental);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
index c171a3e..1b7a63b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
@@ -47,7 +47,7 @@ public class NetStats extends NodeToolCmd
             System.out.println("Not sending any streams.");
         for (StreamState status : statuses)
         {
-            System.out.printf("%s %s%n", status.description, status.planId.toString());
+            System.out.printf("%s %s%n", status.streamOperation.getDescription(), status.planId.toString());
             for (SessionInfo info : status.sessions)
             {
                 System.out.printf("    %s", info.peer.toString());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 9481201..a1054bb 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.locator.RackInferringSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.utils.FBUtilities;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
@@ -98,7 +99,7 @@ public class BootStrapperTest
         InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
 
         assertEquals(numOldNodes, tmd.sortedTokens().size());
-        RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, "Bootstrap", true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), false, 1);
+        RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), false, 1);
         IFailureDetector mockFailureDetector = new IFailureDetector()
         {
             public boolean isAlive(InetAddress ep)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 3a963b1..944e320 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -194,8 +195,8 @@ public class LegacySSTableTest
         details.add(new StreamSession.SSTableStreamingSections(sstable.ref(),
                                                                sstable.getPositionsForRanges(ranges),
                                                                sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
-        new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details)
-                                             .execute().get();
+        new StreamPlan(StreamOperation.OTHER).transferFiles(FBUtilities.getBroadcastAddress(), details)
+                                  .execute().get();
     }
 
     private static void truncateLegacyTables(String legacyVersion) throws Exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/test/unit/org/apache/cassandra/streaming/StreamOperationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamOperationTest.java b/test/unit/org/apache/cassandra/streaming/StreamOperationTest.java
new file mode 100644
index 0000000..2cc216e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/streaming/StreamOperationTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamOperationTest
+{
+    @Test
+    public void testSerialization()
+    {
+        // Unknown descriptions fall back to OTHER
+        assertEquals(StreamOperation.OTHER, StreamOperation.fromString("Foobar"));
+        assertEquals(StreamOperation.OTHER, StreamOperation.fromString("Other"));
+        assertEquals(StreamOperation.RESTORE_REPLICA_COUNT, StreamOperation.fromString("Restore replica count"));
+        assertEquals(StreamOperation.DECOMMISSION, StreamOperation.fromString("Unbootstrap"));
+        assertEquals(StreamOperation.RELOCATION, StreamOperation.fromString("Relocation"));
+        assertEquals(StreamOperation.BOOTSTRAP, StreamOperation.fromString("Bootstrap"));
+        assertEquals(StreamOperation.REBUILD, StreamOperation.fromString("Rebuild"));
+        assertEquals(StreamOperation.BULK_LOAD, StreamOperation.fromString("Bulk Load"));
+        assertEquals(StreamOperation.REPAIR, StreamOperation.fromString("Repair"));
+        // Test case insensivity
+        assertEquals(StreamOperation.REPAIR, StreamOperation.fromString("rEpair"));
+
+        // Test description
+        assertEquals("Repair", StreamOperation.REPAIR.getDescription());
+        assertEquals("Restore replica count", StreamOperation.RESTORE_REPLICA_COUNT.getDescription());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 682e039..ce8d2dd 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -121,7 +121,7 @@ public class StreamTransferTaskTest
     {
         InetAddress peer = FBUtilities.getBroadcastAddress();
         StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null, false, null);
-        StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+        StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), streamCoordinator);
         StreamSession session = new StreamSession(peer, peer, null, 0, true, false, null);
         session.init(future);
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 1fa71f5..36329f4 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -114,14 +114,14 @@ public class StreamingTransferTest
     @Test
     public void testEmptyStreamPlan() throws Exception
     {
-        StreamResultFuture futureResult = new StreamPlan("StreamingTransferTest").execute();
+        StreamResultFuture futureResult = new StreamPlan(StreamOperation.OTHER).execute();
         final UUID planId = futureResult.planId;
         Futures.addCallback(futureResult, new FutureCallback<StreamState>()
         {
             public void onSuccess(StreamState result)
             {
                 assert planId.equals(result.planId);
-                assert result.description.equals("StreamingTransferTest");
+                assert result.streamOperation == StreamOperation.OTHER;
                 assert result.sessions.isEmpty();
             }
 
@@ -143,14 +143,14 @@ public class StreamingTransferTest
         ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
         ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
 
-        StreamResultFuture futureResult = new StreamPlan("StreamingTransferTest")
+        StreamResultFuture futureResult = new StreamPlan(StreamOperation.OTHER)
                                                   .requestRanges(LOCAL, LOCAL, KEYSPACE2, ranges)
                                                   .execute();
 
         UUID planId = futureResult.planId;
         StreamState result = futureResult.get();
         assert planId.equals(result.planId);
-        assert result.description.equals("StreamingTransferTest");
+        assert result.streamOperation == StreamOperation.OTHER;
 
         // we should have completed session with empty transfer
         assert result.sessions.size() == 1;
@@ -238,7 +238,7 @@ public class StreamingTransferTest
         List<Range<Token>> ranges = new ArrayList<>();
         // wrapped range
         ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0"))));
-        StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getTableName());
+        StreamPlan streamPlan = new StreamPlan(StreamOperation.OTHER).transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getTableName());
         streamPlan.execute().get();
         verifyConnectionsAreClosed();
 
@@ -256,7 +256,7 @@ public class StreamingTransferTest
 
     private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception
     {
-        StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
+        StreamPlan streamPlan = new StreamPlan(StreamOperation.OTHER).transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
         streamPlan.execute().get();
         verifyConnectionsAreClosed();
 


Mime
View raw message