Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 54D7518B94 for ; Thu, 30 Jul 2015 21:51:36 +0000 (UTC) Received: (qmail 57593 invoked by uid 500); 30 Jul 2015 21:51:36 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 57547 invoked by uid 500); 30 Jul 2015 21:51:36 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 57464 invoked by uid 99); 30 Jul 2015 21:51:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Jul 2015 21:51:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E150CE7154; Thu, 30 Jul 2015 21:51:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Thu, 30 Jul 2015 21:51:36 -0000 Message-Id: <69d7d766626445fd8f56712ce9df2499@git.apache.org> In-Reply-To: <984ecbbf37144d56b1b236207e81b5d5@git.apache.org> References: <984ecbbf37144d56b1b236207e81b5d5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/14] accumulo git commit: ACCUMULO-3920 Convert more tests from mock http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java new file mode 100644 index 0000000..fb702a2 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.master; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; + +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.master.thrift.MasterState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.master.state.MergeStats; +import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.accumulo.server.master.state.Assignment; +import org.apache.accumulo.server.master.state.CurrentState; +import org.apache.accumulo.server.master.state.MergeInfo; +import org.apache.accumulo.server.master.state.MergeState; +import org.apache.accumulo.server.master.state.MetaDataStateStore; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.io.Text; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.net.HostAndPort; + +public class MergeStateIT extends ConfigurableMacBase { + + private static class MockCurrentState implements CurrentState { + + TServerInstance someTServer = new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1234), 0x123456); + MergeInfo mergeInfo; + + MockCurrentState(MergeInfo info) { + this.mergeInfo = info; + } + + @Override + public Set onlineTables() { + return Collections.singleton("t"); + } + + @Override + public Set onlineTabletServers() { + return Collections.singleton(someTServer); + } + + @Override + public Collection merges() { + return Collections.singleton(mergeInfo); + } + + @Override + public Collection migrations() { + return Collections.emptyList(); + } + + @Override + public MasterState getMasterState() { + return MasterState.NORMAL; + } + + @Override + public Set shutdownServers() { + return Collections.emptySet(); + } + } + + private static void update(Connector c, Mutation m) throws TableNotFoundException, MutationsRejectedException { + BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + bw.addMutation(m); + bw.close(); + } + + @Test + public void test() throws Exception { + AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class); + Connector connector = getConnector(); + EasyMock.expect(context.getConnector()).andReturn(connector).anyTimes(); + EasyMock.replay(context); + connector.securityOperations().grantTablePermission(connector.whoami(), MetadataTable.NAME, TablePermission.WRITE); + BatchWriter bw = connector.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + // Create a fake METADATA table with these splits + String splits[] = {"a", "e", "j", "o", "t", "z"}; + // create metadata for a table "t" with the splits above + Text tableId = new Text("t"); + Text pr = null; + for (String s : splits) { + Text split = new Text(s); + Mutation prevRow = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, split, pr)); + prevRow.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes())); + ChoppedColumnFamily.CHOPPED_COLUMN.put(prevRow, new Value("junk".getBytes())); + bw.addMutation(prevRow); + pr = split; + } + // Add the default tablet + Mutation defaultTablet = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, null, pr)); + defaultTablet.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes())); + bw.addMutation(defaultTablet); + bw.close(); + + // Read out the TabletLocationStates + MockCurrentState state = new MockCurrentState(new MergeInfo(new KeyExtent(tableId, new Text("p"), new Text("e")), MergeInfo.Operation.MERGE)); + + // Verify the tablet state: hosted, and count + MetaDataStateStore metaDataStateStore = new MetaDataStateStore(context, state); + int count = 0; + for (TabletLocationState tss : metaDataStateStore) { + if (tss != null) + count++; + } + Assert.assertEquals(0, count); // the normal case is to skip tablets in a good state + + // Create the hole + // Split the tablet at one end of the range + Mutation m = new KeyExtent(tableId, new Text("t"), new Text("p")).getPrevRowUpdateMutation(); + TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes())); + TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(new Text("o"))); + update(connector, m); + + // do the state check + MergeStats stats = scan(state, metaDataStateStore); + MergeState newState = stats.nextMergeState(connector, state); + Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, newState); + + // unassign the tablets + BatchDeleter deleter = connector.createBatchDeleter(MetadataTable.NAME, Authorizations.EMPTY, 1000, new BatchWriterConfig()); + deleter.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); + deleter.setRanges(Collections.singletonList(new Range())); + deleter.delete(); + + // now we should be ready to merge but, we have inconsistent metadata + stats = scan(state, metaDataStateStore); + Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state)); + + // finish the split + KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o")); + m = tablet.getPrevRowUpdateMutation(); + TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes())); + update(connector, m); + metaDataStateStore.setLocations(Collections.singletonList(new Assignment(tablet, state.someTServer))); + + // onos... there's a new tablet online + stats = scan(state, metaDataStateStore); + Assert.assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState(connector, state)); + + // chop it + m = tablet.getPrevRowUpdateMutation(); + ChoppedColumnFamily.CHOPPED_COLUMN.put(m, new Value("junk".getBytes())); + update(connector, m); + + stats = scan(state, metaDataStateStore); + Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state)); + + // take it offline + m = tablet.getPrevRowUpdateMutation(); + Collection> walogs = Collections.emptyList(); + metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), null); + + // now we can split + stats = scan(state, metaDataStateStore); + Assert.assertEquals(MergeState.MERGING, stats.nextMergeState(connector, state)); + + } + + private MergeStats scan(MockCurrentState state, MetaDataStateStore metaDataStateStore) { + MergeStats stats = new MergeStats(state.mergeInfo); + stats.getMergeInfo().setState(MergeState.WAITING_FOR_OFFLINE); + for (TabletLocationState tss : metaDataStateStore) { + stats.update(tss.extent, tss.getState(state.onlineTabletServers()), tss.chopped, false); + } + return stats; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/FinishedWorkUpdaterIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/FinishedWorkUpdaterIT.java b/test/src/main/java/org/apache/accumulo/test/replication/FinishedWorkUpdaterIT.java new file mode 100644 index 0000000..5519013 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/FinishedWorkUpdaterIT.java @@ -0,0 +1,173 @@ +package org.apache.accumulo.test.replication; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Map.Entry; +import java.util.UUID; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.ReplicationTarget; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.master.replication.FinishedWorkUpdater; +import org.apache.accumulo.server.replication.proto.Replication.Status; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Iterables; + +public class FinishedWorkUpdaterIT extends ConfigurableMacBase { + + private Connector conn; + private FinishedWorkUpdater updater; + + @Before + public void configureUpdater() throws Exception { + conn = getConnector(); + updater = new FinishedWorkUpdater(conn); + } + + @Test + public void offlineReplicationTableFailsGracefully() { + updater.run(); + } + + @Test + public void recordsWithProgressUpdateBothTables() throws Exception { + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ); + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE); + ReplicationTable.setOnline(conn); + + String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + Status stat = Status.newBuilder().setBegin(100).setEnd(200).setClosed(true).setInfiniteEnd(false).build(); + ReplicationTarget target = new ReplicationTarget("peer", "table1", "1"); + + // Create a single work record for a file to some peer + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + Mutation m = new Mutation(file); + WorkSection.add(m, target.toText(), ProtobufUtil.toValue(stat)); + bw.addMutation(m); + bw.close(); + + updater.run(); + + Scanner s = ReplicationTable.getScanner(conn); + s.setRange(Range.exact(file)); + StatusSection.limit(s); + Entry entry = Iterables.getOnlyElement(s); + + Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME); + Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target.getSourceTableId()); + + // We should only rely on the correct begin attribute being returned + Status actual = Status.parseFrom(entry.getValue().get()); + Assert.assertEquals(stat.getBegin(), actual.getBegin()); + } + + @Test + public void chooseMinimumBeginOffset() throws Exception { + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ); + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE); + ReplicationTable.setOnline(conn); + + String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + // @formatter:off + Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(), + stat2 = Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(), + stat3 = Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(); + ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1"), + target2 = new ReplicationTarget("peer2", "table2", "1"), + target3 = new ReplicationTarget("peer3", "table3", "1"); + // @formatter:on + + // Create a single work record for a file to some peer + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + Mutation m = new Mutation(file); + WorkSection.add(m, target1.toText(), ProtobufUtil.toValue(stat1)); + WorkSection.add(m, target2.toText(), ProtobufUtil.toValue(stat2)); + WorkSection.add(m, target3.toText(), ProtobufUtil.toValue(stat3)); + bw.addMutation(m); + bw.close(); + + updater.run(); + + Scanner s = ReplicationTable.getScanner(conn); + s.setRange(Range.exact(file)); + StatusSection.limit(s); + Entry entry = Iterables.getOnlyElement(s); + + Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME); + Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target1.getSourceTableId()); + + // We should only rely on the correct begin attribute being returned + Status actual = Status.parseFrom(entry.getValue().get()); + Assert.assertEquals(1, actual.getBegin()); + } + + @Test + public void chooseMinimumBeginOffsetInfiniteEnd() throws Exception { + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ); + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE); + ReplicationTable.setOnline(conn); + + String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + // @formatter:off + Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(), + stat2 = Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(), + stat3 = Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(); + ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1"), + target2 = new ReplicationTarget("peer2", "table2", "1"), + target3 = new ReplicationTarget("peer3", "table3", "1"); + // @formatter:on + + // Create a single work record for a file to some peer + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + Mutation m = new Mutation(file); + WorkSection.add(m, target1.toText(), ProtobufUtil.toValue(stat1)); + WorkSection.add(m, target2.toText(), ProtobufUtil.toValue(stat2)); + WorkSection.add(m, target3.toText(), ProtobufUtil.toValue(stat3)); + bw.addMutation(m); + bw.close(); + + updater.run(); + + Scanner s = ReplicationTable.getScanner(conn); + s.setRange(Range.exact(file)); + StatusSection.limit(s); + Entry entry = Iterables.getOnlyElement(s); + + Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME); + Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target1.getSourceTableId()); + + // We should only rely on the correct begin attribute being returned + Status actual = Status.parseFrom(entry.getValue().get()); + Assert.assertEquals(1, actual.getBegin()); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/RemoveCompleteReplicationRecordsIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/RemoveCompleteReplicationRecordsIT.java b/test/src/main/java/org/apache/accumulo/test/replication/RemoveCompleteReplicationRecordsIT.java new file mode 100644 index 0000000..df1f64f --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/RemoveCompleteReplicationRecordsIT.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection; +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.ReplicationTarget; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.master.replication.RemoveCompleteReplicationRecords; +import org.apache.accumulo.server.replication.StatusUtil; +import org.apache.accumulo.server.replication.proto.Replication.Status; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.io.Text; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Iterables; + +public class RemoveCompleteReplicationRecordsIT extends ConfigurableMacBase { + + private MockRemoveCompleteReplicationRecords rcrr; + private Connector conn; + + private static class MockRemoveCompleteReplicationRecords extends RemoveCompleteReplicationRecords { + + public MockRemoveCompleteReplicationRecords(Connector conn) { + super(conn); + } + + @Override + public long removeCompleteRecords(Connector conn, BatchScanner bs, BatchWriter bw) { + return super.removeCompleteRecords(conn, bs, bw); + } + + } + + @Before + public void initialize() throws Exception { + conn = getConnector(); + rcrr = new MockRemoveCompleteReplicationRecords(conn); + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ); + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE); + ReplicationTable.setOnline(conn); + } + + @Test + public void notYetReplicationRecordsIgnored() throws Exception { + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + int numRecords = 3; + for (int i = 0; i < numRecords; i++) { + String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID(); + Mutation m = new Mutation(file); + StatusSection.add(m, new Text(Integer.toString(i)), StatusUtil.openWithUnknownLengthValue()); + bw.addMutation(m); + } + + bw.close(); + + Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn))); + + BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1); + bs.setRanges(Collections.singleton(new Range())); + IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class); + bs.addScanIterator(cfg); + bw = EasyMock.createMock(BatchWriter.class); + + EasyMock.replay(bw); + + rcrr.removeCompleteRecords(conn, bs, bw); + bs.close(); + + Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn))); + } + + @Test + public void partiallyReplicatedRecordsIgnored() throws Exception { + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + int numRecords = 3; + Status.Builder builder = Status.newBuilder(); + builder.setClosed(false); + builder.setEnd(10000); + builder.setInfiniteEnd(false); + for (int i = 0; i < numRecords; i++) { + String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID(); + Mutation m = new Mutation(file); + StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build())); + bw.addMutation(m); + } + + bw.close(); + + Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn))); + + BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1); + bs.setRanges(Collections.singleton(new Range())); + IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class); + bs.addScanIterator(cfg); + bw = EasyMock.createMock(BatchWriter.class); + + EasyMock.replay(bw); + + // We don't remove any records, so we can just pass in a fake BW for both + rcrr.removeCompleteRecords(conn, bs, bw); + bs.close(); + + Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn))); + } + + @Test + public void replicatedClosedWorkRecordsAreNotRemovedWithoutClosedStatusRecords() throws Exception { + BatchWriter replBw = ReplicationTable.getBatchWriter(conn); + int numRecords = 3; + + Status.Builder builder = Status.newBuilder(); + builder.setClosed(false); + builder.setEnd(10000); + builder.setInfiniteEnd(false); + + // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated + for (int i = 0; i < numRecords; i++) { + String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID(); + Mutation m = new Mutation(file); + StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build())); + replBw.addMutation(m); + } + + // Add two records that we can delete + String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID(); + Mutation m = new Mutation(fileToRemove); + StatusSection.add(m, new Text("5"), ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(false).build())); + replBw.addMutation(m); + + numRecords++; + + fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID(); + m = new Mutation(fileToRemove); + StatusSection.add(m, new Text("6"), ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(false).build())); + replBw.addMutation(m); + + numRecords++; + + replBw.flush(); + + // Make sure that we have the expected number of records in both tables + Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn))); + + // We should not remove any records because they're missing closed status + BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1); + bs.setRanges(Collections.singleton(new Range())); + IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class); + bs.addScanIterator(cfg); + + try { + Assert.assertEquals(0l, rcrr.removeCompleteRecords(conn, bs, replBw)); + } finally { + bs.close(); + replBw.close(); + } + } + + @Test + public void replicatedClosedRowsAreRemoved() throws Exception { + BatchWriter replBw = ReplicationTable.getBatchWriter(conn); + int numRecords = 3; + + Status.Builder builder = Status.newBuilder(); + builder.setClosed(false); + builder.setEnd(10000); + builder.setInfiniteEnd(false); + + long time = System.currentTimeMillis(); + // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated + for (int i = 0; i < numRecords; i++) { + builder.setCreatedTime(time++); + String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID(); + Mutation m = new Mutation(file); + Value v = ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()); + StatusSection.add(m, new Text(Integer.toString(i)), v); + replBw.addMutation(m); + m = OrderSection.createMutation(file, time); + OrderSection.add(m, new Text(Integer.toString(i)), v); + replBw.addMutation(m); + } + + Set filesToRemove = new HashSet<>(); + // We created two mutations for each file + numRecords *= 2; + int finalNumRecords = numRecords; + + // Add two records that we can delete + String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID(); + filesToRemove.add(fileToRemove); + Mutation m = new Mutation(fileToRemove); + ReplicationTarget target = new ReplicationTarget("peer1", "5", "5"); + Value value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setCreatedTime(time).build()); + StatusSection.add(m, new Text("5"), value); + WorkSection.add(m, target.toText(), value); + replBw.addMutation(m); + + m = OrderSection.createMutation(fileToRemove, time); + OrderSection.add(m, new Text("5"), value); + replBw.addMutation(m); + time++; + + numRecords += 3; + + fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID(); + filesToRemove.add(fileToRemove); + m = new Mutation(fileToRemove); + value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setCreatedTime(time).build()); + target = new ReplicationTarget("peer1", "6", "6"); + StatusSection.add(m, new Text("6"), value); + WorkSection.add(m, target.toText(), value); + replBw.addMutation(m); + + m = OrderSection.createMutation(fileToRemove, time); + OrderSection.add(m, new Text("6"), value); + replBw.addMutation(m); + time++; + + numRecords += 3; + + replBw.flush(); + + // Make sure that we have the expected number of records in both tables + Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn))); + + // We should remove the two fully completed records we inserted + BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1); + bs.setRanges(Collections.singleton(new Range())); + StatusSection.limit(bs); + WorkSection.limit(bs); + IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class); + bs.addScanIterator(cfg); + + try { + Assert.assertEquals(4l, rcrr.removeCompleteRecords(conn, bs, replBw)); + } finally { + bs.close(); + replBw.close(); + } + + int actualRecords = 0; + for (Entry entry : ReplicationTable.getScanner(conn)) { + Assert.assertFalse(filesToRemove.contains(entry.getKey().getRow().toString())); + actualRecords++; + } + + Assert.assertEquals(finalNumRecords, actualRecords); + } + + @Test + public void partiallyReplicatedEntriesPrecludeRowDeletion() throws Exception { + BatchWriter replBw = ReplicationTable.getBatchWriter(conn); + int numRecords = 3; + + Status.Builder builder = Status.newBuilder(); + builder.setClosed(false); + builder.setEnd(10000); + builder.setInfiniteEnd(false); + + // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated + for (int i = 0; i < numRecords; i++) { + String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID(); + Mutation m = new Mutation(file); + StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build())); + replBw.addMutation(m); + } + + // Add two records that we can delete + String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID(); + Mutation m = new Mutation(fileToRemove); + ReplicationTarget target = new ReplicationTarget("peer1", "5", "5"); + Value value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).build()); + StatusSection.add(m, new Text("5"), value); + WorkSection.add(m, target.toText(), value); + target = new ReplicationTarget("peer2", "5", "5"); + WorkSection.add(m, target.toText(), value); + target = new ReplicationTarget("peer3", "5", "5"); + WorkSection.add(m, target.toText(), ProtobufUtil.toValue(builder.setClosed(false).build())); + replBw.addMutation(m); + + numRecords += 4; + + replBw.flush(); + + // Make sure that we have the expected number of records in both tables + Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn))); + + // We should remove the two fully completed records we inserted + BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1); + bs.setRanges(Collections.singleton(new Range())); + IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class); + bs.addScanIterator(cfg); + + try { + Assert.assertEquals(0l, rcrr.removeCompleteRecords(conn, bs, replBw)); + } finally { + bs.close(); + replBw.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java new file mode 100644 index 0000000..fa026d1 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.Credentials; +import org.apache.accumulo.core.client.impl.ReplicationOperationsImpl; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.trace.thrift.TInfo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.master.MasterClientServiceHandler; +import org.apache.accumulo.server.replication.proto.Replication.Status; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.io.Text; +import org.apache.thrift.TException; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationOperationsImplIT extends ConfigurableMacBase { + private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImplIT.class); + + private Instance inst; + private Connector conn; + + @Before + public void configureInstance() throws Exception { + conn = getConnector(); + inst = conn.getInstance(); + ReplicationTable.setOnline(conn); + conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE); + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ); + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE); + } + + /** + * Spoof out the Master so we can call the implementation without starting a full instance. + */ + private ReplicationOperationsImpl getReplicationOperations() throws Exception { + Master master = EasyMock.createMock(Master.class); + EasyMock.expect(master.getConnector()).andReturn(conn).anyTimes(); + EasyMock.expect(master.getInstance()).andReturn(inst).anyTimes(); + EasyMock.replay(master); + + final MasterClientServiceHandler mcsh = new MasterClientServiceHandler(master) { + @Override + protected String getTableId(Instance inst, String tableName) throws ThriftTableOperationException { + try { + return conn.tableOperations().tableIdMap().get(tableName); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken(ROOT_PASSWORD)), getClientConfig()); + return new ReplicationOperationsImpl(context) { + @Override + protected boolean getMasterDrain(final TInfo tinfo, final TCredentials rpcCreds, final String tableName, final Set wals) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + try { + return mcsh.drainReplicationTable(tinfo, rpcCreds, tableName, wals); + } catch (TException e) { + throw new RuntimeException(e); + } + } + }; + } + + @Test + public void waitsUntilEntriesAreReplicated() throws Exception { + conn.tableOperations().create("foo"); + Text tableId = new Text(conn.tableOperations().tableIdMap().get("foo")); + + String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); + + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + + Mutation m = new Mutation(file1); + StatusSection.add(m, tableId, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + + m = new Mutation(file2); + StatusSection.add(m, tableId, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + + bw.close(); + + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.put(ReplicationSection.COLF, tableId, ProtobufUtil.toValue(stat)); + + bw.addMutation(m); + + m = new Mutation(ReplicationSection.getRowPrefix() + file2); + m.put(ReplicationSection.COLF, tableId, ProtobufUtil.toValue(stat)); + + bw.close(); + + final AtomicBoolean done = new AtomicBoolean(false); + final AtomicBoolean exception = new AtomicBoolean(false); + final ReplicationOperationsImpl roi = getReplicationOperations(); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + roi.drain("foo"); + } catch (Exception e) { + log.error("Got error", e); + exception.set(true); + } + done.set(true); + } + }); + + t.start(); + + // With the records, we shouldn't be drained + Assert.assertFalse(done.get()); + + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.putDelete(ReplicationSection.COLF, tableId); + bw.addMutation(m); + bw.flush(); + + Assert.assertFalse(done.get()); + + m = new Mutation(ReplicationSection.getRowPrefix() + file2); + m.putDelete(ReplicationSection.COLF, tableId); + bw.addMutation(m); + bw.flush(); + bw.close(); + + // Removing metadata entries doesn't change anything + Assert.assertFalse(done.get()); + + // Remove the replication entries too + bw = ReplicationTable.getBatchWriter(conn); + m = new Mutation(file1); + m.putDelete(StatusSection.NAME, tableId); + bw.addMutation(m); + bw.flush(); + + Assert.assertFalse(done.get()); + + m = new Mutation(file2); + m.putDelete(StatusSection.NAME, tableId); + bw.addMutation(m); + bw.flush(); + + try { + t.join(5000); + } catch (InterruptedException e) { + Assert.fail("ReplicationOperations.drain did not complete"); + } + + // After both metadata and replication + Assert.assertTrue("Drain never finished", done.get()); + Assert.assertFalse("Saw unexpectetd exception", exception.get()); + } + + @Test + public void unrelatedReplicationRecordsDontBlockDrain() throws Exception { + conn.tableOperations().create("foo"); + conn.tableOperations().create("bar"); + + Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo")); + Text tableId2 = new Text(conn.tableOperations().tableIdMap().get("bar")); + + String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); + + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + + Mutation m = new Mutation(file1); + StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + + m = new Mutation(file2); + StatusSection.add(m, tableId2, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + + bw.close(); + + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat)); + + bw.addMutation(m); + + m = new Mutation(ReplicationSection.getRowPrefix() + file2); + m.put(ReplicationSection.COLF, tableId2, ProtobufUtil.toValue(stat)); + + bw.close(); + + final AtomicBoolean done = new AtomicBoolean(false); + final AtomicBoolean exception = new AtomicBoolean(false); + + final ReplicationOperationsImpl roi = getReplicationOperations(); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + roi.drain("foo"); + } catch (Exception e) { + log.error("Got error", e); + exception.set(true); + } + done.set(true); + } + }); + + t.start(); + + // With the records, we shouldn't be drained + Assert.assertFalse(done.get()); + + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.putDelete(ReplicationSection.COLF, tableId1); + bw.addMutation(m); + bw.flush(); + + // Removing metadata entries doesn't change anything + Assert.assertFalse(done.get()); + + // Remove the replication entries too + bw = ReplicationTable.getBatchWriter(conn); + m = new Mutation(file1); + m.putDelete(StatusSection.NAME, tableId1); + bw.addMutation(m); + bw.flush(); + + try { + t.join(5000); + } catch (InterruptedException e) { + Assert.fail("ReplicationOperations.drain did not complete"); + } + + // After both metadata and replication + Assert.assertTrue("Drain never completed", done.get()); + Assert.assertFalse("Saw unexpected exception", exception.get()); + } + + @Test + public void inprogressReplicationRecordsBlockExecution() throws Exception { + conn.tableOperations().create("foo"); + + Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo")); + + String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); + + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + + Mutation m = new Mutation(file1); + StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + bw.close(); + + LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId1), null, null), System.currentTimeMillis(), "tserver", file1); + + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + + m = new Mutation(logEntry.getRow()); + m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue()); + bw.addMutation(m); + + bw.close(); + + final AtomicBoolean done = new AtomicBoolean(false); + final AtomicBoolean exception = new AtomicBoolean(false); + final ReplicationOperationsImpl roi = getReplicationOperations(); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + roi.drain("foo"); + } catch (Exception e) { + log.error("Got error", e); + exception.set(true); + } + done.set(true); + } + }); + + t.start(); + + // With the records, we shouldn't be drained + Assert.assertFalse(done.get()); + + Status newStatus = Status.newBuilder().setBegin(1000).setEnd(2000).setInfiniteEnd(false).setClosed(true).build(); + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(newStatus)); + bw.addMutation(m); + bw.flush(); + + // Removing metadata entries doesn't change anything + Assert.assertFalse(done.get()); + + // Remove the replication entries too + bw = ReplicationTable.getBatchWriter(conn); + m = new Mutation(file1); + m.put(StatusSection.NAME, tableId1, ProtobufUtil.toValue(newStatus)); + bw.addMutation(m); + bw.flush(); + + try { + t.join(5000); + } catch (InterruptedException e) { + Assert.fail("ReplicationOperations.drain did not complete"); + } + + // New records, but not fully replicated ones don't cause it to complete + Assert.assertFalse("Drain somehow finished", done.get()); + Assert.assertFalse("Saw unexpected exception", exception.get()); + } + + @Test + public void laterCreatedLogsDontBlockExecution() throws Exception { + conn.tableOperations().create("foo"); + + Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo")); + + String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); + + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + Mutation m = new Mutation(file1); + StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + bw.close(); + + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + + bw.close(); + + log.info("Reading metadata first time"); + for (Entry e : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + log.info("{}", e.getKey()); + } + + final AtomicBoolean done = new AtomicBoolean(false); + final AtomicBoolean exception = new AtomicBoolean(false); + final ReplicationOperationsImpl roi = getReplicationOperations(); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + roi.drain("foo"); + } catch (Exception e) { + log.error("Got error", e); + exception.set(true); + } + done.set(true); + } + }); + + t.start(); + + // We need to wait long enough for the table to read once + Thread.sleep(2000); + + // Write another file, but also delete the old files + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + "/accumulo/wals/tserver+port/" + UUID.randomUUID()); + m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat)); + bw.addMutation(m); + m = new Mutation(ReplicationSection.getRowPrefix() + file1); + m.putDelete(ReplicationSection.COLF, tableId1); + bw.addMutation(m); + bw.close(); + + log.info("Reading metadata second time"); + for (Entry e : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + log.info("{}", e.getKey()); + } + + bw = ReplicationTable.getBatchWriter(conn); + m = new Mutation(file1); + m.putDelete(StatusSection.NAME, tableId1); + bw.addMutation(m); + bw.close(); + + try { + t.join(5000); + } catch (InterruptedException e) { + Assert.fail("ReplicationOperations.drain did not complete"); + } + + // We should pass immediately because we aren't waiting on both files to be deleted (just the one that we did) + Assert.assertTrue("Drain didn't finish", done.get()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java new file mode 100644 index 0000000..5668a67 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.ReplicationTarget; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.master.replication.SequentialWorkAssigner; +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper; +import org.apache.accumulo.server.replication.proto.Replication.Status; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; +import org.apache.accumulo.server.zookeeper.ZooCache; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SequentialWorkAssignerIT extends ConfigurableMacBase { + + private Connector conn; + private MockSequentialWorkAssigner assigner; + + private static class MockSequentialWorkAssigner extends SequentialWorkAssigner { + + public MockSequentialWorkAssigner(Connector conn) { + super(null, conn); + } + + @Override + public void setConnector(Connector conn) { + super.setConnector(conn); + } + + @Override + public void setQueuedWork(Map> queuedWork) { + super.setQueuedWork(queuedWork); + } + + @Override + public void setWorkQueue(DistributedWorkQueue workQueue) { + super.setWorkQueue(workQueue); + } + + @Override + public void setMaxQueueSize(int maxQueueSize) { + super.setMaxQueueSize(maxQueueSize); + } + + @Override + public void createWork() { + super.createWork(); + } + + @Override + public void setZooCache(ZooCache zooCache) { + super.setZooCache(zooCache); + } + + @Override + public void cleanupFinishedWork() { + super.cleanupFinishedWork(); + } + + } + + @Before + public void init() throws Exception { + conn = getConnector(); + assigner = new MockSequentialWorkAssigner(conn); + // grant ourselves write to the replication table + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ); + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE); + ReplicationTable.setOnline(conn); + } + + @Test + public void createWorkForFilesInCorrectOrder() throws Exception { + ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1"); + Text serializedTarget = target.toText(); + + // Create two mutations, both of which need replication work done + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + // We want the name of file2 to sort before file1 + String filename1 = "z_file1", filename2 = "a_file1"; + String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2; + + // File1 was closed before file2, however + Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build(); + Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build(); + + Mutation m = new Mutation(file1); + WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1)); + bw.addMutation(m); + + m = new Mutation(file2); + WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2)); + bw.addMutation(m); + + m = OrderSection.createMutation(file1, stat1.getCreatedTime()); + OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1)); + bw.addMutation(m); + + m = OrderSection.createMutation(file2, stat2.getCreatedTime()); + OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2)); + bw.addMutation(m); + + bw.close(); + + DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); + Map> queuedWork = new HashMap<>(); + assigner.setQueuedWork(queuedWork); + assigner.setWorkQueue(workQueue); + assigner.setMaxQueueSize(Integer.MAX_VALUE); + + // Make sure we expect the invocations in the correct order (accumulo is sorted) + workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), file1); + expectLastCall().once(); + + // file2 is *not* queued because file1 must be replicated first + + replay(workQueue); + + assigner.createWork(); + + verify(workQueue); + + Assert.assertEquals(1, queuedWork.size()); + Assert.assertTrue(queuedWork.containsKey("cluster1")); + Map cluster1Work = queuedWork.get("cluster1"); + Assert.assertEquals(1, cluster1Work.size()); + Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId())); + } + + @Test + public void workAcrossTablesHappensConcurrently() throws Exception { + ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"); + Text serializedTarget1 = target1.toText(); + + ReplicationTarget target2 = new ReplicationTarget("cluster1", "table2", "2"); + Text serializedTarget2 = target2.toText(); + + // Create two mutations, both of which need replication work done + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + // We want the name of file2 to sort before file1 + String filename1 = "z_file1", filename2 = "a_file1"; + String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2; + + // File1 was closed before file2, however + Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build(); + Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build(); + + Mutation m = new Mutation(file1); + WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(stat1)); + bw.addMutation(m); + + m = new Mutation(file2); + WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(stat2)); + bw.addMutation(m); + + m = OrderSection.createMutation(file1, stat1.getCreatedTime()); + OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(stat1)); + bw.addMutation(m); + + m = OrderSection.createMutation(file2, stat2.getCreatedTime()); + OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(stat2)); + bw.addMutation(m); + + bw.close(); + + DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); + Map> queuedWork = new HashMap<>(); + assigner.setQueuedWork(queuedWork); + assigner.setWorkQueue(workQueue); + assigner.setMaxQueueSize(Integer.MAX_VALUE); + + // Make sure we expect the invocations in the correct order (accumulo is sorted) + workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), file1); + expectLastCall().once(); + + workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), file2); + expectLastCall().once(); + + // file2 is *not* queued because file1 must be replicated first + + replay(workQueue); + + assigner.createWork(); + + verify(workQueue); + + Assert.assertEquals(1, queuedWork.size()); + Assert.assertTrue(queuedWork.containsKey("cluster1")); + + Map cluster1Work = queuedWork.get("cluster1"); + Assert.assertEquals(2, cluster1Work.size()); + Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId())); + + Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId())); + } + + @Test + public void workAcrossPeersHappensConcurrently() throws Exception { + ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"); + Text serializedTarget1 = target1.toText(); + + ReplicationTarget target2 = new ReplicationTarget("cluster2", "table1", "1"); + Text serializedTarget2 = target2.toText(); + + // Create two mutations, both of which need replication work done + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + // We want the name of file2 to sort before file1 + String filename1 = "z_file1", filename2 = "a_file1"; + String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2; + + // File1 was closed before file2, however + Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build(); + Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build(); + + Mutation m = new Mutation(file1); + WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(stat1)); + bw.addMutation(m); + + m = new Mutation(file2); + WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(stat2)); + bw.addMutation(m); + + m = OrderSection.createMutation(file1, stat1.getCreatedTime()); + OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(stat1)); + bw.addMutation(m); + + m = OrderSection.createMutation(file2, stat2.getCreatedTime()); + OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(stat2)); + bw.addMutation(m); + + bw.close(); + + DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); + Map> queuedWork = new HashMap<>(); + assigner.setQueuedWork(queuedWork); + assigner.setWorkQueue(workQueue); + assigner.setMaxQueueSize(Integer.MAX_VALUE); + + // Make sure we expect the invocations in the correct order (accumulo is sorted) + workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), file1); + expectLastCall().once(); + + workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), file2); + expectLastCall().once(); + + // file2 is *not* queued because file1 must be replicated first + + replay(workQueue); + + assigner.createWork(); + + verify(workQueue); + + Assert.assertEquals(2, queuedWork.size()); + Assert.assertTrue(queuedWork.containsKey("cluster1")); + + Map cluster1Work = queuedWork.get("cluster1"); + Assert.assertEquals(1, cluster1Work.size()); + Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId())); + + Map cluster2Work = queuedWork.get("cluster2"); + Assert.assertEquals(1, cluster2Work.size()); + Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId())); + } + + @Test + public void reprocessingOfCompletedWorkRemovesWork() throws Exception { + ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1"); + Text serializedTarget = target.toText(); + + // Create two mutations, both of which need replication work done + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + // We want the name of file2 to sort before file1 + String filename1 = "z_file1", filename2 = "a_file1"; + String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2; + + // File1 was closed before file2, however + Status stat1 = Status.newBuilder().setBegin(100).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build(); + Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build(); + + Mutation m = new Mutation(file1); + WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1)); + bw.addMutation(m); + + m = new Mutation(file2); + WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2)); + bw.addMutation(m); + + m = OrderSection.createMutation(file1, stat1.getCreatedTime()); + OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1)); + bw.addMutation(m); + + m = OrderSection.createMutation(file2, stat2.getCreatedTime()); + OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2)); + bw.addMutation(m); + + bw.close(); + + DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); + + // Treat filename1 as we have already submitted it for replication + Map> queuedWork = new HashMap<>(); + Map queuedWorkForCluster = new HashMap<>(); + queuedWorkForCluster.put(target.getSourceTableId(), DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target)); + queuedWork.put("cluster1", queuedWorkForCluster); + + assigner.setQueuedWork(queuedWork); + assigner.setWorkQueue(workQueue); + assigner.setMaxQueueSize(Integer.MAX_VALUE); + + // Make sure we expect the invocations in the correct order (accumulo is sorted) + workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), file2); + expectLastCall().once(); + + // file2 is queued because we remove file1 because it's fully replicated + + replay(workQueue); + + assigner.createWork(); + + verify(workQueue); + + Assert.assertEquals(1, queuedWork.size()); + Assert.assertTrue(queuedWork.containsKey("cluster1")); + Map cluster1Work = queuedWork.get("cluster1"); + Assert.assertEquals(1, cluster1Work.size()); + Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId())); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java new file mode 100644 index 0000000..cb34ed2 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection; +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.master.replication.StatusMaker; +import org.apache.accumulo.server.replication.StatusUtil; +import org.apache.accumulo.server.replication.proto.Replication.Status; +import org.apache.accumulo.server.util.ReplicationTableUtil; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +public class StatusMakerIT extends ConfigurableMacBase { + + private Connector conn; + + @Before + public void setupInstance() throws Exception { + conn = getConnector(); + ReplicationTable.setOnline(conn); + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE); + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ); + } + + @Test + public void statusRecordsCreated() throws Exception { + String sourceTable = testName.getMethodName(); + conn.tableOperations().create(sourceTable); + ReplicationTableUtil.configureMetadataTable(conn, sourceTable); + + BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig()); + String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/"; + Set files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), + walPrefix + UUID.randomUUID()); + Map fileToTableId = new HashMap<>(); + + int index = 1; + long timeCreated = 0; + Map fileToTimeCreated = new HashMap<>(); + for (String file : files) { + Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file); + m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), StatusUtil.fileCreatedValue(timeCreated)); + fileToTimeCreated.put(file, timeCreated); + bw.addMutation(m); + fileToTableId.put(file, index); + index++; + timeCreated++; + } + + bw.close(); + + StatusMaker statusMaker = new StatusMaker(conn); + statusMaker.setSourceTableName(sourceTable); + + statusMaker.run(); + + Scanner s = ReplicationTable.getScanner(conn); + StatusSection.limit(s); + Text file = new Text(), tableId = new Text(); + for (Entry entry : s) { + StatusSection.getFile(entry.getKey(), file); + StatusSection.getTableId(entry.getKey(), tableId); + + Assert.assertTrue("Found unexpected file: " + file, files.contains(file.toString())); + Assert.assertEquals(fileToTableId.get(file.toString()), new Integer(tableId.toString())); + timeCreated = fileToTimeCreated.get(file.toString()); + Assert.assertNotNull(timeCreated); + Assert.assertEquals(StatusUtil.fileCreated(timeCreated), Status.parseFrom(entry.getValue().get())); + } + } + + @Test + public void openMessagesAreNotDeleted() throws Exception { + String sourceTable = testName.getMethodName(); + conn.tableOperations().create(sourceTable); + ReplicationTableUtil.configureMetadataTable(conn, sourceTable); + + BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig()); + String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/"; + Set files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), + walPrefix + UUID.randomUUID()); + Map fileToTableId = new HashMap<>(); + + int index = 1; + long timeCreated = 0; + for (String file : files) { + Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file); + m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), StatusUtil.fileCreatedValue(timeCreated)); + bw.addMutation(m); + fileToTableId.put(file, index); + index++; + timeCreated++; + } + + bw.close(); + + StatusMaker statusMaker = new StatusMaker(conn); + statusMaker.setSourceTableName(sourceTable); + + statusMaker.run(); + + Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY); + s.setRange(ReplicationSection.getRange()); + s.fetchColumnFamily(ReplicationSection.COLF); + Assert.assertEquals(files.size(), Iterables.size(s)); + } + + @Test + public void closedMessagesAreDeleted() throws Exception { + String sourceTable = testName.getMethodName(); + conn.tableOperations().create(sourceTable); + ReplicationTableUtil.configureMetadataTable(conn, sourceTable); + + BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig()); + String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/"; + Set files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), + walPrefix + UUID.randomUUID()); + Map fileToTableId = new HashMap<>(); + + Status stat = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).setCreatedTime(System.currentTimeMillis()).build(); + + int index = 1; + for (String file : files) { + Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file); + m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), ProtobufUtil.toValue(stat)); + bw.addMutation(m); + fileToTableId.put(file, index); + index++; + } + + bw.close(); + + StatusMaker statusMaker = new StatusMaker(conn); + statusMaker.setSourceTableName(sourceTable); + + statusMaker.run(); + + Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY); + s.setRange(ReplicationSection.getRange()); + s.fetchColumnFamily(ReplicationSection.COLF); + for (Entry e : s) { + System.out.println(e.getKey().toStringNoTruncate() + " " + e.getValue()); + } + s = conn.createScanner(sourceTable, Authorizations.EMPTY); + s.setRange(ReplicationSection.getRange()); + s.fetchColumnFamily(ReplicationSection.COLF); + Assert.assertEquals(0, Iterables.size(s)); + + } + + @Test + public void closedMessagesCreateOrderRecords() throws Exception { + String sourceTable = testName.getMethodName(); + conn.tableOperations().create(sourceTable); + ReplicationTableUtil.configureMetadataTable(conn, sourceTable); + + BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig()); + String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/"; + List files = Arrays.asList(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), + walPrefix + UUID.randomUUID()); + Map fileToTableId = new HashMap<>(); + + Status.Builder statBuilder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true); + + int index = 1; + long time = System.currentTimeMillis(); + for (String file : files) { + statBuilder.setCreatedTime(time++); + Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file); + m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), ProtobufUtil.toValue(statBuilder.build())); + bw.addMutation(m); + fileToTableId.put(file, index); + index++; + } + + bw.close(); + + StatusMaker statusMaker = new StatusMaker(conn); + statusMaker.setSourceTableName(sourceTable); + + statusMaker.run(); + + Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY); + s.setRange(ReplicationSection.getRange()); + s.fetchColumnFamily(ReplicationSection.COLF); + Assert.assertEquals(0, Iterables.size(s)); + + s = ReplicationTable.getScanner(conn); + OrderSection.limit(s); + Iterator> iter = s.iterator(); + Assert.assertTrue("Found no order records in replication table", iter.hasNext()); + + Iterator expectedFiles = files.iterator(); + Text buff = new Text(); + while (expectedFiles.hasNext() && iter.hasNext()) { + String file = expectedFiles.next(); + Entry entry = iter.next(); + + Assert.assertEquals(file, OrderSection.getFile(entry.getKey(), buff)); + OrderSection.getTableId(entry.getKey(), buff); + Assert.assertEquals(fileToTableId.get(file).intValue(), Integer.parseInt(buff.toString())); + } + + Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext()); + Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerIT.java new file mode 100644 index 0000000..048fa94 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerIT.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.ReplicationTarget; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.master.replication.UnorderedWorkAssigner; +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper; +import org.apache.accumulo.server.replication.StatusUtil; +import org.apache.accumulo.server.replication.proto.Replication.Status; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; +import org.apache.accumulo.server.zookeeper.ZooCache; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.Before; +import org.junit.Test; + +public class UnorderedWorkAssignerIT extends ConfigurableMacBase { + + private Connector conn; + private MockUnorderedWorkAssigner assigner; + + private static class MockUnorderedWorkAssigner extends UnorderedWorkAssigner { + public MockUnorderedWorkAssigner(Connector conn) { + super(null, conn); + } + + @Override + protected void setQueuedWork(Set queuedWork) { + super.setQueuedWork(queuedWork); + } + + @Override + protected void setWorkQueue(DistributedWorkQueue workQueue) { + super.setWorkQueue(workQueue); + } + + @Override + protected boolean queueWork(Path path, ReplicationTarget target) { + return super.queueWork(path, target); + } + + @Override + protected void initializeQueuedWork() { + super.initializeQueuedWork(); + } + + @Override + protected Set getQueuedWork() { + return super.getQueuedWork(); + } + + @Override + protected void setConnector(Connector conn) { + super.setConnector(conn); + } + + @Override + protected void setMaxQueueSize(int maxQueueSize) { + super.setMaxQueueSize(maxQueueSize); + } + + @Override + protected void createWork() { + super.createWork(); + } + + @Override + protected void setZooCache(ZooCache zooCache) { + super.setZooCache(zooCache); + } + + @Override + protected void cleanupFinishedWork() { + super.cleanupFinishedWork(); + } + } + + @Before + public void init() throws Exception { + conn = getConnector(); + assigner = new MockUnorderedWorkAssigner(conn); + ReplicationTable.setOnline(conn); + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE); + conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ); + } + + @Test + public void createWorkForFilesNeedingIt() throws Exception { + ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2"); + Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText(); + String keyTarget1 = target1.getPeerName() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target1.getRemoteIdentifier() + + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target1.getSourceTableId(), keyTarget2 = target2.getPeerName() + + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target2.getRemoteIdentifier() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + + target2.getSourceTableId(); + + Status.Builder builder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).setCreatedTime(5l); + Status status1 = builder.build(); + builder.setCreatedTime(10l); + Status status2 = builder.build(); + + // Create two mutations, both of which need replication work done + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString(); + String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2; + Mutation m = new Mutation(file1); + WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(status1)); + bw.addMutation(m); + m = OrderSection.createMutation(file1, status1.getCreatedTime()); + OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(status1)); + bw.addMutation(m); + + m = new Mutation(file2); + WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(status2)); + bw.addMutation(m); + m = OrderSection.createMutation(file2, status2.getCreatedTime()); + OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(status2)); + bw.addMutation(m); + + bw.close(); + + DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); + HashSet queuedWork = new HashSet<>(); + assigner.setQueuedWork(queuedWork); + assigner.setWorkQueue(workQueue); + assigner.setMaxQueueSize(Integer.MAX_VALUE); + + // Make sure we expect the invocations in the order they were created + String key = filename1 + "|" + keyTarget1; + workQueue.addWork(key, file1); + expectLastCall().once(); + + key = filename2 + "|" + keyTarget2; + workQueue.addWork(key, file2); + expectLastCall().once(); + + replay(workQueue); + + assigner.createWork(); + + verify(workQueue); + } + + @Test + public void doNotCreateWorkForFilesNotNeedingIt() throws Exception { + ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2"); + Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText(); + + // Create two mutations, both of which need replication work done + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString(); + String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2; + + Mutation m = new Mutation(file1); + WorkSection.add(m, serializedTarget1, StatusUtil.fileCreatedValue(5)); + bw.addMutation(m); + + m = new Mutation(file2); + WorkSection.add(m, serializedTarget2, StatusUtil.fileCreatedValue(10)); + bw.addMutation(m); + + bw.close(); + + DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); + HashSet queuedWork = new HashSet<>(); + assigner.setQueuedWork(queuedWork); + assigner.setMaxQueueSize(Integer.MAX_VALUE); + + replay(workQueue); + + assigner.createWork(); + + verify(workQueue); + } + + @Test + public void workNotReAdded() throws Exception { + Set queuedWork = new HashSet<>(); + + assigner.setQueuedWork(queuedWork); + + ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1"); + String serializedTarget = target.getPeerName() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getRemoteIdentifier() + + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getSourceTableId(); + + queuedWork.add("wal1|" + serializedTarget.toString()); + + // Create two mutations, both of which need replication work done + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + String file1 = "/accumulo/wal/tserver+port/wal1"; + Mutation m = new Mutation(file1); + WorkSection.add(m, target.toText(), StatusUtil.openWithUnknownLengthValue()); + bw.addMutation(m); + + bw.close(); + + DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); + assigner.setWorkQueue(workQueue); + assigner.setMaxQueueSize(Integer.MAX_VALUE); + + replay(workQueue); + + assigner.createWork(); + + verify(workQueue); + } +}