cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ifesdj...@apache.org
Subject [1/3] cassandra git commit: Introduce in-jvm distributed tests
Date Fri, 16 Nov 2018 18:42:45 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 787703508 -> f22fec927


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/MessageFilters.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/MessageFilters.java b/test/distributed/org/apache/cassandra/distributed/MessageFilters.java
new file mode 100644
index 0000000..f488e08
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/MessageFilters.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.function.BiConsumer;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+
+public class MessageFilters
+{
+    private final TestCluster cluster;
+    private final Set<Filter> filters = new CopyOnWriteArraySet<>();
+
+    public MessageFilters(TestCluster cluster)
+    {
+        this.cluster = cluster;
+    }
+
+    BiConsumer<InetAddressAndPort, Message> filter(BiConsumer<InetAddressAndPort,
Message> applyIfNotFiltered)
+    {
+        return (toAddress, message) ->
+        {
+            int from = cluster.get(message.from).config.num;
+            int to = cluster.get(toAddress).config.num;
+            int verb = message.verb;
+            for (Filter filter : filters)
+            {
+                if (filter.matches(from, to, verb))
+                    return;
+            }
+
+            applyIfNotFiltered.accept(toAddress, message);
+        };
+    }
+
+    public class Filter
+    {
+        final int[] from;
+        final int[] to;
+        final int[] verbs;
+
+        Filter(int[] from, int[] to, int[] verbs)
+        {
+            if (from != null)
+            {
+                from = from.clone();
+                Arrays.sort(from);
+            }
+            if (to != null)
+            {
+                to = to.clone();
+                Arrays.sort(to);
+            }
+            if (verbs != null)
+            {
+                verbs = verbs.clone();
+                Arrays.sort(verbs);
+            }
+            this.from = from;
+            this.to = to;
+            this.verbs = verbs;
+        }
+
+        public int hashCode()
+        {
+            return (from == null ? 0 : Arrays.hashCode(from))
+                    + (to == null ? 0 : Arrays.hashCode(to))
+                    + (verbs == null ? 0 : Arrays.hashCode(verbs));
+        }
+
+        public boolean equals(Object that)
+        {
+            return that instanceof Filter && equals((Filter) that);
+        }
+
+        public boolean equals(Filter that)
+        {
+            return Arrays.equals(from, that.from)
+                    && Arrays.equals(to, that.to)
+                    && Arrays.equals(verbs, that.verbs);
+        }
+
+        public boolean matches(int from, int to, int verb)
+        {
+            return (this.from == null || Arrays.binarySearch(this.from, from) >= 0)
+                    && (this.to == null || Arrays.binarySearch(this.to, to) >=
0)
+                    && (this.verbs == null || Arrays.binarySearch(this.verbs, verb)
>= 0);
+        }
+
+        public Filter restore()
+        {
+            filters.remove(this);
+            return this;
+        }
+
+        public Filter drop()
+        {
+            filters.add(this);
+            return this;
+        }
+    }
+
+    public class Builder
+    {
+        int[] from;
+        int[] to;
+        int[] verbs;
+
+        private Builder(int[] verbs)
+        {
+            this.verbs = verbs;
+        }
+
+        public Builder from(int ... nums)
+        {
+            from = nums;
+            return this;
+        }
+
+        public Builder to(int ... nums)
+        {
+            to = nums;
+            return this;
+        }
+
+        public Filter ready()
+        {
+            return new Filter(from, to, verbs);
+        }
+
+        public Filter drop()
+        {
+            return ready().drop();
+        }
+    }
+
+    public Builder verbs(MessagingService.Verb ... verbs)
+    {
+        int[] ids = new int[verbs.length];
+        for (int i = 0 ; i < verbs.length ; ++i)
+            ids[i] = verbs[i].getId();
+        return new Builder(ids);
+    }
+
+    public Builder allVerbs()
+    {
+        return new Builder(null);
+    }
+
+    public void reset()
+    {
+        filters.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/RowUtil.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/RowUtil.java b/test/distributed/org/apache/cassandra/distributed/RowUtil.java
new file mode 100644
index 0000000..bce896d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/RowUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+public class RowUtil
+{
+    public static Object[][] toObjects(ResultMessage.Rows rows)
+    {
+        Object[][] result = new Object[rows.result.rows.size()][];
+        List<ColumnSpecification> specs = rows.result.metadata.names;
+        for (int i = 0; i < rows.result.rows.size(); i++)
+        {
+            List<ByteBuffer> row = rows.result.rows.get(i);
+            result[i] = new Object[row.size()];
+            for (int j = 0; j < row.size(); j++)
+            {
+                ByteBuffer bb = row.get(j);
+
+                if (bb != null)
+                    result[i][j] = specs.get(j).type.getSerializer().deserialize(bb);
+            }
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/TestCluster.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/TestCluster.java b/test/distributed/org/apache/cassandra/distributed/TestCluster.java
new file mode 100644
index 0000000..2b979ee
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/TestCluster.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.diag.DiagnosticEventService;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.SchemaEvent;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+
+/**
+ * TestCluster creates, initializes and manages Cassandra instances ({@link Instance}.
+ *
+ * All instances created under the same cluster will have a shared ClassLoader that'll preload
+ * common classes required for configuration and communication (byte buffers, primitives,
config
+ * objects etc). Shared classes are listed in {@link InstanceClassLoader#commonClasses}.
+ *
+ * Each instance has its own class loader that will load logging, yaml libraries and all
non-shared
+ * Cassandra package classes. The rule of thumb is that we'd like to have all Cassandra-specific
things
+ * (unless explitily shared through the common classloader) on a per-classloader basis in
order to
+ * allow creating more than one instance of DatabaseDescriptor and other Cassandra singletones.
+ *
+ * All actions (reading, writing, schema changes, etc) are executed by serializing lambda/runnables,
+ * transferring them to instance-specific classloaders, deserializing and running them there.
Most of
+ * the things can be simply captured in closure or passed through `apply` method of the wrapped
serializable
+ * function/callable. You can use {@link InvokableInstance#{applies|runs|consumes}OnInstance}
for executing
+ * code on specific instance.
+ *
+ * Each instance has its own logger. Each instance log line will contain INSTANCE{instance_id}.
+ *
+ * As of today, messaging is faked by hooking into MessagingService, so we're not using usual
Cassandra
+ * handlers for internode to have more control over it. Messaging is wired by passing verbs
manually.
+ * coordinator-handling code and hooks to the callbacks can be found in {@link Coordinator}.
+ */
+public class TestCluster implements AutoCloseable
+{
+    private static ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("cluster-async-tasks"));
+
+    private final File root;
+    private final List<Instance> instances;
+    private final Coordinator coordinator;
+    private final Map<InetAddressAndPort, Instance> instanceMap;
+    private final MessageFilters filters;
+
+    private TestCluster(File root, List<Instance> instances)
+    {
+        this.root = root;
+        this.instances = instances;
+        this.instanceMap = new HashMap<>();
+        this.coordinator = new Coordinator(instances.get(0));
+        this.filters = new MessageFilters(this);
+    }
+
+    void launch()
+    {
+        FBUtilities.waitOnFutures(instances.stream()
+                .map(i -> exec.submit(() -> i.launch(this)))
+                .collect(Collectors.toList())
+        );
+        for (Instance instance : instances)
+            instanceMap.put(instance.getBroadcastAddress(), instance);
+    }
+
+    public int size()
+    {
+        return instances.size();
+    }
+
+    public Coordinator coordinator()
+    {
+        return coordinator;
+    }
+
+    /**
+     * WARNING: we index from 1 here, for consistency with inet address!
+     */
+    public Instance get(int idx)
+    {
+        return instances.get(idx - 1);
+    }
+
+    public Instance get(InetAddressAndPort addr)
+    {
+        return instanceMap.get(addr);
+    }
+
+    MessageFilters filters()
+    {
+        return filters;
+    }
+
+    MessageFilters.Builder verbs(MessagingService.Verb ... verbs)
+    {
+        return filters.verbs(verbs);
+    }
+
+    public void disableAutoCompaction(String keyspace)
+    {
+        for (Instance instance : instances)
+        {
+            instance.runOnInstance(() -> {
+                for (ColumnFamilyStore cs : Keyspace.open(keyspace).getColumnFamilyStores())
+                    cs.disableAutoCompaction();
+            });
+        }
+    }
+
+    public void schemaChange(String query)
+    {
+        try (SchemaChangeMonitor monitor = new SchemaChangeMonitor())
+        {
+            // execute the schema change
+            coordinator().execute(query, ConsistencyLevel.ALL);
+            monitor.waitForAgreement();
+        }
+    }
+
+    /**
+     * Will wait for a schema change AND agreement that occurs after it is created
+     * (and precedes the invocation to waitForAgreement)
+     *
+     * Works by simply checking if all UUIDs agree after any schema version change event,
+     * so long as the waitForAgreement method has been entered (indicating the change has
+     * taken place on the coordinator)
+     *
+     * This could perhaps be made a little more robust, but this should more than suffice.
+     */
+    public class SchemaChangeMonitor implements AutoCloseable
+    {
+        final List<Runnable> cleanup;
+        volatile boolean schemaHasChanged;
+        final SimpleCondition agreement = new SimpleCondition();
+
+        public SchemaChangeMonitor()
+        {
+            this.cleanup = new ArrayList<>(instances.size());
+            for (Instance instance : instances)
+            {
+                cleanup.add(
+                        instance.appliesOnInstance(
+                                (Runnable runnable) -> {
+                                    Consumer<SchemaEvent> consumer = event -> runnable.run();
+                                    DiagnosticEventService.instance().subscribe(SchemaEvent.class,
SchemaEvent.SchemaEventType.VERSION_UPDATED, consumer);
+                                    return (Runnable) () -> DiagnosticEventService.instance().unsubscribe(SchemaEvent.class,
consumer);
+                                }
+                        ).apply(this::signal)
+                );
+            }
+        }
+
+        private void signal()
+        {
+            if (schemaHasChanged && 1 == instances.stream().map(Instance::getSchemaVersion).distinct().count())
+                agreement.signalAll();
+        }
+
+        @Override
+        public void close()
+        {
+            for (Runnable runnable : cleanup)
+                runnable.run();
+        }
+
+        public void waitForAgreement()
+        {
+            schemaHasChanged = true;
+            signal();
+            try
+            {
+                agreement.await(1L, TimeUnit.MINUTES);
+            } catch (InterruptedException e)
+            {
+                throw new IllegalStateException("Schema agreement not reached");
+            }
+        }
+    }
+
+    public void schemaChange(String statement, int instance)
+    {
+        get(instance).schemaChange(statement);
+    }
+
+    public static TestCluster create(int nodeCount) throws Throwable
+    {
+        return create(nodeCount, Files.createTempDirectory("dtests").toFile());
+    }
+
+    public static TestCluster create(int nodeCount, File root)
+    {
+        root.mkdirs();
+        setupLogging(root);
+
+        IntFunction<ClassLoader> classLoaderFactory = InstanceClassLoader.createFactory(
+                (URLClassLoader) Thread.currentThread().getContextClassLoader());
+        List<Instance> instances = new ArrayList<>();
+        long token = Long.MIN_VALUE + 1, increment = 2 * (Long.MAX_VALUE / nodeCount);
+        for (int i = 0 ; i < nodeCount ; ++i)
+        {
+            InstanceConfig instanceConfig = InstanceConfig.generate(i + 1, root, String.valueOf(token));
+            instances.add(new Instance(instanceConfig, classLoaderFactory.apply(i + 1)));
+            token += increment;
+        }
+
+        TestCluster cluster = new TestCluster(root, instances);
+        cluster.launch();
+        return cluster;
+    }
+
+    private static void setupLogging(File root)
+    {
+        try
+        {
+            String testConfPath = "test/conf/logback-dtest.xml";
+            Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");
+            if (!logConfPath.toFile().exists())
+            {
+                Files.copy(new File(testConfPath).toPath(),
+                           logConfPath);
+            }
+            System.setProperty("logback.configurationFile", "file://" + logConfPath);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        List<Future<?>> futures = instances.stream()
+                .map(i -> exec.submit(i::shutdown))
+                .collect(Collectors.toList());
+
+//        withThreadLeakCheck(futures);
+
+        // Make sure to only delete directory when threads are stopped
+        exec.submit(() -> {
+            FBUtilities.waitOnFutures(futures);
+            FileUtils.deleteRecursive(root);
+        });
+    }
+
+    // We do not want this check to run every time until we fix problems with tread stops
+    private void withThreadLeakCheck(List<Future<?>> futures)
+    {
+        FBUtilities.waitOnFutures(futures);
+
+        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
+        threadSet = Sets.difference(threadSet, Collections.singletonMap(Thread.currentThread(),
null).keySet());
+        if (!threadSet.isEmpty())
+        {
+            for (Thread thread : threadSet)
+            {
+                System.out.println(thread);
+                System.out.println(Arrays.toString(thread.getStackTrace()));
+            }
+            throw new RuntimeException(String.format("Not all threads have shut down. %d
threads are still running: %s", threadSet.size(), threadSet));
+        }
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
index 4607d5c..18d17e8 100644
--- a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
+++ b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
@@ -78,7 +78,7 @@ public class NettyFactoryTest
     }
 
     @After
-    public void tearDown()
+    public void tearDown() throws Exception
     {
         if (factory != null)
             factory.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
index fff7b17..e274f27 100644
--- a/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
+++ b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
@@ -127,9 +127,5 @@ public class StreamCompressionSerializerTest
         {
             return true;
         }
-
-        @Override
-        public void close()
-        {   }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message