accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch 1.9 updated: fixes #800 avoid importing files for completed bulk transactions (#837)
Date Wed, 02 Jan 2019 18:59:32 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new db5f18b  fixes #800 avoid importing files for completed bulk transactions (#837)
db5f18b is described below

commit db5f18b1fc44a03898e80c213cf75e3168349aca
Author: Keith Turner <kturner@apache.org>
AuthorDate: Wed Jan 2 13:59:27 2019 -0500

    fixes #800 avoid importing files for completed bulk transactions (#837)
---
 .../server/client/ClientServiceHandler.java        |  21 +-
 .../accumulo/master/tableOps/BulkImport.java       |  13 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  63 +++--
 .../accumulo/test/functional/BulkFailureIT.java    | 265 +++++++++++++++++++++
 .../apache/accumulo/test/functional/BulkIT.java    |   1 -
 5 files changed, 319 insertions(+), 44 deletions(-)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
index 6af00f4..3d76cca 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
@@ -26,9 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.Callable;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
@@ -342,18 +340,13 @@ public class ClientServiceHandler implements ClientService.Iface {
             SecurityErrorCode.PERMISSION_DENIED);
       bulkImportStatus.updateBulkImportStatus(files, BulkImportState.INITIAL);
       log.debug("Got request to bulk import files to table(" + tableId + "): " + files);
