phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sama...@apache.org
Subject phoenix git commit: PHOENIX-1596 Turning tracing on causes region servers to crash
Date Fri, 30 Jan 2015 23:18:09 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.2 93aa244b8 -> 907f9c25c


PHOENIX-1596 Turning tracing on causes region servers to crash


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/907f9c25
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/907f9c25
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/907f9c25

Branch: refs/heads/4.2
Commit: 907f9c25c0a4aec71c5c0cc767b0c4b21f9faee0
Parents: 93aa244
Author: Samarth <samarth.jain@salesforce.com>
Authored: Fri Jan 30 15:17:45 2015 -0800
Committer: Samarth <samarth.jain@salesforce.com>
Committed: Fri Jan 30 15:17:45 2015 -0800

----------------------------------------------------------------------
 .../coprocessor/BaseScannerRegionObserver.java  |  30 ++-
 .../org/apache/phoenix/hbase/index/Indexer.java | 225 +++++++++----------
 .../org/apache/phoenix/trace/util/Tracing.java  |  67 +-----
 3 files changed, 133 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/907f9c25/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 68fa3d1..8fc7534 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.cloudera.htrace.Span;
+import org.cloudera.htrace.Trace;
 
 
 abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@@ -138,24 +139,39 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver
{
                 return s;
             }
             boolean success =false;
-            // turn on tracing, if its enabled
-            final Span child = Tracing.childOnServer(scan, rawConf, SCANNER_OPENED_TRACE_INFO);
+            // Save the current span. When done with the child span, reset the span back
to
+            // what it was. Otherwise, this causes the thread local storing the current span

+            // to not be reset back to null causing catastrophic infinite loops
+            // and region servers to crash. See https://issues.apache.org/jira/browse/PHOENIX-1596
+            // TraceScope can't be used here because closing the scope will end up calling

