Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 850A310DD6 for ; Fri, 6 Sep 2013 18:22:57 +0000 (UTC) Received: (qmail 62586 invoked by uid 500); 6 Sep 2013 18:22:50 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 62516 invoked by uid 500); 6 Sep 2013 18:22:50 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 60286 invoked by uid 99); 6 Sep 2013 18:22:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Sep 2013 18:22:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id F1615902A93; Fri, 6 Sep 2013 18:22:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Fri, 06 Sep 2013 18:23:14 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [46/53] [abbrv] ACCUMULO-658 consistent package names to avoid overlapped sealed jars http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java b/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java index 765b83a..98bed54 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java @@ -25,9 +25,9 @@ import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.master.Master; import org.apache.accumulo.master.EventCoordinator.Listener; -import org.apache.accumulo.master.LiveTServerSet.TServerConnection; -import org.apache.accumulo.master.state.TServerInstance; import org.apache.accumulo.master.tableOps.MasterRepo; +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; +import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java index 20e4291..1580f70 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java +++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java @@ -39,15 +39,15 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; -import org.apache.accumulo.master.state.Assignment; -import org.apache.accumulo.master.state.CurrentState; -import org.apache.accumulo.master.state.MergeInfo; -import org.apache.accumulo.master.state.MergeState; import org.apache.accumulo.master.state.MergeStats; -import org.apache.accumulo.master.state.MetaDataStateStore; -import org.apache.accumulo.master.state.TServerInstance; -import org.apache.accumulo.master.state.TabletLocationState; -import org.apache.accumulo.master.state.TabletState; +import org.apache.accumulo.server.master.state.Assignment; +import org.apache.accumulo.server.master.state.CurrentState; +import org.apache.accumulo.server.master.state.MergeInfo; +import org.apache.accumulo.server.master.state.MergeState; +import org.apache.accumulo.server.master.state.MetaDataStateStore; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.server.master.state.TabletState; import org.apache.hadoop.io.Text; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/master/src/test/java/org/apache/accumulo/master/balancer/ChaoticLoadBalancerTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/balancer/ChaoticLoadBalancerTest.java b/server/master/src/test/java/org/apache/accumulo/master/balancer/ChaoticLoadBalancerTest.java index 7f4b306..7a7dfa2 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/balancer/ChaoticLoadBalancerTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/balancer/ChaoticLoadBalancerTest.java @@ -36,8 +36,8 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.master.balancer.ChaoticLoadBalancer; -import org.apache.accumulo.master.state.TServerInstance; import org.apache.accumulo.master.state.TabletMigration; +import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.hadoop.io.Text; import org.apache.thrift.TException; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/master/src/test/java/org/apache/accumulo/master/balancer/DefaultLoadBalancerTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/balancer/DefaultLoadBalancerTest.java b/server/master/src/test/java/org/apache/accumulo/master/balancer/DefaultLoadBalancerTest.java index d005436..a9abb06 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/balancer/DefaultLoadBalancerTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/balancer/DefaultLoadBalancerTest.java @@ -36,8 +36,8 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.master.balancer.DefaultLoadBalancer; -import org.apache.accumulo.master.state.TServerInstance; import org.apache.accumulo.master.state.TabletMigration; +import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.hadoop.io.Text; import org.apache.thrift.TException; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/master/src/test/java/org/apache/accumulo/master/balancer/TableLoadBalancerTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/balancer/TableLoadBalancerTest.java b/server/master/src/test/java/org/apache/accumulo/master/balancer/TableLoadBalancerTest.java index 653e1aa..749a0aa 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/balancer/TableLoadBalancerTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/balancer/TableLoadBalancerTest.java @@ -35,8 +35,8 @@ import org.apache.accumulo.core.master.thrift.TableInfo; import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.util.AddressUtil; -import org.apache.accumulo.master.state.TServerInstance; import org.apache.accumulo.master.state.TabletMigration; +import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.hadoop.io.Text; import org.apache.thrift.TException; import org.junit.Assert; http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java index 85ee5ec..d7fc619 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java @@ -17,8 +17,8 @@ package org.apache.accumulo.master.state; import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.master.state.MergeInfo; -import org.apache.accumulo.master.state.MergeState; +import org.apache.accumulo.server.master.state.MergeInfo; +import org.apache.accumulo.server.master.state.MergeState; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; @@ -34,9 +34,9 @@ public class MergeInfoTest { in.reset(buffer.getData(), 0, buffer.getLength()); MergeInfo info2 = new MergeInfo(); info2.readFields(in); - Assert.assertEquals(info.extent, info2.extent); - Assert.assertEquals(info.state, info2.state); - Assert.assertEquals(info.operation, info2.operation); + Assert.assertEquals(info.getExtent(), info2.getExtent()); + Assert.assertEquals(info.getState(), info2.getState()); + Assert.assertEquals(info.getOperation(), info2.getOperation()); return info2; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java index 158e948..0fe86e5 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java @@ -30,13 +30,13 @@ import java.util.List; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.util.AddressUtil; -import org.apache.accumulo.master.state.Assignment; -import org.apache.accumulo.master.state.DistributedStore; -import org.apache.accumulo.master.state.DistributedStoreException; -import org.apache.accumulo.master.state.TServerInstance; -import org.apache.accumulo.master.state.TabletLocationState; -import org.apache.accumulo.master.state.ZooTabletStateStore; -import org.apache.accumulo.master.state.TabletLocationState.BadLocationStateException; +import org.apache.accumulo.server.master.state.Assignment; +import org.apache.accumulo.server.master.state.DistributedStore; +import org.apache.accumulo.server.master.state.DistributedStoreException; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.server.master.state.ZooTabletStateStore; +import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException; import org.apache.hadoop.io.Text; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java index 4581d96..17c6a80 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java @@ -25,9 +25,9 @@ import javax.servlet.http.HttpServletResponse; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.zookeeper.ZooUtil; -import org.apache.accumulo.master.state.DeadServerList; import org.apache.accumulo.monitor.LogService; import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.master.state.DeadServerList; import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.problems.ProblemType; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TServersServlet.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TServersServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TServersServlet.java index 48b1feb..abe27b6 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TServersServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TServersServlet.java @@ -38,7 +38,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.util.Duration; import org.apache.accumulo.core.util.ThriftUtil; -import org.apache.accumulo.master.state.TabletServerState; import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.monitor.util.Table; import org.apache.accumulo.monitor.util.TableRow; @@ -50,6 +49,7 @@ import org.apache.accumulo.monitor.util.celltypes.PercentageType; import org.apache.accumulo.monitor.util.celltypes.ProgressChartType; import org.apache.accumulo.monitor.util.celltypes.TServerLinkType; import org.apache.accumulo.monitor.util.celltypes.TableLinkType; +import org.apache.accumulo.server.master.state.TabletServerState; import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.util.ActionStatsUpdator; import org.apache.accumulo.server.util.TableInfoUtil; http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TablesServlet.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TablesServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TablesServlet.java index 4efedd8..428880e 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TablesServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/TablesServlet.java @@ -35,8 +35,6 @@ import org.apache.accumulo.core.master.thrift.TableInfo; import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.master.state.MetaDataTableScanner; -import org.apache.accumulo.master.state.TabletLocationState; import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.monitor.util.Table; import org.apache.accumulo.monitor.util.TableRow; @@ -46,6 +44,8 @@ import org.apache.accumulo.monitor.util.celltypes.NumberType; import org.apache.accumulo.monitor.util.celltypes.TableLinkType; import org.apache.accumulo.monitor.util.celltypes.TableStateType; import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.master.state.MetaDataTableScanner; +import org.apache.accumulo.server.master.state.TabletLocationState; import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.tables.TableManager; import org.apache.accumulo.server.util.TableInfoUtil; http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java index e9c4bd3..0cbf957 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/XMLServlet.java @@ -30,10 +30,10 @@ import org.apache.accumulo.core.master.thrift.Compacting; import org.apache.accumulo.core.master.thrift.DeadServer; import org.apache.accumulo.core.master.thrift.TableInfo; import org.apache.accumulo.core.master.thrift.TabletServerStatus; -import org.apache.accumulo.master.state.TabletServerState; import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.monitor.util.celltypes.TServerLinkType; import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.master.state.TabletServerState; import org.apache.accumulo.server.util.TableInfoUtil; public class XMLServlet extends BasicServlet { http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/server/src/main/java/org/apache/accumulo/server/util/Admin.java ---------------------------------------------------------------------- diff --git a/server/server/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/server/src/main/java/org/apache/accumulo/server/util/Admin.java deleted file mode 100644 index 8c797cd..0000000 --- a/server/server/src/main/java/org/apache/accumulo/server/util/Admin.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.util; - -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.InstanceOperations; -import org.apache.accumulo.core.client.impl.ClientExec; -import org.apache.accumulo.core.client.impl.MasterClient; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.master.thrift.MasterClientService; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.security.Credentials; -import org.apache.accumulo.core.util.AddressUtil; -import org.apache.accumulo.server.cli.ClientOpts; -import org.apache.accumulo.server.security.SystemCredentials; -import org.apache.accumulo.trace.instrument.Tracer; -import org.apache.log4j.Logger; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.Parameters; - -public class Admin { - private static final Logger log = Logger.getLogger(Admin.class); - - static class AdminOpts extends ClientOpts { - @Parameter(names = {"-f", "--force"}, description = "force the given server to stop by removing its lock") - boolean force = false; - } - - @Parameters(commandDescription = "stop the tablet server on the given hosts") - static class StopCommand { - @Parameter(description = " { ... }") - List args = new ArrayList(); - } - - @Parameters(commandDescription = "Ping tablet servers. If no arguments, pings all.") - static class PingCommand { - @Parameter(description = "{ ... }") - List args = new ArrayList(); - } - - @Parameters(commandDescription = "print tablets that are offline in online tables") - static class CheckTabletsCommand { - @Parameter(names = "--fixFiles", description = "Remove dangling file pointers") - boolean fixFiles = false; - - @Parameter(names = {"-t", "--table"}, description = "Table to check, if not set checks all tables") - String table = null; - } - - @Parameters(commandDescription = "stop the master") - static class StopMasterCommand {} - - @Parameters(commandDescription = "stop all the servers") - static class StopAllCommand {} - - @Parameters(commandDescription = "list Accumulo instances in zookeeper") - static class ListInstancesCommand { - @Parameter(names = "--print-errors", description = "display errors while listing instances") - boolean printErrors = false; - @Parameter(names = "--print-all", description = "print information for all instances, not just those with names") - boolean printAll = false; - } - - public static void main(String[] args) { - boolean everything; - - AdminOpts opts = new AdminOpts(); - JCommander cl = new JCommander(opts); - cl.setProgramName(Admin.class.getName()); - - CheckTabletsCommand checkTabletsCommand = new CheckTabletsCommand(); - cl.addCommand("checkTablets", checkTabletsCommand); - - ListInstancesCommand listIntancesOpts = new ListInstancesCommand(); - cl.addCommand("listInstances", listIntancesOpts); - - PingCommand pingCommand = new PingCommand(); - cl.addCommand("ping", pingCommand); - - StopCommand stopOpts = new StopCommand(); - cl.addCommand("stop", stopOpts); - StopAllCommand stopAllOpts = new StopAllCommand(); - cl.addCommand("stopAll", stopAllOpts); - StopMasterCommand stopMasterOpts = new StopMasterCommand(); - cl.addCommand("stopMaster", stopMasterOpts); - cl.parse(args); - - if (opts.help || cl.getParsedCommand() == null) { - cl.usage(); - return; - } - Instance instance = opts.getInstance(); - - try { - String principal; - AuthenticationToken token; - if (opts.getToken() == null) { - principal = SystemCredentials.get().getPrincipal(); - token = SystemCredentials.get().getToken(); - } else { - principal = opts.principal; - token = opts.getToken(); - } - - int rc = 0; - - if (cl.getParsedCommand().equals("listInstances")) { - ListInstances.listInstances(instance.getZooKeepers(), listIntancesOpts.printAll, listIntancesOpts.printErrors); - } else if (cl.getParsedCommand().equals("ping")) { - if (ping(instance, principal, token, pingCommand.args) != 0) - rc = 4; - } else if (cl.getParsedCommand().equals("checkTablets")) { - System.out.println("\n*** Looking for offline tablets ***\n"); - if (FindOfflineTablets.findOffline(instance, new Credentials(principal, token), checkTabletsCommand.table) != 0) - rc = 5; - System.out.println("\n*** Looking for missing files ***\n"); - if (checkTabletsCommand.table == null) { - if (RemoveEntriesForMissingFiles.checkAllTables(instance, principal, token, checkTabletsCommand.fixFiles) != 0) - rc = 6; - } else { - if (RemoveEntriesForMissingFiles.checkTable(instance, principal, token, checkTabletsCommand.table, checkTabletsCommand.fixFiles) != 0) - rc = 6; - } - - }else if (cl.getParsedCommand().equals("stop")) { - stopTabletServer(instance, new Credentials(principal, token), stopOpts.args, opts.force); - } else { - everything = cl.getParsedCommand().equals("stopAll"); - - if (everything) - flushAll(instance, principal, token); - - stopServer(instance, new Credentials(principal, token), everything); - } - - if (rc != 0) - System.exit(rc); - } catch (AccumuloException e) { - log.error(e, e); - System.exit(1); - } catch (AccumuloSecurityException e) { - log.error(e, e); - System.exit(2); - } catch (Exception e) { - log.error(e, e); - System.exit(3); - } - } - - private static int ping(Instance instance, String principal, AuthenticationToken token, List args) throws AccumuloException, - AccumuloSecurityException { - - InstanceOperations io = instance.getConnector(principal, token).instanceOperations(); - - if (args.size() == 0) { - args = io.getTabletServers(); - } - - int unreachable = 0; - - for (String tserver : args) { - try { - io.ping(tserver); - System.out.println(tserver + " OK"); - } catch (AccumuloException ae) { - System.out.println(tserver + " FAILED (" + ae.getMessage() + ")"); - unreachable++; - } - } - - System.out.printf("\n%d of %d tablet servers unreachable\n\n", unreachable, args.size()); - return unreachable; - } - - /** - * flushing during shutdown is a performance optimization, its not required. The method will make an attempt to initiate flushes of all tables and give up if - * it takes too long. - * - */ - private static void flushAll(final Instance instance, final String principal, final AuthenticationToken token) throws AccumuloException, - AccumuloSecurityException { - - final AtomicInteger flushesStarted = new AtomicInteger(0); - - Runnable flushTask = new Runnable() { - - @Override - public void run() { - try { - Connector conn = instance.getConnector(principal, token); - Set tables = conn.tableOperations().tableIdMap().keySet(); - for (String table : tables) { - if (table.equals(MetadataTable.NAME)) - continue; - try { - conn.tableOperations().flush(table, null, null, false); - flushesStarted.incrementAndGet(); - } catch (TableNotFoundException e) {} - } - } catch (Exception e) { - log.warn("Failed to intiate flush " + e.getMessage()); - } - } - }; - - Thread flusher = new Thread(flushTask); - flusher.setDaemon(true); - flusher.start(); - - long start = System.currentTimeMillis(); - try { - flusher.join(3000); - } catch (InterruptedException e) {} - - while (flusher.isAlive() && System.currentTimeMillis() - start < 15000) { - int flushCount = flushesStarted.get(); - try { - flusher.join(1000); - } catch (InterruptedException e) {} - - if (flushCount == flushesStarted.get()) { - // no progress was made while waiting for join... maybe its stuck, stop waiting on it - break; - } - } - } - - private static void stopServer(final Instance instance, final Credentials credentials, final boolean tabletServersToo) throws AccumuloException, - AccumuloSecurityException { - MasterClient.execute(instance, new ClientExec() { - @Override - public void execute(MasterClientService.Client client) throws Exception { - client.shutdown(Tracer.traceInfo(), credentials.toThrift(instance), tabletServersToo); - } - }); - } - - private static void stopTabletServer(final Instance instance, final Credentials creds, List servers, final boolean force) throws AccumuloException, - AccumuloSecurityException { - for (String server : servers) { - InetSocketAddress address = AddressUtil.parseAddress(server); - final String finalServer = org.apache.accumulo.core.util.AddressUtil.toString(address); - log.info("Stopping server " + finalServer); - MasterClient.execute(instance, new ClientExec() { - @Override - public void execute(MasterClientService.Client client) throws Exception { - client.shutdownTabletServer(Tracer.traceInfo(), creds.toThrift(instance), finalServer, force); - } - }); - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java deleted file mode 100644 index c543881..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver; - -import java.io.IOException; - -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.server.conf.ServerConfiguration; -import org.apache.accumulo.server.trace.TraceFileSystem; -import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; - -/** - * Copy failed bulk imports. - */ -public class BulkFailedCopyProcessor implements Processor { - - private static final Logger log = Logger.getLogger(BulkFailedCopyProcessor.class); - - @Override - public Processor newProcessor() { - return new BulkFailedCopyProcessor(); - } - - @Override - public void process(String workID, byte[] data) { - - String paths[] = new String(data).split(","); - - Path orig = new Path(paths[0]); - Path dest = new Path(paths[1]); - Path tmp = new Path(dest.getParent(), dest.getName() + ".tmp"); - - try { - FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(), - ServerConfiguration.getSiteConfiguration())); - - FileUtil.copy(fs, orig, fs, tmp, false, true, CachedConfiguration.getInstance()); - fs.rename(tmp, dest); - log.debug("copied " + orig + " to " + dest); - } catch (IOException ex) { - try { - FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(), - ServerConfiguration.getSiteConfiguration())); - - fs.create(dest).close(); - log.warn(" marked " + dest + " failed", ex); - } catch (IOException e) { - log.error("Unable to create failure flag file " + dest, e); - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionQueue.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionQueue.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionQueue.java deleted file mode 100644 index f5f21e4..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionQueue.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver; - -import java.util.AbstractQueue; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -@SuppressWarnings({"rawtypes", "unchecked"}) -public class CompactionQueue extends AbstractQueue implements BlockingQueue { - - private List task = new LinkedList(); - - @Override - public synchronized Runnable poll() { - if (task.size() == 0) - return null; - - Comparable min = Collections.min(task); - task.remove(min); - return (Runnable) min; - } - - @Override - public synchronized Runnable peek() { - if (task.size() == 0) - return null; - - Comparable min = Collections.min(task); - return (Runnable) min; - } - - @Override - public synchronized boolean offer(Runnable e) { - task.add((Comparable) e); - notify(); - return true; - } - - @Override - public synchronized void put(Runnable e) throws InterruptedException { - task.add((Comparable) e); - notify(); - } - - @Override - public synchronized boolean offer(Runnable e, long timeout, TimeUnit unit) throws InterruptedException { - task.add((Comparable) e); - notify(); - return true; - } - - @Override - public synchronized Runnable take() throws InterruptedException { - while (task.size() == 0) { - wait(); - } - - return poll(); - } - - @Override - public synchronized Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { - if (task.size() == 0) { - wait(unit.toMillis(timeout)); - } - - if (task.size() == 0) - return null; - - return poll(); - } - - @Override - public synchronized int remainingCapacity() { - return Integer.MAX_VALUE; - } - - @Override - public synchronized int drainTo(Collection c) { - return drainTo(c, task.size()); - } - - @Override - public synchronized int drainTo(Collection c, int maxElements) { - Collections.sort(task); - - int num = Math.min(task.size(), maxElements); - - Iterator iter = task.iterator(); - for (int i = 0; i < num; i++) { - c.add((Runnable) iter.next()); - iter.remove(); - } - - return num; - } - - @Override - public synchronized Iterator iterator() { - Collections.sort(task); - - final Iterator iter = task.iterator(); - - return new Iterator() { - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public Runnable next() { - return (Runnable) iter.next(); - } - - @Override - public void remove() { - iter.remove(); - } - }; - } - - @Override - public synchronized int size() { - return task.size(); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionStats.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionStats.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionStats.java deleted file mode 100644 index 086c5cd..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/CompactionStats.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver; - -public class CompactionStats { - private long entriesRead; - private long entriesWritten; - private long fileSize; - - CompactionStats(long er, long ew) { - this.setEntriesRead(er); - this.setEntriesWritten(ew); - } - - public CompactionStats() {} - - private void setEntriesRead(long entriesRead) { - this.entriesRead = entriesRead; - } - - public long getEntriesRead() { - return entriesRead; - } - - private void setEntriesWritten(long entriesWritten) { - this.entriesWritten = entriesWritten; - } - - public long getEntriesWritten() { - return entriesWritten; - } - - public void add(CompactionStats mcs) { - this.entriesRead += mcs.entriesRead; - this.entriesWritten += mcs.entriesWritten; - } - - public void setFileSize(long fileSize) { - this.fileSize = fileSize; - } - - public long getFileSize() { - return this.fileSize; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java deleted file mode 100644 index 480fbaa..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java +++ /dev/null @@ -1,500 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver; - -import java.io.IOException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.data.ByteSequence; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.thrift.IterInfo; -import org.apache.accumulo.core.file.FileOperations; -import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.file.FileSKVWriter; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.IteratorUtil; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.iterators.WrappingIterator; -import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; -import org.apache.accumulo.core.iterators.system.DeletingIterator; -import org.apache.accumulo.core.iterators.system.MultiIterator; -import org.apache.accumulo.core.iterators.system.TimeSettingIterator; -import org.apache.accumulo.core.metadata.schema.DataFileValue; -import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; -import org.apache.accumulo.core.tabletserver.thrift.CompactionReason; -import org.apache.accumulo.core.tabletserver.thrift.CompactionType; -import org.apache.accumulo.core.util.LocalityGroupUtil; -import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; -import org.apache.accumulo.server.conf.TableConfiguration; -import org.apache.accumulo.server.fs.FileRef; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.problems.ProblemReport; -import org.apache.accumulo.server.problems.ProblemReportingIterator; -import org.apache.accumulo.server.problems.ProblemReports; -import org.apache.accumulo.server.problems.ProblemType; -import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason; -import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason; -import org.apache.accumulo.trace.instrument.Span; -import org.apache.accumulo.trace.instrument.Trace; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; - - -public class Compactor implements Callable { - - public class CountingIterator extends WrappingIterator { - - private long count; - - public CountingIterator deepCopy(IteratorEnvironment env) { - return new CountingIterator(this, env); - } - - private CountingIterator(CountingIterator other, IteratorEnvironment env) { - setSource(other.getSource().deepCopy(env)); - count = 0; - } - - public CountingIterator(SortedKeyValueIterator source) { - this.setSource(source); - count = 0; - } - - @Override - public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) { - throw new UnsupportedOperationException(); - } - - @Override - public void next() throws IOException { - super.next(); - count++; - if (count % 1024 == 0) { - entriesRead.addAndGet(1024); - } - } - - public long getCount() { - return count; - } - } - - private static final Logger log = Logger.getLogger(Compactor.class); - - static class CompactionCanceledException extends Exception { - private static final long serialVersionUID = 1L; - } - - static interface CompactionEnv { - boolean isCompactionEnabled(); - - IteratorScope getIteratorScope(); - } - - private Map filesToCompact; - private InMemoryMap imm; - private FileRef outputFile; - private boolean propogateDeletes; - private TableConfiguration acuTableConf; - private CompactionEnv env; - private Configuration conf; - private VolumeManager fs; - protected KeyExtent extent; - private List iterators; - - // things to report - private String currentLocalityGroup = ""; - private long startTime; - - private MajorCompactionReason reason; - protected MinorCompactionReason mincReason; - - private AtomicLong entriesRead = new AtomicLong(0); - private AtomicLong entriesWritten = new AtomicLong(0); - private DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); - - private synchronized void setLocalityGroup(String name) { - this.currentLocalityGroup = name; - } - - private void clearStats() { - entriesRead.set(0); - entriesWritten.set(0); - } - - protected static Set runningCompactions = Collections.synchronizedSet(new HashSet()); - - public static class CompactionInfo { - - private Compactor compactor; - private String localityGroup; - private long entriesRead; - private long entriesWritten; - - CompactionInfo(Compactor compactor) { - this.localityGroup = compactor.currentLocalityGroup; - this.entriesRead = compactor.entriesRead.get(); - this.entriesWritten = compactor.entriesWritten.get(); - this.compactor = compactor; - } - - public ActiveCompaction toThrift() { - - CompactionType type; - - if (compactor.imm != null) - if (compactor.filesToCompact.size() > 0) - type = CompactionType.MERGE; - else - type = CompactionType.MINOR; - else if (!compactor.propogateDeletes) - type = CompactionType.FULL; - else - type = CompactionType.MAJOR; - - CompactionReason reason; - - if (compactor.imm != null) - switch(compactor.mincReason){ - case USER: - reason = CompactionReason.USER; - break; - case CLOSE: - reason = CompactionReason.CLOSE; - break; - case SYSTEM: - default: - reason = CompactionReason.SYSTEM; - break; - } - else - switch (compactor.reason) { - case USER: - reason = CompactionReason.USER; - break; - case CHOP: - reason = CompactionReason.CHOP; - break; - case IDLE: - reason = CompactionReason.IDLE; - break; - case NORMAL: - default: - reason = CompactionReason.SYSTEM; - break; - } - - List iiList = new ArrayList(); - Map> iterOptions = new HashMap>(); - - for (IteratorSetting iterSetting : compactor.iterators) { - iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(), iterSetting.getName())); - iterOptions.put(iterSetting.getName(), iterSetting.getOptions()); - } - List filesToCompact = new ArrayList(); - for (FileRef ref : compactor.filesToCompact.keySet()) - filesToCompact.add(ref.toString()); - return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, filesToCompact, compactor.outputFile.toString(), type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions); - } - } - - public static List getRunningCompactions() { - ArrayList compactions = new ArrayList(); - - synchronized (runningCompactions) { - for (Compactor compactor : runningCompactions) { - compactions.add(new CompactionInfo(compactor)); - } - } - - return compactions; - } - - Compactor(Configuration conf, VolumeManager fs, Map files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes, - TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List iterators, MajorCompactionReason reason) { - this.extent = extent; - this.conf = conf; - this.fs = fs; - this.filesToCompact = files; - this.imm = imm; - this.outputFile = outputFile; - this.propogateDeletes = propogateDeletes; - this.acuTableConf = acuTableConf; - this.env = env; - this.iterators = iterators; - this.reason = reason; - - startTime = System.currentTimeMillis(); - } - - Compactor(Configuration conf, VolumeManager fs, Map files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes, - TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) { - this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList(), null); - } - - public VolumeManager getFileSystem() { - return fs; - } - - KeyExtent getExtent() { - return extent; - } - - String getOutputFile() { - return outputFile.toString(); - } - - @Override - public CompactionStats call() throws IOException, CompactionCanceledException { - - FileSKVWriter mfw = null; - - CompactionStats majCStats = new CompactionStats(); - - boolean remove = runningCompactions.add(this); - - clearStats(); - - String oldThreadName = Thread.currentThread().getName(); - String newThreadName = "MajC compacting " + extent.toString() + " started " + dateFormatter.format(new Date()) + " file: " + outputFile; - Thread.currentThread().setName(newThreadName); - try { - FileOperations fileFactory = FileOperations.getInstance(); - FileSystem ns = this.fs.getFileSystemByPath(outputFile.path()); - mfw = fileFactory.openWriter(outputFile.path().toString(), ns, ns.getConf(), acuTableConf); - - Map> lGroups; - try { - lGroups = LocalityGroupUtil.getLocalityGroups(acuTableConf); - } catch (LocalityGroupConfigurationError e) { - throw new IOException(e); - } - - long t1 = System.currentTimeMillis(); - - HashSet allColumnFamilies = new HashSet(); - - if (mfw.supportsLocalityGroups()) { - for (Entry> entry : lGroups.entrySet()) { - setLocalityGroup(entry.getKey()); - compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats); - allColumnFamilies.addAll(entry.getValue()); - } - } - - setLocalityGroup(""); - compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats); - - long t2 = System.currentTimeMillis(); - - FileSKVWriter mfwTmp = mfw; - mfw = null; // set this to null so we do not try to close it again in finally if the close fails - mfwTmp.close(); // if the close fails it will cause the compaction to fail - - // Verify the file, since hadoop 0.20.2 sometimes lies about the success of close() - try { - FileSKVIterator openReader = fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(), acuTableConf); - openReader.close(); - } catch (IOException ex) { - log.error("Verification of successful compaction fails!!! " + extent + " " + outputFile, ex); - throw ex; - } - - log.debug(String.format("Compaction %s %,d read | %,d written | %,6d entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(), - majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0)); - - majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns, ns.getConf(), acuTableConf)); - return majCStats; - } catch (IOException e) { - log.error(e, e); - throw e; - } catch (RuntimeException e) { - log.error(e, e); - throw e; - } finally { - Thread.currentThread().setName(oldThreadName); - if (remove) - runningCompactions.remove(this); - - try { - if (mfw != null) { - // compaction must not have finished successfully, so close its output file - try { - mfw.close(); - } finally { - if (!fs.deleteRecursively(outputFile.path())) - if (fs.exists(outputFile.path())) - log.error("Unable to delete " + outputFile); - } - } - } catch (IOException e) { - log.warn(e, e); - } - } - } - - private List> openMapDataFiles(String lgName, ArrayList readers) throws IOException { - - List> iters = new ArrayList>(filesToCompact.size()); - - for (FileRef mapFile : filesToCompact.keySet()) { - try { - - FileOperations fileFactory = FileOperations.getInstance(); - FileSystem fs = this.fs.getFileSystemByPath(mapFile.path()); - FileSKVIterator reader; - - reader = fileFactory.openReader(mapFile.path().toString(), false, fs, conf, acuTableConf); - - readers.add(reader); - - SortedKeyValueIterator iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile.path().toString(), false, reader); - - if (filesToCompact.get(mapFile).isTimeSet()) { - iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime()); - } - - iters.add(iter); - - } catch (Throwable e) { - - ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile.path().toString(), e)); - - log.warn("Some problem opening map file " + mapFile + " " + e.getMessage(), e); - // failed to open some map file... close the ones that were opened - for (FileSKVIterator reader : readers) { - try { - reader.close(); - } catch (Throwable e2) { - log.warn("Failed to close map file", e2); - } - } - - readers.clear(); - - if (e instanceof IOException) - throw (IOException) e; - throw new IOException("Failed to open map data files", e); - } - } - - return iters; - } - - private void compactLocalityGroup(String lgName, Set columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats) - throws IOException, CompactionCanceledException { - ArrayList readers = new ArrayList(filesToCompact.size()); - Span span = Trace.start("compact"); - try { - long entriesCompacted = 0; - List> iters = openMapDataFiles(lgName, readers); - - if (imm != null) { - iters.add(imm.compactionIterator()); - } - - CountingIterator citr = new CountingIterator(new MultiIterator(iters, extent.toDataRange())); - DeletingIterator delIter = new DeletingIterator(citr, propogateDeletes); - ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); - - - // if(env.getIteratorScope() ) - - TabletIteratorEnvironment iterEnv; - if (env.getIteratorScope() == IteratorScope.majc) - iterEnv = new TabletIteratorEnvironment(IteratorScope.majc, !propogateDeletes, acuTableConf); - else if (env.getIteratorScope() == IteratorScope.minc) - iterEnv = new TabletIteratorEnvironment(IteratorScope.minc, acuTableConf); - else - throw new IllegalArgumentException(); - - SortedKeyValueIterator itr = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(env.getIteratorScope(), cfsi, extent, acuTableConf, - iterators, iterEnv)); - - itr.seek(extent.toDataRange(), columnFamilies, inclusive); - - if (!inclusive) { - mfw.startDefaultLocalityGroup(); - } else { - mfw.startNewLocalityGroup(lgName, columnFamilies); - } - - Span write = Trace.start("write"); - try { - while (itr.hasTop() && env.isCompactionEnabled()) { - mfw.append(itr.getTopKey(), itr.getTopValue()); - itr.next(); - entriesCompacted++; - - if (entriesCompacted % 1024 == 0) { - // Periodically update stats, do not want to do this too often since its volatile - entriesWritten.addAndGet(1024); - } - } - - if (itr.hasTop() && !env.isCompactionEnabled()) { - // cancel major compaction operation - try { - try { - mfw.close(); - } catch (IOException e) { - log.error(e, e); - } - fs.deleteRecursively(outputFile.path()); - } catch (Exception e) { - log.warn("Failed to delete Canceled compaction output file " + outputFile, e); - } - throw new CompactionCanceledException(); - } - - } finally { - CompactionStats lgMajcStats = new CompactionStats(citr.getCount(), entriesCompacted); - majCStats.add(lgMajcStats); - write.stop(); - } - - } finally { - // close sequence files opened - for (FileSKVIterator reader : readers) { - try { - reader.close(); - } catch (Throwable e) { - log.warn("Failed to close map file", e); - } - } - span.stop(); - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java deleted file mode 100644 index 7258a82..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.tserver.data.ServerConditionalMutation; -import org.apache.hadoop.io.WritableComparator; - -/** - * - */ -public class ConditionalMutationSet { - - static interface DeferFilter { - void defer(List scml, List okMutations, List deferred); - } - - static class DuplicateFitler implements DeferFilter { - public void defer(List scml, List okMutations, List deferred) { - okMutations.add(scml.get(0)); - for (int i = 1; i < scml.size(); i++) { - if (Arrays.equals(scml.get(i - 1).getRow(), scml.get(i).getRow())) { - deferred.add(scml.get(i)); - } else { - okMutations.add(scml.get(i)); - } - } - } - } - - static void defer(Map> updates, Map> deferredMutations, DeferFilter filter) { - for (Entry> entry : updates.entrySet()) { - List scml = entry.getValue(); - List okMutations = new ArrayList(scml.size()); - List deferred = new ArrayList(); - filter.defer(scml, okMutations, deferred); - - if (deferred.size() > 0) { - scml.clear(); - scml.addAll(okMutations); - List l = deferredMutations.get(entry.getKey()); - if (l == null) { - l = deferred; - deferredMutations.put(entry.getKey(), l); - } else { - l.addAll(deferred); - } - - } - } - } - - static void deferDuplicatesRows(Map> updates, Map> deferred) { - defer(updates, deferred, new DuplicateFitler()); - } - - static void sortConditionalMutations(Map> updates) { - for (Entry> entry : updates.entrySet()) { - // TODO check if its already in sorted order? - // TODO maybe the potential benefit of sorting is not worth the cost - Collections.sort(entry.getValue(), new Comparator() { - @Override - public int compare(ServerConditionalMutation o1, ServerConditionalMutation o2) { - return WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length); - } - }); - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/EndOfTableException.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/EndOfTableException.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/EndOfTableException.java deleted file mode 100644 index 65fa538..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/EndOfTableException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver; - -public class EndOfTableException extends Exception { - - /** - * - */ - private static final long serialVersionUID = 1L; - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java deleted file mode 100644 index 7349d51..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java +++ /dev/null @@ -1,561 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.Semaphore; - -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.FileOperations; -import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.file.blockfile.cache.BlockCache; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.iterators.system.InterruptibleIterator; -import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator; -import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource; -import org.apache.accumulo.core.iterators.system.TimeSettingIterator; -import org.apache.accumulo.core.metadata.schema.DataFileValue; -import org.apache.accumulo.server.ServerConstants; -import org.apache.accumulo.server.conf.ServerConfiguration; -import org.apache.accumulo.server.fs.FileRef; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.problems.ProblemReport; -import org.apache.accumulo.server.problems.ProblemReportingIterator; -import org.apache.accumulo.server.problems.ProblemReports; -import org.apache.accumulo.server.problems.ProblemType; -import org.apache.accumulo.server.util.time.SimpleTimer; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; - -public class FileManager { - - private static final Logger log = Logger.getLogger(FileManager.class); - - int maxOpen; - - private static class OpenReader implements Comparable { - long releaseTime; - FileSKVIterator reader; - String fileName; - - public OpenReader(String fileName, FileSKVIterator reader) { - this.fileName = fileName; - this.reader = reader; - this.releaseTime = System.currentTimeMillis(); - } - - @Override - public int compareTo(OpenReader o) { - if (releaseTime < o.releaseTime) { - return -1; - } else if (releaseTime > o.releaseTime) { - return 1; - } else { - return 0; - } - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof OpenReader) { - return compareTo((OpenReader) obj) == 0; - } - return false; - } - - @Override - public int hashCode() { - return fileName.hashCode(); - } - } - - private Map> openFiles; - private HashMap reservedReaders; - - private Semaphore filePermits; - - private VolumeManager fs; - - // the data cache and index cache are allocated in - // TabletResourceManager and passed through the file opener to - // CachableBlockFile which can handle the caches being - // null if unallocated - private BlockCache dataCache = null; - private BlockCache indexCache = null; - - private long maxIdleTime; - - private final ServerConfiguration conf; - - private class IdleFileCloser implements Runnable { - - @Override - public void run() { - - long curTime = System.currentTimeMillis(); - - ArrayList filesToClose = new ArrayList(); - - // determine which files to close in a sync block, and then close the - // files outside of the sync block - synchronized (FileManager.this) { - Iterator>> iter = openFiles.entrySet().iterator(); - while (iter.hasNext()) { - Entry> entry = iter.next(); - List ofl = entry.getValue(); - - for (Iterator oflIter = ofl.iterator(); oflIter.hasNext();) { - OpenReader openReader = oflIter.next(); - - if (curTime - openReader.releaseTime > maxIdleTime) { - - filesToClose.add(openReader.reader); - oflIter.remove(); - } - } - - if (ofl.size() == 0) { - iter.remove(); - } - } - } - - closeReaders(filesToClose); - - } - - } - - /** - * - * @param dataCache - * : underlying file can and should be able to handle a null cache - * @param indexCache - * : underlying file can and should be able to handle a null cache - */ - FileManager(ServerConfiguration conf, VolumeManager fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) { - - if (maxOpen <= 0) - throw new IllegalArgumentException("maxOpen <= 0"); - this.conf = conf; - this.dataCache = dataCache; - this.indexCache = indexCache; - - this.filePermits = new Semaphore(maxOpen, true); - this.maxOpen = maxOpen; - this.fs = fs; - - this.openFiles = new HashMap>(); - this.reservedReaders = new HashMap(); - - this.maxIdleTime = conf.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE); - SimpleTimer.getInstance().schedule(new IdleFileCloser(), maxIdleTime, maxIdleTime / 2); - - } - - private static int countReaders(Map> files) { - int count = 0; - - for (List list : files.values()) { - count += list.size(); - } - - return count; - } - - private List takeLRUOpenFiles(int numToTake) { - - ArrayList openReaders = new ArrayList(); - - for (Entry> entry : openFiles.entrySet()) { - openReaders.addAll(entry.getValue()); - } - - Collections.sort(openReaders); - - ArrayList ret = new ArrayList(); - - for (int i = 0; i < numToTake; i++) { - OpenReader or = openReaders.get(i); - - List ofl = openFiles.get(or.fileName); - if (!ofl.remove(or)) { - throw new RuntimeException("Failed to remove open reader that should have been there"); - } - - if (ofl.size() == 0) { - openFiles.remove(or.fileName); - } - - ret.add(or.reader); - } - - return ret; - } - - private static List getFileList(String file, Map> files) { - List ofl = files.get(file); - if (ofl == null) { - ofl = new ArrayList(); - files.put(file, ofl); - } - - return ofl; - } - - private void closeReaders(List filesToClose) { - for (FileSKVIterator reader : filesToClose) { - try { - reader.close(); - } catch (Exception e) { - log.error("Failed to close file " + e.getMessage(), e); - } - } - } - - private List takeOpenFiles(Collection files, List reservedFiles, Map readersReserved) { - List filesToOpen = new LinkedList(files); - for (Iterator iterator = filesToOpen.iterator(); iterator.hasNext();) { - String file = iterator.next(); - - List ofl = openFiles.get(file); - if (ofl != null && ofl.size() > 0) { - OpenReader openReader = ofl.remove(ofl.size() - 1); - reservedFiles.add(openReader.reader); - readersReserved.put(openReader.reader, file); - if (ofl.size() == 0) { - openFiles.remove(file); - } - iterator.remove(); - } - - } - return filesToOpen; - } - - private synchronized String getReservedReadeFilename(FileSKVIterator reader) { - return reservedReaders.get(reader); - } - - private List reserveReaders(Text table, Collection files, boolean continueOnFailure) throws IOException { - - if (files.size() >= maxOpen) { - throw new IllegalArgumentException("requested files exceeds max open"); - } - - if (files.size() == 0) { - return Collections.emptyList(); - } - - List filesToOpen = null; - List filesToClose = Collections.emptyList(); - List reservedFiles = new ArrayList(); - Map readersReserved = new HashMap(); - - filePermits.acquireUninterruptibly(files.size()); - - // now that the we are past the semaphore, we have the authority - // to open files.size() files - - // determine what work needs to be done in sync block - // but do the work of opening and closing files outside - // a synch block - synchronized (this) { - - filesToOpen = takeOpenFiles(files, reservedFiles, readersReserved); - - int numOpen = countReaders(openFiles); - - if (filesToOpen.size() + numOpen + reservedReaders.size() > maxOpen) { - filesToClose = takeLRUOpenFiles((filesToOpen.size() + numOpen + reservedReaders.size()) - maxOpen); - } - } - - // close files before opening files to ensure we stay under resource - // limitations - closeReaders(filesToClose); - - // open any files that need to be opened - for (String file : filesToOpen) { - try { - Path path = fs.getFullPath(ServerConstants.getTablesDirs(), file); - FileSystem ns = fs.getFileSystemByPath(path); - //log.debug("Opening "+file + " path " + path); - FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()), - dataCache, indexCache); - reservedFiles.add(reader); - readersReserved.put(reader, file); - } catch (Exception e) { - - ProblemReports.getInstance().report(new ProblemReport(table.toString(), ProblemType.FILE_READ, file, e)); - - if (continueOnFailure) { - // release the permit for the file that failed to open - filePermits.release(1); - log.warn("Failed to open file " + file + " " + e.getMessage() + " continuing..."); - } else { - // close whatever files were opened - closeReaders(reservedFiles); - - filePermits.release(files.size()); - - log.error("Failed to open file " + file + " " + e.getMessage()); - throw new IOException("Failed to open " + file, e); - } - } - } - - synchronized (this) { - // update set of reserved readers - reservedReaders.putAll(readersReserved); - } - - return reservedFiles; - } - - private void releaseReaders(List readers, boolean sawIOException) { - // put files in openFiles - - synchronized (this) { - - // check that readers were actually reserved ... want to make sure a thread does - // not try to release readers they never reserved - if (!reservedReaders.keySet().containsAll(readers)) { - throw new IllegalArgumentException("Asked to release readers that were never reserved "); - } - - for (FileSKVIterator reader : readers) { - try { - reader.closeDeepCopies(); - } catch (IOException e) { - log.warn(e, e); - sawIOException = true; - } - } - - for (FileSKVIterator reader : readers) { - String fileName = reservedReaders.remove(reader); - if (!sawIOException) - getFileList(fileName, openFiles).add(new OpenReader(fileName, reader)); - } - } - - if (sawIOException) - closeReaders(readers); - - // decrement the semaphore - filePermits.release(readers.size()); - - } - - static class FileDataSource implements DataSource { - - private SortedKeyValueIterator iter; - private ArrayList deepCopies; - private boolean current = true; - private IteratorEnvironment env; - private String file; - - FileDataSource(String file, SortedKeyValueIterator iter) { - this.file = file; - this.iter = iter; - this.deepCopies = new ArrayList(); - } - - public FileDataSource(IteratorEnvironment env, SortedKeyValueIterator deepCopy, ArrayList deepCopies) { - this.iter = deepCopy; - this.env = env; - this.deepCopies = deepCopies; - deepCopies.add(this); - } - - @Override - public boolean isCurrent() { - return current; - } - - @Override - public DataSource getNewDataSource() { - current = true; - return this; - } - - @Override - public DataSource getDeepCopyDataSource(IteratorEnvironment env) { - return new FileDataSource(env, iter.deepCopy(env), deepCopies); - } - - @Override - public SortedKeyValueIterator iterator() throws IOException { - return iter; - } - - void unsetIterator() { - current = false; - iter = null; - for (FileDataSource fds : deepCopies) { - fds.current = false; - fds.iter = null; - } - } - - void setIterator(SortedKeyValueIterator iter) { - current = false; - this.iter = iter; - for (FileDataSource fds : deepCopies) { - fds.current = false; - fds.iter = iter.deepCopy(fds.env); - } - } - - } - - public class ScanFileManager { - - private ArrayList dataSources; - private ArrayList tabletReservedReaders; - private KeyExtent tablet; - private boolean continueOnFailure; - - ScanFileManager(KeyExtent tablet) { - tabletReservedReaders = new ArrayList(); - dataSources = new ArrayList(); - this.tablet = tablet; - - continueOnFailure = conf.getTableConfiguration(tablet).getBoolean(Property.TABLE_FAILURES_IGNORE); - - if (tablet.isMeta()) { - continueOnFailure = false; - } - } - - private List openFileRefs(Collection files) throws TooManyFilesException, IOException { - List strings = new ArrayList(files.size()); - for (FileRef ref : files) - strings.add(ref.path().toString()); - return openFiles(strings); - } - - private List openFiles(Collection files) throws TooManyFilesException, IOException { - // one tablet can not open more than maxOpen files, otherwise it could get stuck - // forever waiting on itself to release files - - if (tabletReservedReaders.size() + files.size() >= maxOpen) { - throw new TooManyFilesException("Request to open files would exceed max open files reservedReaders.size()=" + tabletReservedReaders.size() - + " files.size()=" + files.size() + " maxOpen=" + maxOpen + " tablet = " + tablet); - } - - List newlyReservedReaders = reserveReaders(tablet.getTableId(), files, continueOnFailure); - - tabletReservedReaders.addAll(newlyReservedReaders); - return newlyReservedReaders; - } - - synchronized List openFiles(Map files, boolean detachable) throws IOException { - - List newlyReservedReaders = openFileRefs(files.keySet()); - - ArrayList iters = new ArrayList(); - - for (FileSKVIterator reader : newlyReservedReaders) { - String filename = getReservedReadeFilename(reader); - InterruptibleIterator iter; - if (detachable) { - FileDataSource fds = new FileDataSource(filename, reader); - dataSources.add(fds); - SourceSwitchingIterator ssi = new SourceSwitchingIterator(fds); - iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, ssi); - } else { - iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, reader); - } - DataFileValue value = files.get(new FileRef(filename)); - if (value.isTimeSet()) { - iter = new TimeSettingIterator(iter, value.getTime()); - } - - iters.add(iter); - } - - return iters; - } - - synchronized void detach() { - - releaseReaders(tabletReservedReaders, false); - tabletReservedReaders.clear(); - - for (FileDataSource fds : dataSources) - fds.unsetIterator(); - } - - synchronized void reattach() throws IOException { - if (tabletReservedReaders.size() != 0) - throw new IllegalStateException(); - - Collection files = new ArrayList(); - for (FileDataSource fds : dataSources) - files.add(fds.file); - - List newlyReservedReaders = openFiles(files); - Map> map = new HashMap>(); - for (FileSKVIterator reader : newlyReservedReaders) { - String fileName = getReservedReadeFilename(reader); - List list = map.get(fileName); - if (list == null) { - list = new LinkedList(); - map.put(fileName, list); - } - - list.add(reader); - } - - for (FileDataSource fds : dataSources) { - FileSKVIterator reader = map.get(fds.file).remove(0); - fds.setIterator(reader); - } - } - - synchronized void releaseOpenFiles(boolean sawIOException) { - releaseReaders(tabletReservedReaders, sawIOException); - tabletReservedReaders.clear(); - dataSources.clear(); - } - - synchronized int getNumOpenFiles() { - return tabletReservedReaders.size(); - } - } - - public ScanFileManager newScanFileManager(KeyExtent tablet) { - return new ScanFileManager(tablet); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/HoldTimeoutException.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/HoldTimeoutException.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/HoldTimeoutException.java deleted file mode 100644 index a14db23..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/HoldTimeoutException.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver; - -public class HoldTimeoutException extends RuntimeException { - private static final long serialVersionUID = 1L; - - public HoldTimeoutException(String why) { - super(why); - } -}