-      return transactionWatcher.run(Constants.BULK_ARBITRATOR_TYPE, tid,
-          new Callable<List<String>>() {
-            @Override
-            public List<String> call() throws Exception {
-              bulkImportStatus.updateBulkImportStatus(files, BulkImportState.PROCESSING);
-              try {
-                return BulkImporter.bulkLoad(context, tid, tableId, files, errorDir, setTime);
-              } finally {
-                bulkImportStatus.removeBulkImportStatus(files);
-              }
-            }
-          });
+
+      bulkImportStatus.updateBulkImportStatus(files, BulkImportState.PROCESSING);
+      try {
+        return BulkImporter.bulkLoad(context, tid, tableId, files, errorDir, setTime);
+      } finally {
+        bulkImportStatus.removeBulkImportStatus(files);
+      }
     } catch (AccumuloSecurityException e) {
       throw e.asThriftException();
     } catch (Exception ex) {
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index 2152737..10ffeb0 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.tablets.UniqueNameAllocator;
@@ -49,6 +50,8 @@ import org.apache.hadoop.io.MapFile;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /*
  * Bulk import makes requests of tablet servers, and those requests can take a
  * long time. Our communications to the tablet server may fail, so we won't know
@@ -146,7 +149,8 @@ public class BulkImport extends MasterRepo {
     }
   }
 
-  private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException {
+  private static Path createNewBulkDir(VolumeManager fs, String sourceDir, String tableId)
+      throws IOException {
     Path tempPath = fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs());
     if (tempPath == null)
       throw new IOException(sourceDir + " is not in a volume configured for Accumulo");
@@ -177,9 +181,10 @@ public class BulkImport extends MasterRepo {
     }
   }
 
-  private String prepareBulkImport(Master master, final VolumeManager fs, String dir,
-      String tableId) throws Exception {
-    final Path bulkDir = createNewBulkDir(fs, tableId);
+  @VisibleForTesting
+  public static String prepareBulkImport(AccumuloServerContext master, final VolumeManager
fs,
+      String dir, String tableId) throws Exception {
+    final Path bulkDir = createNewBulkDir(fs, dir, tableId);
 
     MetadataTableUtil.addBulkLoadInProgressFlag(master,
         "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index bd41541..b1d4947 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -48,6 +48,7 @@ import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -460,42 +461,54 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
     }
 
     @Override
-    public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long
tid,
-        Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
+    public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, final
long tid,
+        final Map<TKeyExtent,Map<String,MapFileInfo>> files, final boolean setTime)
         throws ThriftSecurityException {
 
       if (!security.canPerformSystemActions(credentials))
         throw new ThriftSecurityException(credentials.getPrincipal(),
             SecurityErrorCode.PERMISSION_DENIED);
 
-      List<TKeyExtent> failures = new ArrayList<>();
+      try {
+        return watcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, new Callable<List<TKeyExtent>>()
{
 
-      for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet())
{
-        TKeyExtent tke = entry.getKey();
-        Map<String,MapFileInfo> fileMap = entry.getValue();
-        Map<FileRef,MapFileInfo> fileRefMap = new HashMap<>();
-        for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
-          Path path = new Path(mapping.getKey());
-          FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
-          path = ns.makeQualified(path);
-          fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue());
-        }
+          @Override
+          public List<TKeyExtent> call() throws Exception {
+            List<TKeyExtent> failures = new ArrayList<>();
+
+            for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet())
{
+              TKeyExtent tke = entry.getKey();
+              Map<String,MapFileInfo> fileMap = entry.getValue();
+              Map<FileRef,MapFileInfo> fileRefMap = new HashMap<>();
+              for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
+                Path path = new Path(mapping.getKey());
+                FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
+                path = ns.makeQualified(path);
+                fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue());
+              }
 
-        Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
+              Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
 
-        if (importTablet == null) {
-          failures.add(tke);
-        } else {
-          try {
-            importTablet.importMapFiles(tid, fileRefMap, setTime);
-          } catch (IOException ioe) {
-            log.info("files {} not imported to {}: {}", fileMap.keySet(), new KeyExtent(tke),
-                ioe.getMessage());
-            failures.add(tke);
+              if (importTablet == null) {
+                failures.add(tke);
+              } else {
+                try {
+                  importTablet.importMapFiles(tid, fileRefMap, setTime);
+                } catch (IOException ioe) {
+                  log.info("files {} not imported to {}: {}", fileMap.keySet(), new KeyExtent(tke),
+                      ioe.getMessage());
+                  failures.add(tke);
+                }
+              }
+            }
+            return failures;
           }
-        }
+        });
+      } catch (RuntimeException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
-      return failures;
     }
 
     @Override
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
new file mode 100644
index 0000000..ed97c07
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.Translators;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.client.rfile.RFileWriter;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.data.thrift.MapFileInfo;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.master.tableOps.BulkImport;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TServiceClient;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class BulkFailureIT extends AccumuloClusterHarness {
+
+  /**
+   * This test verifies two things. First it ensures that after a bulk imported file is compacted
+   * that import request are ignored. Second it ensures that after the bulk import transaction
is
+   * canceled that import request fail. The public API for bulk import can not be used for
this
+   * test. Internal (non public API) RPCs and Zookeeper state is manipulated directly. This
is the
+   * only way to interleave compactions with multiple, duplicate import RPC request.
+   */
+  @Test
+  public void testImportCompactionImport() throws Exception {
+    Connector c = getConnector();
+    String table = getUniqueNames(1)[0];
+
+    SortedMap<Key,Value> testData = createTestData();
+
+    FileSystem fs = getCluster().getFileSystem();
+    String testFile = createTestFile(testData, fs);
+
+    c.tableOperations().create(table);
+    String tableId = c.tableOperations().tableIdMap().get(table);
+
+    // Table has no splits, so this extent corresponds to the tables single tablet
+    KeyExtent extent = new KeyExtent(tableId.toString(), null, null);
+
+    // Set up site configuration because this test uses server side code that expects it.
+    setupSiteConfig();
+
+    long fateTxid = 99999999L;
+
+    AccumuloServerContext asCtx = new AccumuloServerContext(
+        new ServerConfigurationFactory(HdfsZooInstance.getInstance()));
+    ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, fateTxid);
+
+    VolumeManager vm = VolumeManagerImpl.get();
+
+    // move the file into a directory for the table and rename the file to something unique
+    String bulkDir = BulkImport.prepareBulkImport(asCtx, vm, testFile, tableId);
+
+    // determine the files new name and path
+    FileStatus status = fs.listStatus(new Path(bulkDir))[0];
+    Path bulkLoadPath = fs.makeQualified(status.getPath());
+
+    // Directly ask the tablet to load the file.
+    assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen());
+
+    assertEquals(ImmutableSet.of(bulkLoadPath), getFiles(c, extent));
+    assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
+    assertEquals(testData, readTable(table, c));
+
+    // Compact the bulk imported file. Subsequent request to load the file should be ignored.
+    c.tableOperations().compact(table, new CompactionConfig().setWait(true));
+
+    Set<Path> tabletFiles = getFiles(c, extent);
+    assertFalse(tabletFiles.contains(bulkLoadPath));
+    assertEquals(1, tabletFiles.size());
+    assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
+    assertEquals(testData, readTable(table, c));
+
+    // this request should be ignored by the tablet
+    assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen());
+
+    assertEquals(tabletFiles, getFiles(c, extent));
+    assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
+    assertEquals(testData, readTable(table, c));
+
+    // this is done to ensure the tablet reads the load flags from the metadata table when
it loads
+    c.tableOperations().offline(table, true);
+    c.tableOperations().online(table, true);
+
+    // this request should be ignored by the tablet
+    assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen());
+
+    assertEquals(tabletFiles, getFiles(c, extent));
+    assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
+    assertEquals(testData, readTable(table, c));
+
+    // After this, all load request should fail.
+    ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, fateTxid);
+
+    try {
+      // expect this to fail
+      assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen());
+      fail();
+    } catch (TApplicationException tae) {
+
+    }
+
+    assertEquals(tabletFiles, getFiles(c, extent));
+    assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
+    assertEquals(testData, readTable(table, c));
+  }
+
+  private SortedMap<Key,Value> createTestData() {
+    SortedMap<Key,Value> testData = new TreeMap<>();
+    testData.put(new Key("r001", "f002", "q009", 56), new Value("v001"));
+    testData.put(new Key("r001", "f002", "q019", 56), new Value("v002"));
+    testData.put(new Key("r002", "f002", "q009", 57), new Value("v003"));
+    testData.put(new Key("r002", "f002", "q019", 57), new Value("v004"));
+    return testData;
+  }
+
+  private String createTestFile(SortedMap<Key,Value> testData, FileSystem fs) throws
IOException {
+    Path base = new Path(getCluster().getTemporaryPath(), "testBulk_ICI");
+
+    fs.delete(base, true);
+    fs.mkdirs(base);
+    Path files = new Path(base, "files");
+
+    try (RFileWriter writer = RFile.newWriter().to(new Path(files, "ici_01.rf").toString())
+        .withFileSystem(fs).build()) {
+      writer.append(testData.entrySet());
+    }
+
+    String filesStr = fs.makeQualified(files).toString();
+    return filesStr;
+  }
+
+  private SortedMap<Key,Value> readTable(String table, Connector connector)
+      throws TableNotFoundException {
+    Scanner scanner = connector.createScanner(table, Authorizations.EMPTY);
+
+    SortedMap<Key,Value> actual = new TreeMap<>();
+
+    for (Entry<Key,Value> entry : scanner) {
+      actual.put(entry.getKey(), entry.getValue());
+    }
+
+    return actual;
+  }
+
+  public Set<Path> getLoaded(Connector connector, KeyExtent extent) throws TableNotFoundException
{
+    return getPaths(connector, extent, BulkFileColumnFamily.NAME);
+  }
+
+  public Set<Path> getFiles(Connector connector, KeyExtent extent) throws TableNotFoundException
{
+    return getPaths(connector, extent, DataFileColumnFamily.NAME);
+  }
+
+  private Set<Path> getPaths(Connector connector, KeyExtent extent, Text fam)
+      throws TableNotFoundException {
+    HashSet<Path> files = new HashSet<>();
+
+    Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    scanner.setRange(extent.toMetadataRange());
+    scanner.fetchColumnFamily(fam);
+
+    for (Entry<Key,Value> entry : scanner) {
+      files.add(new Path(entry.getKey().getColumnQualifierData().toString()));
+    }
+
+    return files;
+  }
+
+  private List<KeyExtent> assignMapFiles(long txid, ClientContext context, KeyExtent
extent,
+      String path, long size) throws Exception {
+
+    TabletLocator locator = TabletLocator.getLocator(context, extent.getTableId());
+
+    locator.invalidateCache(extent);
+
+    HostAndPort location = HostAndPort
+        .fromString(locator.locateTablet(context, new Text(""), false, true).tablet_location);
+
+    long timeInMillis = context.getConfiguration().getTimeInMillis(Property.TSERV_BULK_TIMEOUT);
+    TabletClientService.Iface client = ThriftUtil.getTServerClient(location, context, timeInMillis);
+    try {
+
+      Map<String,MapFileInfo> val = ImmutableMap.of(path, new MapFileInfo(size));
+      Map<KeyExtent,Map<String,MapFileInfo>> files = ImmutableMap.of(extent,
val);
+
+      List<TKeyExtent> failures = client.bulkImport(Tracer.traceInfo(), context.rpcCreds(),
txid,
+          Translator.translate(files, Translators.KET), false);
+
+      return Translator.translate(failures, Translators.TKET);
+    } finally {
+      ThriftUtil.returnClient((TServiceClient) client);
+    }
+
+  }
+
+  private void setupSiteConfig() throws AccumuloException, AccumuloSecurityException {
+    for (Entry<String,String> entry : getCluster().getSiteConfiguration()) {
+      SiteConfiguration.getInstance().set(entry.getKey(), entry.getValue());
+    }
+  }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
index f51ae1b..c94a93f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
@@ -113,5 +113,4 @@ public class BulkIT extends AccumuloClusterHarness {
     vopts.rows = 1;
     VerifyIngest.verifyIngest(c, vopts, SOPTS);
   }
-
 }


Mime
View raw message