accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [1/2] accumulo git commit: ACCUMULO-3735 allow users to see the bulk imports in progress
Date Mon, 24 Aug 2015 23:28:08 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 5e1c0d1db -> 784bd05a0


http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
index 588e3e0..c306e85 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
@@ -50,6 +50,8 @@ import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.BulkImportState;
+import org.apache.accumulo.core.master.thrift.BulkImportStatus;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.NamespacePermission;
 import org.apache.accumulo.core.security.SystemPermission;
@@ -61,6 +63,7 @@ import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.util.ServerBulkImportStatus;
 import org.apache.accumulo.server.util.TableDiskUsage;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
@@ -75,6 +78,7 @@ public class ClientServiceHandler implements ClientService.Iface {
   private final Instance instance;
   private final VolumeManager fs;
   private final SecurityOperation security;
+  private final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
 
   public ClientServiceHandler(AccumuloServerContext context, TransactionWatcher transactionWatcher,
VolumeManager fs) {
     this.context = context;
@@ -294,11 +298,17 @@ public class ClientServiceHandler implements ClientService.Iface {
     try {
       if (!security.canPerformSystemActions(credentials))
         throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+      bulkImportStatus.updateBulkImportStatus(files, BulkImportState.INITIAL);
       log.debug("Got request to bulk import files to table(" + tableId + "): " + files);
       return transactionWatcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, new Callable<List<String>>()
{
         @Override
         public List<String> call() throws Exception {
-          return BulkImporter.bulkLoad(context, tid, tableId, files, errorDir, setTime);
+          bulkImportStatus.updateBulkImportStatus(files, BulkImportState.PROCESSING);
+          try {
+            return BulkImporter.bulkLoad(context, tid, tableId, files, errorDir, setTime);
+          } finally {
+            bulkImportStatus.removeBulkImportStatus(files);
+          }
         }
       });
     } catch (AccumuloSecurityException e) {
@@ -452,4 +462,8 @@ public class ClientServiceHandler implements ClientService.Iface {
     AccumuloConfiguration config = context.getServerConfigurationFactory().getNamespaceConfiguration(namespaceId);
     return conf(credentials, config);
   }
+
+  public List<BulkImportStatus> getBulkLoadStatus() {
+    return bulkImportStatus.getBulkLoadStatus();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/server/base/src/main/java/org/apache/accumulo/server/util/ServerBulkImportStatus.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ServerBulkImportStatus.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ServerBulkImportStatus.java
new file mode 100644
index 0000000..21815f8
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ServerBulkImportStatus.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.accumulo.core.master.thrift.BulkImportState;
+import org.apache.accumulo.core.master.thrift.BulkImportStatus;
+
+// A little class to hold bulk import status information in the Master
+// and two places in the tablet server.
+public class ServerBulkImportStatus {
+  private final ConcurrentMap<String,BulkImportStatus> status = new ConcurrentHashMap<>();
+
+  public List<BulkImportStatus> getBulkLoadStatus() {
+    return new ArrayList<>(status.values());
+  }
+
+  public void updateBulkImportStatus(List<String> files, BulkImportState state) {
+    for (String file : files) {
+      BulkImportStatus initial = new BulkImportStatus(System.currentTimeMillis(), file, state);
+      status.putIfAbsent(file, initial);
+      initial = status.get(file);
+      if (initial != null) {
+        initial.state = state;
+      }
+    }
+  }
+
+  public void removeBulkImportStatus(List<String> files) {
+    status.keySet().removeAll(files);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
index d50ce16..cda9548 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.core.master.thrift.FateOperation;
 import org.apache.accumulo.core.master.thrift.FateService;
 import org.apache.accumulo.core.security.thrift.TCredentials;
@@ -363,6 +364,7 @@ class FateServiceHandler implements FateService.Iface {
         if (!canBulkImport)
           throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
 
+        master.updateBulkImportStatus(dir, BulkImportState.INITIAL);
         master.fate.seedTransaction(opid, new TraceRepo<Master>(new BulkImport(tableId,
dir, failDir, setTime)), autoCleanup);
         break;
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 6f8bc55..ff4705e 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.master;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.io.IOException;
@@ -55,6 +56,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.core.master.thrift.MasterClientService.Iface;
 import org.apache.accumulo.core.master.thrift.MasterClientService.Processor;
 import org.apache.accumulo.core.master.thrift.MasterGoalState;
@@ -135,6 +137,7 @@ import org.apache.accumulo.server.tables.TableObserver;
 import org.apache.accumulo.server.util.DefaultMap;
 import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.util.ServerBulkImportStatus;
 import org.apache.accumulo.server.util.TableInfoUtil;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.server.zookeeper.ZooLock;
@@ -158,7 +161,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 
 /**
  * The Master is responsible for assigning and balancing tablets to tablet servers.
@@ -210,6 +212,7 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
   Fate<Master> fate;
 
   volatile SortedMap<TServerInstance,TabletServerStatus> tserverStatus = Collections.unmodifiableSortedMap(new
TreeMap<TServerInstance,TabletServerStatus>());
+  final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
 
   @Override
   public synchronized MasterState getMasterState() {
@@ -1564,6 +1567,7 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
     }
     DeadServerList obit = new DeadServerList(ZooUtil.getRoot(getInstance()) + Constants.ZDEADTSERVERS);
     result.deadTabletServers = obit.getList();
+    result.bulkImports = bulkImportStatus.getBulkLoadStatus();
     return result;
   }
 
@@ -1598,4 +1602,12 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
       }
     }
   }
+
+  public void updateBulkImportStatus(String directory, BulkImportState state) {
+    bulkImportStatus.updateBulkImportStatus(Collections.singletonList(directory), state);
+  }
+
+  public void removeBulkImportStatus(String directory) {
+    bulkImportStatus.removeBulkImportStatus(Collections.singletonList(directory));
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index 5313964..4b53096 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.master.tableOps;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -32,6 +34,7 @@ import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
@@ -46,8 +49,6 @@ import org.apache.hadoop.io.MapFile;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-
 /*
  * Bulk import makes requests of tablet servers, and those requests can take a
  * long time. Our communications to the tablet server may fail, so we won't know
@@ -131,7 +132,7 @@ public class BulkImport extends MasterRepo {
           errorDir + " is not empty");
 
     ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid);
-
+    master.updateBulkImportStatus(sourceDir, BulkImportState.MOVING);
     // move the files into the directory
     try {
       String bulkDir = prepareBulkImport(master, fs, sourceDir, tableId);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CleanUpBulkImport.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CleanUpBulkImport.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CleanUpBulkImport.java
index 5ca325f..fef327d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CleanUpBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CleanUpBulkImport.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.master.tableOps;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.server.util.MetadataTableUtil;
@@ -46,6 +47,7 @@ class CleanUpBulkImport extends MasterRepo {
 
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
+    master.updateBulkImportStatus(source, BulkImportState.CLEANUP);
     log.debug("removing the bulk processing flag file in " + bulk);
     Path bulkDir = new Path(bulk);
     MetadataTableUtil.removeBulkLoadInProgressFlag(master, "/" + bulkDir.getParent().getName()
+ "/" + bulkDir.getName());
@@ -59,6 +61,7 @@ class CleanUpBulkImport extends MasterRepo {
     Utils.getReadLock(tableId, tid).unlock();
     log.debug("completing bulk import transaction " + tid);
     ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
+    master.removeBulkImportStatus(source);
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java
index 5f5b298..e57a678 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.security.Authorizations;
@@ -86,7 +87,7 @@ class CopyFailed extends MasterRepo {
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
     // This needs to execute after the arbiter is stopped
-
+    master.updateBulkImportStatus(source, BulkImportState.COPY_FILES);
     VolumeManager fs = master.getFileSystem();
 
     if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
index 0c0aad6..48cbaa5 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.master.tableOps;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.io.BufferedWriter;
@@ -40,6 +41,7 @@ import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
@@ -53,8 +55,6 @@ import org.apache.htrace.wrappers.TraceExecutorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-
 class LoadFiles extends MasterRepo {
 
   private static final long serialVersionUID = 1L;
@@ -95,6 +95,7 @@ class LoadFiles extends MasterRepo {
 
   @Override
   public Repo<Master> call(final long tid, final Master master) throws Exception {
+    master.updateBulkImportStatus(source, BulkImportState.LOADING);
     ExecutorService executor = getThreadPool(master);
     final AccumuloConfiguration conf = master.getConfiguration();
     VolumeManager fs = master.getFileSystem();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index a8c5d79..d704b94 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -60,6 +60,7 @@ import org.apache.accumulo.fate.util.LoggingRunnable;
 import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.monitor.servlets.BulkImportServlet;
 import org.apache.accumulo.monitor.servlets.DefaultServlet;
 import org.apache.accumulo.monitor.servlets.GcStatusServlet;
 import org.apache.accumulo.monitor.servlets.JSONServlet;
@@ -473,6 +474,7 @@ public class Monitor {
     server.addServlet(JSONServlet.class, "/json");
     server.addServlet(VisServlet.class, "/vis");
     server.addServlet(ScanServlet.class, "/scans");
+    server.addServlet(BulkImportServlet.class, "/bulkImports");
     server.addServlet(Summary.class, "/trace/summary");
     server.addServlet(ListType.class, "/trace/listType");
     server.addServlet(ShowTrace.class, "/trace/show");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
index fac18cd..fc329b8 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
@@ -168,6 +168,7 @@ abstract public class BasicServlet extends HttpServlet {
     sb.append("<a href='/master'>Master&nbsp;Server</a><br />\n");
     sb.append("<a href='/tservers'>Tablet&nbsp;Servers</a><br />\n");
     sb.append("<a href='/scans'>Active&nbsp;Scans</a><br />\n");
+    sb.append("<a href='/bulkImports'>Bulk&nbsp;Imports</a><br />\n");
     sb.append("<a href='/vis'>Server Activity</a><br />\n");
     sb.append("<a href='/gc'>Garbage&nbsp;Collector</a><br />\n");
     sb.append("<a href='/tables'>Tables</a><br />\n");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BulkImportServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BulkImportServlet.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BulkImportServlet.java
new file mode 100644
index 0000000..4be67c6
--- /dev/null
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BulkImportServlet.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.monitor.servlets;
+
+import java.io.IOException;
+import java.util.List;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.accumulo.core.master.thrift.BulkImportStatus;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.monitor.Monitor;
+import org.apache.accumulo.monitor.util.Table;
+import org.apache.accumulo.monitor.util.TableRow;
+import org.apache.accumulo.monitor.util.celltypes.BulkImportStateType;
+import org.apache.accumulo.monitor.util.celltypes.DurationType;
+import org.apache.accumulo.monitor.util.celltypes.PreciseNumberType;
+import org.apache.accumulo.monitor.util.celltypes.TServerLinkType;
+
+public class BulkImportServlet extends BasicServlet {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  protected String getTitle(HttpServletRequest req) {
+    return "Bulk Imports";
+  }
+  
+  static private long duration(long start) {
+    return (System.currentTimeMillis() - start) / 1000L;
+  }
+
+  @Override
+  protected void pageBody(HttpServletRequest req, HttpServletResponse response, StringBuilder
sb) throws IOException {
+	  Table table = new Table("masterBulkImportStatus", "Bulk&nbsp;Import&nbsp;Status");
+    table.addSortableColumn("Directory");
+	  table.addSortableColumn("Age", new DurationType(0l, 5 * 60 * 1000l), "The age the import.");
+	  table.addSortableColumn("State", new BulkImportStateType(), "The current state of the
bulk import");
+	  for (BulkImportStatus bulk : Monitor.getMmi().bulkImports) {
+	    TableRow row = table.prepareRow();
+	    row.add(bulk.filename);
+	    row.add(duration(bulk.startTime));
+	    row.add(bulk.state);
+	    table.addRow(row);
+	  }
+	  table.generate(req, sb);
+	      
+    table = new Table("bulkImportStatus", "TabletServer&nbsp;Bulk&nbsp;Import&nbsp;Status");
+    table.addSortableColumn("Server", new TServerLinkType(), null);
+    table.addSortableColumn("#", new PreciseNumberType(0, 20, 0, 100), "Number of imports
presently running");
+    table.addSortableColumn("Oldest&nbsp;Age", new DurationType(0l, 5 * 60 * 1000l),
"The age of the oldest import running on this server.");
+    for (TabletServerStatus tserverInfo : Monitor.getMmi().getTServerInfo()) {
+      TableRow row = table.prepareRow();
+      row.add(tserverInfo);
+      List<BulkImportStatus> stats = tserverInfo.bulkImports;
+      if (stats != null) {
+        row.add(stats.size());
+        long oldest = Long.MAX_VALUE;
+        for (BulkImportStatus bulk : stats) {
+        	oldest = Math.min(oldest, bulk.startTime);
+        }
+        if (oldest != Long.MAX_VALUE) {
+        	row.add(duration(oldest));
+        } else {
+        	row.add(0L);
+        }
+      } else {
+    	row.add(0);
+    	row.add(0L);
+      }
+      table.addRow(row);
+    }
+    table.generate(req, sb);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/celltypes/BulkImportStateType.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/celltypes/BulkImportStateType.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/celltypes/BulkImportStateType.java
new file mode 100644
index 0000000..6b4ec52
--- /dev/null
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/celltypes/BulkImportStateType.java
@@ -0,0 +1,29 @@
+package org.apache.accumulo.monitor.util.celltypes;
+
+import org.apache.accumulo.core.master.thrift.BulkImportState;
+
+public class BulkImportStateType extends CellType<BulkImportState> {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public String alignment() {
+    return "left";
+  }
+
+  @Override
+  public String format(Object obj) {
+    BulkImportState state = (BulkImportState)obj;
+    return state.name();
+  }
+
+  @Override
+  public int compare(BulkImportState o1, BulkImportState o2) {
+    if (o1 == null && o2 == null)
+      return 0;
+    else if (o1 == null)
+      return -1;
+    return o1.compareTo(o2);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index a39fbae..ac5d948 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.tserver;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
 
@@ -101,6 +102,7 @@ import org.apache.accumulo.core.data.thrift.TMutation;
 import org.apache.accumulo.core.data.thrift.TRange;
 import org.apache.accumulo.core.data.thrift.UpdateErrors;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.core.master.thrift.Compacting;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.master.thrift.TableInfo;
@@ -188,6 +190,7 @@ import org.apache.accumulo.server.util.FileSystemMonitor;
 import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.server.util.MasterMetadataUtil;
 import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.util.ServerBulkImportStatus;
 import org.apache.accumulo.server.util.time.RelativeTime;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
@@ -250,7 +253,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.net.HostAndPort;
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 
 public class TabletServer extends AccumuloServerContext implements Runnable {
   private static final Logger log = LoggerFactory.getLogger(TabletServer.class);
@@ -396,6 +398,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
 
   private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
   private final ReentrantLock recoveryLock = new ReentrantLock(true);
+  private ThriftClientHandler clientHandler;
+  private final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
 
   private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface
{
 
@@ -2263,6 +2267,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
 
   private HostAndPort startTabletClientService() throws UnknownHostException {
     // start listening for client connection last
+    clientHandler = new ThriftClientHandler();
     Iface rpcProxy = RpcWrapper.service(new ThriftClientHandler());
     final Processor<Iface> processor;
     if (ThriftServerType.SASL == getThriftServerType()) {
@@ -2907,6 +2912,9 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
     result.logSorts = logSorter.getLogSorts();
     result.flushs = flushCounter.get();
     result.syncs = syncCounter.get();
+    result.bulkImports = new ArrayList<>();
+    result.bulkImports.addAll(clientHandler.getBulkLoadStatus());
+    result.bulkImports.addAll(bulkImportStatus.getBulkLoadStatus());
     return result;
   }
 
@@ -3084,4 +3092,12 @@ public class TabletServer extends AccumuloServerContext implements
Runnable {
     walMarker.closeWal(getTabletSession(), currentLog.getPath());
   }
 
+  public void updateBulkImportState(List<String> files, BulkImportState state) {
+    bulkImportStatus.updateBulkImportStatus(files, state);
+  }
+
+  public void removeBulkImportState(List<String> files) {
+    bulkImportStatus.removeBulkImportStatus(files);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index ad3fb47..b8c260d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.tserver.tablet;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.io.ByteArrayInputStream;
@@ -73,6 +74,7 @@ import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
+import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.core.master.thrift.TabletLoadState;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
@@ -149,7 +151,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 
 /**
  *
@@ -2161,9 +2162,11 @@ public class Tablet implements TabletCommitter {
 
   public void importMapFiles(long tid, Map<FileRef,MapFileInfo> fileMap, boolean setTime)
throws IOException {
     Map<FileRef,DataFileValue> entries = new HashMap<FileRef,DataFileValue>(fileMap.size());
+    List<String> files = new ArrayList<>();
 
     for (Entry<FileRef,MapFileInfo> entry : fileMap.entrySet()) {
       entries.put(entry.getKey(), new DataFileValue(entry.getValue().estimatedSize, 0l));
+      files.add(entry.getKey().path().toString());
     }
 
     // Clients timeout and will think that this operation failed.
@@ -2198,7 +2201,7 @@ public class Tablet implements TabletCommitter {
 
       writesInProgress++;
     }
-
+    tabletServer.updateBulkImportState(files, BulkImportState.LOADING);
     try {
       getDatafileManager().importMapFiles(tid, entries, setTime);
       lastMapFileImportTime = System.currentTimeMillis();
@@ -2227,6 +2230,7 @@ public class Tablet implements TabletCommitter {
         } catch (Exception ex) {
           log.info(ex.toString(), ex);
         }
+        tabletServer.removeBulkImportState(files);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java b/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
new file mode 100644
index 0000000..76e2903
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
@@ -0,0 +1,129 @@
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class BulkImportMonitoringIT extends ConfigurableMacBase {
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setNumTservers(1);
+    cfg.useMiniDFS(true);
+    cfg.setProperty(Property.GC_FILE_ARCHIVE, "false");
+  }
+
+  @Test
+  public void test() throws Exception {
+	getCluster().getClusterControl().start(ServerType.MONITOR);
+    final Connector c = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1");
+    // splits to slow down bulk import
+    SortedSet<Text> splits = new TreeSet<>();
+    for (int i = 1; i < 0xf; i++) {
+      splits.add(new Text(Integer.toHexString(i)));
+    }
+    c.tableOperations().addSplits(tableName, splits);
+
+    MasterMonitorInfo stats = getCluster().getMasterMonitorInfo();
+    assertEquals(1, stats.tServerInfo.size());
+    assertEquals(0, stats.bulkImports.size());
+    assertEquals(0, stats.tServerInfo.get(0).bulkImports.size());
+
+    log.info("Creating lots of bulk import files");
+    final FileSystem fs = getCluster().getFileSystem();
+    final Path basePath = getCluster().getTemporaryPath();
+    CachedConfiguration.setInstance(fs.getConf());
+
+    final Path base = new Path(basePath, "testBulkLoad" + tableName);
+    fs.delete(base, true);
+    fs.mkdirs(base);
+
+    ExecutorService es = Executors.newFixedThreadPool(5);
+    List<Future<Pair<String,String>>> futures = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      final int which = i;
+      futures.add(es.submit(new Callable<Pair<String,String>>() {
+        @Override
+        public Pair<String,String> call() throws Exception {
+          Path bulkFailures = new Path(base, "failures" + which);
+          Path files = new Path(base, "files" + which);
+          fs.mkdirs(bulkFailures);
+          fs.mkdirs(files);
+          for (int i = 0; i < 10; i++) {
+            FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString()
+ "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(),
+                AccumuloConfiguration.getDefaultConfiguration());
+            writer.startDefaultLocalityGroup();
+            for (int j = 0x100; j < 0xfff; j += 3) {
+              writer.append(new Key(Integer.toHexString(j)), new Value(new byte[0]));
+            }
+            writer.close();
+          }
+          return new Pair<String,String>(files.toString(), bulkFailures.toString());
+        }
+      }));
+    }
+    List<Pair<String,String>> dirs = new ArrayList<>();
+    for (Future<Pair<String,String>> f : futures) {
+      dirs.add(f.get());
+    }
+    log.info("Importing");
+    long now = System.currentTimeMillis();
+    List<Future<Object>> errs = new ArrayList<>();
+    for (Pair<String,String> entry : dirs) {
+      final String dir = entry.getFirst();
+      final String err = entry.getSecond();
+      errs.add(es.submit(new Callable<Object>() {
+        @Override
+        public Object call() throws Exception {
+          c.tableOperations().importDirectory(tableName, dir, err, false);
+          return null;
+        }
+      }));
+    }
+    es.shutdown();
+    while (!es.isTerminated() && stats.bulkImports.size() + stats.tServerInfo.get(0).bulkImports.size()
== 0) {
+      es.awaitTermination(10, TimeUnit.MILLISECONDS);
+      stats = getCluster().getMasterMonitorInfo();
+    }
+    log.info(stats.bulkImports.toString());
+    assertTrue(stats.bulkImports.size() > 0);
+    // look for exception
+    for (Future<Object> err : errs) {
+        err.get();
+    }
+    es.awaitTermination(2, TimeUnit.MINUTES);
+    assertTrue(es.isTerminated());
+    log.info(String.format("Completed in %.2f seconds", (System.currentTimeMillis() - now)
/ 1000.));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java b/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
index e0a0832..259a19e 100644
--- a/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
@@ -55,7 +55,7 @@ import org.junit.Test;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.gson.Gson;
 
-// ACCUMULO-3949
+// ACCUMULO-3949, ACCUMULO-3953
 public class GetFileInfoBulkIT extends ConfigurableMacBase {
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/784bd05a/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java b/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
index 5c2cbf3..0d0449e 100644
--- a/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
@@ -21,6 +21,7 @@ import java.util.Date;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.master.thrift.BulkImportStatus;
 import org.apache.accumulo.core.master.thrift.DeadServer;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
@@ -67,6 +68,12 @@ public class GetMasterStats {
       out(2, "Last report: %s", new SimpleDateFormat().format(new Date(dead.lastStatus)));
       out(2, "Cause: %s", dead.status);
     }
+    out(0, "Bulk imports: %s", stats.bulkImports.size());
+    for (BulkImportStatus bulk : stats.bulkImports) {
+      out(1, "Import directory: %s", bulk.filename);
+      out(2, "Bulk state %s", bulk.state);
+      out(2, "Bulk start %s", bulk.startTime);
+    }
     if (stats.tableMap != null && stats.tableMap.size() > 0) {
       out(0, "Tables");
       for (Entry<String,TableInfo> entry : stats.tableMap.entrySet()) {
@@ -117,6 +124,13 @@ public class GetMasterStats {
           out(3, "Progress: %.2f%%", sort.progress * 100);
           out(3, "Time running: %s", sort.runtime / 1000.);
         }
+        out(3, "Bulk imports: %s", stats.bulkImports.size());
+        for (BulkImportStatus bulk : stats.bulkImports) {
+          out(4, "Import file: %s", bulk.filename);
+          out(5, "Bulk state %s", bulk.state);
+          out(5, "Bulk start %s", bulk.startTime);
+        }
+
       }
     }
   }


Mime
View raw message