cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [3/3] add request tracing patch by David Alves; reviewed by jbellis for CASSANDRA-1123
Date Wed, 29 Aug 2012 18:18:20 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
new file mode 100644
index 0000000..7675d74
--- /dev/null
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -0,0 +1,256 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.tracing;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.ColumnNameBuilder;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ExpiringColumn;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.marshal.InetAddressType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A trace session context. Able to track and store trace sessions. A session is usually
a user initiated query, and may
+ * have multiple local and remote events before it is completed. All events and sessions
are stored at table.
+ */
+public class Tracing
+{
+    public static final String TRACE_KS = "system_traces";
+    public static final String EVENTS_CF = "events";
+    public static final String SESSIONS_CF = "sessions";
+    public static final String TRACE_HEADER = "TraceSession";
+
+    private static final int TTL = 24 * 3600;
+
+    private static Tracing instance = new Tracing();
+
+    public static final Logger logger = LoggerFactory.getLogger(Tracing.class);
+
+    /**
+     * Fetches and lazy initializes the trace context.
+     */
+    public static Tracing instance()
+    {
+        return instance;
+    }
+
+    private InetAddress localAddress = FBUtilities.getLocalAddress();
+
+    private final ThreadLocal<TraceState> state = new ThreadLocal<TraceState>();
+
+    public static void addColumn(ColumnFamily cf, ByteBuffer name, Object value)
+    {
+        cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value.toString()), System.currentTimeMillis(),
TTL));
+    }
+
+    public static void addColumn(ColumnFamily cf, ByteBuffer name, InetAddress address)
+    {
+        cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(address), System.currentTimeMillis(),
TTL));
+    }
+
+    public static void addColumn(ColumnFamily cf, ByteBuffer name, int value)
+    {
+        cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value), System.currentTimeMillis(),
TTL));
+    }
+
+    public static void addColumn(ColumnFamily cf, ByteBuffer name, long value)
+    {
+        cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value), System.currentTimeMillis(),
TTL));
+    }
+
+    public static void addColumn(ColumnFamily cf, ByteBuffer name, String value)
+    {
+        cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value), System.currentTimeMillis(),
TTL));
+    }
+
+    private void addColumn(ColumnFamily cf, ByteBuffer name, ByteBuffer value)
+    {
+        cf.addColumn(new ExpiringColumn(name, value, System.currentTimeMillis(), TTL));
+    }
+
+    public void addParameterColumns(ColumnFamily cf, Map<String, String> rawPayload)
+    {
+        for (Map.Entry<String, String> entry : rawPayload.entrySet())
+        {
+            cf.addColumn(new ExpiringColumn(buildName(cf.metadata(), bytes("parameters"),
bytes(entry.getKey())),
+                                            bytes(entry.getValue()), System.currentTimeMillis(),
TTL));
+        }
+    }
+
+    public static ByteBuffer buildName(CFMetaData meta, ByteBuffer... args)
+    {
+        ColumnNameBuilder builder = meta.getCfDef().getColumnNameBuilder();
+        for (ByteBuffer arg : args)
+            builder.add(arg);
+        return builder.build();
+    }
+
+    public UUID getSessionId()
+    {
+        assert isTracing();
+        return state.get().sessionId;
+    }
+
+    /**
+     * Indicates if the current thread's execution is being traced.
+     */
+    public static boolean isTracing()
+    {
+        return instance != null && instance.state.get() != null;
+    }
+
+    public void reset()
+    {
+        state.set(null);
+    }
+
+    public UUID newSession()
+    {
+        return newSession(TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())));
+    }
+
+    public UUID newSession(UUID sessionId)
+    {
+        assert state.get() == null;
+
+        TraceState ts = new TraceState(localAddress, sessionId);
+        state.set(ts);
+
+        return sessionId;
+    }
+
+    public void stopSession()
+    {
+        TraceState state = this.state.get();
+        if (state == null) // inline isTracing to avoid implicit two calls to state.get()
+        {
+            logger.debug("request complete");
+        }
+        else
+        {
+            final long finished_at = System.currentTimeMillis();
+            final ByteBuffer sessionIdBytes = state.sessionIdBytes;
+
+            StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
+            {
+                public void runMayThrow() throws TimedOutException, UnavailableException
+                {
+                    ColumnFamily cf = ColumnFamily.create(CFMetaData.TraceSessionsCf);
+                    addColumn(cf,
+                              buildName(CFMetaData.TraceSessionsCf, bytes("finished_at")),
+                              LongType.instance.decompose(finished_at));
+                    RowMutation mutation = new RowMutation(TRACE_KS, sessionIdBytes);
+                    mutation.add(cf);
+                    StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY);
+                }
+            });
+
+            reset();
+        }
+    }
+
+    public TraceState get()
+    {
+        return state.get();
+    }
+
+    public void set(final TraceState tls)
+    {
+        state.set(tls);
+    }
+
+    public void begin(final String request, final Map<String, String> parameters)
+    {
+        assert isTracing();
+
+        final long started_at = System.currentTimeMillis();
+        final ByteBuffer sessionIdBytes = state.get().sessionIdBytes;
+
+        StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
+        {
+            public void runMayThrow() throws TimedOutException, UnavailableException
+            {
+                ColumnFamily cf = ColumnFamily.create(CFMetaData.TraceSessionsCf);
+                addColumn(cf,
+                          buildName(CFMetaData.TraceSessionsCf, bytes("coordinator")),
+                          InetAddressType.instance.decompose(FBUtilities.getBroadcastAddress()));
+                addColumn(cf,
+                          buildName(CFMetaData.TraceSessionsCf, bytes("request")),
+                          UTF8Type.instance.decompose(request));
+                addColumn(cf,
+                          buildName(CFMetaData.TraceSessionsCf, bytes("started_at")),
+                          LongType.instance.decompose(started_at));
+                addParameterColumns(cf, parameters);
+                RowMutation mutation = new RowMutation(TRACE_KS, sessionIdBytes);
+                mutation.add(cf);
+                StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY);
+            }
+        });
+    }
+
+    /**
+     * Updates the threads query context from a message
+     * 
+     * @param message
+     *            The internode message
+     */
+    public void initializeFromMessage(final MessageIn<?> message)
+    {
+        final byte[] sessionBytes = message.parameters.get(Tracing.TRACE_HEADER);
+
+        // if the message has no session context header don't do tracing
+        if (sessionBytes == null)
+        {
+            state.set(null);
+            return;
+        }
+
+        checkState(sessionBytes.length == 16);
+        state.set(new TraceState(message.from, UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes))));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/tracing/TracingAppender.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TracingAppender.java b/src/java/org/apache/cassandra/tracing/TracingAppender.java
new file mode 100644
index 0000000..25b7056
--- /dev/null
+++ b/src/java/org/apache/cassandra/tracing/TracingAppender.java
@@ -0,0 +1,65 @@
+package org.apache.cassandra.tracing;
+
+import static org.apache.cassandra.tracing.Tracing.*;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class TracingAppender extends AppenderSkeleton
+{
+    protected void append(final LoggingEvent event)
+    {
+        if (Tracing.instance() == null) // instance might not be built at the time this is
called
+            return;
+        
+        final TraceState state = Tracing.instance().get();
+        if (state == null) // inline isTracing to avoid implicit two calls to state.get()
+            return;
+
+        final int elapsed = state.elapsed();
+        StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
+        {
+            public void runMayThrow() throws TimedOutException, UnavailableException
+            {
+                ByteBuffer eventId = ByteBufferUtil.bytes(UUIDGen.makeType1UUIDFromHost(FBUtilities
+                        .getBroadcastAddress()));
+                CFMetaData cfMeta = CFMetaData.TraceEventsCf;
+                ColumnFamily cf = ColumnFamily.create(cfMeta);
+                addColumn(cf, buildName(cfMeta, eventId, bytes("source")), FBUtilities.getBroadcastAddress());
+                addColumn(cf, buildName(cfMeta, eventId, bytes("thread")), event.getThreadName());
+                addColumn(cf, buildName(cfMeta, eventId, bytes("happened_at")), event.getTimeStamp());
+                addColumn(cf, buildName(cfMeta, eventId, bytes("source_elapsed")), elapsed);
+                addColumn(cf, buildName(cfMeta, eventId, bytes("activity")), event.getMessage());
+                RowMutation mutation = new RowMutation(Tracing.TRACE_KS, state.sessionIdBytes);
+                mutation.add(cf);
+                StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY);
+            }
+        });
+    }
+
+    public void close()
+    {
+    }
+
+    public boolean requiresLayout()
+    {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index f0b36bf..2b5da97 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -24,10 +24,12 @@ package org.apache.cassandra.utils;
  */
 
 import java.io.*;
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
 import java.util.Arrays;
+import java.util.UUID;
 
 import static com.google.common.base.Charsets.UTF_8;
 
@@ -523,4 +525,14 @@ public class ByteBufferUtil
         }
         return 0;
     }
