Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E9D6810696 for ; Fri, 1 Nov 2013 00:55:42 +0000 (UTC) Received: (qmail 66505 invoked by uid 500); 1 Nov 2013 00:55:42 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 66356 invoked by uid 500); 1 Nov 2013 00:55:42 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 66023 invoked by uid 99); 1 Nov 2013 00:55:41 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Nov 2013 00:55:41 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 83A286EFC; Fri, 1 Nov 2013 00:55:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Fri, 01 Nov 2013 00:55:55 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/54] [partial] ACCUMULO-658, ACCUMULO-656 Split server into separate modules http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java b/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java deleted file mode 100644 index 9492bd7..0000000 --- a/server/src/main/java/org/apache/accumulo/server/master/TabletGroupWatcher.java +++ /dev/null @@ -1,659 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.master; - -import static java.lang.Math.min; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.TreeSet; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.RowIterator; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.PartialKey; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; -import org.apache.accumulo.core.util.Daemon; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.server.ServerConstants; -import org.apache.accumulo.server.fs.FileRef; -import org.apache.accumulo.server.fs.VolumeManager.FileType; -import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; -import org.apache.accumulo.server.master.Master.TabletGoalState; -import org.apache.accumulo.server.master.state.Assignment; -import org.apache.accumulo.server.master.state.DistributedStoreException; -import org.apache.accumulo.server.master.state.MergeInfo; -import org.apache.accumulo.server.master.state.MergeState; -import org.apache.accumulo.server.master.state.MergeStats; -import org.apache.accumulo.server.master.state.TServerInstance; -import org.apache.accumulo.server.master.state.TableCounts; -import org.apache.accumulo.server.master.state.TableStats; -import org.apache.accumulo.server.master.state.TabletLocationState; -import org.apache.accumulo.server.master.state.TabletState; -import org.apache.accumulo.server.master.state.TabletStateStore; -import org.apache.accumulo.server.master.state.tables.TableManager; -import org.apache.accumulo.server.security.SystemCredentials; -import org.apache.accumulo.server.tabletserver.TabletTime; -import org.apache.accumulo.server.util.MetadataTableUtil; -import org.apache.hadoop.io.Text; -import org.apache.thrift.TException; - -class TabletGroupWatcher extends Daemon { - - private final Master master; - final TabletStateStore store; - final TabletGroupWatcher dependentWatcher; - - final TableStats stats = new TableStats(); - - TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher) { - this.master = master; - this.store = store; - this.dependentWatcher = dependentWatcher; - } - - Map getStats() { - return stats.getLast(); - } - - TableCounts getStats(Text tableId) { - return stats.getLast(tableId); - } - - @Override - public void run() { - - Thread.currentThread().setName("Watching " + store.name()); - int[] oldCounts = new int[TabletState.values().length]; - EventCoordinator.Listener eventListener = this.master.nextEvent.getListener(); - - while (this.master.stillMaster()) { - // slow things down a little, otherwise we spam the logs when there are many wake-up events - UtilWaitThread.sleep(100); - - int totalUnloaded = 0; - int unloaded = 0; - try { - Map mergeStatsCache = new HashMap(); - - // Get the current status for the current list of tservers - SortedMap currentTServers = new TreeMap(); - for (TServerInstance entry : this.master.tserverSet.getCurrentServers()) { - currentTServers.put(entry, this.master.tserverStatus.get(entry)); - } - - if (currentTServers.size() == 0) { - eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); - continue; - } - - // Don't move tablets to servers that are shutting down - SortedMap destinations = new TreeMap(currentTServers); - destinations.keySet().removeAll(this.master.serversToShutdown); - - List assignments = new ArrayList(); - List assigned = new ArrayList(); - List assignedToDeadServers = new ArrayList(); - Map unassigned = new HashMap(); - - int[] counts = new int[TabletState.values().length]; - stats.begin(); - // Walk through the tablets in our store, and work tablets - // towards their goal - for (TabletLocationState tls : store) { - if (tls == null) { - continue; - } - // ignore entries for tables that do not exist in zookeeper - if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null) - continue; - - // Don't overwhelm the tablet servers with work - if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { - flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned); - assignments.clear(); - assigned.clear(); - assignedToDeadServers.clear(); - unassigned.clear(); - unloaded = 0; - eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); - } - Text tableId = tls.extent.getTableId(); - MergeStats mergeStats = mergeStatsCache.get(tableId); - if (mergeStats == null) { - mergeStatsCache.put(tableId, mergeStats = new MergeStats(this.master.getMergeInfo(tls.extent))); - } - TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo()); - TServerInstance server = tls.getServer(); - TabletState state = tls.getState(currentTServers.keySet()); - stats.update(tableId, state); - mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty()); - sendChopRequest(mergeStats.getMergeInfo(), state, tls); - sendSplitRequest(mergeStats.getMergeInfo(), state, tls); - - // Always follow through with assignments - if (state == TabletState.ASSIGNED) { - goal = TabletGoalState.HOSTED; - } - - // if we are shutting down all the tabletservers, we have to do it in order - if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) { - if (this.master.serversToShutdown.equals(currentTServers.keySet())) { - if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) { - goal = TabletGoalState.HOSTED; - } - } - } - - if (goal == TabletGoalState.HOSTED) { - if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) { - if (this.master.recoveryManager.recoverLogs(tls.extent, tls.walogs)) - continue; - } - switch (state) { - case HOSTED: - if (server.equals(this.master.migrations.get(tls.extent))) - this.master.migrations.remove(tls.extent); - break; - case ASSIGNED_TO_DEAD_SERVER: - assignedToDeadServers.add(tls); - if (server.equals(this.master.migrations.get(tls.extent))) - this.master.migrations.remove(tls.extent); - // log.info("Current servers " + currentTServers.keySet()); - break; - case UNASSIGNED: - // maybe it's a finishing migration - TServerInstance dest = this.master.migrations.get(tls.extent); - if (dest != null) { - // if destination is still good, assign it - if (destinations.keySet().contains(dest)) { - assignments.add(new Assignment(tls.extent, dest)); - } else { - // get rid of this migration - this.master.migrations.remove(tls.extent); - unassigned.put(tls.extent, server); - } - } else { - unassigned.put(tls.extent, server); - } - break; - case ASSIGNED: - // Send another reminder - assigned.add(new Assignment(tls.extent, tls.future)); - break; - } - } else { - switch (state) { - case UNASSIGNED: - break; - case ASSIGNED_TO_DEAD_SERVER: - assignedToDeadServers.add(tls); - // log.info("Current servers " + currentTServers.keySet()); - break; - case HOSTED: - TServerConnection conn = this.master.tserverSet.getConnection(server); - if (conn != null) { - conn.unloadTablet(this.master.masterLock, tls.extent, goal != TabletGoalState.DELETED); - unloaded++; - totalUnloaded++; - } else { - Master.log.warn("Could not connect to server " + server); - } - break; - case ASSIGNED: - break; - } - } - counts[state.ordinal()]++; - } - - flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned); - - // provide stats after flushing changes to avoid race conditions w/ delete table - stats.end(); - - // Report changes - for (TabletState state : TabletState.values()) { - int i = state.ordinal(); - if (counts[i] > 0 && counts[i] != oldCounts[i]) { - this.master.nextEvent.event("[%s]: %d tablets are %s", store.name(), counts[i], state.name()); - } - } - Master.log.debug(String.format("[%s]: scan time %.2f seconds", store.name(), stats.getScanTime() / 1000.)); - oldCounts = counts; - if (totalUnloaded > 0) { - this.master.nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded); - } - - updateMergeState(mergeStatsCache); - - Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.)); - eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); - } catch (Exception ex) { - Master.log.error("Error processing table state for store " + store.name(), ex); - UtilWaitThread.sleep(Master.WAIT_BETWEEN_ERRORS); - } - } - } - - private int assignedOrHosted() { - int result = 0; - for (TableCounts counts : stats.getLast().values()) { - result += counts.assigned() + counts.hosted(); - } - return result; - } - - private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) { - // Already split? - if (!info.getState().equals(MergeState.SPLITTING)) - return; - // Merges don't split - if (!info.isDelete()) - return; - // Online and ready to split? - if (!state.equals(TabletState.HOSTED)) - return; - // Does this extent cover the end points of the delete? - KeyExtent range = info.getExtent(); - if (tls.extent.overlaps(range)) { - for (Text splitPoint : new Text[] {range.getPrevEndRow(), range.getEndRow()}) { - if (splitPoint == null) - continue; - if (!tls.extent.contains(splitPoint)) - continue; - if (splitPoint.equals(tls.extent.getEndRow())) - continue; - if (splitPoint.equals(tls.extent.getPrevEndRow())) - continue; - try { - TServerConnection conn; - conn = this.master.tserverSet.getConnection(tls.current); - if (conn != null) { - Master.log.info("Asking " + tls.current + " to split " + tls.extent + " at " + splitPoint); - conn.splitTablet(this.master.masterLock, tls.extent, splitPoint); - } else { - Master.log.warn("Not connected to server " + tls.current); - } - } catch (NotServingTabletException e) { - Master.log.debug("Error asking tablet server to split a tablet: " + e); - } catch (Exception e) { - Master.log.warn("Error asking tablet server to split a tablet: " + e); - } - } - } - } - - private void sendChopRequest(MergeInfo info, TabletState state, TabletLocationState tls) { - // Don't bother if we're in the wrong state - if (!info.getState().equals(MergeState.WAITING_FOR_CHOPPED)) - return; - // Tablet must be online - if (!state.equals(TabletState.HOSTED)) - return; - // Tablet isn't already chopped - if (tls.chopped) - return; - // Tablet ranges intersect - if (info.needsToBeChopped(tls.extent)) { - TServerConnection conn; - try { - conn = this.master.tserverSet.getConnection(tls.current); - if (conn != null) { - Master.log.info("Asking " + tls.current + " to chop " + tls.extent); - conn.chop(this.master.masterLock, tls.extent); - } else { - Master.log.warn("Could not connect to server " + tls.current); - } - } catch (TException e) { - Master.log.warn("Communications error asking tablet server to chop a tablet"); - } - } - } - - private void updateMergeState(Map mergeStatsCache) { - for (MergeStats stats : mergeStatsCache.values()) { - try { - MergeState update = stats.nextMergeState(this.master.getConnector(), this.master); - // when next state is MERGING, its important to persist this before - // starting the merge... the verification check that is done before - // moving into the merging state could fail if merge starts but does - // not finish - if (update == MergeState.COMPLETE) - update = MergeState.NONE; - if (update != stats.getMergeInfo().getState()) { - this.master.setMergeState(stats.getMergeInfo(), update); - } - - if (update == MergeState.MERGING) { - try { - if (stats.getMergeInfo().isDelete()) { - deleteTablets(stats.getMergeInfo()); - } else { - mergeMetadataRecords(stats.getMergeInfo()); - } - this.master.setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE); - } catch (Exception ex) { - Master.log.error("Unable merge metadata table records", ex); - } - } - } catch (Exception ex) { - Master.log.error("Unable to update merge state for merge " + stats.getMergeInfo().getExtent(), ex); - } - } - } - - private void deleteTablets(MergeInfo info) throws AccumuloException { - KeyExtent extent = info.getExtent(); - String targetSystemTable = extent.isMeta() ? RootTable.NAME : MetadataTable.NAME; - Master.log.debug("Deleting tablets for " + extent); - char timeType = '\0'; - KeyExtent followingTablet = null; - if (extent.getEndRow() != null) { - Key nextExtent = new Key(extent.getEndRow()).followingKey(PartialKey.ROW); - followingTablet = getHighTablet(new KeyExtent(extent.getTableId(), nextExtent.getRow(), extent.getEndRow())); - Master.log.debug("Found following tablet " + followingTablet); - } - try { - Connector conn = this.master.getConnector(); - Text start = extent.getPrevEndRow(); - if (start == null) { - start = new Text(); - } - Master.log.debug("Making file deletion entries for " + extent); - Range deleteRange = new Range(KeyExtent.getMetadataEntry(extent.getTableId(), start), false, KeyExtent.getMetadataEntry(extent.getTableId(), - extent.getEndRow()), true); - Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY); - scanner.setRange(deleteRange); - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); - TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); - Set datafiles = new TreeSet(); - for (Entry entry : scanner) { - Key key = entry.getKey(); - if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) { - datafiles.add(new FileRef(this.master.fs, key)); - if (datafiles.size() > 1000) { - MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get()); - datafiles.clear(); - } - } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) { - timeType = entry.getValue().toString().charAt(0); - } else if (key.compareColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) { - throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!"); - } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { - datafiles.add(new FileRef(entry.getValue().toString(), this.master.fs.getFullPath(FileType.TABLE, entry.getValue().toString()))); - if (datafiles.size() > 1000) { - MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get()); - datafiles.clear(); - } - } - } - MetadataTableUtil.addDeleteEntries(extent, datafiles, SystemCredentials.get()); - BatchWriter bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig()); - try { - deleteTablets(info, deleteRange, bw, conn); - } finally { - bw.close(); - } - - if (followingTablet != null) { - Master.log.debug("Updating prevRow of " + followingTablet + " to " + extent.getPrevEndRow()); - bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig()); - try { - Mutation m = new Mutation(followingTablet.getMetadataEntry()); - TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(extent.getPrevEndRow())); - ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m); - bw.addMutation(m); - bw.flush(); - } finally { - bw.close(); - } - } else { - // Recreate the default tablet to hold the end of the table - Master.log.debug("Recreating the last tablet to point to " + extent.getPrevEndRow()); - String tdir = master.getFileSystem().choose(ServerConstants.getTablesDirs()) + "/" + extent.getTableId() + Constants.DEFAULT_TABLET_LOCATION; - MetadataTableUtil.addTablet(new KeyExtent(extent.getTableId(), null, extent.getPrevEndRow()), tdir, - SystemCredentials.get(), timeType, this.master.masterLock); - } - } catch (Exception ex) { - throw new AccumuloException(ex); - } - } - - private void mergeMetadataRecords(MergeInfo info) throws AccumuloException { - KeyExtent range = info.getExtent(); - Master.log.debug("Merging metadata for " + range); - KeyExtent stop = getHighTablet(range); - Master.log.debug("Highest tablet is " + stop); - Value firstPrevRowValue = null; - Text stopRow = stop.getMetadataEntry(); - Text start = range.getPrevEndRow(); - if (start == null) { - start = new Text(); - } - Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false); - String targetSystemTable = MetadataTable.NAME; - if (range.isMeta()) { - targetSystemTable = RootTable.NAME; - } - - BatchWriter bw = null; - try { - long fileCount = 0; - Connector conn = this.master.getConnector(); - // Make file entries in highest tablet - bw = conn.createBatchWriter(targetSystemTable, new BatchWriterConfig()); - Scanner scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY); - scanner.setRange(scanRange); - TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); - TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner); - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - Mutation m = new Mutation(stopRow); - String maxLogicalTime = null; - for (Entry entry : scanner) { - Key key = entry.getKey(); - Value value = entry.getValue(); - if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { - m.put(key.getColumnFamily(), key.getColumnQualifier(), value); - fileCount++; - } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue == null) { - Master.log.debug("prevRow entry for lowest tablet is " + value); - firstPrevRowValue = new Value(value); - } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) { - maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString()); - } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { - bw.addMutation(MetadataTableUtil.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString())); - } - } - - // read the logical time from the last tablet in the merge range, it is not included in - // the loop above - scanner = conn.createScanner(targetSystemTable, Authorizations.EMPTY); - scanner.setRange(new Range(stopRow)); - TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner); - for (Entry entry : scanner) { - if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) { - maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, entry.getValue().toString()); - } - } - - if (maxLogicalTime != null) - TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(maxLogicalTime.getBytes())); - - if (!m.getUpdates().isEmpty()) { - bw.addMutation(m); - } - - bw.flush(); - - Master.log.debug("Moved " + fileCount + " files to " + stop); - - if (firstPrevRowValue == null) { - Master.log.debug("tablet already merged"); - return; - } - - stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue)); - Mutation updatePrevRow = stop.getPrevRowUpdateMutation(); - Master.log.debug("Setting the prevRow for last tablet: " + stop); - bw.addMutation(updatePrevRow); - bw.flush(); - - deleteTablets(info, scanRange, bw, conn); - - // Clean-up the last chopped marker - m = new Mutation(stopRow); - ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m); - bw.addMutation(m); - bw.flush(); - - } catch (Exception ex) { - throw new AccumuloException(ex); - } finally { - if (bw != null) - try { - bw.close(); - } catch (Exception ex) { - throw new AccumuloException(ex); - } - } - } - - private void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException, MutationsRejectedException { - Scanner scanner; - Mutation m; - // Delete everything in the other tablets - // group all deletes into tablet into one mutation, this makes tablets - // either disappear entirely or not all.. this is important for the case - // where the process terminates in the loop below... - scanner = conn.createScanner(info.getExtent().isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY); - Master.log.debug("Deleting range " + scanRange); - scanner.setRange(scanRange); - RowIterator rowIter = new RowIterator(scanner); - while (rowIter.hasNext()) { - Iterator> row = rowIter.next(); - m = null; - while (row.hasNext()) { - Entry entry = row.next(); - Key key = entry.getKey(); - - if (m == null) - m = new Mutation(key.getRow()); - - m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); - Master.log.debug("deleting entry " + key); - } - bw.addMutation(m); - } - - bw.flush(); - } - - private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException { - try { - Connector conn = this.master.getConnector(); - Scanner scanner = conn.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY); - TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); - KeyExtent start = new KeyExtent(range.getTableId(), range.getEndRow(), null); - scanner.setRange(new Range(start.getMetadataEntry(), null)); - Iterator> iterator = scanner.iterator(); - if (!iterator.hasNext()) { - throw new AccumuloException("No last tablet for a merge " + range); - } - Entry entry = iterator.next(); - KeyExtent highTablet = new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue())); - if (highTablet.getTableId() != range.getTableId()) { - throw new AccumuloException("No last tablet for merge " + range + " " + highTablet); - } - return highTablet; - } catch (Exception ex) { - throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex); - } - } - - private void flushChanges(SortedMap currentTServers, List assignments, List assigned, - List assignedToDeadServers, Map unassigned) throws DistributedStoreException, TException { - if (!assignedToDeadServers.isEmpty()) { - int maxServersToShow = min(assignedToDeadServers.size(), 100); - Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "..."); - store.unassign(assignedToDeadServers); - this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size()); - } - - if (!currentTServers.isEmpty()) { - Map assignedOut = new HashMap(); - this.master.tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), Collections.unmodifiableMap(unassigned), assignedOut); - for (Entry assignment : assignedOut.entrySet()) { - if (unassigned.containsKey(assignment.getKey())) { - if (assignment.getValue() != null) { - Master.log.debug(store.name() + " assigning tablet " + assignment); - assignments.add(new Assignment(assignment.getKey(), assignment.getValue())); - } - } else { - Master.log.warn(store.name() + " load balancer assigning tablet that was not nominated for assignment " + assignment.getKey()); - } - } - if (!unassigned.isEmpty() && assignedOut.isEmpty()) - Master.log.warn("Load balancer failed to assign any tablets"); - } - - if (assignments.size() > 0) { - Master.log.info(String.format("Assigning %d tablets", assignments.size())); - store.setFutureLocations(assignments); - } - assignments.addAll(assigned); - for (Assignment a : assignments) { - TServerConnection conn = this.master.tserverSet.getConnection(a.server); - if (conn != null) { - conn.assignTablet(this.master.masterLock, a.tablet); - } else { - Master.log.warn("Could not connect to server " + a.server); - } - master.assignedTablet(a.tablet); - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java deleted file mode 100644 index ec3371c..0000000 --- a/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.master.balancer; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.Set; -import java.util.SortedMap; - -import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.master.thrift.TableInfo; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.tabletserver.thrift.TabletStats; -import org.apache.accumulo.server.conf.ServerConfiguration; -import org.apache.accumulo.server.master.state.TServerInstance; -import org.apache.accumulo.server.master.state.TabletMigration; -import org.apache.thrift.TException; - -/** - * A chaotic load balancer used for testing. It constantly shuffles tablets, preventing them from resting in a single location for very long. This is not - * designed for performance, do not use on production systems. I'm calling it the LokiLoadBalancer. - */ -public class ChaoticLoadBalancer extends TabletBalancer { - Random r = new Random(); - - @Override - public void getAssignments(SortedMap current, Map unassigned, - Map assignments) { - long total = assignments.size() + unassigned.size(); - long avg = (long) Math.ceil(((double) total) / current.size()); - Map toAssign = new HashMap(); - List tServerArray = new ArrayList(); - for (Entry e : current.entrySet()) { - long numTablets = 0; - for (TableInfo ti : e.getValue().getTableMap().values()) { - numTablets += ti.tablets; - } - if (numTablets < avg) { - tServerArray.add(e.getKey()); - toAssign.put(e.getKey(), avg - numTablets); - } - } - - for (KeyExtent ke : unassigned.keySet()) { - int index = r.nextInt(tServerArray.size()); - TServerInstance dest = tServerArray.get(index); - assignments.put(ke, dest); - long remaining = toAssign.get(dest).longValue() - 1; - if (remaining == 0) { - tServerArray.remove(index); - toAssign.remove(dest); - } else { - toAssign.put(dest, remaining); - } - } - } - - /** - * Will balance randomly, maintaining distribution - */ - @Override - public long balance(SortedMap current, Set migrations, List migrationsOut) { - Map numTablets = new HashMap(); - List underCapacityTServer = new ArrayList(); - - if (!migrations.isEmpty()) - return 100; - - boolean moveMetadata = r.nextInt(4) == 0; - long totalTablets = 0; - for (Entry e : current.entrySet()) { - long tabletCount = 0; - for (TableInfo ti : e.getValue().getTableMap().values()) { - tabletCount += ti.tablets; - } - numTablets.put(e.getKey(), tabletCount); - underCapacityTServer.add(e.getKey()); - totalTablets += tabletCount; - } - // totalTablets is fuzzy due to asynchronicity of the stats - // *1.2 to handle fuzziness, and prevent locking for 'perfect' balancing scenarios - long avg = (long) Math.ceil(((double) totalTablets) / current.size() * 1.2); - - for (Entry e : current.entrySet()) { - for (String table : e.getValue().getTableMap().keySet()) { - if (!moveMetadata && MetadataTable.NAME.equals(table)) - continue; - try { - for (TabletStats ts : getOnlineTabletsForTable(e.getKey(), table)) { - KeyExtent ke = new KeyExtent(ts.extent); - int index = r.nextInt(underCapacityTServer.size()); - TServerInstance dest = underCapacityTServer.get(index); - if (dest.equals(e.getKey())) - continue; - migrationsOut.add(new TabletMigration(ke, e.getKey(), dest)); - if (numTablets.put(dest, numTablets.get(dest) + 1) > avg) - underCapacityTServer.remove(index); - if (numTablets.put(e.getKey(), numTablets.get(e.getKey()) - 1) <= avg && !underCapacityTServer.contains(e.getKey())) - underCapacityTServer.add(e.getKey()); - - // We can get some craziness with only 1 tserver, so lets make sure there's always an option! - if (underCapacityTServer.isEmpty()) - underCapacityTServer.addAll(numTablets.keySet()); - } - } catch (ThriftSecurityException e1) { - // Shouldn't happen, but carry on if it does - e1.printStackTrace(); - } catch (TException e1) { - // Shouldn't happen, but carry on if it does - e1.printStackTrace(); - } - } - } - - return 100; - } - - @Override - public void init(ServerConfiguration conf) { - super.init(conf); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java deleted file mode 100644 index 9b88d74..0000000 --- a/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java +++ /dev/null @@ -1,318 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.master.balancer; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.SortedMap; - -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.master.thrift.TableInfo; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; -import org.apache.accumulo.core.tabletserver.thrift.TabletStats; -import org.apache.accumulo.server.master.state.TServerInstance; -import org.apache.accumulo.server.master.state.TabletMigration; -import org.apache.log4j.Logger; - -public class DefaultLoadBalancer extends TabletBalancer { - - private static final Logger log = Logger.getLogger(DefaultLoadBalancer.class); - - Iterator assignments; - // if tableToBalance is set, then only balance the given table - String tableToBalance = null; - - public DefaultLoadBalancer() { - - } - - public DefaultLoadBalancer(String table) { - tableToBalance = table; - } - - List randomize(Set locations) { - List result = new ArrayList(locations); - Collections.shuffle(result); - return result; - } - - public TServerInstance getAssignment(SortedMap locations, KeyExtent extent, TServerInstance last) { - if (locations.size() == 0) - return null; - - if (last != null) { - // Maintain locality - TServerInstance simple = new TServerInstance(last.getLocation(), ""); - Iterator find = locations.tailMap(simple).keySet().iterator(); - if (find.hasNext()) { - TServerInstance current = find.next(); - if (current.host().equals(last.host())) - return current; - } - } - - // The strategy here is to walk through the locations and hand them back, one at a time - // Grab an iterator off of the set of options; use a new iterator if it hands back something not in the current list. - if (assignments == null || !assignments.hasNext()) - assignments = randomize(locations.keySet()).iterator(); - TServerInstance result = assignments.next(); - if (!locations.containsKey(result)) { - assignments = null; - return randomize(locations.keySet()).iterator().next(); - } - return result; - } - - static class ServerCounts implements Comparable { - public final TServerInstance server; - public final int count; - public final TabletServerStatus status; - - ServerCounts(int count, TServerInstance server, TabletServerStatus status) { - this.count = count; - this.server = server; - this.status = status; - } - - public int compareTo(ServerCounts obj) { - int result = count - obj.count; - if (result == 0) - return server.compareTo(obj.server); - return result; - } - } - - public boolean getMigrations(Map current, List result) { - boolean moreBalancingNeeded = false; - try { - // no moves possible - if (current.size() < 2) { - return false; - } - - // Sort by total number of online tablets, per server - int total = 0; - ArrayList totals = new ArrayList(); - for (Entry entry : current.entrySet()) { - int serverTotal = 0; - if (entry.getValue() != null && entry.getValue().tableMap != null) { - for (Entry e : entry.getValue().tableMap.entrySet()) { - /** - * The check below was on entry.getKey(), but that resolves to a tabletserver not a tablename. Believe it should be e.getKey() which is a tablename - */ - if (tableToBalance == null || tableToBalance.equals(e.getKey())) - serverTotal += e.getValue().onlineTablets; - } - } - totals.add(new ServerCounts(serverTotal, entry.getKey(), entry.getValue())); - total += serverTotal; - } - - // order from low to high - Collections.sort(totals); - Collections.reverse(totals); - int even = total / totals.size(); - int numServersOverEven = total % totals.size(); - - // Move tablets from the servers with too many to the servers with - // the fewest but only nominate tablets to move once. This allows us - // to fill new servers with tablets from a mostly balanced server - // very quickly. However, it may take several balancing passes to move - // tablets from one hugely overloaded server to many slightly - // under-loaded servers. - int end = totals.size() - 1; - int movedAlready = 0; - for (int tooManyIndex = 0; tooManyIndex < totals.size(); tooManyIndex++) { - ServerCounts tooMany = totals.get(tooManyIndex); - int goal = even; - if (tooManyIndex < numServersOverEven) { - goal++; - } - int needToUnload = tooMany.count - goal; - ServerCounts tooLittle = totals.get(end); - int needToLoad = goal - tooLittle.count - movedAlready; - if (needToUnload < 1 && needToLoad < 1) { - break; - } - if (needToUnload >= needToLoad) { - result.addAll(move(tooMany, tooLittle, needToLoad)); - end--; - movedAlready = 0; - } else { - result.addAll(move(tooMany, tooLittle, needToUnload)); - movedAlready += needToUnload; - } - if (needToUnload > needToLoad) - moreBalancingNeeded = true; - } - - } finally { - log.debug("balance ended with " + result.size() + " migrations"); - } - return moreBalancingNeeded; - } - - static class TableDiff { - int diff; - String table; - - public TableDiff(int diff, String table) { - this.diff = diff; - this.table = table; - } - }; - - /** - * Select a tablet based on differences between table loads; if the loads are even, use the busiest table - */ - List move(ServerCounts tooMuch, ServerCounts tooLittle, int count) { - - List result = new ArrayList(); - if (count == 0) - return result; - - Map> onlineTablets = new HashMap>(); - // Copy counts so we can update them as we propose migrations - Map tooMuchMap = tabletCountsPerTable(tooMuch.status); - Map tooLittleMap = tabletCountsPerTable(tooLittle.status); - - for (int i = 0; i < count; i++) { - String table; - Integer tooLittleCount; - if (tableToBalance == null) { - // find a table to migrate - // look for an uneven table count - int biggestDifference = 0; - String biggestDifferenceTable = null; - for (Entry tableEntry : tooMuchMap.entrySet()) { - String tableID = tableEntry.getKey(); - if (tooLittleMap.get(tableID) == null) - tooLittleMap.put(tableID, 0); - int diff = tableEntry.getValue() - tooLittleMap.get(tableID); - if (diff > biggestDifference) { - biggestDifference = diff; - biggestDifferenceTable = tableID; - } - } - if (biggestDifference < 2) { - table = busiest(tooMuch.status.tableMap); - } else { - table = biggestDifferenceTable; - } - } else { - // just balance the given table - table = tableToBalance; - } - Map onlineTabletsForTable = onlineTablets.get(table); - try { - if (onlineTabletsForTable == null) { - onlineTabletsForTable = new HashMap(); - for (TabletStats stat : getOnlineTabletsForTable(tooMuch.server, table)) - onlineTabletsForTable.put(new KeyExtent(stat.extent), stat); - onlineTablets.put(table, onlineTabletsForTable); - } - } catch (Exception ex) { - log.error("Unable to select a tablet to move", ex); - return result; - } - KeyExtent extent = selectTablet(tooMuch.server, onlineTabletsForTable); - onlineTabletsForTable.remove(extent); - if (extent == null) - return result; - tooMuchMap.put(table, tooMuchMap.get(table) - 1); - /** - * If a table grows from 1 tablet then tooLittleMap.get(table) can return a null, since there is only one tabletserver that holds all of the tablets. Here - * we check to see if in fact that is the case and if so set the value to 0. - */ - tooLittleCount = tooLittleMap.get(table); - if (tooLittleCount == null) { - tooLittleCount = 0; - } - tooLittleMap.put(table, tooLittleCount + 1); - - result.add(new TabletMigration(extent, tooMuch.server, tooLittle.server)); - } - return result; - } - - static Map tabletCountsPerTable(TabletServerStatus status) { - Map result = new HashMap(); - if (status != null && status.tableMap != null) { - Map tableMap = status.tableMap; - for (Entry entry : tableMap.entrySet()) { - result.put(entry.getKey(), entry.getValue().onlineTablets); - } - } - return result; - } - - static KeyExtent selectTablet(TServerInstance tserver, Map extents) { - if (extents.size() == 0) - return null; - KeyExtent mostRecentlySplit = null; - long splitTime = 0; - for (Entry entry : extents.entrySet()) - if (entry.getValue().splitCreationTime >= splitTime) { - splitTime = entry.getValue().splitCreationTime; - mostRecentlySplit = entry.getKey(); - } - return mostRecentlySplit; - } - - // define what it means for a tablet to be busy - private static String busiest(Map tables) { - String result = null; - double busiest = Double.NEGATIVE_INFINITY; - for (Entry entry : tables.entrySet()) { - TableInfo info = entry.getValue(); - double busy = info.ingestRate + info.queryRate; - if (busy > busiest) { - busiest = busy; - result = entry.getKey(); - } - } - return result; - } - - @Override - public void getAssignments(SortedMap current, Map unassigned, - Map assignments) { - for (Entry entry : unassigned.entrySet()) { - assignments.put(entry.getKey(), getAssignment(current, entry.getKey(), entry.getValue())); - } - } - - @Override - public long balance(SortedMap current, Set migrations, List migrationsOut) { - // do we have any servers? - if (current.size() > 0) { - // Don't migrate if we have migrations in progress - if (migrations.size() == 0) { - if (getMigrations(current, migrationsOut)) - return 1 * 1000; - } - } - return 5 * 1000; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java deleted file mode 100644 index 3e0a2bf..0000000 --- a/server/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.master.balancer; - -import java.lang.reflect.Constructor; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.SortedMap; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; -import org.apache.accumulo.server.master.state.TServerInstance; -import org.apache.accumulo.server.master.state.TabletMigration; -import org.apache.accumulo.server.security.SystemCredentials; -import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; -import org.apache.log4j.Logger; - -public class TableLoadBalancer extends TabletBalancer { - - private static final Logger log = Logger.getLogger(TableLoadBalancer.class); - - Map perTableBalancers = new HashMap(); - - private TabletBalancer constructNewBalancerForTable(String clazzName, String table) throws Exception { - Class clazz = AccumuloVFSClassLoader.loadClass(clazzName, TabletBalancer.class); - Constructor constructor = clazz.getConstructor(String.class); - return constructor.newInstance(table); - } - - protected String getLoadBalancerClassNameForTable(String table) { - return configuration.getTableConfiguration(table).get(Property.TABLE_LOAD_BALANCER); - } - - protected TabletBalancer getBalancerForTable(String table) { - TabletBalancer balancer = perTableBalancers.get(table); - - String clazzName = getLoadBalancerClassNameForTable(table); - - if (clazzName == null) - clazzName = DefaultLoadBalancer.class.getName(); - if (balancer != null) { - if (clazzName.equals(balancer.getClass().getName()) == false) { - // the balancer class for this table does not match the class specified in the configuration - try { - // attempt to construct a balancer with the specified class - TabletBalancer newBalancer = constructNewBalancerForTable(clazzName, table); - if (newBalancer != null) { - balancer = newBalancer; - perTableBalancers.put(table, balancer); - balancer.init(configuration); - } - } catch (Exception e) { - log.warn("Failed to load table balancer class " + clazzName + " for table " + table, e); - } - } - } - if (balancer == null) { - try { - balancer = constructNewBalancerForTable(clazzName, table); - log.info("Loaded class " + clazzName + " for table " + table); - } catch (Exception e) { - log.warn("Failed to load table balancer class " + clazzName + " for table " + table, e); - } - - if (balancer == null) { - log.info("Using balancer " + DefaultLoadBalancer.class.getName() + " for table " + table); - balancer = new DefaultLoadBalancer(table); - } - perTableBalancers.put(table, balancer); - balancer.init(configuration); - } - return balancer; - } - - @Override - public void getAssignments(SortedMap current, Map unassigned, - Map assignments) { - // separate the unassigned into tables - Map> groupedUnassigned = new HashMap>(); - for (Entry e : unassigned.entrySet()) { - Map tableUnassigned = groupedUnassigned.get(e.getKey().getTableId().toString()); - if (tableUnassigned == null) { - tableUnassigned = new HashMap(); - groupedUnassigned.put(e.getKey().getTableId().toString(), tableUnassigned); - } - tableUnassigned.put(e.getKey(), e.getValue()); - } - for (Entry> e : groupedUnassigned.entrySet()) { - Map newAssignments = new HashMap(); - getBalancerForTable(e.getKey()).getAssignments(current, e.getValue(), newAssignments); - assignments.putAll(newAssignments); - } - } - - private TableOperations tops = null; - - protected TableOperations getTableOperations() { - if (tops == null) - try { - tops = configuration.getInstance().getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken()).tableOperations(); - } catch (AccumuloException e) { - log.error("Unable to access table operations from within table balancer", e); - } catch (AccumuloSecurityException e) { - log.error("Unable to access table operations from within table balancer", e); - } - return tops; - } - - @Override - public long balance(SortedMap current, Set migrations, List migrationsOut) { - long minBalanceTime = 5 * 1000; - // Iterate over the tables and balance each of them - TableOperations t = getTableOperations(); - if (t == null) - return minBalanceTime; - for (String s : t.tableIdMap().values()) { - ArrayList newMigrations = new ArrayList(); - long tableBalanceTime = getBalancerForTable(s).balance(current, migrations, newMigrations); - if (tableBalanceTime < minBalanceTime) - minBalanceTime = tableBalanceTime; - migrationsOut.addAll(newMigrations); - } - return minBalanceTime; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java b/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java deleted file mode 100644 index fd76ce2..0000000 --- a/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.master.balancer; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; - -import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; -import org.apache.accumulo.core.tabletserver.thrift.TabletStats; -import org.apache.accumulo.core.util.ThriftUtil; -import org.apache.accumulo.server.conf.ServerConfiguration; -import org.apache.accumulo.server.master.state.TServerInstance; -import org.apache.accumulo.server.master.state.TabletMigration; -import org.apache.accumulo.server.security.SystemCredentials; -import org.apache.accumulo.trace.instrument.Tracer; -import org.apache.log4j.Logger; -import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransportException; - -public abstract class TabletBalancer { - - private static final Logger log = Logger.getLogger(TabletBalancer.class); - - protected ServerConfiguration configuration; - - /** - * Initialize the TabletBalancer. This gives the balancer the opportunity to read the configuration. - */ - public void init(ServerConfiguration conf) { - configuration = conf; - } - - /** - * Assign tablets to tablet servers. This method is called whenever the master finds tablets that are unassigned. - * - * @param current - * The current table-summary state of all the online tablet servers. Read-only. The TabletServerStatus for each server may be null if the tablet - * server has not yet responded to a recent request for status. - * @param unassigned - * A map from unassigned tablet to the last known tablet server. Read-only. - * @param assignments - * A map from tablet to assigned server. Write-only. - */ - abstract public void getAssignments(SortedMap current, Map unassigned, - Map assignments); - - /** - * Ask the balancer if any migrations are necessary. - * - * @param current - * The current table-summary state of all the online tablet servers. Read-only. - * @param migrations - * the current set of migrations. Read-only. - * @param migrationsOut - * new migrations to perform; should not contain tablets in the current set of migrations. Write-only. - * @return the time, in milliseconds, to wait before re-balancing. - * - * This method will not be called when there are unassigned tablets. - */ - public abstract long balance(SortedMap current, Set migrations, List migrationsOut); - - /** - * Fetch the tablets for the given table by asking the tablet server. Useful if your balance strategy needs details at the tablet level to decide what tablets - * to move. - * - * @param tserver - * The tablet server to ask. - * @param tableId - * The table id - * @return a list of tablet statistics - * @throws ThriftSecurityException - * tablet server disapproves of your internal System password. - * @throws TException - * any other problem - */ - public List getOnlineTabletsForTable(TServerInstance tserver, String tableId) throws ThriftSecurityException, TException { - log.debug("Scanning tablet server " + tserver + " for table " + tableId); - Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), tserver.getLocation(), configuration.getConfiguration()); - try { - List onlineTabletsForTable = client.getTabletStats(Tracer.traceInfo(), SystemCredentials.get().toThrift(configuration.getInstance()), - tableId); - return onlineTabletsForTable; - } catch (TTransportException e) { - log.error("Unable to connect to " + tserver + ": " + e); - } finally { - ThriftUtil.returnClient(client); - } - return null; - } - - /** - * Utility to ensure that the migrations from balance() are consistent: - *
    - *
  • Tablet objects are not null - *
  • Source and destination tablet servers are not null and current - *
- * - * @param current - * @param migrations - * @return A list of TabletMigration object that passed sanity checks. - */ - public static List checkMigrationSanity(Set current, List migrations) { - List result = new ArrayList(migrations.size()); - for (TabletMigration m : migrations) { - if (m.tablet == null) { - log.warn("Balancer gave back a null tablet " + m); - continue; - } - if (m.newServer == null) { - log.warn("Balancer did not set the destination " + m); - continue; - } - if (m.oldServer == null) { - log.warn("Balancer did not set the source " + m); - continue; - } - if (!current.contains(m.oldServer)) { - log.warn("Balancer wants to move a tablet from a server that is not current: " + m); - continue; - } - if (!current.contains(m.newServer)) { - log.warn("Balancer wants to move a tablet to a server that is not current: " + m); - continue; - } - result.add(m); - } - return result; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java b/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java deleted file mode 100644 index 3eda844..0000000 --- a/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.master.recovery; - -import java.io.FileNotFoundException; -import java.io.IOException; - -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.server.master.Master; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.log4j.Logger; - -public class HadoopLogCloser implements LogCloser { - - private static Logger log = Logger.getLogger(HadoopLogCloser.class); - - @Override - public long close(Master master, VolumeManager fs, Path source) throws IOException { - FileSystem ns = fs.getFileSystemByPath(source); - if (ns instanceof DistributedFileSystem) { - DistributedFileSystem dfs = (DistributedFileSystem) ns; - try { - if (!dfs.recoverLease(source)) { - log.info("Waiting for file to be closed " + source.toString()); - return master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_LEASE_RECOVERY_WAITING_PERIOD); - } - log.info("Recovered lease on " + source.toString()); - } catch (FileNotFoundException ex) { - throw ex; - } catch (Exception ex) { - log.warn("Error recovering lease on " + source.toString(), ex); - ns.append(source).close(); - log.info("Recovered lease on " + source.toString() + " using append"); - } - } else if (ns instanceof LocalFileSystem) { - // ignore - } else { - throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName()); - } - return 0; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java b/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java deleted file mode 100644 index 42497ff..0000000 --- a/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.master.recovery; - -import java.io.IOException; - -import org.apache.accumulo.server.master.Master; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.hadoop.fs.Path; - -public interface LogCloser { - public long close(Master master, VolumeManager fs, Path path) throws IOException; -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java b/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java deleted file mode 100644 index a5bb0c7..0000000 --- a/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.master.recovery; - -import java.io.IOException; - -import org.apache.accumulo.server.master.Master; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.log4j.Logger; - -public class MapRLogCloser implements LogCloser { - - private static Logger log = Logger.getLogger(MapRLogCloser.class); - - @Override - public long close(Master m, VolumeManager fs, Path path) throws IOException { - log.info("Recovering file " + path.toString() + " by changing permission to readonly"); - FileSystem ns = fs.getFileSystemByPath(path); - FsPermission roPerm = new FsPermission((short) 0444); - try { - ns.setPermission(path, roPerm); - return 0; - } catch (IOException ex) { - log.error("error recovering lease ", ex); - // lets do this again - return 1000; - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java b/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java deleted file mode 100644 index 5ce7a66..0000000 --- a/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.master.recovery; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.util.NamingThreadFactory; -import org.apache.accumulo.core.zookeeper.ZooUtil; -import org.apache.accumulo.server.ServerConstants; -import org.apache.accumulo.server.fs.VolumeManager.FileType; -import org.apache.accumulo.server.master.Master; -import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; -import org.apache.accumulo.server.zookeeper.ZooCache; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; -import org.apache.zookeeper.KeeperException; - -public class RecoveryManager { - - private static Logger log = Logger.getLogger(RecoveryManager.class); - - private Map recoveryDelay = new HashMap(); - private Set closeTasksQueued = new HashSet(); - private Set sortsQueued = new HashSet(); - private ScheduledExecutorService executor; - private Master master; - private ZooCache zooCache; - - public RecoveryManager(Master master) { - this.master = master; - executor = Executors.newScheduledThreadPool(4, new NamingThreadFactory("Walog sort starter ")); - zooCache = new ZooCache(); - try { - List workIDs = new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).getWorkQueued(); - sortsQueued.addAll(workIDs); - } catch (Exception e) { - log.warn(e, e); - } - } - - private class LogSortTask implements Runnable { - private String source; - private String destination; - private String sortId; - private LogCloser closer; - - public LogSortTask(LogCloser closer, String source, String destination, String sortId) { - this.closer = closer; - this.source = source; - this.destination = destination; - this.sortId = sortId; - } - - @Override - public void run() { - boolean rescheduled = false; - try { - - long time = closer.close(master, master.getFileSystem(), new Path(source)); - - if (time > 0) { - executor.schedule(this, time, TimeUnit.MILLISECONDS); - rescheduled = true; - } else { - initiateSort(sortId, source, destination); - } - } catch (FileNotFoundException e) { - log.debug("Unable to initate log sort for " + source + ": " + e); - } catch (Exception e) { - log.warn("Failed to initiate log sort " + source, e); - } finally { - if (!rescheduled) { - synchronized (RecoveryManager.this) { - closeTasksQueued.remove(sortId); - } - } - } - } - - } - - private void initiateSort(String sortId, String source, final String destination) throws KeeperException, InterruptedException, IOException { - String work = source + "|" + destination; - new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(sortId, work.getBytes()); - - synchronized (this) { - sortsQueued.add(sortId); - } - - final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId; - log.info("Created zookeeper entry " + path + " with data " + work); - } - - public boolean recoverLogs(KeyExtent extent, Collection> walogs) throws IOException { - boolean recoveryNeeded = false; - ; - for (Collection logs : walogs) { - for (String walog : logs) { - String hostFilename[] = walog.split("/", 2); - String host = hostFilename[0]; - String filename = hostFilename[1]; - String parts[] = filename.split("/"); - String sortId = parts[parts.length - 1]; - String dest = master.getFileSystem().choose(ServerConstants.getRecoveryDirs()) + "/" + sortId; - filename = master.getFileSystem().getFullPath(FileType.WAL, walog).toString(); - log.debug("Recovering " + filename + " to " + dest); - - boolean sortQueued; - synchronized (this) { - sortQueued = sortsQueued.contains(sortId); - } - - if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId) == null) { - synchronized (this) { - sortsQueued.remove(sortId); - } - } - - if (master.getFileSystem().exists(new Path(dest, "finished"))) { - synchronized (this) { - closeTasksQueued.remove(sortId); - recoveryDelay.remove(sortId); - sortsQueued.remove(sortId); - } - continue; - } - - recoveryNeeded = true; - synchronized (this) { - if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) { - AccumuloConfiguration aconf = master.getConfiguration().getConfiguration(); - LogCloser closer = Property.createInstanceFromPropertyName(aconf, Property.MASTER_WALOG_CLOSER_IMPLEMETATION, LogCloser.class, new HadoopLogCloser()); - Long delay = recoveryDelay.get(sortId); - if (delay == null) { - delay = master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY); - } else { - delay = Math.min(2 * delay, 1000 * 60 * 5l); - } - - log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s) created for " + host + ", tablet " + extent + " holds a reference"); - - executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS); - closeTasksQueued.add(sortId); - recoveryDelay.put(sortId, delay); - } - } - } - } - return recoveryNeeded; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/Assignment.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/Assignment.java b/server/src/main/java/org/apache/accumulo/server/master/state/Assignment.java deleted file mode 100644 index 40b7a93..0000000 --- a/server/src/main/java/org/apache/accumulo/server/master/state/Assignment.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.master.state; - -import org.apache.accumulo.core.data.KeyExtent; - -public class Assignment { - public KeyExtent tablet; - public TServerInstance server; - - public Assignment(KeyExtent tablet, TServerInstance server) { - this.tablet = tablet; - this.server = server; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java b/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java deleted file mode 100644 index f4d98bf..0000000 --- a/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.master.state; - -import java.util.Collection; -import java.util.Set; - -public interface CurrentState { - - Set onlineTables(); - - Set onlineTabletServers(); - - Collection merges(); - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java b/server/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java deleted file mode 100644 index b2ea7d6..0000000 --- a/server/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.master.state; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.accumulo.core.master.thrift.DeadServer; -import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; -import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; -import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; -import org.apache.accumulo.server.zookeeper.ZooReaderWriter; -import org.apache.log4j.Logger; -import org.apache.zookeeper.data.Stat; - -public class DeadServerList { - private static final Logger log = Logger.getLogger(DeadServerList.class); - private final String path; - - public DeadServerList(String path) { - this.path = path; - IZooReaderWriter zoo = ZooReaderWriter.getInstance(); - try { - zoo.mkdirs(path); - } catch (Exception ex) { - log.error("Unable to make parent directories of " + path, ex); - } - } - - public List getList() { - List result = new ArrayList(); - IZooReaderWriter zoo = ZooReaderWriter.getInstance(); - try { - List children = zoo.getChildren(path); - if (children != null) { - for (String child : children) { - Stat stat = new Stat(); - byte[] data = zoo.getData(path + "/" + child, stat); - DeadServer server = new DeadServer(child, stat.getMtime(), new String(data)); - result.add(server); - } - } - } catch (Exception ex) { - log.error(ex, ex); - } - return result; - } - - public void delete(String server) { - IZooReaderWriter zoo = ZooReaderWriter.getInstance(); - try { - zoo.recursiveDelete(path + "/" + server, NodeMissingPolicy.SKIP); - } catch (Exception ex) { - log.error(ex, ex); - } - } - - public void post(String server, String cause) { - IZooReaderWriter zoo = ZooReaderWriter.getInstance(); - try { - zoo.putPersistentData(path + "/" + server, cause.getBytes(), NodeExistsPolicy.SKIP); - } catch (Exception ex) { - log.error(ex, ex); - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java b/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java deleted file mode 100644 index ad658df..0000000 --- a/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.master.state; - -import java.util.List; - -/* - * An abstract version of ZooKeeper that we can write tests against. - */ -public interface DistributedStore { - - public List getChildren(String path) throws DistributedStoreException; - - public byte[] get(String path) throws DistributedStoreException; - - public void put(String path, byte[] bs) throws DistributedStoreException; - - public void remove(String path) throws DistributedStoreException; - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java b/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java deleted file mode 100644 index 3d3a725..0000000 --- a/server/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.master.state; - -public class DistributedStoreException extends Exception { - - private static final long serialVersionUID = 1L; - - public DistributedStoreException(String why) { - super(why); - } - - public DistributedStoreException(Exception cause) { - super(cause); - } - - public DistributedStoreException(String why, Exception cause) { - super(why, cause); - } -}