Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7B6AE184F4 for ; Thu, 21 May 2015 04:04:08 +0000 (UTC) Received: (qmail 9185 invoked by uid 500); 21 May 2015 04:04:08 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 9154 invoked by uid 500); 21 May 2015 04:04:08 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 9145 invoked by uid 99); 21 May 2015 04:04:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 May 2015 04:04:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BE7BEE4421; Thu, 21 May 2015 04:04:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Thu, 21 May 2015 04:04:09 -0000 Message-Id: <4fc07dcc30d34de09b659e9ac58cf4d6@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] accumulo git commit: Merge branch '1.7' Merge branch '1.7' Conflicts: test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cf9b9a4e Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cf9b9a4e Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cf9b9a4e Branch: refs/heads/master Commit: cf9b9a4ea6cfd03a053e64c7feca964ca9c026aa Parents: 9cd8273 81fdad8 Author: Josh Elser Authored: Thu May 21 00:03:56 2015 -0400 Committer: Josh Elser Committed: Thu May 21 00:03:56 2015 -0400 ---------------------------------------------------------------------- .../test/replication/ReplicationIT.java | 93 +++++++++++--------- 1 file changed, 50 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/cf9b9a4e/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java index 54b42f4,e2ee215..ef81f2c --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java @@@ -132,32 -130,21 +133,33 @@@ public class ReplicationIT extends Conf } private Multimap getLogs(Connector conn) throws TableNotFoundException { - Multimap logs = HashMultimap.create(); + // Map of server to tableId + Multimap serverToTableID = HashMultimap.create(); Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - scanner.fetchColumnFamily(LogColumnFamily.NAME); - scanner.setRange(new Range()); + scanner.setRange(MetadataSchema.TabletsSection.getRange()); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); + for (Entry entry : scanner) { + TServerInstance key = new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier()); + byte[] tableId = KeyExtent.tableOfMetadataRow(entry.getKey().getRow()); + serverToTableID.put(key, new String(tableId, UTF_8)); + } + // Map of logs to tableId + Multimap logs = HashMultimap.create(); + scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.setRange(MetadataSchema.CurrentLogsSection.getRange()); for (Entry entry : scanner) { if (Thread.interrupted()) { + Thread.currentThread().interrupt(); return logs; } - - LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue()); - - for (String log : logEntry.logSet) { - // Need to normalize the log file from LogEntry - logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString()); + Text path = new Text(); + MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path); + Text session = new Text(); + Text hostPort = new Text(); + MetadataSchema.CurrentLogsSection.getTabletServer(entry.getKey(), hostPort, session); + TServerInstance server = new TServerInstance(AddressUtil.parseAddress(hostPort.toString(), false), session.toString()); + for (String tableId : serverToTableID.get(server)) { + logs.put(new Path(path.toString()).toString(), tableId); } } return logs; @@@ -575,12 -638,12 +603,13 @@@ // We should have *some* reference to each log that was seen in the metadata table // They might not yet all be closed though (might be newfile) - Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles); + Assert.assertTrue("Metadata log distribution: " + logs + "replFiles " + replFiles, logs.keySet().containsAll(replFiles)); + Assert.assertTrue("Difference between replication entries and current logs is bigger than one", logs.keySet().size() - replFiles.size() <= 1); + final Configuration conf = new Configuration(); for (String replFile : replFiles) { Path p = new Path(replFile); - FileSystem fs = p.getFileSystem(new Configuration()); + FileSystem fs = p.getFileSystem(conf); Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p)); } } @@@ -631,53 -694,59 +660,31 @@@ conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null); - final AtomicBoolean keepRunning = new AtomicBoolean(true); - final Set metadataWals = new HashSet<>(); - - Thread t = new Thread(new Runnable() { - @Override - public void run() { - // Should really be able to interrupt here, but the Scanner throws a fit to the logger - // when that happens - while (keepRunning.get()) { - try { - metadataWals.addAll(getLogs(conn).keySet()); - } catch (Exception e) { - log.error("Metadata table doesn't exist"); - } - } - } - - }); - - t.start(); - String table1 = "table1", table2 = "table2", table3 = "table3"; - try { - conn.tableOperations().create(table1); - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - conn.tableOperations().create(table2); - conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - conn.tableOperations().create(table3); - conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); + conn.tableOperations().create(table1); + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); + conn.tableOperations().create(table2); + conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true"); + conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); + conn.tableOperations().create(table3); + conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true"); + conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - writeSomeData(conn, table1, 200, 500); - // Write some data to table1 - BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } ++ writeSomeData(conn, table1, 200, 500); - writeSomeData(conn, table2, 200, 500); - bw.close(); - - // Write some data to table2 - bw = conn.createBatchWriter(table2, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } ++ writeSomeData(conn, table2, 200, 500); - writeSomeData(conn, table3, 200, 500); - bw.close(); ++ writeSomeData(conn, table3, 200, 500); - // Flush everything to try to make the replication records - for (String table : Arrays.asList(table1, table2, table3)) { - conn.tableOperations().flush(table, null, null, true); - // Write some data to table3 - bw = conn.createBatchWriter(table3, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); -- } - bw.addMutation(m); ++ // Flush everything to try to make the replication records ++ for (String table : Arrays.asList(table1, table2, table3)) { ++ conn.tableOperations().flush(table, null, null, true); + } - } finally { - keepRunning.set(false); - t.join(5000); - bw.close(); - + // Flush everything to try to make the replication records + for (String table : Arrays.asList(table1, table2, table3)) { + conn.tableOperations().flush(table, null, null, true); } for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) {