+
+    public static ByteBuffer bytes(InetAddress address)
+    {
+        return ByteBuffer.wrap(address.getAddress());
+    }
+
+    public static ByteBuffer bytes(UUID uuid)
+    {
+        return ByteBuffer.wrap(UUIDGen.decompose(uuid));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
index 196d731..4a93775 100644
--- a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
+++ b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.HashMap;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -129,8 +128,8 @@ public class CFMetaDataTest extends SchemaLoader
 
         // Test schema conversion
         RowMutation rm = cfm.toSchema(System.currentTimeMillis());
-        ColumnFamily serializedCf = rm.getColumnFamily(Schema.instance.getId(Table.SYSTEM_TABLE,
SystemTable.SCHEMA_COLUMNFAMILIES_CF));
-        ColumnFamily serializedCD = rm.getColumnFamily(Schema.instance.getId(Table.SYSTEM_TABLE,
SystemTable.SCHEMA_COLUMNS_CF));
+        ColumnFamily serializedCf = rm.getColumnFamily(Schema.instance.getId(Table.SYSTEM_KS,
SystemTable.SCHEMA_COLUMNFAMILIES_CF));
+        ColumnFamily serializedCD = rm.getColumnFamily(Schema.instance.getId(Table.SYSTEM_KS,
SystemTable.SCHEMA_COLUMNS_CF));
         UntypedResultSet.Row result = QueryProcessor.resultify("SELECT * FROM system.schema_columnfamilies",
new Row(k, serializedCf)).one();
         CFMetaData newCfm = CFMetaData.addColumnDefinitionSchema(CFMetaData.fromSchemaNoColumns(result),
new Row(k, serializedCD));
         assert cfm.equals(newCfm) : String.format("\n%s\n!=\n%s", cfm, newCfm);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/test/unit/org/apache/cassandra/config/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DefsTest.java b/test/unit/org/apache/cassandra/config/DefsTest.java
index cca87fb..b01ffc4 100644
--- a/test/unit/org/apache/cassandra/config/DefsTest.java
+++ b/test/unit/org/apache/cassandra/config/DefsTest.java
@@ -49,8 +49,8 @@ public class DefsTest extends SchemaLoader
     @Test
     public void ensureStaticCFMIdsAreLessThan1000()
     {
-        assert CFMetaData.OldStatusCf.cfId.equals(CFMetaData.getId(Table.SYSTEM_TABLE, SystemTable.OLD_STATUS_CF));
-        assert CFMetaData.OldHintsCf.cfId.equals(CFMetaData.getId(Table.SYSTEM_TABLE, SystemTable.OLD_HINTS_CF));
+        assert CFMetaData.OldStatusCf.cfId.equals(CFMetaData.getId(Table.SYSTEM_KS, SystemTable.OLD_STATUS_CF));
+        assert CFMetaData.OldHintsCf.cfId.equals(CFMetaData.getId(Table.SYSTEM_KS, SystemTable.OLD_HINTS_CF));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
index 0bcaff4..e43124d 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
@@ -27,6 +27,7 @@ import java.util.List;
 import org.junit.Test;
 
 import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -72,7 +73,7 @@ public class StorageServiceServerTest
     public void testColumnFamilySnapshot() throws IOException
     {
         // no need to insert extra data, even an "empty" database will have a little information
in the system keyspace
-        StorageService.instance.takeColumnFamilySnapshot("system", "Schema", "cf_snapshot");
+        StorageService.instance.takeColumnFamilySnapshot(Table.SYSTEM_KS, "Schema", "cf_snapshot");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/tools/stress/src/org/apache/cassandra/stress/Session.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java b/tools/stress/src/org/apache/cassandra/stress/Session.java
index dbe1951..5763f37 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Session.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Session.java
@@ -117,6 +117,7 @@ public class Session implements Serializable
     private boolean ignoreErrors  = false;
     private boolean enable_cql    = false;
     private boolean use_prepared  = false;
+    private boolean trace         = false;
 
     private final String outFileName;
 
@@ -139,6 +140,7 @@ public class Session implements Serializable
     public final InetAddress sendToDaemon;
     public final String comparator;
     public final boolean timeUUIDComparator;
+    public double traceProbability = 0.0;
 
     public Session(String[] arguments) throws IllegalArgumentException
     {


Mime
View raw message