accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [4/9] git commit: ACCUMULO-3249 New IT which ensures that closed status msgs aren't written for wals with no records for the tablet
Date Thu, 23 Oct 2014 18:58:21 GMT
ACCUMULO-3249 New IT which ensures that closed status msgs aren't written for wals with no
records for the tablet


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6512bfd5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6512bfd5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6512bfd5

Branch: refs/heads/master
Commit: 6512bfd5ef9d8c3bbb314ddcdc8d3af4bcd6f9dd
Parents: 6cbc586
Author: Josh Elser <elserj@apache.org>
Authored: Wed Oct 22 19:32:24 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Thu Oct 23 11:18:59 2014 -0400

----------------------------------------------------------------------
 .../UnusedWalDoesntCloseReplicationStatus.java  | 223 +++++++++++++++++++
 1 file changed, 223 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/6512bfd5/test/src/test/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatus.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatus.java
b/test/src/test/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatus.java
new file mode 100644
index 0000000..5590f7b
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatus.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+/**
+ *
+ */
+public class UnusedWalDoesntCloseReplicationStatus extends ConfigurableMacIT {
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+    cfg.setNumTservers(1);
+  }
+
+  @Test
+  public void test() throws Exception {
+    File accumuloDir = this.getCluster().getConfig().getAccumuloDir();
+    final Connector conn = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+
+    conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+    conn.tableOperations().create(tableName);
+
+    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+    final int numericTableId = Integer.valueOf(tableId);
+    final int fakeTableId = numericTableId + 1;
+
+    Assert.assertNotNull("Did not find table ID", tableId);
+
+    conn.tableOperations().setProperty(tableName, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(tableName, Property.TABLE_REPLICATION_TARGET.getKey()
+ "cluster1", "1");
+    // just sleep
+    conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+        ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "50000"));
+
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    File tserverWalDir = new File(accumuloDir, ServerConstants.WAL_DIR + Path.SEPARATOR +
"faketserver+port");
+    File tserverWal = new File(tserverWalDir, UUID.randomUUID().toString());
+    fs.mkdirs(new Path(tserverWalDir.getAbsolutePath()));
+
+    // Make a fake WAL with no data in it for our real table
+    FSDataOutputStream out = fs.create(new Path(tserverWal.getAbsolutePath()));
+
+    out.write(DfsLogger.LOG_FILE_HEADER_V3.getBytes(StandardCharsets.UTF_8));
+
+    DataOutputStream dos = new DataOutputStream(out);
+    dos.writeUTF("NullCryptoModule");
+
+    // Fake a single update WAL that has a mutation for another table
+    LogFileKey key = new LogFileKey();
+    LogFileValue value = new LogFileValue();
+
+    key.event = OPEN;
+    key.tserverSession = tserverWal.getAbsolutePath();
+    key.filename = tserverWal.getAbsolutePath();
+    key.write(out);
+    value.write(out);
+
+    key.event = LogEvents.DEFINE_TABLET;
+    key.tablet = new KeyExtent(new Text(Integer.toString(fakeTableId)), null, null);
+    key.seq = 1l;
+    key.tid = 1;
+
+    key.write(dos);
+    value.write(dos);
+
+    key.tablet = null;
+    key.event = LogEvents.MUTATION;
+    key.filename = tserverWal.getAbsolutePath();
+    value.mutations = Arrays.<Mutation> asList(new ServerMutation(new Text("row")));
+
+    key.write(dos);
+    value.write(dos);
+
+    key.event = LogEvents.COMPACTION_START;
+    key.filename = accumuloDir.getAbsolutePath() + "/tables/" + fakeTableId + "/t-000001/A000001.rf";
+    value.mutations = Collections.emptyList();
+
+    key.write(dos);
+    value.write(dos);
+
+    key.event = LogEvents.COMPACTION_FINISH;
+    value.mutations = Collections.emptyList();
+
+    key.write(dos);
+    value.write(dos);
+
+    dos.close();
+
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    Mutation m = new Mutation("m");
+    m.put("m", "m", "M");
+    bw.addMutation(m);
+    bw.close();
+
+    log.info("State of metadata table after inserting a record");
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+    for (Entry<Key,Value> entry : s) {
+      System.out.println(entry.getKey().toStringNoTruncate() + " " + entry.getValue());
+    }
+
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.ReplicationSection.getRange());
+    for (Entry<Key,Value> entry : s) {
+      System.out.println(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
+    }
+
+    log.info("Offline'ing table");
+
+    conn.tableOperations().offline(tableName, true);
+
+    // Add our fake WAL to the log column for this table
+    String walUri = tserverWal.toURI().toString();
+    KeyExtent extent = new KeyExtent(new Text(tableId), null, null);
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(extent.getMetadataEntry());
+    m.put(MetadataSchema.TabletsSection.LogColumnFamily.NAME, new Text("localhost:12345/"
+ walUri),
+        new Value((walUri + "|1").getBytes(StandardCharsets.UTF_8)));
+    bw.addMutation(m);
+
+    // Add a replication entry for our fake WAL
+    m = new Mutation(MetadataSchema.ReplicationSection.getRowPrefix() + new Path(walUri).toString());
+    m.put(MetadataSchema.ReplicationSection.COLF, new Text(tableId), new Value(StatusUtil.fileCreated(System.currentTimeMillis()).toByteArray()));
+    bw.addMutation(m);
+    bw.close();
+
+    log.info("State of metadata after injecting WAL manually");
+
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+    for (Entry<Key,Value> entry : s) {
+      log.info(entry.getKey().toStringNoTruncate() + " " + entry.getValue());
+    }
+
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.ReplicationSection.getRange());
+    for (Entry<Key,Value> entry : s) {
+      log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
+    }
+
+    log.info("Bringing table online");
+    conn.tableOperations().online(tableName, true);
+
+    Assert.assertEquals(1, Iterables.size(conn.createScanner(tableName, Authorizations.EMPTY)));
+
+    log.info("Table has performed recovery, state of metadata:");
+
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+    for (Entry<Key,Value> entry : s) {
+      log.info(entry.getKey().toStringNoTruncate() + " " + entry.getValue());
+    }
+
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.ReplicationSection.getRange());
+    for (Entry<Key,Value> entry : s) {
+      Status status = Status.parseFrom(entry.getValue().get());
+      log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(status));
+      Assert.assertFalse("Status record was closed and it should not be", status.getClosed());
+    }
+  }
+}


Mime
View raw message