+            // currentSpan.stop() and that should happen only when we are closing the scanner.
+            final Span savedSpan = Trace.currentSpan();
+            final Span child = Trace.startSpan(SCANNER_OPENED_TRACE_INFO, savedSpan).getSpan();
             try {
                 RegionScanner scanner = doPostScannerOpen(c, scan, s);
                 scanner = new DelegateRegionScanner(scanner) {
+                    // This isn't very obvious but close() could be called in a thread
+                    // that is different from the thread that created the scanner.
                     @Override
                     public void close() throws IOException {
-                        if (child != null) {
-                            child.stop();
+                        try {
+                            delegate.close();
+                        } finally {
+                            if (child != null) {
+                                child.stop();
+                            }
                         }
-                        delegate.close();
                     }
                 };
                 success = true;
                 return scanner;
             } finally {
-                if (!success && child != null) {
-                    child.stop();
+                try {
+                    if (!success && child != null) {
+                        child.stop();
+                    }
+                } finally {
+                    Trace.continueSpan(savedSpan);
                 }
             }
         } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907f9c25/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index b841410..a4fc96b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -210,79 +210,76 @@ public class Indexer extends BaseRegionObserver {
   }
 
   public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment>
c,
-      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
+          MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
 
-    // first group all the updates for a single row into a single update to be processed
-    Map<ImmutableBytesPtr, MultiMutation> mutations =
-        new HashMap<ImmutableBytesPtr, MultiMutation>();
+      // first group all the updates for a single row into a single update to be processed
+      Map<ImmutableBytesPtr, MultiMutation> mutations =
+              new HashMap<ImmutableBytesPtr, MultiMutation>();
 
-    Durability defaultDurability = Durability.SYNC_WAL;
-    if(c.getEnvironment().getRegion() != null) {
-    	defaultDurability = c.getEnvironment().getRegion().getTableDesc().getDurability();
-    	defaultDurability = (defaultDurability == Durability.USE_DEFAULT) ? 
-    			Durability.SYNC_WAL : defaultDurability;
-    }
-    Durability durability = Durability.SKIP_WAL;
-    for (int i = 0; i < miniBatchOp.size(); i++) {
-      Mutation m = miniBatchOp.getOperation(i);
-      // skip this mutation if we aren't enabling indexing
-      // unfortunately, we really should ask if the raw mutation (rather than the combined
mutation)
-      // should be indexed, which means we need to expose another method on the builder.
Such is the
-      // way optimization go though.
-      if (!this.builder.isEnabled(m)) {
-        continue;
-      }
-      
-      Durability effectiveDurablity = (m.getDurability() == Durability.USE_DEFAULT) ? 
-    		  defaultDurability : m.getDurability();
-      if (effectiveDurablity.ordinal() > durability.ordinal()) {
-        durability = effectiveDurablity;
+      Durability defaultDurability = Durability.SYNC_WAL;
+      if(c.getEnvironment().getRegion() != null) {
+          defaultDurability = c.getEnvironment().getRegion().getTableDesc().getDurability();
+          defaultDurability = (defaultDurability == Durability.USE_DEFAULT) ? 
+                  Durability.SYNC_WAL : defaultDurability;
       }
+      Durability durability = Durability.SKIP_WAL;
+      for (int i = 0; i < miniBatchOp.size(); i++) {
+          Mutation m = miniBatchOp.getOperation(i);
+          // skip this mutation if we aren't enabling indexing
+          // unfortunately, we really should ask if the raw mutation (rather than the combined
mutation)
+          // should be indexed, which means we need to expose another method on the builder.
Such is the
+          // way optimization go though.
+          if (!this.builder.isEnabled(m)) {
+              continue;
+          }
+
+          Durability effectiveDurablity = (m.getDurability() == Durability.USE_DEFAULT) ?

+                  defaultDurability : m.getDurability();
+          if (effectiveDurablity.ordinal() > durability.ordinal()) {
+              durability = effectiveDurablity;
+          }
 
-      // add the mutation to the batch set
-      ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-      MultiMutation stored = mutations.get(row);
-      // we haven't seen this row before, so add it
-      if (stored == null) {
-        stored = new MultiMutation(row);
-        mutations.put(row, stored);
+          // add the mutation to the batch set
+          ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+          MultiMutation stored = mutations.get(row);
+          // we haven't seen this row before, so add it
+          if (stored == null) {
+              stored = new MultiMutation(row);
+              mutations.put(row, stored);
+          }
+          stored.addAll(m);
       }
-      stored.addAll(m);
-    }
-    
-    // early exit if it turns out we don't have any edits
-    if (mutations.entrySet().size() == 0) {
-      return;
-    }
 
-    // dump all the index updates into a single WAL. They will get combined in the end anyways,
so
-    // don't worry which one we get
-    WALEdit edit = miniBatchOp.getWalEdit(0);
-    if (edit == null) {
-        edit = new WALEdit();
-        miniBatchOp.setWalEdit(0, edit);
-    }
+      // early exit if it turns out we don't have any edits
+      if (mutations.entrySet().size() == 0) {
+          return;
+      }
 
-        // get the current span, or just use a null-span to avoid a bunch of if statements
-    TraceScope scope = Trace.startSpan("Starting to build index updates");
-        Span current = scope.getSpan();
-        if (current == null) {
-            current = NullSpan.INSTANCE;
-        }
+      // dump all the index updates into a single WAL. They will get combined in the end
anyways, so
+      // don't worry which one we get
+      WALEdit edit = miniBatchOp.getWalEdit(0);
+      if (edit == null) {
+          edit = new WALEdit();
+          miniBatchOp.setWalEdit(0, edit);
+      }
 
-    // get the index updates for all elements in this batch
-    Collection<Pair<Mutation, byte[]>> indexUpdates =
-        this.builder.getIndexUpdate(miniBatchOp, mutations.values());
+      // get the current span, or just use a null-span to avoid a bunch of if statements
+      try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
+          Span current = scope.getSpan();
+          if (current == null) {
+              current = NullSpan.INSTANCE;
+          }
 
-        current.addTimelineAnnotation("Built index updates, doing preStep");
-        TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
+          // get the index updates for all elements in this batch
+          Collection<Pair<Mutation, byte[]>> indexUpdates =
+                  this.builder.getIndexUpdate(miniBatchOp, mutations.values());
 
-    // write them, either to WAL or the index tables
-    doPre(indexUpdates, edit, durability);
+          current.addTimelineAnnotation("Built index updates, doing preStep");
+          TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
 
-        // close the span
-        current.stop();
-        scope.close();
+          // write them, either to WAL or the index tables
+          doPre(indexUpdates, edit, durability);
+      }
   }
 
   private class MultiMutation extends Mutation {
@@ -416,65 +413,59 @@ public class Indexer extends BaseRegionObserver {
   }
 
   private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability)
