accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [3/3] accumulo git commit: Merge branch '1.7'
Date Sat, 31 Oct 2015 19:39:30 GMT
Merge branch '1.7'

Conflicts:
	server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/31a4a7e7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/31a4a7e7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/31a4a7e7

Branch: refs/heads/master
Commit: 31a4a7e78d18328c500d85bf8f633d39ecb6afe1
Parents: 5a3082d 0eaece7
Author: Josh Elser <elserj@apache.org>
Authored: Sat Oct 31 15:39:10 2015 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Sat Oct 31 15:39:10 2015 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java | 11 +++--
 .../apache/accumulo/tserver/TabletServer.java   | 16 +++++--
 .../tserver/log/TabletServerLogger.java         | 44 +++++++++++++++-----
 .../accumulo/test/TabletServerGivesUpIT.java    |  2 +
 4 files changed, 56 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/31a4a7e7/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31a4a7e7/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31a4a7e7/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 386de35,13c742a..1102ecb
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -39,8 -36,9 +39,10 @@@ import org.apache.accumulo.core.data.Mu
  import org.apache.accumulo.core.data.impl.KeyExtent;
  import org.apache.accumulo.core.protobuf.ProtobufUtil;
  import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
 -import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.fate.util.LoggingRunnable;
+ import org.apache.accumulo.fate.zookeeper.Retry;
+ import org.apache.accumulo.fate.zookeeper.RetryFactory;
  import org.apache.accumulo.server.conf.TableConfiguration;
  import org.apache.accumulo.server.fs.VolumeManager;
  import org.apache.accumulo.server.replication.StatusUtil;
@@@ -57,10 -54,6 +59,8 @@@ import org.apache.hadoop.fs.Path
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
- import com.google.common.cache.Cache;
- import com.google.common.cache.CacheBuilder;
 +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
  /**
   * Central logging facility for the TServerInfo.
   *
@@@ -151,19 -141,20 +150,19 @@@ public class TabletServerLogger 
      this.maxSize = maxSize;
      this.syncCounter = syncCounter;
      this.flushCounter = flushCounter;
-     this.toleratedFailures = toleratedWalCreationFailures;
-     this.walErrors = CacheBuilder.newBuilder().maximumSize(toleratedFailures).expireAfterWrite(toleratedFailuresPeriodMillis,
TimeUnit.MILLISECONDS).build();
+     this.retryFactory = retryFactory;
+     this.retry = null;
    }
  
 -  private int initializeLoggers(final List<DfsLogger> copy) throws IOException {
 -    final int[] result = {-1};
 -    testLockAndRun(logSetLock, new TestCallWithWriteLock() {
 +  private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException {
 +    final AtomicReference<DfsLogger> result = new AtomicReference<DfsLogger>();
 +    testLockAndRun(logIdLock, new TestCallWithWriteLock() {
        @Override
        boolean test() {
 -        copy.clear();
 -        copy.addAll(loggers);
 -        if (!loggers.isEmpty())
 -          result[0] = logSetId.get();
 -        return loggers.isEmpty();
 +        result.set(currentLog);
 +        if (currentLog != null)
 +          logIdOut.set(logId.get());
 +        return currentLog == null;
        }
  
        @Override
@@@ -210,24 -196,41 +209,49 @@@
      }
  
      try {
 -      DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
 -      alog.open(tserver.getClientAddressString());
 -      loggers.add(alog);
 -      logSetId.incrementAndGet();
 -
 -      // When we successfully create a WAL, make sure to reset the Retry.
 -      if (null != retry) {
 -        retry = null;
 +      startLogMaker();
 +      Object next = nextLog.take();
 +      if (next instanceof Exception) {
 +        throw (Exception) next;
        }
 +      if (next instanceof DfsLogger) {
 +        currentLog = (DfsLogger) next;
 +        logId.incrementAndGet();
 +        log.info("Using next log " + currentLog.getFileName());
++
++        // When we successfully create a WAL, make sure to reset the Retry.
++        if (null != retry) {
++          retry = null;
++        }
+ 
 -      return;
 +        return;
 +      } else {
 +        throw new RuntimeException("Error: unexpected type seen: " + next);
 +      }
      } catch (Exception t) {
-       walErrors.put(System.currentTimeMillis(), "");
-       if (walErrors.size() > toleratedFailures) {
+       if (null == retry) {
+         retry = retryFactory.create();
+       }
+ 
+       // We have more retries or we exceeded the maximum number of accepted failures
+       if (retry.canRetry()) {
+         // Use the retry and record the time in which we did so
+         retry.useRetry();
+ 
+         try {
+           // Backoff
+           retry.waitForNextAttempt();
+         } catch (InterruptedException e) {
+           Thread.currentThread().interrupt();
+           throw new RuntimeException(e);
+         }
+       } else {
+         log.error("Repeatedly failed to create WAL. Going to exit tabletserver.", t);
+         // We didn't have retries or we failed too many times.
          Halt.halt("Experienced too many errors creating WALs, giving up");
        }
+ 
+       // The exception will trigger the log creation to be re-attempted.
        throw new RuntimeException(t);
      }
    }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31a4a7e7/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
index 944b310,0000000..f7b11f6
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
@@@ -1,75 -1,0 +1,77 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +import java.util.TreeSet;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.test.functional.ConfigurableMacBase;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
 +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
 +// ACCUMULO-2480
 +public class TabletServerGivesUpIT extends ConfigurableMacBase {
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    cfg.useMiniDFS(true);
 +    cfg.setNumTservers(1);
 +    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++    cfg.setProperty(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES, "15");
++    cfg.setProperty(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION, "0s");
 +  }
 +
 +  @Test(timeout = 30 * 1000)
 +  public void test() throws Exception {
 +    final Connector conn = this.getConnector();
 +    // Yes, there's a tabletserver
 +    assertEquals(1, conn.instanceOperations().getTabletServers().size());
 +    final String tableName = getUniqueNames(1)[0];
 +    conn.tableOperations().create(tableName);
 +    // Kill dfs
 +    cluster.getMiniDfs().shutdown();
 +    // ask the tserver to do something
 +    final AtomicReference<Exception> ex = new AtomicReference<>();
 +    Thread splitter = new Thread() {
 +      @Override
 +      public void run() {
 +        try {
 +          TreeSet<Text> splits = new TreeSet<>();
 +          splits.add(new Text("X"));
 +          conn.tableOperations().addSplits(tableName, splits);
 +        } catch (Exception e) {
 +          ex.set(e);
 +        }
 +      }
 +    };
 +    splitter.start();
 +    // wait for the tserver to give up on writing to the WAL
 +    while (conn.instanceOperations().getTabletServers().size() == 1) {
 +      sleepUninterruptibly(1, TimeUnit.SECONDS);
 +    }
 +  }
 +
 +}


Mime
View raw message