accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch 2.0 updated: fix #1247 store fate txid in ~blip marker value (#1248)
Date Mon, 08 Jul 2019 17:30:32 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 67d83e4  fix #1247 store fate txid in ~blip marker value (#1248)
67d83e4 is described below

commit 67d83e449eae03a067b9612943fbaedc4ee30bb1
Author: Keith Turner <kturner@apache.org>
AuthorDate: Mon Jul 8 13:30:27 2019 -0400

    fix #1247 store fate txid in ~blip marker value (#1248)
    
    Before this change the bulk import code was serializing and logging fate
    transaction ids differently than other parts of the code.  When adding
    FATE transaction ids to blip markers, I decided it would be best if they
    were consistently persisted (in metadata table ) and logged. That
    decision made this change much larger than simply adding something to
    the blip value, but I feel its worth it.
    
    The following are the main changes :
    
     * Introduce new class FateTxId with methods for consistenly formatting
       fate transaction ids.
     * Make bulk fate logging use FateTxId
     * Update ~blip and loaded family to use FateTxId.  Made loaded family
       backwards compat.
     * Update existing fate logging code to use FateTxId.  Instead of
       String.format
    
    Follow on work to update all Repos to use FateTxId would be nice.  Chose
    not do to that in the change because it would make it harder to review.
---
 .../java/org/apache/accumulo/fate/AgeOffStore.java |  4 +-
 .../main/java/org/apache/accumulo/fate/Fate.java   | 20 ++++----
 .../java/org/apache/accumulo/fate/FateTxId.java    | 59 ++++++++++++++++++++++
 .../java/org/apache/accumulo/fate/ZooStore.java    |  8 +--
 .../server/constraints/MetadataConstraints.java    |  3 +-
 .../server/iterators/MetadataBulkLoadFilter.java   |  3 +-
 .../accumulo/server/util/MasterMetadataUtil.java   |  5 +-
 .../accumulo/server/util/MetadataTableUtil.java    | 55 ++++++++------------
 .../master/tableOps/bulkVer1/BulkImport.java       | 13 +++--
 .../tableOps/bulkVer1/CleanUpBulkImport.java       |  3 +-
 .../master/tableOps/bulkVer1/CopyFailed.java       | 13 +++--
 .../master/tableOps/bulkVer1/LoadFiles.java        | 13 +++--
 .../master/tableOps/bulkVer2/BulkImportMove.java   | 20 +++++---
 .../tableOps/bulkVer2/CleanUpBulkImport.java       |  3 +-
 .../master/tableOps/bulkVer2/LoadFiles.java        | 11 ++--
 .../apache/accumulo/tserver/tablet/TabletData.java |  8 +--
 .../accumulo/shell/commands/FateCommand.java       | 13 ++++-
 .../accumulo/test/functional/BulkFailureIT.java    |  3 +-
 18 files changed, 166 insertions(+), 91 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/fate/AgeOffStore.java b/core/src/main/java/org/apache/accumulo/fate/AgeOffStore.java
index 3718b60..3bd4ed9 100644
--- a/core/src/main/java/org/apache/accumulo/fate/AgeOffStore.java
+++ b/core/src/main/java/org/apache/accumulo/fate/AgeOffStore.java
@@ -95,7 +95,7 @@ public class AgeOffStore<T> implements TStore<T> {
             case FAILED:
             case SUCCESSFUL:
               store.delete(txid);
-              log.debug("Aged off FATE tx {}", String.format("%016x", txid));
+              log.debug("Aged off FATE tx {}", FateTxId.formatTid(txid));
               break;
             default:
               break;
@@ -105,7 +105,7 @@ public class AgeOffStore<T> implements TStore<T> {
           store.unreserve(txid, 0);
         }
       } catch (Exception e) {
-        log.warn("Failed to age off FATE tx " + String.format("%016x", txid), e);
+        log.warn("Failed to age off FATE tx " + FateTxId.formatTid(txid), e);
       }
     }
   }