-      throws Exception {
-    //short circuit, if we don't need to do any work
-    if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m)) {
-      // already did the index update in prePut, so we are done
-      return;
-    }
-
-        // get the current span, or just use a null-span to avoid a bunch of if statements
-    TraceScope scope = Trace.startSpan("Completing index writes");
-        Span current = scope.getSpan();
-        if (current == null) {
-            current = NullSpan.INSTANCE;
-        }
-
-    // there is a little bit of excess here- we iterate all the non-indexed kvs for this
check first
-    // and then do it again later when getting out the index updates. This should be pretty
minor
-    // though, compared to the rest of the runtime
-    IndexedKeyValue ikv = getFirstIndexedKeyValue(edit);
-
-    /*
-     * early exit - we have nothing to write, so we don't need to do anything else. NOTE:
we don't
-     * release the WAL Rolling lock (INDEX_UPDATE_LOCK) since we never take it in doPre if
there are
-     * no index updates.
-     */
-    if (ikv == null) {
-            current.stop();
-            scope.close();
-      return;
-    }
-
-    /*
-     * only write the update if we haven't already seen this batch. We only want to write
the batch
-     * once (this hook gets called with the same WALEdit for each Put/Delete in a batch,
which can
-     * lead to writing all the index updates for each Put/Delete).
-     */
-    if (!ikv.getBatchFinished()) {
-      Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(edit);
-
-      // the WAL edit is kept in memory and we already specified the factory when we created
the
-      // references originally - therefore, we just pass in a null factory here and use the
ones
-      // already specified on each reference
-      try {
-                current.addTimelineAnnotation("Actually doing index update for first time");
-          writer.writeAndKillYourselfOnFailure(indexUpdates);
-      } finally {
-        // With a custom kill policy, we may throw instead of kill the server.
-        // Without doing this in a finally block (at least with the mini cluster),
-        // the region server never goes down.
+          throws Exception {
+      //short circuit, if we don't need to do any work
+      if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m)) {
+          // already did the index update in prePut, so we are done
+          return;
+      }
 
-        // mark the batch as having been written. In the single-update case, this never gets
check
-        // again, but in the batch case, we will check it again (see above).
-        ikv.markBatchFinished();
+      // get the current span, or just use a null-span to avoid a bunch of if statements
+      try (TraceScope scope = Trace.startSpan("Completing index writes")) {
+          Span current = scope.getSpan();
+          if (current == null) {
+              current = NullSpan.INSTANCE;
+          }
 
-                // finish the span
+          // there is a little bit of excess here- we iterate all the non-indexed kvs for
this check first
+          // and then do it again later when getting out the index updates. This should be
pretty minor
+          // though, compared to the rest of the runtime
+          IndexedKeyValue ikv = getFirstIndexedKeyValue(edit);
+
+          /*
+           * early exit - we have nothing to write, so we don't need to do anything else.
NOTE: we don't
+           * release the WAL Rolling lock (INDEX_UPDATE_LOCK) since we never take it in doPre
if there are
+           * no index updates.
+           */
+          if (ikv == null) {
+              return;
+          }
 
-                current.stop();
-                scope.close();
+          /*
+           * only write the update if we haven't already seen this batch. We only want to
write the batch
+           * once (this hook gets called with the same WALEdit for each Put/Delete in a batch,
which can
+           * lead to writing all the index updates for each Put/Delete).
+           */
+          if (!ikv.getBatchFinished()) {
+              Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(edit);
+
+              // the WAL edit is kept in memory and we already specified the factory when
we created the
+              // references originally - therefore, we just pass in a null factory here and
use the ones
+              // already specified on each reference
+              try {
+                  current.addTimelineAnnotation("Actually doing index update for first time");
+                  writer.writeAndKillYourselfOnFailure(indexUpdates);
+              } finally {
+                  // With a custom kill policy, we may throw instead of kill the server.
+                  // Without doing this in a finally block (at least with the mini cluster),
+                  // the region server never goes down.
+
+                  // mark the batch as having been written. In the single-update case, this
never gets check
+                  // again, but in the batch case, we will check it again (see above).
+                  ikv.markBatchFinished();
+              }
+          }
       }
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/907f9c25/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
index b093b9c..7e1df72 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
@@ -28,9 +28,6 @@ import javax.annotation.Nullable;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.OperationWithAttributes;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.call.CallRunner;
 import org.apache.phoenix.call.CallWrapper;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -40,7 +37,6 @@ import org.apache.phoenix.trace.TraceMetricSource;
 import org.cloudera.htrace.Sampler;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.Trace;
-import org.cloudera.htrace.TraceInfo;
 import org.cloudera.htrace.TraceScope;
 import org.cloudera.htrace.Tracer;
 import org.cloudera.htrace.impl.ProbabilitySampler;
@@ -62,15 +58,10 @@ public class Tracing {
     // Constants for tracing across the wire
     public static final String TRACE_ID_ATTRIBUTE_KEY = "phoenix.trace.traceid";
     public static final String SPAN_ID_ATTRIBUTE_KEY = "phoenix.trace.spanid";
-    private static final String START_SPAN_MESSAGE = "Span received on server. Starting child";
-
+    
     // Constants for passing into the metrics system
     private static final String TRACE_METRIC_PREFIX = "phoenix.trace.instance";
-    /**
-     * We always trace on the server, assuming the client has requested tracing on the request
-     */
-    private static Sampler<?> SERVER_TRACE_LEVEL = Sampler.ALWAYS;
-
+    
     /**
      * Manage the types of frequencies that we support. By default, we never turn on tracing.
      */
@@ -169,60 +160,6 @@ public class Tracing {
                 + SEPARATOR + span.getSpanId();
     }
 
-    /**
-     * Check to see if tracing is current enabled. The trace for this thread is returned,
if we are
-     * already tracing. Otherwise, checks to see if mutation has tracing enabled, and if
so, starts
-     * a new span with the {@link Mutation}'s specified span as its parent.
-     * <p>
-     * This should only be run on the server-side as we base tracing on if we are currently
tracing
-     * (started higher in the call-stack) or if the {@link Mutation} has the tracing attributes
-     * defined. As such, we would expect to continue the trace on the server-side based on
the
-     * original sampling parameters.
-     * @param scan {@link Mutation} to check
-     * @param conf {@link Configuration} to read for the current sampler
-     * @param description description of the child span to start
-     * @return <tt>null</tt> if tracing is not enabled, or the parent {@link
Span}
-     */
-    public static Span childOnServer(OperationWithAttributes scan, Configuration conf,
-            String description) {
-        // check to see if we are currently tracing. Generally, this will only work when
we go to
-        // 0.96. CPs should always be setting up and tearing down their own tracing
-        Span current = Trace.currentSpan();
-        if (current == null) {
-            // its not tracing yet, but maybe it should be.
-            current = enable(scan, conf, description);
-        } else {
-            current = Trace.startSpan(description, current).getSpan();
-        }
-        return current;
-    }
-
-    /**
-     * Check to see if this mutation has tracing enabled, and if so, get a new span with
the
-     * {@link Mutation}'s specified span as its parent.
-     * @param map mutation to check
-     * @param conf {@link Configuration} to check for the {@link Sampler} configuration,
if we are
-     *            tracing
-     * @param description on the child to start
-     * @return a child span of the mutation, or <tt>null</tt> if tracing is not
enabled.
-     */
-    @SuppressWarnings("unchecked")
-    private static Span enable(OperationWithAttributes map, Configuration conf, String description)
{
-        byte[] traceid = map.getAttribute(TRACE_ID_ATTRIBUTE_KEY);
-        if (traceid == null) {
-            return NullSpan.INSTANCE;
-        }
-        byte[] spanid = map.getAttribute(SPAN_ID_ATTRIBUTE_KEY);
-        if (spanid == null) {
-            LOG.error("TraceID set to " + Bytes.toLong(traceid) + ", but span id was not
set!");
-            return NullSpan.INSTANCE;
-        }
-        Sampler<?> sampler = SERVER_TRACE_LEVEL;
-        TraceInfo parent = new TraceInfo(Bytes.toLong(traceid), Bytes.toLong(spanid));
-        return Trace.startSpan(START_SPAN_MESSAGE + ": " + description,
-            (Sampler<TraceInfo>) sampler, parent).getSpan();
-    }
-
     public static Span child(Span s, String d) {
         if (s == null) {
             return NullSpan.INSTANCE;


Mime
View raw message