Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5FC571049B for ; Fri, 6 Sep 2013 01:48:37 +0000 (UTC) Received: (qmail 19519 invoked by uid 500); 6 Sep 2013 01:48:36 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 19174 invoked by uid 500); 6 Sep 2013 01:48:34 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 17743 invoked by uid 99); 6 Sep 2013 01:48:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Sep 2013 01:48:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 8DB76901D8B; Fri, 6 Sep 2013 01:48:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Fri, 06 Sep 2013 01:49:20 -0000 Message-Id: <6967ff4ff58e4d64b4effdc39fb8db5e@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [52/53] [abbrv] ACCUMULO-658 Move tests and resources to correct modules http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilterTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilterTest.java b/server/base/src/test/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilterTest.java new file mode 100644 index 0000000..1b95531 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilterTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.iterators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.TreeMap; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.SortedMapIterator; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator; +import org.apache.accumulo.tserver.iterators.MetadataBulkLoadFilter; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; + +/** + * + */ +public class MetadataBulkLoadFilterTest { + static class TestArbitrator implements Arbitrator { + @Override + public boolean transactionAlive(String type, long tid) throws Exception { + return tid == 5; + } + + @Override + public boolean transactionComplete(String type, long tid) throws Exception { + if (tid == 9) + throw new RuntimeException(); + return tid != 5 && tid != 7; + } + } + + static class TestMetadataBulkLoadFilter extends MetadataBulkLoadFilter { + @Override + protected Arbitrator getArbitrator() { + return new TestArbitrator(); + } + } + + private static void put(TreeMap tm, String row, ColumnFQ cfq, String val) { + Key k = new Key(new Text(row), cfq.getColumnFamily(), cfq.getColumnQualifier()); + tm.put(k, new Value(val.getBytes())); + } + + private static void put(TreeMap tm, String row, Text cf, String cq, String val) { + Key k = new Key(new Text(row), cf, new Text(cq)); + if (val == null) { + k.setDeleted(true); + tm.put(k, new Value("".getBytes())); + } else + tm.put(k, new Value(val.getBytes())); + } + + @Test + public void testBasic() throws IOException { + TreeMap tm1 = new TreeMap(); + TreeMap expected = new TreeMap(); + + // following should not be deleted by filter + put(tm1, "2;m", TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, "/t1"); + put(tm1, "2;m", DataFileColumnFamily.NAME, "/t1/file1", "1,1"); + put(tm1, "2;m", TabletsSection.BulkFileColumnFamily.NAME, "/t1/file1", "5"); + put(tm1, "2;m", TabletsSection.BulkFileColumnFamily.NAME, "/t1/file3", "7"); + put(tm1, "2;m", TabletsSection.BulkFileColumnFamily.NAME, "/t1/file4", "9"); + put(tm1, "2<", TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, "/t2"); + put(tm1, "2<", DataFileColumnFamily.NAME, "/t2/file2", "1,1"); + put(tm1, "2<", TabletsSection.BulkFileColumnFamily.NAME, "/t2/file6", "5"); + put(tm1, "2<", TabletsSection.BulkFileColumnFamily.NAME, "/t2/file7", "7"); + put(tm1, "2<", TabletsSection.BulkFileColumnFamily.NAME, "/t2/file8", "9"); + put(tm1, "2<", TabletsSection.BulkFileColumnFamily.NAME, "/t2/fileC", null); + + expected.putAll(tm1); + + // the following should be deleted by filter + put(tm1, "2;m", TabletsSection.BulkFileColumnFamily.NAME, "/t1/file5", "8"); + put(tm1, "2<", TabletsSection.BulkFileColumnFamily.NAME, "/t2/file9", "8"); + put(tm1, "2<", TabletsSection.BulkFileColumnFamily.NAME, "/t2/fileA", "2"); + + TestMetadataBulkLoadFilter iter = new TestMetadataBulkLoadFilter(); + iter.init(new SortedMapIterator(tm1), new HashMap(), new IteratorEnvironment() { + + @Override + public SortedKeyValueIterator reserveMapFileReader(String mapFileName) throws IOException { + return null; + } + + @Override + public void registerSideChannel(SortedKeyValueIterator iter) {} + + @Override + public boolean isFullMajorCompaction() { + return false; + } + + @Override + public IteratorScope getIteratorScope() { + return IteratorScope.majc; + } + + @Override + public AccumuloConfiguration getConfig() { + return null; + } + }); + + iter.seek(new Range(), new ArrayList(), false); + + TreeMap actual = new TreeMap(); + + while (iter.hasTop()) { + actual.put(iter.getTopKey(), iter.getTopValue()); + iter.next(); + } + + Assert.assertEquals(expected, actual); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java b/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java new file mode 100644 index 0000000..f29fb27 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.security; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.ConnectorImpl; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.server.security.SystemCredentials.SystemToken; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * + */ +public class SystemCredentialsTest { + + @BeforeClass + public static void setUp() throws IOException { + File testInstanceId = new File(new File(new File(new File("target"), "instanceTest"), "instance_id"), UUID.fromString( + "00000000-0000-0000-0000-000000000000").toString()); + if (!testInstanceId.exists()) { + testInstanceId.getParentFile().mkdirs(); + testInstanceId.createNewFile(); + } + } + + /** + * This is a test to ensure the string literal in {@link ConnectorImpl#ConnectorImpl(Instance, Credentials)} is kept up-to-date if we move the + * {@link SystemToken}
+ * This check will not be needed after ACCUMULO-1578 + */ + @Test + public void testSystemToken() { + assertEquals("org.apache.accumulo.server.security.SystemCredentials$SystemToken", SystemToken.class.getName()); + assertEquals(SystemCredentials.get().getToken().getClass(), SystemToken.class); + } + + @Test + public void testSystemCredentials() { + Credentials a = SystemCredentials.get(); + Credentials b = SystemCredentials.get(); + assertTrue(a == b); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java b/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java new file mode 100644 index 0000000..9700c8a --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.security.handler; + +import java.util.Set; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.SystemPermission; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.util.ByteArraySet; +import org.apache.accumulo.server.security.handler.ZKSecurityTool; + +import junit.framework.TestCase; + +public class ZKAuthenticatorTest extends TestCase { + public void testPermissionIdConversions() { + for (SystemPermission s : SystemPermission.values()) + assertTrue(s.equals(SystemPermission.getPermissionById(s.getId()))); + + for (TablePermission s : TablePermission.values()) + assertTrue(s.equals(TablePermission.getPermissionById(s.getId()))); + } + + public void testAuthorizationConversion() { + ByteArraySet auths = new ByteArraySet(); + for (int i = 0; i < 300; i += 3) + auths.add(Integer.toString(i).getBytes()); + + Authorizations converted = new Authorizations(auths); + byte[] test = ZKSecurityTool.convertAuthorizations(converted); + Authorizations test2 = ZKSecurityTool.convertAuthorizations(test); + assertTrue(auths.size() == test2.size()); + for (byte[] s : auths) { + assertTrue(test2.contains(s)); + } + } + + public void testSystemConversion() { + Set perms = new TreeSet(); + for (SystemPermission s : SystemPermission.values()) + perms.add(s); + + Set converted = ZKSecurityTool.convertSystemPermissions(ZKSecurityTool.convertSystemPermissions(perms)); + assertTrue(perms.size() == converted.size()); + for (SystemPermission s : perms) + assertTrue(converted.contains(s)); + } + + public void testTableConversion() { + Set perms = new TreeSet(); + for (TablePermission s : TablePermission.values()) + perms.add(s); + + Set converted = ZKSecurityTool.convertTablePermissions(ZKSecurityTool.convertTablePermissions(perms)); + assertTrue(perms.size() == converted.size()); + for (TablePermission s : perms) + assertTrue(converted.contains(s)); + } + + public void testEncryption() { + byte[] rawPass = "myPassword".getBytes(); + byte[] storedBytes; + try { + storedBytes = ZKSecurityTool.createPass(rawPass); + assertTrue(ZKSecurityTool.checkPass(rawPass, storedBytes)); + } catch (AccumuloException e) { + e.printStackTrace(); + assertTrue(false); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/java/org/apache/accumulo/server/util/CloneTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/CloneTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/CloneTest.java new file mode 100644 index 0000000..e2d1ecb --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/util/CloneTest.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.util.HashSet; +import java.util.Map.Entry; + +import junit.framework.TestCase; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; + +public class CloneTest extends TestCase { + + public void testNoFiles() throws Exception { + MockInstance mi = new MockInstance(); + Connector conn = mi.getConnector("", new PasswordToken("")); + + KeyExtent ke = new KeyExtent(new Text("0"), null, null); + Mutation mut = ke.getPrevRowUpdateMutation(); + + TabletsSection.ServerColumnFamily.TIME_COLUMN.put(mut, new Value("M0".getBytes())); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut, new Value("/default_tablet".getBytes())); + + BatchWriter bw1 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + bw1.addMutation(mut); + + bw1.close(); + + BatchWriter bw2 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + MetadataTableUtil.initializeClone("0", "1", conn, bw2); + + int rc = MetadataTableUtil.checkClone("0", "1", conn, bw2); + + assertEquals(0, rc); + + // scan tables metadata entries and confirm the same + + } + + public void testFilesChange() throws Exception { + MockInstance mi = new MockInstance(); + Connector conn = mi.getConnector("", new PasswordToken("")); + + KeyExtent ke = new KeyExtent(new Text("0"), null, null); + Mutation mut = ke.getPrevRowUpdateMutation(); + + TabletsSection.ServerColumnFamily.TIME_COLUMN.put(mut, new Value("M0".getBytes())); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut, new Value("/default_tablet".getBytes())); + mut.put(DataFileColumnFamily.NAME.toString(), "/default_tablet/0_0.rf", "1,200"); + + BatchWriter bw1 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + bw1.addMutation(mut); + + bw1.flush(); + + BatchWriter bw2 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + MetadataTableUtil.initializeClone("0", "1", conn, bw2); + + Mutation mut2 = new Mutation(ke.getMetadataEntry()); + mut2.putDelete(DataFileColumnFamily.NAME.toString(), "/default_tablet/0_0.rf"); + mut2.put(DataFileColumnFamily.NAME.toString(), "/default_tablet/1_0.rf", "2,300"); + + bw1.addMutation(mut2); + bw1.flush(); + + int rc = MetadataTableUtil.checkClone("0", "1", conn, bw2); + + assertEquals(1, rc); + + rc = MetadataTableUtil.checkClone("0", "1", conn, bw2); + + assertEquals(0, rc); + + Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.setRange(new KeyExtent(new Text("1"), null, null).toMetadataRange()); + + HashSet files = new HashSet(); + + for (Entry entry : scanner) { + if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) + files.add(entry.getKey().getColumnQualifier().toString()); + } + + assertEquals(1, files.size()); + assertTrue(files.contains("../0/default_tablet/1_0.rf")); + + } + + // test split where files of children are the same + public void testSplit1() throws Exception { + MockInstance mi = new MockInstance(); + Connector conn = mi.getConnector("", new PasswordToken("")); + + BatchWriter bw1 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + bw1.addMutation(createTablet("0", null, null, "/default_tablet", "/default_tablet/0_0.rf")); + + bw1.flush(); + + BatchWriter bw2 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + MetadataTableUtil.initializeClone("0", "1", conn, bw2); + + bw1.addMutation(createTablet("0", "m", null, "/default_tablet", "/default_tablet/0_0.rf")); + bw1.addMutation(createTablet("0", null, "m", "/t-1", "/default_tablet/0_0.rf")); + + bw1.flush(); + + int rc = MetadataTableUtil.checkClone("0", "1", conn, bw2); + + assertEquals(0, rc); + + Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.setRange(new KeyExtent(new Text("1"), null, null).toMetadataRange()); + + HashSet files = new HashSet(); + + int count = 0; + for (Entry entry : scanner) { + if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) { + files.add(entry.getKey().getColumnQualifier().toString()); + count++; + } + } + + assertEquals(1, count); + assertEquals(1, files.size()); + assertTrue(files.contains("../0/default_tablet/0_0.rf")); + } + + // test split where files of children differ... like majc and split occurred + public void testSplit2() throws Exception { + MockInstance mi = new MockInstance(); + Connector conn = mi.getConnector("", new PasswordToken("")); + + BatchWriter bw1 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + bw1.addMutation(createTablet("0", null, null, "/default_tablet", "/default_tablet/0_0.rf")); + + bw1.flush(); + + BatchWriter bw2 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + MetadataTableUtil.initializeClone("0", "1", conn, bw2); + + bw1.addMutation(createTablet("0", "m", null, "/default_tablet", "/default_tablet/1_0.rf")); + Mutation mut3 = createTablet("0", null, "m", "/t-1", "/default_tablet/1_0.rf"); + mut3.putDelete(DataFileColumnFamily.NAME.toString(), "/default_tablet/0_0.rf"); + bw1.addMutation(mut3); + + bw1.flush(); + + int rc = MetadataTableUtil.checkClone("0", "1", conn, bw2); + + assertEquals(1, rc); + + rc = MetadataTableUtil.checkClone("0", "1", conn, bw2); + + assertEquals(0, rc); + + Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.setRange(new KeyExtent(new Text("1"), null, null).toMetadataRange()); + + HashSet files = new HashSet(); + + int count = 0; + + for (Entry entry : scanner) { + if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) { + files.add(entry.getKey().getColumnQualifier().toString()); + count++; + } + } + + assertEquals(1, files.size()); + assertEquals(2, count); + assertTrue(files.contains("../0/default_tablet/1_0.rf")); + } + + private static Mutation deleteTablet(String tid, String endRow, String prevRow, String dir, String file) throws Exception { + KeyExtent ke = new KeyExtent(new Text(tid), endRow == null ? null : new Text(endRow), prevRow == null ? null : new Text(prevRow)); + Mutation mut = new Mutation(ke.getMetadataEntry()); + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.putDelete(mut); + TabletsSection.ServerColumnFamily.TIME_COLUMN.putDelete(mut); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.putDelete(mut); + mut.putDelete(DataFileColumnFamily.NAME.toString(), file); + + return mut; + } + + private static Mutation createTablet(String tid, String endRow, String prevRow, String dir, String file) throws Exception { + KeyExtent ke = new KeyExtent(new Text(tid), endRow == null ? null : new Text(endRow), prevRow == null ? null : new Text(prevRow)); + Mutation mut = ke.getPrevRowUpdateMutation(); + + TabletsSection.ServerColumnFamily.TIME_COLUMN.put(mut, new Value("M0".getBytes())); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut, new Value(dir.getBytes())); + mut.put(DataFileColumnFamily.NAME.toString(), file, "10,200"); + + return mut; + } + + // test two tablets splitting into four + public void testSplit3() throws Exception { + MockInstance mi = new MockInstance(); + Connector conn = mi.getConnector("", new PasswordToken("")); + + BatchWriter bw1 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + bw1.addMutation(createTablet("0", "m", null, "/d1", "/d1/file1")); + bw1.addMutation(createTablet("0", null, "m", "/d2", "/d2/file2")); + + bw1.flush(); + + BatchWriter bw2 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + MetadataTableUtil.initializeClone("0", "1", conn, bw2); + + bw1.addMutation(createTablet("0", "f", null, "/d1", "/d1/file3")); + bw1.addMutation(createTablet("0", "m", "f", "/d3", "/d1/file1")); + bw1.addMutation(createTablet("0", "s", "m", "/d2", "/d2/file2")); + bw1.addMutation(createTablet("0", null, "s", "/d4", "/d2/file2")); + + bw1.flush(); + + int rc = MetadataTableUtil.checkClone("0", "1", conn, bw2); + + assertEquals(0, rc); + + Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.setRange(new KeyExtent(new Text("1"), null, null).toMetadataRange()); + + HashSet files = new HashSet(); + + int count = 0; + for (Entry entry : scanner) { + if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) { + files.add(entry.getKey().getColumnQualifier().toString()); + count++; + } + } + + assertEquals(2, count); + assertEquals(2, files.size()); + assertTrue(files.contains("../0/d1/file1")); + assertTrue(files.contains("../0/d2/file2")); + } + + // test cloned marker + public void testClonedMarker() throws Exception { + + MockInstance mi = new MockInstance(); + Connector conn = mi.getConnector("", new PasswordToken("")); + + BatchWriter bw1 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + bw1.addMutation(createTablet("0", "m", null, "/d1", "/d1/file1")); + bw1.addMutation(createTablet("0", null, "m", "/d2", "/d2/file2")); + + bw1.flush(); + + BatchWriter bw2 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + MetadataTableUtil.initializeClone("0", "1", conn, bw2); + + bw1.addMutation(deleteTablet("0", "m", null, "/d1", "/d1/file1")); + bw1.addMutation(deleteTablet("0", null, "m", "/d2", "/d2/file2")); + + bw1.flush(); + + bw1.addMutation(createTablet("0", "f", null, "/d1", "/d1/file3")); + bw1.addMutation(createTablet("0", "m", "f", "/d3", "/d1/file1")); + bw1.addMutation(createTablet("0", "s", "m", "/d2", "/d2/file3")); + bw1.addMutation(createTablet("0", null, "s", "/d4", "/d4/file3")); + + bw1.flush(); + + int rc = MetadataTableUtil.checkClone("0", "1", conn, bw2); + + assertEquals(1, rc); + + bw1.addMutation(deleteTablet("0", "m", "f", "/d3", "/d1/file1")); + + bw1.flush(); + + bw1.addMutation(createTablet("0", "m", "f", "/d3", "/d1/file3")); + + bw1.flush(); + + rc = MetadataTableUtil.checkClone("0", "1", conn, bw2); + + assertEquals(0, rc); + + Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.setRange(new KeyExtent(new Text("1"), null, null).toMetadataRange()); + + HashSet files = new HashSet(); + + int count = 0; + for (Entry entry : scanner) { + if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) { + files.add(entry.getKey().getColumnQualifier().toString()); + count++; + } + } + + assertEquals(3, count); + assertEquals(3, files.size()); + assertTrue(files.contains("../0/d1/file1")); + assertTrue(files.contains("../0/d2/file3")); + assertTrue(files.contains("../0/d4/file3")); + } + + // test two tablets splitting into four + public void testMerge() throws Exception { + MockInstance mi = new MockInstance(); + Connector conn = mi.getConnector("", new PasswordToken("")); + + BatchWriter bw1 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + bw1.addMutation(createTablet("0", "m", null, "/d1", "/d1/file1")); + bw1.addMutation(createTablet("0", null, "m", "/d2", "/d2/file2")); + + bw1.flush(); + + BatchWriter bw2 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + MetadataTableUtil.initializeClone("0", "1", conn, bw2); + + bw1.addMutation(deleteTablet("0", "m", null, "/d1", "/d1/file1")); + Mutation mut = createTablet("0", null, null, "/d2", "/d2/file2"); + mut.put(DataFileColumnFamily.NAME.toString(), "/d1/file1", "10,200"); + bw1.addMutation(mut); + + bw1.flush(); + + try { + MetadataTableUtil.checkClone("0", "1", conn, bw2); + assertTrue(false); + } catch (TabletIterator.TabletDeletedException tde) {} + + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/java/org/apache/accumulo/server/util/DefaultMapTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/DefaultMapTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/DefaultMapTest.java new file mode 100644 index 0000000..b68d412 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/util/DefaultMapTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import static org.junit.Assert.*; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.server.util.DefaultMap; +import org.junit.Test; + +public class DefaultMapTest { + + @Test + public void testDefaultMap() { + Integer value = new DefaultMap(0).get("test"); + assertNotNull(value); + assertEquals(new Integer(0), value); + value = new DefaultMap(1).get("test"); + assertNotNull(value); + assertEquals(new Integer(1), value); + + AtomicInteger canConstruct = new DefaultMap(new AtomicInteger(1)).get("test"); + assertNotNull(canConstruct); + assertEquals(new AtomicInteger(0).get(), canConstruct.get()); + + DefaultMap map = new DefaultMap(""); + assertEquals(map.get("foo"), ""); + map.put("foo", "bar"); + assertEquals(map.get("foo"), "bar"); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/java/org/apache/accumulo/server/util/TabletIteratorTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/TabletIteratorTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/TabletIteratorTest.java new file mode 100644 index 0000000..72ce334 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/util/TabletIteratorTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.util.Map.Entry; + +import junit.framework.TestCase; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.server.util.TabletIterator.TabletDeletedException; +import org.apache.hadoop.io.Text; + +public class TabletIteratorTest extends TestCase { + + class TestTabletIterator extends TabletIterator { + + private Connector conn; + + public TestTabletIterator(Connector conn) throws Exception { + super(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY), MetadataSchema.TabletsSection.getRange(), true, true); + this.conn = conn; + } + + @Override + protected void resetScanner() { + try { + Scanner ds = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + Text tablet = new KeyExtent(new Text("0"), new Text("m"), null).getMetadataEntry(); + ds.setRange(new Range(tablet, true, tablet, true)); + + Mutation m = new Mutation(tablet); + + BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + for (Entry entry : ds) { + Key k = entry.getKey(); + m.putDelete(k.getColumnFamily(), k.getColumnQualifier(), k.getTimestamp()); + } + + bw.addMutation(m); + + bw.close(); + + } catch (Exception e) { + throw new RuntimeException(e); + } + + super.resetScanner(); + } + + } + + // simulate a merge happening while iterating over tablets + public void testMerge() throws Exception { + MockInstance mi = new MockInstance(); + Connector conn = mi.getConnector("", new PasswordToken("")); + + KeyExtent ke1 = new KeyExtent(new Text("0"), new Text("m"), null); + Mutation mut1 = ke1.getPrevRowUpdateMutation(); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut1, new Value("/d1".getBytes())); + + KeyExtent ke2 = new KeyExtent(new Text("0"), null, null); + Mutation mut2 = ke2.getPrevRowUpdateMutation(); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut2, new Value("/d2".getBytes())); + + BatchWriter bw1 = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + bw1.addMutation(mut1); + bw1.addMutation(mut2); + bw1.close(); + + TestTabletIterator tabIter = new TestTabletIterator(conn); + + try { + while (tabIter.hasNext()) { + tabIter.next(); + } + assertTrue(false); + } catch (TabletDeletedException tde) {} + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/java/org/apache/accumulo/server/util/time/BaseRelativeTimeTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/time/BaseRelativeTimeTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/time/BaseRelativeTimeTest.java new file mode 100644 index 0000000..fdedd84 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/util/time/BaseRelativeTimeTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.time; + +import static org.junit.Assert.*; + +import org.apache.accumulo.server.util.time.BaseRelativeTime; +import org.apache.accumulo.server.util.time.ProvidesTime; +import org.junit.Test; + +public class BaseRelativeTimeTest { + + static class BogusTime implements ProvidesTime { + public long value = 0; + + public long currentTime() { + return value; + } + } + + @Test + public void testMatchesTime() { + BogusTime bt = new BogusTime(); + BogusTime now = new BogusTime(); + now.value = bt.value = System.currentTimeMillis(); + + BaseRelativeTime brt = new BaseRelativeTime(now); + assertEquals(brt.currentTime(), now.value); + brt.updateTime(now.value); + assertEquals(brt.currentTime(), now.value); + } + + @Test + public void testFutureTime() { + BogusTime advice = new BogusTime(); + BogusTime local = new BogusTime(); + local.value = advice.value = System.currentTimeMillis(); + // Ten seconds into the future + advice.value += 10000; + + BaseRelativeTime brt = new BaseRelativeTime(local); + assertEquals(brt.currentTime(), local.value); + brt.updateTime(advice.value); + long once = brt.currentTime(); + assertTrue(once < advice.value); + assertTrue(once > local.value); + + for (int i = 0; i < 100; i++) { + brt.updateTime(advice.value); + } + long many = brt.currentTime(); + assertTrue(many > once); + assertTrue("after much advice, relative time is still closer to local time", (advice.value - many) < (once - local.value)); + } + + @Test + public void testPastTime() { + BogusTime advice = new BogusTime(); + BogusTime local = new BogusTime(); + local.value = advice.value = System.currentTimeMillis(); + // Ten seconds into the past + advice.value -= 10000; + + BaseRelativeTime brt = new BaseRelativeTime(local); + brt.updateTime(advice.value); + long once = brt.currentTime(); + assertTrue(once < local.value); + brt.updateTime(advice.value); + long twice = brt.currentTime(); + assertTrue("Time cannot go backwards", once <= twice); + brt.updateTime(advice.value - 10000); + assertTrue("Time cannot go backwards", once <= twice); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/resources/accumulo-site.xml ---------------------------------------------------------------------- diff --git a/server/base/src/test/resources/accumulo-site.xml b/server/base/src/test/resources/accumulo-site.xml new file mode 100644 index 0000000..2aa9fff --- /dev/null +++ b/server/base/src/test/resources/accumulo-site.xml @@ -0,0 +1,32 @@ + + + + + + + + instance.dfs.dir + ${project.build.directory}/instanceTest + + + + instance.secret + TEST_SYSTEM_SECRET + + + http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/server/base/src/test/resources/log4j.properties b/server/base/src/test/resources/log4j.properties new file mode 100644 index 0000000..3206832 --- /dev/null +++ b/server/base/src/test/resources/log4j.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, CA +log4j.appender.CA=org.apache.log4j.ConsoleAppender +log4j.appender.CA.layout=org.apache.log4j.PatternLayout +log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%-8c{2}] %-5p: %m%n + +log4j.logger.org.apache.accumulo.server.util.TabletIterator=ERROR \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/master/src/main/java/org/apache/accumulo/master/LiveTServerSet.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/LiveTServerSet.java b/server/master/src/main/java/org/apache/accumulo/master/LiveTServerSet.java deleted file mode 100644 index 59ab8c8..0000000 --- a/server/master/src/main/java/org/apache/accumulo/master/LiveTServerSet.java +++ /dev/null @@ -1,398 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.master; - -import static org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP; - -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; -import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; -import org.apache.accumulo.core.util.AddressUtil; -import org.apache.accumulo.core.util.ServerServices; -import org.apache.accumulo.core.util.ThriftUtil; -import org.apache.accumulo.core.zookeeper.ZooUtil; -import org.apache.accumulo.master.state.TServerInstance; -import org.apache.accumulo.server.security.SystemCredentials; -import org.apache.accumulo.server.util.Halt; -import org.apache.accumulo.server.util.time.SimpleTimer; -import org.apache.accumulo.server.zookeeper.ZooCache; -import org.apache.accumulo.server.zookeeper.ZooLock; -import org.apache.accumulo.server.zookeeper.ZooReaderWriter; -import org.apache.accumulo.trace.instrument.Tracer; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; -import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransport; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.KeeperException.NotEmptyException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.Stat; - -public class LiveTServerSet implements Watcher { - - public interface Listener { - void update(LiveTServerSet current, Set deleted, Set added); - } - - private static final Logger log = Logger.getLogger(LiveTServerSet.class); - - private final Listener cback; - private final Instance instance; - private final AccumuloConfiguration conf; - private ZooCache zooCache; - - public class TServerConnection { - private final InetSocketAddress address; - - public TServerConnection(InetSocketAddress addr) throws TException { - address = addr; - } - - private String lockString(ZooLock mlock) { - return mlock.getLockID().serialize(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK); - } - - public void assignTablet(ZooLock lock, KeyExtent extent) throws TException { - TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - try { - client.loadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift()); - } finally { - ThriftUtil.returnClient(client); - } - } - - public void unloadTablet(ZooLock lock, KeyExtent extent, boolean save) throws TException { - TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - try { - client.unloadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift(), save); - } finally { - ThriftUtil.returnClient(client); - } - } - - public TabletServerStatus getTableMap(boolean usePooledConnection) throws TException, ThriftSecurityException { - - if (usePooledConnection == true) - throw new UnsupportedOperationException(); - - TTransport transport = ThriftUtil.createTransport(address, conf); - - try { - TabletClientService.Client client = ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport); - return client.getTabletServerStatus(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance)); - } finally { - if (transport != null) - transport.close(); - } - } - - public void halt(ZooLock lock) throws TException, ThriftSecurityException { - TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - try { - client.halt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock)); - } finally { - ThriftUtil.returnClient(client); - } - } - - public void fastHalt(ZooLock lock) throws TException { - TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - try { - client.fastHalt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock)); - } finally { - ThriftUtil.returnClient(client); - } - } - - public void flush(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException { - TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - try { - client.flush(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId, - startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow)); - } finally { - ThriftUtil.returnClient(client); - } - } - - public void chop(ZooLock lock, KeyExtent extent) throws TException { - TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - try { - client.chop(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift()); - } finally { - ThriftUtil.returnClient(client); - } - } - - public void splitTablet(ZooLock lock, KeyExtent extent, Text splitPoint) throws TException, ThriftSecurityException, NotServingTabletException { - TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - try { - client.splitTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), extent.toThrift(), - ByteBuffer.wrap(splitPoint.getBytes(), 0, splitPoint.getLength())); - } finally { - ThriftUtil.returnClient(client); - } - } - - public void flushTablet(ZooLock lock, KeyExtent extent) throws TException { - TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - try { - client.flushTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift()); - } finally { - ThriftUtil.returnClient(client); - } - } - - public void compact(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException { - TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - try { - client.compact(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId, - startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow)); - } finally { - ThriftUtil.returnClient(client); - } - } - - public boolean isActive(long tid) throws TException { - TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - try { - return client.isActive(Tracer.traceInfo(), tid); - } finally { - ThriftUtil.returnClient(client); - } - } - - } - - static class TServerInfo { - TServerConnection connection; - TServerInstance instance; - - TServerInfo(TServerInstance instance, TServerConnection connection) { - this.connection = connection; - this.instance = instance; - } - }; - - // The set of active tservers with locks, indexed by their name in zookeeper - private Map current = new HashMap(); - // as above, indexed by TServerInstance - private Map currentInstances = new HashMap(); - - // The set of entries in zookeeper without locks, and the first time each was noticed - private Map locklessServers = new HashMap(); - - public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) { - this.cback = cback; - this.instance = instance; - this.conf = conf; - - } - - public synchronized ZooCache getZooCache() { - if (zooCache == null) - zooCache = new ZooCache(this); - return zooCache; - } - - public synchronized void startListeningForTabletServerChanges() { - scanServers(); - SimpleTimer.getInstance().schedule(new Runnable() { - @Override - public void run() { - scanServers(); - } - }, 0, 5000); - } - - public synchronized void scanServers() { - try { - final Set updates = new HashSet(); - final Set doomed = new HashSet(); - - final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; - - HashSet all = new HashSet(current.keySet()); - all.addAll(getZooCache().getChildren(path)); - - locklessServers.keySet().retainAll(all); - - for (String zPath : all) { - checkServer(updates, doomed, path, zPath); - } - - // log.debug("Current: " + current.keySet()); - if (!doomed.isEmpty() || !updates.isEmpty()) - this.cback.update(this, doomed, updates); - } catch (Exception ex) { - log.error(ex, ex); - } - } - - private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException { - try { - ZooReaderWriter.getInstance().delete(serverNode, -1); - } catch (NotEmptyException ex) { - // race condition: tserver created the lock after our last check; we'll see it at the next check - } catch (NoNodeException nne) { - // someone else deleted it - } - } - - private synchronized void checkServer(final Set updates, final Set doomed, final String path, final String zPath) - throws TException, InterruptedException, KeeperException { - - TServerInfo info = current.get(zPath); - - final String lockPath = path + "/" + zPath; - Stat stat = new Stat(); - byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat); - - if (lockData == null) { - if (info != null) { - doomed.add(info.instance); - current.remove(zPath); - currentInstances.remove(info.instance); - } - - Long firstSeen = locklessServers.get(zPath); - if (firstSeen == null) { - locklessServers.put(zPath, System.currentTimeMillis()); - } else if (System.currentTimeMillis() - firstSeen > 10 * 60 * 1000) { - deleteServerNode(path + "/" + zPath); - locklessServers.remove(zPath); - } - } else { - locklessServers.remove(zPath); - ServerServices services = new ServerServices(new String(lockData)); - InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT); - TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner()); - - if (info == null) { - updates.add(instance); - TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client)); - current.put(zPath, tServerInfo); - currentInstances.put(instance, tServerInfo); - } else if (!info.instance.equals(instance)) { - doomed.add(info.instance); - updates.add(instance); - TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client)); - current.put(zPath, tServerInfo); - currentInstances.put(info.instance, tServerInfo); - } - } - } - - @Override - public void process(WatchedEvent event) { - - // its important that these event are propagated by ZooCache, because this ensures when reading zoocache that is has already processed the event and cleared - // relevant nodes before code below reads from zoocache - - if (event.getPath() != null) { - if (event.getPath().endsWith(Constants.ZTSERVERS)) { - scanServers(); - } else if (event.getPath().contains(Constants.ZTSERVERS)) { - int pos = event.getPath().lastIndexOf('/'); - - // do only if ZTSERVER is parent - if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS)) { - - String server = event.getPath().substring(pos + 1); - - final Set updates = new HashSet(); - final Set doomed = new HashSet(); - - final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; - - try { - checkServer(updates, doomed, path, server); - if (!doomed.isEmpty() || !updates.isEmpty()) - this.cback.update(this, doomed, updates); - } catch (Exception ex) { - log.error(ex, ex); - } - } - } - } - } - - public synchronized TServerConnection getConnection(TServerInstance server) throws TException { - if (server == null) - return null; - TServerInfo tServerInfo = currentInstances.get(server); - if (tServerInfo == null) - return null; - return tServerInfo.connection; - } - - public synchronized Set getCurrentServers() { - return new HashSet(currentInstances.keySet()); - } - - public synchronized int size() { - return current.size(); - } - - public synchronized TServerInstance find(String tabletServer) { - InetSocketAddress addr = AddressUtil.parseAddress(tabletServer); - for (Entry entry : current.entrySet()) { - if (entry.getValue().instance.getLocation().equals(addr)) - return entry.getValue().instance; - } - return null; - } - - public synchronized void remove(TServerInstance server) { - String zPath = null; - for (Entry entry : current.entrySet()) { - if (entry.getValue().instance.equals(server)) { - zPath = entry.getKey(); - break; - } - } - if (zPath == null) - return; - current.remove(zPath); - currentInstances.remove(server); - - log.info("Removing zookeeper lock for " + server); - String fullpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + zPath; - try { - ZooReaderWriter.getRetryingInstance().recursiveDelete(fullpath, SKIP); - } catch (Exception e) { - String msg = "error removing tablet server lock"; - log.fatal(msg, e); - Halt.halt(msg, -1); - } - getZooCache().clear(fullpath); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/server/pom.xml b/server/server/pom.xml index 7f3b421..30b1f97 100644 --- a/server/server/pom.xml +++ b/server/server/pom.xml @@ -25,142 +25,6 @@ accumulo-server Server - - - com.beust - jcommander - - - com.google.code.gson - gson - - - jline - jline - - - org.apache.accumulo - accumulo-core - - - org.apache.accumulo - accumulo-fate - - - org.apache.accumulo - accumulo-gc - - - org.apache.accumulo - accumulo-master - - - org.apache.accumulo - accumulo-monitor - - - org.apache.accumulo - accumulo-start - - - org.apache.accumulo - accumulo-trace - - - org.apache.accumulo - accumulo-tserver - - - org.apache.thrift - libthrift - - - commons-codec - commons-codec - provided - - - commons-collections - commons-collections - provided - - - commons-configuration - commons-configuration - provided - - - commons-io - commons-io - provided - - - commons-lang - commons-lang - provided - - - javax.servlet - servlet-api - provided - - - log4j - log4j - provided - - - org.apache.hadoop - hadoop-client - provided - - - org.apache.zookeeper - zookeeper - provided - - - org.mortbay.jetty - jetty - provided - - - junit - junit - test - - - org.slf4j - slf4j-api - test - - - org.slf4j - slf4j-log4j12 - test - - - - - - true - src/test/resources - - - - - - org.apache.rat - apache-rat-plugin - - - src/main/resources/web/flot/**/*.js - - - - - - native http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java ---------------------------------------------------------------------- diff --git a/server/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java b/server/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java deleted file mode 100644 index 83a8a41..0000000 --- a/server/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.metanalysis; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; - -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.tserver.logger.LogEvents; -import org.apache.accumulo.tserver.logger.LogFileKey; -import org.apache.accumulo.tserver.logger.LogFileValue; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -/** - * A map reduce job that takes a set of walogs and filters out all non metadata table events. - */ -public class FilterMeta extends Configured implements Tool { - - public static class FilterMapper extends Mapper { - private Set tabletIds; - - @Override - protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException { - tabletIds = new HashSet(); - } - - @Override - public void map(LogFileKey key, LogFileValue value, Context context) throws IOException, InterruptedException { - if (key.event == LogEvents.OPEN) { - context.write(key, value); - } else if (key.event == LogEvents.DEFINE_TABLET && key.tablet.getTableId().toString().equals(MetadataTable.ID)) { - tabletIds.add(key.tid); - context.write(key, value); - } else if ((key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) && tabletIds.contains(key.tid)) { - context.write(key, value); - } - } - } - - @Override - public int run(String[] args) throws Exception { - - String jobName = this.getClass().getSimpleName() + "_" + System.currentTimeMillis(); - - Job job = new Job(getConf(), jobName); - job.setJarByClass(this.getClass()); - - Path paths[] = new Path[args.length - 1]; - for (int i = 0; i < paths.length; i++) { - paths[i] = new Path(args[i]); - } - - job.setInputFormatClass(LogFileInputFormat.class); - LogFileInputFormat.setInputPaths(job, paths); - - job.setOutputFormatClass(LogFileOutputFormat.class); - LogFileOutputFormat.setOutputPath(job, new Path(args[args.length - 1])); - - job.setMapperClass(FilterMapper.class); - - job.setNumReduceTasks(0); - - job.waitForCompletion(true); - return job.isSuccessful() ? 0 : 1; - } - - public static void main(String[] args) throws Exception { - int res = ToolRunner.run(CachedConfiguration.getInstance(), new FilterMeta(), args); - System.exit(res); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java ---------------------------------------------------------------------- diff --git a/server/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java b/server/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java deleted file mode 100644 index f0a8268..0000000 --- a/server/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.metanalysis; - -import java.util.Map.Entry; - -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.TextUtil; -import org.apache.accumulo.server.cli.ClientOpts; -import org.apache.hadoop.io.Text; - -import com.beust.jcommander.Parameter; - -/** - * Finds tablet creation events. - */ -public class FindTablet { - - static public class Opts extends ClientOpts { - @Parameter(names = {"-r", "--row"}, required = true, description = "find tablets that contain this row") - String row = null; - - @Parameter(names = "--tableId", required = true, description = "table id") - String tableId = null; - } - - public static void main(String[] args) throws Exception { - Opts opts = new Opts(); - opts.parseArgs(FindTablet.class.getName(), args); - - findContainingTablets(opts); - } - - private static void findContainingTablets(Opts opts) throws Exception { - Range range = new KeyExtent(new Text(opts.tableId), null, null).toMetadataRange(); - - Scanner scanner = opts.getConnector().createScanner("createEvents", opts.auths); - scanner.setRange(range); - - Text row = new Text(opts.row); - for (Entry entry : scanner) { - KeyExtent ke = new KeyExtent(entry.getKey().getRow(), new Value(TextUtil.getBytes(entry.getKey().getColumnFamily()))); - if (ke.contains(row)) { - System.out.println(entry.getKey().getColumnQualifier() + " " + ke + " " + entry.getValue()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java ---------------------------------------------------------------------- diff --git a/server/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java b/server/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java deleted file mode 100644 index e42731a..0000000 --- a/server/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.metanalysis; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; -import org.apache.accumulo.core.data.ColumnUpdate; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.server.cli.ClientOpts; -import org.apache.accumulo.tserver.logger.LogEvents; -import org.apache.accumulo.tserver.logger.LogFileKey; -import org.apache.accumulo.tserver.logger.LogFileValue; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.log4j.Logger; - -import com.beust.jcommander.Parameter; - -/** - * A map reduce job that takes write ahead logs containing mutations for the metadata table and indexes them into Accumulo tables for analysis. - * - */ - -public class IndexMeta extends Configured implements Tool { - - public static class IndexMapper extends Mapper { - private static final Text CREATE_EVENTS_TABLE = new Text("createEvents"); - private static final Text TABLET_EVENTS_TABLE = new Text("tabletEvents"); - private Map tabletIds = new HashMap(); - private String uuid = null; - - @Override - protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException { - tabletIds = new HashMap(); - uuid = null; - } - - @Override - public void map(LogFileKey key, LogFileValue value, Context context) throws IOException, InterruptedException { - if (key.event == LogEvents.OPEN) { - uuid = key.tserverSession; - } else if (key.event == LogEvents.DEFINE_TABLET) { - if (key.tablet.getTableId().toString().equals(MetadataTable.ID)) { - tabletIds.put(key.tid, new KeyExtent(key.tablet)); - } - } else if ((key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) && tabletIds.containsKey(key.tid)) { - for (Mutation m : value.mutations) { - index(context, m, uuid, tabletIds.get(key.tid)); - } - } - } - - void index(Context context, Mutation m, String logFile, KeyExtent metaTablet) throws IOException, InterruptedException { - List columnsUpdates = m.getUpdates(); - - Text prevRow = null; - long timestamp = 0; - - if (m.getRow().length > 0 && m.getRow()[0] == '~') { - return; - } - - for (ColumnUpdate cu : columnsUpdates) { - if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.equals(new Text(cu.getColumnFamily()), new Text(cu.getColumnQualifier())) && !cu.isDeleted()) { - prevRow = new Text(cu.getValue()); - } - - timestamp = cu.getTimestamp(); - } - - byte[] serMut = WritableUtils.toByteArray(m); - - if (prevRow != null) { - Mutation createEvent = new Mutation(new Text(m.getRow())); - createEvent.put(prevRow, new Text(String.format("%020d", timestamp)), new Value(metaTablet.toString().getBytes())); - context.write(CREATE_EVENTS_TABLE, createEvent); - } - - Mutation tabletEvent = new Mutation(new Text(m.getRow())); - tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("mut"), new Value(serMut)); - tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("mtab"), new Value(metaTablet.toString().getBytes())); - tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("log"), new Value(logFile.getBytes())); - context.write(TABLET_EVENTS_TABLE, tabletEvent); - } - } - - static class Opts extends ClientOpts { - @Parameter(description = " { ...}") - List logFiles = new ArrayList(); - } - - @Override - public int run(String[] args) throws Exception { - Opts opts = new Opts(); - opts.parseArgs(IndexMeta.class.getName(), args); - - String jobName = this.getClass().getSimpleName() + "_" + System.currentTimeMillis(); - - Job job = new Job(getConf(), jobName); - job.setJarByClass(this.getClass()); - - List logFiles = Arrays.asList(args).subList(4, args.length); - Path paths[] = new Path[logFiles.size()]; - int count = 0; - for (String logFile : logFiles) { - paths[count++] = new Path(logFile); - } - - job.setInputFormatClass(LogFileInputFormat.class); - LogFileInputFormat.setInputPaths(job, paths); - - job.setNumReduceTasks(0); - - job.setOutputFormatClass(AccumuloOutputFormat.class); - AccumuloOutputFormat.setZooKeeperInstance(job, opts.instance, opts.zookeepers); - AccumuloOutputFormat.setConnectorInfo(job, opts.principal, opts.getToken()); - AccumuloOutputFormat.setCreateTables(job, false); - - job.setMapperClass(IndexMapper.class); - - Connector conn = opts.getConnector(); - - try { - conn.tableOperations().create("createEvents"); - } catch (TableExistsException tee) { - Logger.getLogger(IndexMeta.class).warn("Table createEvents exists"); - } - - try { - conn.tableOperations().create("tabletEvents"); - } catch (TableExistsException tee) { - Logger.getLogger(IndexMeta.class).warn("Table tabletEvents exists"); - } - - job.waitForCompletion(true); - return job.isSuccessful() ? 0 : 1; - } - - public static void main(String[] args) throws Exception { - int res = ToolRunner.run(CachedConfiguration.getInstance(), new IndexMeta(), args); - System.exit(res); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java ---------------------------------------------------------------------- diff --git a/server/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java b/server/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java deleted file mode 100644 index 0b206ba..0000000 --- a/server/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.accumulo.server.metanalysis; - -import java.io.EOFException; -import java.io.IOException; - -import org.apache.accumulo.tserver.logger.LogFileKey; -import org.apache.accumulo.tserver.logger.LogFileValue; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; - -/** - * Input format for Accumulo write ahead logs - */ -public class LogFileInputFormat extends FileInputFormat { - - private static class LogFileRecordReader extends RecordReader { - - private FSDataInputStream fsdis; - private LogFileKey key; - private LogFileValue value; - private long length; - - @Override - public void close() throws IOException { - fsdis.close(); - } - - @Override - public LogFileKey getCurrentKey() throws IOException, InterruptedException { - return key; - } - - @Override - public LogFileValue getCurrentValue() throws IOException, InterruptedException { - return value; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - float progress = (length - fsdis.getPos()) / (float) length; - if (progress < 0) - return 0; - return progress; - } - - @Override - public void initialize(InputSplit is, TaskAttemptContext context) throws IOException, InterruptedException { - FileSplit fileSplit = (FileSplit) is; - - Configuration conf = new Configuration(); - FileSystem fs = FileSystem.get(conf); - - key = new LogFileKey(); - value = new LogFileValue(); - - fsdis = fs.open(fileSplit.getPath()); - FileStatus status = fs.getFileStatus(fileSplit.getPath()); - length = status.getLen(); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (key == null) - return false; - - try { - key.readFields(fsdis); - value.readFields(fsdis); - return true; - } catch (EOFException ex) { - key = null; - value = null; - return false; - } - } - - } - - - @Override - public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { - return new LogFileRecordReader(); - } - - @Override - protected boolean isSplitable(JobContext context, Path filename) { - return false; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/server/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java b/server/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java deleted file mode 100644 index f8dcc9e..0000000 --- a/server/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.metanalysis; - -import java.io.IOException; - -import org.apache.accumulo.tserver.logger.LogFileKey; -import org.apache.accumulo.tserver.logger.LogFileValue; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; - -/** - * Output format for Accumulo write ahead logs. - */ -public class LogFileOutputFormat extends FileOutputFormat { - - private static class LogFileRecordWriter extends RecordWriter { - - private FSDataOutputStream out; - - public LogFileRecordWriter(Path outputPath) throws IOException { - Configuration conf = new Configuration(); - FileSystem fs = FileSystem.get(conf); - - out = fs.create(outputPath); - } - - @Override - public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { - out.close(); - } - - @Override - public void write(LogFileKey key, LogFileValue val) throws IOException, InterruptedException { - key.write(out); - val.write(out); - } - - } - - @Override - public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - Path outputPath = getDefaultWorkFile(context, ""); - return new LogFileRecordWriter(outputPath); - } - -}