diff --git a/core/src/main/java/org/apache/accumulo/fate/Fate.java b/core/src/main/java/org/apache/accumulo/fate/Fate.java
index a2a4f72..a5a0869 100644
--- a/core/src/main/java/org/apache/accumulo/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/fate/Fate.java
@@ -135,9 +135,9 @@ public class Fate<T> {
      */
     private void blockIfHadoopShutdown(long tid, Exception e) {
       if (isIOException(e) && ShutdownUtil.isShutdownInProgress()) {
-        String tidStr = String.format("%016x", tid);
-        log.info("Ignoring exception that was likely caused by Hadoop Shutdown hook. tid
: {} ",
-            tidStr, e);
+        String tidStr = FateTxId.formatTid(tid);
+        log.info("Ignoring exception that was likely caused by Hadoop Shutdown hook. {} ",
tidStr,
+            e);
 
         while (true) {
           // Nothing is going to work well at this point, so why even try. Just wait for
the end.
@@ -147,8 +147,8 @@ public class Fate<T> {
     }
 
     private void transitionToFailed(long tid, Exception e) {
-      String tidStr = String.format("%016x", tid);
-      final String msg = "Failed to execute Repo, tid=" + tidStr;
+      String tidStr = FateTxId.formatTid(tid);
+      final String msg = "Failed to execute Repo, " + tidStr;
       // Certain FATE ops that throw exceptions don't need to be propagated up to the Monitor
       // as a warning. They're a normal, handled failure condition.
       if (e instanceof AcceptableException) {
@@ -158,7 +158,7 @@ public class Fate<T> {
       }
       store.setProperty(tid, EXCEPTION_PROP, e);
       store.setStatus(tid, TStatus.FAILED_IN_PROGRESS);
-      log.info("Updated status for Repo with tid={} to FAILED_IN_PROGRESS", tidStr);
+      log.info("Updated status for Repo with {} to FAILED_IN_PROGRESS", tidStr);
     }
 
     private void processFailed(long tid, Repo<T> op) {
@@ -189,7 +189,7 @@ public class Fate<T> {
       try {
         op.undo(tid, environment);
       } catch (Exception e) {
-        log.warn("Failed to undo Repo, tid=" + String.format("%016x", tid), e);
+        log.warn("Failed to undo Repo, " + FateTxId.formatTid(tid), e);
       }
     }
 
@@ -273,7 +273,7 @@ public class Fate<T> {
         case FAILED_IN_PROGRESS:
         case IN_PROGRESS:
           throw new IllegalStateException(
-              "Can not delete in progress transaction " + String.format("%016x", tid));
+              "Can not delete in progress transaction " + FateTxId.formatTid(tid));
         case UNKNOWN:
           // nothing to do, it does not exist
           break;
@@ -288,7 +288,7 @@ public class Fate<T> {
     try {
       if (store.getStatus(tid) != TStatus.SUCCESSFUL)
         throw new IllegalStateException("Tried to get exception when transaction "
-            + String.format("%016x", tid) + " not in successful state");
+            + FateTxId.formatTid(tid) + " not in successful state");
       return (String) store.getProperty(tid, RETURN_PROP);
     } finally {
       store.unreserve(tid, 0);
@@ -301,7 +301,7 @@ public class Fate<T> {
     try {
       if (store.getStatus(tid) != TStatus.FAILED)
         throw new IllegalStateException("Tried to get exception when transaction "
-            + String.format("%016x", tid) + " not in failed state");
+            + FateTxId.formatTid(tid) + " not in failed state");
       return (Exception) store.getProperty(tid, EXCEPTION_PROP);
     } finally {
       store.unreserve(tid, 0);
diff --git a/core/src/main/java/org/apache/accumulo/fate/FateTxId.java b/core/src/main/java/org/apache/accumulo/fate/FateTxId.java
new file mode 100644
index 0000000..2b516d1
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/fate/FateTxId.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.fate;
+
+import java.util.regex.Pattern;
+
+import com.google.common.base.Preconditions;
+
+public class FateTxId {
+
+  private static final String PREFIX = "FTID[";
+  private static final String SUFFIX = "]";
+
+  private final static Pattern PATTERN =
+      Pattern.compile(Pattern.quote(PREFIX) + "[0-9a-fA-F]+" + Pattern.quote(SUFFIX));
+
+  private static String getHex(String fmtTid) {
+    return fmtTid.substring(PREFIX.length(), fmtTid.length() - SUFFIX.length());
+  }
+
+  /**
+   * @return true if string was created by {@link #formatTid(long)} and false otherwise.
+   */
+  public static boolean isFormatedTid(String fmtTid) {
+    return PATTERN.matcher(fmtTid).matches();
+  }
+
+  /**
+   * Reverses {@link #formatTid(long)}
+   */
+  public static long fromString(String fmtTid) {
+    Preconditions.checkArgument(fmtTid.startsWith(PREFIX) && fmtTid.endsWith(SUFFIX));
+    return Long.parseLong(getHex(fmtTid), 16);
+  }
+
+  /**
+   * Formats transaction ids in a consistent way that is useful for logging and persisting.
+   */
+  public static String formatTid(long tid) {
+    // do not change how this formats without considering implications for persistence
+    return String.format("%s%016x%s", PREFIX, tid, SUFFIX);
+  }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/fate/ZooStore.java
index e22bf8b..755da53 100644
--- a/core/src/main/java/org/apache/accumulo/fate/ZooStore.java
+++ b/core/src/main/java/org/apache/accumulo/fate/ZooStore.java
@@ -227,7 +227,7 @@ public class ZooStore<T> implements TStore<T> {
     synchronized (this) {
       if (!reserved.remove(tid))
         throw new IllegalStateException(
-            "Tried to unreserve id that was not reserved " + String.format("%016x", tid));
+            "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid));
 
       // do not want this unreserve to unesc wake up threads in reserve()... this leads to
infinite
       // loop when tx is stuck in NEW...
@@ -246,7 +246,7 @@ public class ZooStore<T> implements TStore<T> {
     synchronized (this) {
       if (!reserved.remove(tid))
         throw new IllegalStateException(
-            "Tried to unreserve id that was not reserved " + String.format("%016x", tid));
+            "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid));
 
       if (deferTime > 0)
         defered.put(tid, System.currentTimeMillis() + deferTime);
@@ -260,7 +260,7 @@ public class ZooStore<T> implements TStore<T> {
     synchronized (this) {
       if (!reserved.contains(tid))
         throw new IllegalStateException(
-            "Tried to operate on unreserved transaction " + String.format("%016x", tid));
+            "Tried to operate on unreserved transaction " + FateTxId.formatTid(tid));
     }
   }
 
@@ -341,7 +341,7 @@ public class ZooStore<T> implements TStore<T> {
       String txpath = getTXPath(tid);
       String top = findTop(txpath);
       if (top == null)
-        throw new IllegalStateException("Tried to pop when empty " + tid);
+        throw new IllegalStateException("Tried to pop when empty " + FateTxId.formatTid(tid));
       zk.recursiveDelete(txpath + "/" + top, NodeMissingPolicy.SKIP);
     } catch (Exception e) {
       throw new RuntimeException(e);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
index 4a1fdc1..4e9ad98 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.Arbitrator;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
 import org.apache.hadoop.io.Text;
@@ -229,7 +230,7 @@ public class MetadataConstraints implements Constraint {
           }
 
           if (!isSplitMutation && !isLocationMutation) {
-            long tid = Long.parseLong(tidString);
+            long tid = MetadataTableUtil.getBulkLoadTid(new Value(tidString));
 
             try {
               if (otherTidCount > 0 || !dataFiles.equals(loadedFiles) || !getArbitrator(context)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
b/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
index d46b2a7..38ebe85 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.Arbitrator;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
 import org.slf4j.Logger;
@@ -50,7 +51,7 @@ public class MetadataBulkLoadFilter extends Filter {
   @Override
   public boolean accept(Key k, Value v) {
     if (!k.isDeleted() && k.compareColumnFamily(TabletsSection.BulkFileColumnFamily.NAME)
== 0) {
-      long txid = Long.parseLong(v.toString());
+      long txid = MetadataTableUtil.getBulkLoadTid(v);
 
       Status status = bulkTxStatusCache.get(txid);
       if (status == null) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
index 2027249..1b053da 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
@@ -48,6 +48,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Lo
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.fate.FateTxId;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
@@ -88,9 +89,9 @@ public class MasterMetadataUtil {
     }
 
     for (Entry<Long,? extends Collection<FileRef>> entry : bulkLoadedFiles.entrySet())
{
-      Value tidBytes = new Value(Long.toString(entry.getKey()).getBytes());
+      Value tidVal = new Value(FateTxId.formatTid(entry.getKey()));
       for (FileRef ref : entry.getValue()) {
-        m.put(TabletsSection.BulkFileColumnFamily.NAME, ref.meta(), new Value(tidBytes));
+        m.put(TabletsSection.BulkFileColumnFamily.NAME, ref.meta(), tidVal);
       }
     }
 
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 73e3ecc..8576bf7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -21,7 +21,6 @@ import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -75,6 +74,7 @@ import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.FastFormat;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.fate.FateTxId;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -182,12 +182,12 @@ public class MetadataTableUtil {
   public static void updateTabletDataFile(long tid, KeyExtent extent,
       Map<FileRef,DataFileValue> estSizes, String time, ServerContext context, ZooLock
zooLock) {
     Mutation m = new Mutation(extent.getMetadataEntry());
-    byte[] tidBytes = Long.toString(tid).getBytes(UTF_8);
+    Value tidValue = new Value(FateTxId.formatTid(tid));
 
     for (Entry<FileRef,DataFileValue> entry : estSizes.entrySet()) {
       Text file = entry.getKey().meta();
       m.put(DataFileColumnFamily.NAME, file, new Value(entry.getValue().encode()));
-      m.put(TabletsSection.BulkFileColumnFamily.NAME, file, new Value(tidBytes));
+      m.put(TabletsSection.BulkFileColumnFamily.NAME, file, tidValue);
     }
     TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(time.getBytes(UTF_8)));
     update(context, zooLock, m, extent);
@@ -927,6 +927,18 @@ public class MetadataTableUtil {
     update(context, zooLock, m, extent);
   }
 
+  public static long getBulkLoadTid(Value v) {
+    String vs = v.toString();
+
+    if (FateTxId.isFormatedTid(vs)) {
+      return FateTxId.fromString(vs);
+    } else {
+      // a new serialization format was introduce in 2.0. This code support deserializing
the old
+      // format.
+      return Long.parseLong(vs);
+    }
+  }
+
   public static void removeBulkLoadEntries(AccumuloClient client, TableId tableId, long tid)
       throws Exception {
     try (
@@ -935,10 +947,11 @@ public class MetadataTableUtil {
         BatchWriter bw = client.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()))
{
       mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
       mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
-      byte[] tidAsBytes = Long.toString(tid).getBytes(UTF_8);
+
       for (Entry<Key,Value> entry : mscanner) {
         log.trace("Looking at entry {} with tid {}", entry, tid);
-        if (Arrays.equals(entry.getValue().get(), tidAsBytes)) {
+        long entryTid = getBulkLoadTid(entry.getValue());
+        if (tid == entryTid) {
           log.trace("deleting entry {}", entry);
           Key key = entry.getKey();
           Mutation m = new Mutation(key.getRow());
@@ -949,27 +962,6 @@ public class MetadataTableUtil {
     }
   }
 
-  public static List<FileRef> getBulkFilesLoaded(ServerContext context, AccumuloClient
client,
-      KeyExtent extent, long tid) {
-    List<FileRef> result = new ArrayList<>();
-    try (Scanner mscanner = new IsolatedScanner(client.createScanner(
-        extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY))) {
-      VolumeManager fs = context.getVolumeManager();
-      mscanner.setRange(extent.toMetadataRange());
-      mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
-      for (Entry<Key,Value> entry : mscanner) {
-        if (Long.parseLong(entry.getValue().toString()) == tid) {
-          result.add(new FileRef(fs, entry.getKey()));
-        }
-      }
-
-      return result;
-    } catch (TableNotFoundException ex) {
-      // unlikely
-      throw new RuntimeException("Onos! teh metadata table has vanished!!");
-    }
-  }
-
   public static Map<Long,? extends Collection<FileRef>> getBulkFilesLoaded(ServerContext
context,
       KeyExtent extent) {
     Text metadataRow = extent.getMetadataEntry();
@@ -981,21 +973,18 @@ public class MetadataTableUtil {
       scanner.setRange(new Range(metadataRow));
       scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
       for (Entry<Key,Value> entry : scanner) {
-        Long tid = Long.parseLong(entry.getValue().toString());
-        List<FileRef> lst = result.get(tid);
-        if (lst == null) {
-          result.put(tid, lst = new ArrayList<>());
-        }
+        Long tid = getBulkLoadTid(entry.getValue());
+        List<FileRef> lst = result.computeIfAbsent(tid, k -> new ArrayList<FileRef>());
         lst.add(new FileRef(fs, entry.getKey()));
       }
     }
     return result;
   }
 
-  public static void addBulkLoadInProgressFlag(ServerContext context, String path) {
+  public static void addBulkLoadInProgressFlag(ServerContext context, String path, long fateTxid)
{
 
     Mutation m = new Mutation(MetadataSchema.BlipSection.getRowPrefix() + path);
-    m.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {}));
+    m.put(EMPTY_TEXT, EMPTY_TEXT, new Value(FateTxId.formatTid(fateTxid)));
 
     // new KeyExtent is only added to force update to write to the metadata table, not the
root
     // table
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
index 92f88f5..c2eeede 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.fate.FateTxId;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -108,7 +109,9 @@ public class BulkImport extends MasterRepo {
 
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
-    log.debug(" tid {} sourceDir {}", tid, sourceDir);
+    String fmtTid = FateTxId.formatTid(tid);
+
+    log.debug(" {} sourceDir {}", fmtTid, sourceDir);
 
     Utils.getReadLock(master, tableId, tid).lock();
 
@@ -139,8 +142,8 @@ public class BulkImport extends MasterRepo {
     master.updateBulkImportStatus(sourceDir, BulkImportState.MOVING);
     // move the files into the directory
     try {
-      String bulkDir = prepareBulkImport(master.getContext(), fs, sourceDir, tableId);
-      log.debug(" tid {} bulkDir {}", tid, bulkDir);
+      String bulkDir = prepareBulkImport(master.getContext(), fs, sourceDir, tableId, tid);
+      log.debug(" {} bulkDir {}", tid, bulkDir);
       return new LoadFiles(tableId, sourceDir, bulkDir, errorDir, setTime);
     } catch (IOException ex) {
       log.error("error preparing the bulk import directory", ex);
@@ -185,11 +188,11 @@ public class BulkImport extends MasterRepo {
 
   @VisibleForTesting
   public static String prepareBulkImport(ServerContext master, final VolumeManager fs, String
dir,
-      TableId tableId) throws Exception {
+      TableId tableId, long tid) throws Exception {
     final Path bulkDir = createNewBulkDir(master, fs, dir, tableId);
 
     MetadataTableUtil.addBulkLoadInProgressFlag(master,
-        "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
+        "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName(), tid);
 
     Path dirPath = new Path(dir);
     FileStatus[] mapFiles = fs.listStatus(dirPath);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CleanUpBulkImport.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CleanUpBulkImport.java
index 4d93949..7855698 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CleanUpBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CleanUpBulkImport.java
@@ -20,6 +20,7 @@ import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.master.thrift.BulkImportState;
+import org.apache.accumulo.fate.FateTxId;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -63,7 +64,7 @@ public class CleanUpBulkImport extends MasterRepo {
     Utils.unreserveHdfsDirectory(master, source, tid);
     Utils.unreserveHdfsDirectory(master, error, tid);
     Utils.getReadLock(master, tableId, tid).unlock();
-    log.debug("completing bulkDir import transaction " + tid);
+    log.debug("completing bulkDir import transaction " + FateTxId.formatTid(tid));
     ZooArbitrator.cleanup(master.getContext(), Constants.BULK_ARBITRATOR_TYPE, tid);
     master.removeBulkImportStatus(source);
     return null;
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CopyFailed.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CopyFailed.java
index ddc94b0..39f510c 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CopyFailed.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CopyFailed.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.FateTxId;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -44,6 +45,7 @@ import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.hadoop.fs.Path;
 import org.apache.thrift.TException;
@@ -77,8 +79,8 @@ class CopyFailed extends MasterRepo {
         if (client != null && !client.isActive(tid))
           finished.add(server);
       } catch (TException ex) {
-        log.info(
-            "Ignoring error trying to check on tid " + tid + " from server " + server + ":
" + ex);
+        log.info("Ignoring error trying to check on tid " + FateTxId.formatTid(tid)
+            + " from server " + server + ": " + ex);
       }
     }
     if (finished.containsAll(running))
@@ -121,7 +123,7 @@ class CopyFailed extends MasterRepo {
       mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
 
       for (Entry<Key,Value> entry : mscanner) {
-        if (Long.parseLong(entry.getValue().toString()) == tid) {
+        if (MetadataTableUtil.getBulkLoadTid(entry.getValue()) == tid) {
           FileRef loadedFile = new FileRef(fs, entry.getKey());
           String absPath = failures.remove(loadedFile);
           if (absPath != null) {
@@ -136,7 +138,7 @@ class CopyFailed extends MasterRepo {
       Path orig = new Path(failure);
       Path dest = new Path(error, orig.getName());
       fs.rename(orig, dest);
-      log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
+      log.debug(FateTxId.formatTid(tid) + " renamed " + orig + " to " + dest + ": import
failed");
     }
 
     if (loadedFailures.size() > 0) {
@@ -155,7 +157,8 @@ class CopyFailed extends MasterRepo {
 
         bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(UTF_8));
         workIds.add(orig.getName());
-        log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
+        log.debug(
+            FateTxId.formatTid(tid) + " added to copyq: " + orig + " to " + dest + ": failed");
       }
 
       bifCopyQueue.waitUntilDone(workIds);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java
index df6f5a6..437e35e 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java
@@ -47,6 +47,7 @@ import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.fate.FateTxId;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -107,7 +108,7 @@ class LoadFiles extends MasterRepo {
     for (FileStatus entry : fs.listStatus(new Path(bulk))) {
       files.add(entry);
     }
-    log.debug("tid " + tid + " importing " + files.size() + " files");
+    log.debug(FateTxId.formatTid(tid) + " importing " + files.size() + " files");
 
     Path writable = new Path(this.errorDir, ".iswritable");
     if (!fs.createNewFile(writable)) {
@@ -129,7 +130,8 @@ class LoadFiles extends MasterRepo {
       List<Future<Void>> results = new ArrayList<>();
 
       if (master.onlineTabletServers().size() == 0)
-        log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid
+ ")");
+        log.warn("There are no tablet server to process bulk import, waiting (tid = "
+            + FateTxId.formatTid(tid) + ")");
 
       while (master.onlineTabletServers().size() == 0) {
         sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
@@ -178,7 +180,8 @@ class LoadFiles extends MasterRepo {
                 loaded.add(file);
               }
             } catch (Exception ex) {
-              log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex);
+              log.error(
+                  "rpc failed server:" + server + ", tid:" + FateTxId.formatTid(tid) + "
" + ex);
             } finally {
               ThriftUtil.returnClient(client);
             }
@@ -191,8 +194,8 @@ class LoadFiles extends MasterRepo {
       }
       filesToLoad.removeAll(loaded);
       if (filesToLoad.size() > 0) {
-        log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad,
10)
-            + " failed");
+        log.debug(FateTxId.formatTid(tid) + " attempt " + (attempt + 1) + " "
+            + sampleList(filesToLoad, 10) + " failed");
         sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
       }
     }
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
index 83480fb..db8e621 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.fate.FateTxId;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -73,7 +74,10 @@ class BulkImportMove extends MasterRepo {
   public Repo<Master> call(long tid, Master master) throws Exception {
     final Path bulkDir = new Path(bulkInfo.bulkDir);
     final Path sourceDir = new Path(bulkInfo.sourceDir);
-    log.debug(" tid {} sourceDir {}", tid, sourceDir);
+
+    String fmtTid = FateTxId.formatTid(tid);
+
+    log.debug("{} sourceDir {}", fmtTid, sourceDir);
 
     VolumeManager fs = master.getFileSystem();
 
@@ -84,7 +88,7 @@ class BulkImportMove extends MasterRepo {
     try {
       Map<String,String> oldToNewNameMap =
           BulkSerialize.readRenameMap(bulkDir.toString(), p -> fs.open(p));
-      moveFiles(String.format("%016x", tid), sourceDir, bulkDir, master, fs, oldToNewNameMap);
+      moveFiles(tid, sourceDir, bulkDir, master, fs, oldToNewNameMap);
 
       return new LoadFiles(bulkInfo);
     } catch (Exception ex) {
@@ -97,15 +101,17 @@ class BulkImportMove extends MasterRepo {
   /**
    * For every entry in renames, move the file from the key path to the value path
    */
-  private void moveFiles(String fmtTid, Path sourceDir, Path bulkDir, Master master,
+  private void moveFiles(long tid, Path sourceDir, Path bulkDir, Master master,
       final VolumeManager fs, Map<String,String> renames) throws Exception {
     MetadataTableUtil.addBulkLoadInProgressFlag(master.getContext(),
-        "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
+        "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName(), tid);
 
     int workerCount = master.getConfiguration().getCount(Property.MASTER_BULK_RENAME_THREADS);
     SimpleThreadPool workers = new SimpleThreadPool(workerCount, "bulkDir move");
     List<Future<Boolean>> results = new ArrayList<>();
 
+    String fmtTid = FateTxId.formatTid(tid);
+
     for (Map.Entry<String,String> renameEntry : renames.entrySet()) {
       results.add(workers.submit(() -> {
         final Path originalPath = new Path(sourceDir, renameEntry.getKey());
@@ -121,20 +127,20 @@ class BulkImportMove extends MasterRepo {
           }
 
           log.debug(
-              "Ingoring rename exception because destination already exists. tid: {} orig:
{} new: {}",
+              "Ingoring rename exception because destination already exists. {} orig: {}
new: {}",
               fmtTid, originalPath, newPath, e);
           success = true;
         }
 
         if (!success && fs.exists(newPath) && !fs.exists(originalPath)) {
           log.debug(
-              "Ingoring rename failure because destination already exists. tid: {} orig:
{} new: {}",
+              "Ingoring rename failure because destination already exists. {} orig: {} new:
{}",
               fmtTid, originalPath, newPath);
           success = true;
         }
 
         if (success && log.isTraceEnabled())
-          log.trace("tid {} moved {} to {}", fmtTid, originalPath, newPath);
+          log.trace("{} moved {} to {}", fmtTid, originalPath, newPath);
         return success;
       }));
     }
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java
index 56fb73c..c10dc9c 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.fate.FateTxId;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -67,7 +68,7 @@ public class CleanUpBulkImport extends MasterRepo {
       log.debug("Failed to delete renames and/or loadmap", ioe);
     }
 
-    log.debug("completing bulkDir import transaction " + tid);
+    log.debug("completing bulkDir import transaction " + FateTxId.formatTid(tid));
     if (info.tableState == TableState.ONLINE) {
       ZooArbitrator.cleanup(master.getContext(), Constants.BULK_ARBITRATOR_TYPE, tid);
     }
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
index 72241cc..e824782 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
@@ -51,6 +51,7 @@ import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.MapCounter;
 import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.fate.FateTxId;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -82,7 +83,8 @@ class LoadFiles extends MasterRepo {
   @Override
   public long isReady(long tid, Master master) throws Exception {
     if (master.onlineTabletServers().size() == 0) {
-      log.warn("There are no tablet server to process bulkDir import, waiting (tid = " +
tid + ")");
+      log.warn("There are no tablet server to process bulkDir import, waiting (tid = "
+          + FateTxId.formatTid(tid) + ")");
       return 100;
     }
     VolumeManager fs = master.getFileSystem();
@@ -140,7 +142,7 @@ class LoadFiles extends MasterRepo {
       super.start(bulkDir, master, tid, setTime);
 
       timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
-      fmtTid = String.format("%016x", tid);
+      fmtTid = FateTxId.formatTid(tid);
 
       loadMsgs = new MapCounter<>();
 
@@ -152,7 +154,7 @@ class LoadFiles extends MasterRepo {
         loadQueue.forEach((server, tabletFiles) -> {
 
           if (log.isTraceEnabled()) {
-            log.trace("tid {} asking {} to bulk import {} files for {} tablets", fmtTid,
server,
+            log.trace("{} asking {} to bulk import {} files for {} tablets", fmtTid, server,
                 tabletFiles.values().stream().mapToInt(Map::size).sum(), tabletFiles.size());
           }
 
@@ -162,8 +164,7 @@ class LoadFiles extends MasterRepo {
             client.loadFiles(TraceUtil.traceInfo(), master.getContext().rpcCreds(), tid,
                 bulkDir.toString(), tabletFiles, setTime);
           } catch (TException ex) {
-            log.debug("rpc failed server: " + server + ", tid:" + fmtTid + " " + ex.getMessage(),
-                ex);
+            log.debug("rpc failed server: " + server + ", " + fmtTid + " " + ex.getMessage(),
ex);
           } finally {
             ThriftUtil.returnClient(client);
           }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
index f29409c..917b0b6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
@@ -112,12 +112,8 @@ public class TabletData {
       } else if (family.equals(LastLocationColumnFamily.NAME)) {
         lastLocation = new TServerInstance(value, key.getColumnQualifier());
       } else if (family.equals(BulkFileColumnFamily.NAME)) {
-        Long id = Long.decode(value.toString());
-        List<FileRef> lst = bulkImported.get(id);
-        if (lst == null) {
-          bulkImported.put(id, lst = new ArrayList<>());
-        }
-        lst.add(new FileRef(fs, key));
+        Long id = MetadataTableUtil.getBulkLoadTid(value);
+        bulkImported.computeIfAbsent(id, l -> new ArrayList<FileRef>()).add(new
FileRef(fs, key));
       } else if (PREV_ROW_COLUMN.hasColumns(key)) {
         KeyExtent check = new KeyExtent(key.getRow(), value);
         if (!check.equals(extent)) {
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/FateCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/FateCommand.java
index 00ac9df..35eddab 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/FateCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/FateCommand.java
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.fate.AdminUtil;
+import org.apache.accumulo.fate.FateTxId;
 import org.apache.accumulo.fate.ReadOnlyRepo;
 import org.apache.accumulo.fate.ReadOnlyTStore.TStatus;
 import org.apache.accumulo.fate.Repo;
@@ -109,6 +110,14 @@ public class FateCommand extends Command {
   private Option statusOption;
   private Option disablePaginationOpt;
 
+  private long parseTxid(String s) {
+    if (FateTxId.isFormatedTid(s)) {
+      return FateTxId.fromString(s);
+    } else {
+      return Long.parseLong(s, 16);
+    }
+  }
+
   @Override
   public int execute(final String fullCommand, final CommandLine cl, final Shell shellState)
       throws ParseException, KeeperException, InterruptedException, IOException {
@@ -158,7 +167,7 @@ public class FateCommand extends Command {
         filterTxid = new HashSet<>(args.length);
         for (int i = 1; i < args.length; i++) {
           try {
-            Long val = Long.parseLong(args[i], 16);
+            Long val = parseTxid(args[i]);
             filterTxid.add(val);
           } catch (NumberFormatException nfe) {
             // Failed to parse, will exit instead of displaying everything since the intention
was
@@ -198,7 +207,7 @@ public class FateCommand extends Command {
       } else {
         txids = new ArrayList<>();
         for (int i = 1; i < args.length; i++) {
-          txids.add(Long.parseLong(args[i], 16));
+          txids.add(parseTxid(args[i]));
         }
       }
 
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
index 9fa00f9..18d8182 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
@@ -126,7 +126,8 @@ public class BulkFailureIT extends AccumuloClusterHarness {
       VolumeManager vm = asCtx.getVolumeManager();
 
       // move the file into a directory for the table and rename the file to something unique
-      String bulkDir = BulkImport.prepareBulkImport(asCtx, vm, testFile, TableId.of(tableId));
+      String bulkDir =
+          BulkImport.prepareBulkImport(asCtx, vm, testFile, TableId.of(tableId), fateTxid);
 
       // determine the files new name and path
       FileStatus status = fs.listStatus(new Path(bulkDir))[0];


Mime
View raw message