Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 403E010B8A for ; Wed, 29 Apr 2015 01:03:39 +0000 (UTC) Received: (qmail 93028 invoked by uid 500); 29 Apr 2015 01:03:39 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 92909 invoked by uid 500); 29 Apr 2015 01:03:39 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 92665 invoked by uid 99); 29 Apr 2015 01:03:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Apr 2015 01:03:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BD97AE0A21; Wed, 29 Apr 2015 01:03:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Wed, 29 Apr 2015 01:03:42 -0000 Message-Id: <5aba5f04fbfb43be909ae3a0ad73097c@git.apache.org> In-Reply-To: <96c8bee686d64ccb9427552111fd91fd@git.apache.org> References: <96c8bee686d64ccb9427552111fd91fd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/9] accumulo git commit: ACCUMULO-3759 Fix Java 8 compiler warnings http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/NamespaceInfo.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/NamespaceInfo.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/NamespaceInfo.java new file mode 100644 index 0000000..ef2becd --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/NamespaceInfo.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import java.io.Serializable; +import java.util.Map; + +class NamespaceInfo implements Serializable { + + private static final long serialVersionUID = 1L; + + String namespaceName; + String namespaceId; + String user; + + public Map props; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateMetadata.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateMetadata.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateMetadata.java new file mode 100644 index 0000000..da13ecc --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateMetadata.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.hadoop.io.Text; + +class PopulateMetadata extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private TableInfo tableInfo; + + PopulateMetadata(TableInfo ti) { + this.tableInfo = ti; + } + + @Override + public long isReady(long tid, Master environment) throws Exception { + return 0; + } + + @Override + public Repo call(long tid, Master environment) throws Exception { + KeyExtent extent = new KeyExtent(new Text(tableInfo.tableId), null, null); + MetadataTableUtil.addTablet(extent, tableInfo.dir, environment, tableInfo.timeType, environment.getMasterLock()); + + return new FinishCreateTable(tableInfo); + + } + + @Override + public void undo(long tid, Master environment) throws Exception { + MetadataTableUtil.deleteTable(tableInfo.tableId, false, environment, environment.getMasterLock()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateMetadataTable.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateMetadataTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateMetadataTable.java new file mode 100644 index 0000000..72832ba --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateMetadataTable.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.util.FastFormat; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Optional; + +class PopulateMetadataTable extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private ImportedTableInfo tableInfo; + + PopulateMetadataTable(ImportedTableInfo ti) { + this.tableInfo = ti; + } + + static Map readMappingFile(VolumeManager fs, ImportedTableInfo tableInfo) throws Exception { + BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(tableInfo.importDir, "mappings.txt")), UTF_8)); + + try { + Map map = new HashMap(); + + String line = null; + while ((line = in.readLine()) != null) { + String sa[] = line.split(":", 2); + map.put(sa[0], sa[1]); + } + + return map; + } finally { + in.close(); + } + + } + + @Override + public Repo call(long tid, Master master) throws Exception { + + Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE); + + BatchWriter mbw = null; + ZipInputStream zis = null; + + try { + VolumeManager fs = master.getFileSystem(); + + mbw = master.getConnector().createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + zis = new ZipInputStream(fs.open(path)); + + Map fileNameMappings = readMappingFile(fs, tableInfo); + + log.info("importDir is " + tableInfo.importDir); + + // This is a directory already prefixed with proper volume information e.g. hdfs://localhost:8020/path/to/accumulo/tables/... + final String bulkDir = tableInfo.importDir; + + final String[] tableDirs = ServerConstants.getTablesDirs(); + + ZipEntry zipEntry; + while ((zipEntry = zis.getNextEntry()) != null) { + if (zipEntry.getName().equals(Constants.EXPORT_METADATA_FILE)) { + DataInputStream in = new DataInputStream(new BufferedInputStream(zis)); + + Key key = new Key(); + Value val = new Value(); + + Mutation m = null; + Text currentRow = null; + int dirCount = 0; + + while (true) { + key.readFields(in); + val.readFields(in); + + Text endRow = new KeyExtent(key.getRow(), (Text) null).getEndRow(); + Text metadataRow = new KeyExtent(new Text(tableInfo.tableId), endRow, null).getMetadataEntry(); + + Text cq; + + if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { + String oldName = new Path(key.getColumnQualifier().toString()).getName(); + String newName = fileNameMappings.get(oldName); + + if (newName == null) { + throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER, + "File " + oldName + " does not exist in import dir"); + } + + cq = new Text(bulkDir + "/" + newName); + } else { + cq = key.getColumnQualifier(); + } + + if (m == null) { + // Make a unique directory inside the table's dir. Cannot import multiple tables into one table, so don't need to use unique allocator + String tabletDir = new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES), UTF_8); + + // Build up a full hdfs://localhost:8020/accumulo/tables/$id/c-XXXXXXX + String absolutePath = getClonedTabletDir(master, tableDirs, tabletDir); + + m = new Mutation(metadataRow); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(absolutePath.getBytes(UTF_8))); + currentRow = metadataRow; + } + + if (!currentRow.equals(metadataRow)) { + mbw.addMutation(m); + + // Make a unique directory inside the table's dir. Cannot import multiple tables into one table, so don't need to use unique allocator + String tabletDir = new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES), UTF_8); + + // Build up a full hdfs://localhost:8020/accumulo/tables/$id/c-XXXXXXX + String absolutePath = getClonedTabletDir(master, tableDirs, tabletDir); + + m = new Mutation(metadataRow); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(absolutePath.getBytes(UTF_8))); + } + + m.put(key.getColumnFamily(), cq, val); + + if (endRow == null && TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { + mbw.addMutation(m); + break; // its the last column in the last row + } + } + + break; + } + } + + return new MoveExportedFiles(tableInfo); + } catch (IOException ioe) { + log.warn("{}", ioe.getMessage(), ioe); + throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER, + "Error reading " + path + " " + ioe.getMessage()); + } finally { + if (zis != null) { + try { + zis.close(); + } catch (IOException ioe) { + log.warn("Failed to close zip file ", ioe); + } + } + + if (mbw != null) { + mbw.close(); + } + } + } + + /** + * Given options for tables (across multiple volumes), construct an absolute path using the unique name within the chosen volume + * + * @return An absolute, unique path for the imported table + */ + protected String getClonedTabletDir(Master master, String[] tableDirs, String tabletDir) { + // We can try to spread out the tablet dirs across all volumes + String tableDir = master.getFileSystem().choose(Optional.of(tableInfo.tableId), tableDirs); + + // Build up a full hdfs://localhost:8020/accumulo/tables/$id/c-XXXXXXX + return tableDir + "/" + tableInfo.tableId + "/" + tabletDir; + } + + @Override + public void undo(long tid, Master environment) throws Exception { + MetadataTableUtil.deleteTable(tableInfo.tableId, false, environment, environment.getMasterLock()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateZookeeper.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateZookeeper.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateZookeeper.java new file mode 100644 index 0000000..8ec8834 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateZookeeper.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.tables.TableManager; +import org.apache.accumulo.server.util.TablePropUtil; + +class PopulateZookeeper extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private TableInfo tableInfo; + + PopulateZookeeper(TableInfo ti) { + this.tableInfo = ti; + } + + @Override + public long isReady(long tid, Master environment) throws Exception { + return Utils.reserveTable(tableInfo.tableId, tid, true, false, TableOperation.CREATE); + } + + @Override + public Repo call(long tid, Master master) throws Exception { + // reserve the table name in zookeeper or fail + + Utils.tableNameLock.lock(); + try { + // write tableName & tableId to zookeeper + Instance instance = master.getInstance(); + + Utils.checkTableDoesNotExist(instance, tableInfo.tableName, tableInfo.tableId, TableOperation.CREATE); + + TableManager.getInstance().addTable(tableInfo.tableId, tableInfo.namespaceId, tableInfo.tableName, NodeExistsPolicy.OVERWRITE); + + for (Entry entry : tableInfo.props.entrySet()) + TablePropUtil.setTableProperty(tableInfo.tableId, entry.getKey(), entry.getValue()); + + Tables.clearCache(instance); + return new ChooseDir(tableInfo); + } finally { + Utils.tableNameLock.unlock(); + } + + } + + @Override + public void undo(long tid, Master master) throws Exception { + Instance instance = master.getInstance(); + TableManager.getInstance().removeTable(tableInfo.tableId); + Utils.unreserveTable(tableInfo.tableId, tid, true); + Tables.clearCache(instance); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateZookeeperWithNamespace.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateZookeeperWithNamespace.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateZookeeperWithNamespace.java new file mode 100644 index 0000000..bf101ae --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/PopulateZookeeperWithNamespace.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.tables.TableManager; +import org.apache.accumulo.server.util.NamespacePropUtil; + +class PopulateZookeeperWithNamespace extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private NamespaceInfo namespaceInfo; + + PopulateZookeeperWithNamespace(NamespaceInfo ti) { + this.namespaceInfo = ti; + } + + @Override + public long isReady(long id, Master environment) throws Exception { + return Utils.reserveNamespace(namespaceInfo.namespaceId, id, true, false, TableOperation.CREATE); + } + + @Override + public Repo call(long tid, Master master) throws Exception { + + Utils.tableNameLock.lock(); + try { + Instance instance = master.getInstance(); + + Utils.checkNamespaceDoesNotExist(instance, namespaceInfo.namespaceName, namespaceInfo.namespaceId, TableOperation.CREATE); + + TableManager.prepareNewNamespaceState(instance.getInstanceID(), namespaceInfo.namespaceId, namespaceInfo.namespaceName, NodeExistsPolicy.OVERWRITE); + + for (Entry entry : namespaceInfo.props.entrySet()) + NamespacePropUtil.setNamespaceProperty(namespaceInfo.namespaceId, entry.getKey(), entry.getValue()); + + Tables.clearCache(instance); + + return new FinishCreateNamespace(namespaceInfo); + } finally { + Utils.tableNameLock.unlock(); + } + } + + @Override + public void undo(long tid, Master master) throws Exception { + TableManager.getInstance().removeNamespace(namespaceInfo.namespaceId); + Tables.clearCache(master.getInstance()); + Utils.unreserveNamespace(namespaceInfo.namespaceId, tid, true); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/SetupNamespacePermissions.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/SetupNamespacePermissions.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/SetupNamespacePermissions.java new file mode 100644 index 0000000..ace3935 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/SetupNamespacePermissions.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.security.NamespacePermission; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SecurityOperation; +import org.slf4j.LoggerFactory; + +class SetupNamespacePermissions extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private NamespaceInfo namespaceInfo; + + public SetupNamespacePermissions(NamespaceInfo ti) { + this.namespaceInfo = ti; + } + + @Override + public Repo call(long tid, Master env) throws Exception { + // give all namespace permissions to the creator + SecurityOperation security = AuditedSecurityOperation.getInstance(env); + for (NamespacePermission permission : NamespacePermission.values()) { + try { + security.grantNamespacePermission(env.rpcCreds(), namespaceInfo.user, namespaceInfo.namespaceId, permission); + } catch (ThriftSecurityException e) { + LoggerFactory.getLogger(FinishCreateNamespace.class).error("{}", e.getMessage(), e); + throw e; + } + } + + // setup permissions in zookeeper before table info in zookeeper + // this way concurrent users will not get a spurious permission denied + // error + return new PopulateZookeeperWithNamespace(namespaceInfo); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/SetupPermissions.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/SetupPermissions.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/SetupPermissions.java new file mode 100644 index 0000000..fd3b7da --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/SetupPermissions.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SecurityOperation; +import org.slf4j.LoggerFactory; + +class SetupPermissions extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private TableInfo tableInfo; + + public SetupPermissions(TableInfo ti) { + this.tableInfo = ti; + } + + @Override + public Repo call(long tid, Master env) throws Exception { + // give all table permissions to the creator + SecurityOperation security = AuditedSecurityOperation.getInstance(env); + if (!tableInfo.user.equals(env.getCredentials().getPrincipal())) { + for (TablePermission permission : TablePermission.values()) { + try { + security.grantTablePermission(env.rpcCreds(), tableInfo.user, tableInfo.tableId, permission, tableInfo.namespaceId); + } catch (ThriftSecurityException e) { + LoggerFactory.getLogger(FinishCreateTable.class).error("{}", e.getMessage(), e); + throw e; + } + } + } + + // setup permissions in zookeeper before table info in zookeeper + // this way concurrent users will not get a spurious permission denied + // error + return new PopulateZookeeper(tableInfo); + } + + @Override + public void undo(long tid, Master env) throws Exception { + AuditedSecurityOperation.getInstance(env).deleteTable(env.rpcCreds(), tableInfo.tableId, tableInfo.namespaceId); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableInfo.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableInfo.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableInfo.java new file mode 100644 index 0000000..e2057d1 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableInfo.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import java.io.Serializable; +import java.util.Map; + +class TableInfo implements Serializable { + + private static final long serialVersionUID = 1L; + + String tableName; + String tableId; + String namespaceId; + char timeType; + String user; + + public Map props; + + public String dir = null; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java index a9a923b..1d8b116 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java @@ -30,51 +30,6 @@ import org.apache.accumulo.server.master.state.MergeInfo.Operation; import org.apache.accumulo.server.master.state.MergeState; import org.apache.hadoop.io.Text; -/** - * Merge makes things hard. - * - * Typically, a client will read the list of tablets, and begin an operation on that tablet at the location listed in the metadata table. When a tablet splits, - * the information read from the metadata table doesn't match reality, so the operation fails, and must be retried. But the operation will take place either on - * the parent, or at a later time on the children. It won't take place on just half of the tablet. - * - * However, when a merge occurs, the operation may have succeeded on one section of the merged area, and not on the others, when the merge occurs. There is no - * way to retry the request at a later time on an unmodified tablet. - * - * The code below uses read-write lock to prevent some operations while a merge is taking place. Normal operations, like bulk imports, will grab the read lock - * and prevent merges (writes) while they run. Merge operations will lock out some operations while they run. - */ -class TableRangeOpWait extends MasterRepo { - - private static final long serialVersionUID = 1L; - private String tableId; - - public TableRangeOpWait(String tableId) { - this.tableId = tableId; - } - - @Override - public long isReady(long tid, Master env) throws Exception { - Text tableIdText = new Text(tableId); - if (!env.getMergeInfo(tableIdText).getState().equals(MergeState.NONE)) { - return 50; - } - return 0; - } - - @Override - public Repo call(long tid, Master master) throws Exception { - String namespaceId = Tables.getNamespaceId(master.getInstance(), tableId); - Text tableIdText = new Text(tableId); - MergeInfo mergeInfo = master.getMergeInfo(tableIdText); - log.info("removing merge information " + mergeInfo); - master.clearMergeState(tableIdText); - Utils.unreserveNamespace(namespaceId, tid, false); - Utils.unreserveTable(tableId, tid, true); - return null; - } - -} - public class TableRangeOp extends MasterRepo { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java new file mode 100644 index 0000000..bdab469 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.master.state.MergeInfo; +import org.apache.accumulo.server.master.state.MergeState; +import org.apache.hadoop.io.Text; + +/** + * Merge makes things hard. + * + * Typically, a client will read the list of tablets, and begin an operation on that tablet at the location listed in the metadata table. When a tablet splits, + * the information read from the metadata table doesn't match reality, so the operation fails, and must be retried. But the operation will take place either on + * the parent, or at a later time on the children. It won't take place on just half of the tablet. + * + * However, when a merge occurs, the operation may have succeeded on one section of the merged area, and not on the others, when the merge occurs. There is no + * way to retry the request at a later time on an unmodified tablet. + * + * The code below uses read-write lock to prevent some operations while a merge is taking place. Normal operations, like bulk imports, will grab the read lock + * and prevent merges (writes) while they run. Merge operations will lock out some operations while they run. + */ +class TableRangeOpWait extends MasterRepo { + + private static final long serialVersionUID = 1L; + private String tableId; + + public TableRangeOpWait(String tableId) { + this.tableId = tableId; + } + + @Override + public long isReady(long tid, Master env) throws Exception { + Text tableIdText = new Text(tableId); + if (!env.getMergeInfo(tableIdText).getState().equals(MergeState.NONE)) { + return 50; + } + return 0; + } + + @Override + public Repo call(long tid, Master master) throws Exception { + String namespaceId = Tables.getNamespaceId(master.getInstance(), tableId); + Text tableIdText = new Text(tableId); + MergeInfo mergeInfo = master.getMergeInfo(tableIdText); + log.info("removing merge information " + mergeInfo); + master.clearMergeState(tableIdText); + Utils.unreserveNamespace(namespaceId, tid, false); + Utils.unreserveTable(tableId, tid, true); + return null; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/WriteExportFiles.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/WriteExportFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/WriteExportFiles.java new file mode 100644 index 0000000..ca31d48 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/WriteExportFiles.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.BufferedOutputStream; +import java.io.BufferedWriter; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +class WriteExportFiles extends MasterRepo { + + private static final long serialVersionUID = 1L; + private final ExportInfo tableInfo; + + WriteExportFiles(ExportInfo tableInfo) { + this.tableInfo = tableInfo; + } + + private void checkOffline(Connector conn) throws Exception { + if (Tables.getTableState(conn.getInstance(), tableInfo.tableID) != TableState.OFFLINE) { + Tables.clearCache(conn.getInstance()); + if (Tables.getTableState(conn.getInstance(), tableInfo.tableID) != TableState.OFFLINE) { + throw new ThriftTableOperationException(tableInfo.tableID, tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER, + "Table is not offline"); + } + } + } + + @Override + public long isReady(long tid, Master master) throws Exception { + + long reserved = Utils.reserveNamespace(tableInfo.namespaceID, tid, false, true, TableOperation.EXPORT) + + Utils.reserveTable(tableInfo.tableID, tid, false, true, TableOperation.EXPORT); + if (reserved > 0) + return reserved; + + Connector conn = master.getConnector(); + + checkOffline(conn); + + Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + metaScanner.setRange(new KeyExtent(new Text(tableInfo.tableID), null, null).toMetadataRange()); + + // scan for locations + metaScanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); + metaScanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME); + + if (metaScanner.iterator().hasNext()) { + return 500; + } + + // use the same range to check for walogs that we used to check for hosted (or future hosted) tablets + // this is done as a separate scan after we check for locations, because walogs are okay only if there is no location + metaScanner.clearColumns(); + metaScanner.fetchColumnFamily(LogColumnFamily.NAME); + + if (metaScanner.iterator().hasNext()) { + throw new ThriftTableOperationException(tableInfo.tableID, tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER, + "Write ahead logs found for table"); + } + + return 0; + } + + @Override + public Repo call(long tid, Master master) throws Exception { + try { + exportTable(master.getFileSystem(), master, tableInfo.tableName, tableInfo.tableID, tableInfo.exportDir); + } catch (IOException ioe) { + throw new ThriftTableOperationException(tableInfo.tableID, tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER, + "Failed to create export files " + ioe.getMessage()); + } + Utils.unreserveNamespace(tableInfo.namespaceID, tid, false); + Utils.unreserveTable(tableInfo.tableID, tid, false); + Utils.unreserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid); + return null; + } + + @Override + public void undo(long tid, Master env) throws Exception { + Utils.unreserveNamespace(tableInfo.namespaceID, tid, false); + Utils.unreserveTable(tableInfo.tableID, tid, false); + } + + public static void exportTable(VolumeManager fs, AccumuloServerContext context, String tableName, String tableID, String exportDir) throws Exception { + + fs.mkdirs(new Path(exportDir)); + Path exportMetaFilePath = fs.getVolumeByPath(new Path(exportDir)).getFileSystem().makeQualified(new Path(exportDir, Constants.EXPORT_FILE)); + + FSDataOutputStream fileOut = fs.create(exportMetaFilePath, false); + ZipOutputStream zipOut = new ZipOutputStream(fileOut); + BufferedOutputStream bufOut = new BufferedOutputStream(zipOut); + DataOutputStream dataOut = new DataOutputStream(bufOut); + + try { + + zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_INFO_FILE)); + OutputStreamWriter osw = new OutputStreamWriter(dataOut, UTF_8); + osw.append(ExportTable.EXPORT_VERSION_PROP + ":" + ExportTable.VERSION + "\n"); + osw.append("srcInstanceName:" + context.getInstance().getInstanceName() + "\n"); + osw.append("srcInstanceID:" + context.getInstance().getInstanceID() + "\n"); + osw.append("srcZookeepers:" + context.getInstance().getZooKeepers() + "\n"); + osw.append("srcTableName:" + tableName + "\n"); + osw.append("srcTableID:" + tableID + "\n"); + osw.append(ExportTable.DATA_VERSION_PROP + ":" + ServerConstants.DATA_VERSION + "\n"); + osw.append("srcCodeVersion:" + Constants.VERSION + "\n"); + + osw.flush(); + dataOut.flush(); + + exportConfig(context, tableID, zipOut, dataOut); + dataOut.flush(); + + Map uniqueFiles = exportMetadata(fs, context, tableID, zipOut, dataOut); + + dataOut.close(); + dataOut = null; + + createDistcpFile(fs, exportDir, exportMetaFilePath, uniqueFiles); + + } finally { + if (dataOut != null) + dataOut.close(); + } + } + + private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath, Map uniqueFiles) throws IOException { + BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(exportDir, "distcp.txt"), false), UTF_8)); + + try { + for (String file : uniqueFiles.values()) { + distcpOut.append(file); + distcpOut.newLine(); + } + + distcpOut.append(exportMetaFilePath.toString()); + distcpOut.newLine(); + + distcpOut.close(); + distcpOut = null; + + } finally { + if (distcpOut != null) + distcpOut.close(); + } + } + + private static Map exportMetadata(VolumeManager fs, AccumuloServerContext context, String tableID, ZipOutputStream zipOut, + DataOutputStream dataOut) throws IOException, TableNotFoundException, AccumuloException, AccumuloSecurityException { + zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_METADATA_FILE)); + + Map uniqueFiles = new HashMap(); + + Scanner metaScanner = context.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY); + metaScanner.fetchColumnFamily(DataFileColumnFamily.NAME); + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(metaScanner); + TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(metaScanner); + metaScanner.setRange(new KeyExtent(new Text(tableID), null, null).toMetadataRange()); + + for (Entry entry : metaScanner) { + entry.getKey().write(dataOut); + entry.getValue().write(dataOut); + + if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) { + String path = fs.getFullPath(entry.getKey()).toString(); + String tokens[] = path.split("/"); + if (tokens.length < 1) { + throw new RuntimeException("Illegal path " + path); + } + + String filename = tokens[tokens.length - 1]; + + String existingPath = uniqueFiles.get(filename); + if (existingPath == null) { + uniqueFiles.put(filename, path); + } else if (!existingPath.equals(path)) { + // make sure file names are unique, should only apply for tables with file names generated by Accumulo 1.3 and earlier + throw new IOException("Cannot export table with nonunique file names " + filename + ". Major compact table."); + } + + } + } + return uniqueFiles; + } + + private static void exportConfig(AccumuloServerContext context, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut) throws AccumuloException, + AccumuloSecurityException, TableNotFoundException, IOException { + Connector conn = context.getConnector(); + + DefaultConfiguration defaultConfig = AccumuloConfiguration.getDefaultConfiguration(); + Map siteConfig = conn.instanceOperations().getSiteConfiguration(); + Map systemConfig = conn.instanceOperations().getSystemConfiguration(); + + TableConfiguration tableConfig = context.getServerConfigurationFactory().getTableConfiguration(tableID); + + OutputStreamWriter osw = new OutputStreamWriter(dataOut, UTF_8); + + // only put props that are different than defaults and higher level configurations + zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_TABLE_CONFIG_FILE)); + for (Entry prop : tableConfig) { + if (prop.getKey().startsWith(Property.TABLE_PREFIX.getKey())) { + Property key = Property.getPropertyByKey(prop.getKey()); + + if (key == null || !defaultConfig.get(key).equals(prop.getValue())) { + if (!prop.getValue().equals(siteConfig.get(prop.getKey())) && !prop.getValue().equals(systemConfig.get(prop.getKey()))) { + osw.append(prop.getKey() + "=" + prop.getValue() + "\n"); + } + } + } + } + + osw.flush(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java index 80feb47..2d3a0a1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java @@ -17,11 +17,8 @@ package org.apache.accumulo.tserver; import java.io.IOException; -import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -50,7 +47,6 @@ import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.file.rfile.RFileOperations; import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.SkippingIterator; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.SortedMapIterator; import org.apache.accumulo.core.iterators.WrappingIterator; @@ -72,121 +68,6 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class MemKeyComparator implements Comparator, Serializable { - - private static final long serialVersionUID = 1L; - - @Override - public int compare(Key k1, Key k2) { - int cmp = k1.compareTo(k2); - - if (cmp == 0) { - if (k1 instanceof MemKey) - if (k2 instanceof MemKey) - cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount; - else - cmp = 1; - else if (k2 instanceof MemKey) - cmp = -1; - } - - return cmp; - } -} - -class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator { - - private int kvCount; - - public PartialMutationSkippingIterator(SortedKeyValueIterator source, int maxKVCount) { - setSource(source); - this.kvCount = maxKVCount; - } - - @Override - protected void consume() throws IOException { - while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount) - getSource().next(); - } - - @Override - public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { - return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount); - } - - @Override - public void setInterruptFlag(AtomicBoolean flag) { - ((InterruptibleIterator) getSource()).setInterruptFlag(flag); - } - -} - -class MemKeyConversionIterator extends WrappingIterator implements InterruptibleIterator { - private MemKey currKey = null; - private Value currVal = null; - - public MemKeyConversionIterator(SortedKeyValueIterator source) { - super(); - setSource(source); - } - - @Override - public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { - return new MemKeyConversionIterator(getSource().deepCopy(env)); - } - - @Override - public Key getTopKey() { - return currKey; - } - - @Override - public Value getTopValue() { - return currVal; - } - - private void getTopKeyVal() { - Key k = super.getTopKey(); - Value v = super.getTopValue(); - if (k instanceof MemKey || k == null) { - currKey = (MemKey) k; - currVal = v; - return; - } - currVal = new Value(v); - int mc = MemValue.splitKVCount(currVal); - currKey = new MemKey(k, mc); - - } - - @Override - public void next() throws IOException { - super.next(); - if (hasTop()) - getTopKeyVal(); - } - - @Override - public void seek(Range range, Collection columnFamilies, boolean inclusive) throws IOException { - super.seek(range, columnFamilies, inclusive); - - if (hasTop()) - getTopKeyVal(); - - Key k = range.getStartKey(); - if (k instanceof MemKey && hasTop()) { - while (hasTop() && currKey.compareTo(k) < 0) - next(); - } - } - - @Override - public void setInterruptFlag(AtomicBoolean flag) { - ((InterruptibleIterator) getSource()).setInterruptFlag(flag); - } - -} - public class InMemoryMap { private SimpleMap map = null; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyComparator.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyComparator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyComparator.java new file mode 100644 index 0000000..6c8b0f3 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyComparator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver; + +import java.io.Serializable; +import java.util.Comparator; + +import org.apache.accumulo.core.data.Key; + +class MemKeyComparator implements Comparator, Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public int compare(Key k1, Key k2) { + int cmp = k1.compareTo(k2); + + if (cmp == 0) { + if (k1 instanceof MemKey) + if (k2 instanceof MemKey) + cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount; + else + cmp = 1; + else if (k2 instanceof MemKey) + cmp = -1; + } + + return cmp; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyConversionIterator.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyConversionIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyConversionIterator.java new file mode 100644 index 0000000..891a0ba --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKeyConversionIterator.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.iterators.system.InterruptibleIterator; + +class MemKeyConversionIterator extends WrappingIterator implements InterruptibleIterator { + private MemKey currKey = null; + private Value currVal = null; + + public MemKeyConversionIterator(SortedKeyValueIterator source) { + super(); + setSource(source); + } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + return new MemKeyConversionIterator(getSource().deepCopy(env)); + } + + @Override + public Key getTopKey() { + return currKey; + } + + @Override + public Value getTopValue() { + return currVal; + } + + private void getTopKeyVal() { + Key k = super.getTopKey(); + Value v = super.getTopValue(); + if (k instanceof MemKey || k == null) { + currKey = (MemKey) k; + currVal = v; + return; + } + currVal = new Value(v); + int mc = MemValue.splitKVCount(currVal); + currKey = new MemKey(k, mc); + + } + + @Override + public void next() throws IOException { + super.next(); + if (hasTop()) + getTopKeyVal(); + } + + @Override + public void seek(Range range, Collection columnFamilies, boolean inclusive) throws IOException { + super.seek(range, columnFamilies, inclusive); + + if (hasTop()) + getTopKeyVal(); + + Key k = range.getStartKey(); + if (k instanceof MemKey && hasTop()) { + while (hasTop() && currKey.compareTo(k) < 0) + next(); + } + } + + @Override + public void setInterruptFlag(AtomicBoolean flag) { + ((InterruptibleIterator) getSource()).setInterruptFlag(flag); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/tserver/src/main/java/org/apache/accumulo/tserver/PartialMutationSkippingIterator.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/PartialMutationSkippingIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/PartialMutationSkippingIterator.java new file mode 100644 index 0000000..8e2f113 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/PartialMutationSkippingIterator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SkippingIterator; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.system.InterruptibleIterator; + +class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator { + + private int kvCount; + + public PartialMutationSkippingIterator(SortedKeyValueIterator source, int maxKVCount) { + setSource(source); + this.kvCount = maxKVCount; + } + + @Override + protected void consume() throws IOException { + while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount) + getSource().next(); + } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount); + } + + @Override + public void setInterruptFlag(AtomicBoolean flag) { + ((InterruptibleIterator) getSource()).setInterruptFlag(flag); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/test/src/main/java/org/apache/accumulo/test/EstimateInMemMapOverhead.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/EstimateInMemMapOverhead.java b/test/src/main/java/org/apache/accumulo/test/EstimateInMemMapOverhead.java index 668b9cc..fb3c8a0 100644 --- a/test/src/main/java/org/apache/accumulo/test/EstimateInMemMapOverhead.java +++ b/test/src/main/java/org/apache/accumulo/test/EstimateInMemMapOverhead.java @@ -16,323 +16,6 @@ */ package org.apache.accumulo.test; -import java.util.Collections; -import java.util.TreeMap; - -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.tserver.InMemoryMap; -import org.apache.hadoop.io.Text; - -abstract class MemoryUsageTest { - abstract void addEntry(int i); - - abstract int getEstimatedBytesPerEntry(); - - abstract void clear(); - - abstract int getNumPasses(); - - abstract String getName(); - - abstract void init(); - - public void run() { - System.gc(); - long usedMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); - int count = 0; - while (usedMem > 1024 * 1024 && count < 10) { - System.gc(); - usedMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); - count++; - } - - init(); - - for (int i = 0; i < getNumPasses(); i++) { - addEntry(i); - } - - System.gc(); - - long memSize = (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) - usedMem; - - double actualBytesPerEntry = memSize / (double) getNumPasses(); - double expectedBytesPerEntry = getEstimatedBytesPerEntry(); - double diff = actualBytesPerEntry - expectedBytesPerEntry; - double ratio = actualBytesPerEntry / expectedBytesPerEntry * 100; - - System.out.printf("%30s | %,10d | %6.2fGB | %6.2f | %6.2f | %6.2f | %6.2f%s%n", getName(), getNumPasses(), memSize / (1024 * 1024 * 1024.0), - actualBytesPerEntry, expectedBytesPerEntry, diff, ratio, "%"); - - clear(); - - } - -} - -class TextMemoryUsageTest extends MemoryUsageTest { - - private int keyLen; - private int colFamLen; - private int colQualLen; - private int dataLen; - private TreeMap map; - private int passes; - - TextMemoryUsageTest(int passes, int keyLen, int colFamLen, int colQualLen, int dataLen) { - this.keyLen = keyLen; - this.colFamLen = colFamLen; - this.colQualLen = colQualLen; - this.dataLen = dataLen; - this.passes = passes; - - } - - @Override - void init() { - map = new TreeMap(); - } - - @Override - public void addEntry(int i) { - Text key = new Text(String.format("%0" + keyLen + "d:%0" + colFamLen + "d:%0" + colQualLen + "d", i, 0, 0).getBytes()); - // - byte data[] = new byte[dataLen]; - for (int j = 0; j < data.length; j++) { - data[j] = (byte) (j % 10 + 65); - } - Value value = new Value(data); - - map.put(key, value); - - } - - @Override - public void clear() { - map.clear(); - map = null; - } - - @Override - public int getEstimatedBytesPerEntry() { - return keyLen + colFamLen + colQualLen + dataLen; - } - - @Override - int getNumPasses() { - return passes; - } - - @Override - String getName() { - return "Text " + keyLen + " " + colFamLen + " " + colQualLen + " " + dataLen; - } - -} - -class InMemoryMapMemoryUsageTest extends MemoryUsageTest { - - private int keyLen; - private int colFamLen; - private int colQualLen; - private int colVisLen; - private int dataLen; - - private InMemoryMap imm; - private Text key; - private Text colf; - private Text colq; - private ColumnVisibility colv; - private int passes; - - InMemoryMapMemoryUsageTest(int passes, int keyLen, int colFamLen, int colQualLen, int colVisLen, int dataLen) { - this.keyLen = keyLen; - this.colFamLen = colFamLen; - this.colQualLen = colQualLen; - this.dataLen = dataLen; - this.passes = passes; - this.colVisLen = colVisLen; - - } - - @Override - void init() { - imm = new InMemoryMap(false, "/tmp"); - key = new Text(); - - colf = new Text(String.format("%0" + colFamLen + "d", 0)); - colq = new Text(String.format("%0" + colQualLen + "d", 0)); - colv = new ColumnVisibility(String.format("%0" + colVisLen + "d", 0)); - } - - @Override - public void addEntry(int i) { - key.set(String.format("%0" + keyLen + "d", i)); - - Mutation m = new Mutation(key); - - byte data[] = new byte[dataLen]; - for (int j = 0; j < data.length; j++) { - data[j] = (byte) (j % 10 + 65); - } - Value idata = new Value(data); - - m.put(colf, colq, colv, idata); - - imm.mutate(Collections.singletonList(m)); - - } - - @Override - public int getEstimatedBytesPerEntry() { - return keyLen + colFamLen + colQualLen + dataLen + 4 + colVisLen; - } - - @Override - public void clear() { - imm = null; - key = null; - colf = null; - colq = null; - } - - @Override - int getNumPasses() { - return passes; - } - - @Override - String getName() { - return "IMM " + keyLen + " " + colFamLen + " " + colQualLen + " " + dataLen; - } -} - -class MutationMemoryUsageTest extends MemoryUsageTest { - - private int keyLen; - private int colFamLen; - private int colQualLen; - private int dataLen; - - private Mutation[] mutations; - private Text key; - private Text colf; - private Text colq; - private int passes; - - MutationMemoryUsageTest(int passes, int keyLen, int colFamLen, int colQualLen, int dataLen) { - this.keyLen = keyLen; - this.colFamLen = colFamLen; - this.colQualLen = colQualLen; - this.dataLen = dataLen; - this.passes = passes; - mutations = new Mutation[passes]; - - } - - @Override - void init() { - key = new Text(); - - colf = new Text(String.format("%0" + colFamLen + "d", 0)); - colq = new Text(String.format("%0" + colQualLen + "d", 0)); - - byte data[] = new byte[dataLen]; - for (int i = 0; i < data.length; i++) { - data[i] = (byte) (i % 10 + 65); - } - } - - @Override - public void addEntry(int i) { - key.set(String.format("%0" + keyLen + "d", i)); - - Mutation m = new Mutation(key); - - byte data[] = new byte[dataLen]; - for (int j = 0; j < data.length; j++) { - data[j] = (byte) (j % 10 + 65); - } - Value idata = new Value(data); - - m.put(colf, colq, idata); - - mutations[i] = m; - } - - @Override - public int getEstimatedBytesPerEntry() { - return keyLen + colFamLen + colQualLen + dataLen; - } - - @Override - public void clear() { - key = null; - colf = null; - colq = null; - mutations = null; - } - - @Override - int getNumPasses() { - return passes; - } - - @Override - String getName() { - return "Mutation " + keyLen + " " + colFamLen + " " + colQualLen + " " + dataLen; - } -} - -class IntObjectMemoryUsageTest extends MemoryUsageTest { - - private int passes; - private Object data[]; - - static class SimpleObject { - int d; - - SimpleObject(int d) { - this.d = d; - } - } - - IntObjectMemoryUsageTest(int numPasses) { - this.passes = numPasses; - } - - @Override - void init() { - data = new Object[passes]; - } - - @Override - void addEntry(int i) { - data[i] = new SimpleObject(i); - - } - - @Override - void clear() {} - - @Override - int getEstimatedBytesPerEntry() { - return 4; - } - - @Override - String getName() { - return "int obj"; - } - - @Override - int getNumPasses() { - return passes; - } - -} public class EstimateInMemMapOverhead { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/test/src/main/java/org/apache/accumulo/test/InMemoryMapMemoryUsageTest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/InMemoryMapMemoryUsageTest.java b/test/src/main/java/org/apache/accumulo/test/InMemoryMapMemoryUsageTest.java new file mode 100644 index 0000000..f325524 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/InMemoryMapMemoryUsageTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import java.util.Collections; + +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.tserver.InMemoryMap; +import org.apache.hadoop.io.Text; + +class InMemoryMapMemoryUsageTest extends MemoryUsageTest { + + private int keyLen; + private int colFamLen; + private int colQualLen; + private int colVisLen; + private int dataLen; + + private InMemoryMap imm; + private Text key; + private Text colf; + private Text colq; + private ColumnVisibility colv; + private int passes; + + InMemoryMapMemoryUsageTest(int passes, int keyLen, int colFamLen, int colQualLen, int colVisLen, int dataLen) { + this.keyLen = keyLen; + this.colFamLen = colFamLen; + this.colQualLen = colQualLen; + this.dataLen = dataLen; + this.passes = passes; + this.colVisLen = colVisLen; + + } + + @Override + void init() { + imm = new InMemoryMap(false, "/tmp"); + key = new Text(); + + colf = new Text(String.format("%0" + colFamLen + "d", 0)); + colq = new Text(String.format("%0" + colQualLen + "d", 0)); + colv = new ColumnVisibility(String.format("%0" + colVisLen + "d", 0)); + } + + @Override + public void addEntry(int i) { + key.set(String.format("%0" + keyLen + "d", i)); + + Mutation m = new Mutation(key); + + byte data[] = new byte[dataLen]; + for (int j = 0; j < data.length; j++) { + data[j] = (byte) (j % 10 + 65); + } + Value idata = new Value(data); + + m.put(colf, colq, colv, idata); + + imm.mutate(Collections.singletonList(m)); + + } + + @Override + public int getEstimatedBytesPerEntry() { + return keyLen + colFamLen + colQualLen + dataLen + 4 + colVisLen; + } + + @Override + public void clear() { + imm = null; + key = null; + colf = null; + colq = null; + } + + @Override + int getNumPasses() { + return passes; + } + + @Override + String getName() { + return "IMM " + keyLen + " " + colFamLen + " " + colQualLen + " " + dataLen; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/test/src/main/java/org/apache/accumulo/test/IntObjectMemoryUsageTest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/IntObjectMemoryUsageTest.java b/test/src/main/java/org/apache/accumulo/test/IntObjectMemoryUsageTest.java new file mode 100644 index 0000000..d83421a --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/IntObjectMemoryUsageTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +class IntObjectMemoryUsageTest extends MemoryUsageTest { + + private int passes; + private Object data[]; + + static class SimpleObject { + int d; + + SimpleObject(int d) { + this.d = d; + } + } + + IntObjectMemoryUsageTest(int numPasses) { + this.passes = numPasses; + } + + @Override + void init() { + data = new Object[passes]; + } + + @Override + void addEntry(int i) { + data[i] = new SimpleObject(i); + + } + + @Override + void clear() {} + + @Override + int getEstimatedBytesPerEntry() { + return 4; + } + + @Override + String getName() { + return "int obj"; + } + + @Override + int getNumPasses() { + return passes; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/test/src/main/java/org/apache/accumulo/test/MemoryUsageTest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/MemoryUsageTest.java b/test/src/main/java/org/apache/accumulo/test/MemoryUsageTest.java new file mode 100644 index 0000000..39e8d68 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/MemoryUsageTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +abstract class MemoryUsageTest { + abstract void addEntry(int i); + + abstract int getEstimatedBytesPerEntry(); + + abstract void clear(); + + abstract int getNumPasses(); + + abstract String getName(); + + abstract void init(); + + public void run() { + System.gc(); + long usedMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + int count = 0; + while (usedMem > 1024 * 1024 && count < 10) { + System.gc(); + usedMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + count++; + } + + init(); + + for (int i = 0; i < getNumPasses(); i++) { + addEntry(i); + } + + System.gc(); + + long memSize = (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) - usedMem; + + double actualBytesPerEntry = memSize / (double) getNumPasses(); + double expectedBytesPerEntry = getEstimatedBytesPerEntry(); + double diff = actualBytesPerEntry - expectedBytesPerEntry; + double ratio = actualBytesPerEntry / expectedBytesPerEntry * 100; + + System.out.printf("%30s | %,10d | %6.2fGB | %6.2f | %6.2f | %6.2f | %6.2f%s%n", getName(), getNumPasses(), memSize / (1024 * 1024 * 1024.0), + actualBytesPerEntry, expectedBytesPerEntry, diff, ratio, "%"); + + clear(); + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/test/src/main/java/org/apache/accumulo/test/MutationMemoryUsageTest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/MutationMemoryUsageTest.java b/test/src/main/java/org/apache/accumulo/test/MutationMemoryUsageTest.java new file mode 100644 index 0000000..011fbfe --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/MutationMemoryUsageTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; + +class MutationMemoryUsageTest extends MemoryUsageTest { + + private int keyLen; + private int colFamLen; + private int colQualLen; + private int dataLen; + + private Mutation[] mutations; + private Text key; + private Text colf; + private Text colq; + private int passes; + + MutationMemoryUsageTest(int passes, int keyLen, int colFamLen, int colQualLen, int dataLen) { + this.keyLen = keyLen; + this.colFamLen = colFamLen; + this.colQualLen = colQualLen; + this.dataLen = dataLen; + this.passes = passes; + mutations = new Mutation[passes]; + + } + + @Override + void init() { + key = new Text(); + + colf = new Text(String.format("%0" + colFamLen + "d", 0)); + colq = new Text(String.format("%0" + colQualLen + "d", 0)); + + byte data[] = new byte[dataLen]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) (i % 10 + 65); + } + } + + @Override + public void addEntry(int i) { + key.set(String.format("%0" + keyLen + "d", i)); + + Mutation m = new Mutation(key); + + byte data[] = new byte[dataLen]; + for (int j = 0; j < data.length; j++) { + data[j] = (byte) (j % 10 + 65); + } + Value idata = new Value(data); + + m.put(colf, colq, idata); + + mutations[i] = m; + } + + @Override + public int getEstimatedBytesPerEntry() { + return keyLen + colFamLen + colQualLen + dataLen; + } + + @Override + public void clear() { + key = null; + colf = null; + colq = null; + mutations = null; + } + + @Override + int getNumPasses() { + return passes; + } + + @Override + String getName() { + return "Mutation " + keyLen + " " + colFamLen + " " + colQualLen + " " + dataLen; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/test/src/main/java/org/apache/accumulo/test/TextMemoryUsageTest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/TextMemoryUsageTest.java b/test/src/main/java/org/apache/accumulo/test/TextMemoryUsageTest.java new file mode 100644 index 0000000..14b8184 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/TextMemoryUsageTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import java.util.TreeMap; + +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; + +class TextMemoryUsageTest extends MemoryUsageTest { + + private int keyLen; + private int colFamLen; + private int colQualLen; + private int dataLen; + private TreeMap map; + private int passes; + + TextMemoryUsageTest(int passes, int keyLen, int colFamLen, int colQualLen, int dataLen) { + this.keyLen = keyLen; + this.colFamLen = colFamLen; + this.colQualLen = colQualLen; + this.dataLen = dataLen; + this.passes = passes; + + } + + @Override + void init() { + map = new TreeMap(); + } + + @Override + public void addEntry(int i) { + Text key = new Text(String.format("%0" + keyLen + "d:%0" + colFamLen + "d:%0" + colQualLen + "d", i, 0, 0).getBytes()); + // + byte data[] = new byte[dataLen]; + for (int j = 0; j < data.length; j++) { + data[j] = (byte) (j % 10 + 65); + } + Value value = new Value(data); + + map.put(key, value); + + } + + @Override + public void clear() { + map.clear(); + map = null; + } + + @Override + public int getEstimatedBytesPerEntry() { + return keyLen + colFamLen + colQualLen + dataLen; + } + + @Override + int getNumPasses() { + return passes; + } + + @Override + String getName() { + return "Text " + keyLen + " " + colFamLen + " " + colQualLen + " " + dataLen; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/test/src/main/java/org/apache/accumulo/test/continuous/HistData.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/HistData.java b/test/src/main/java/org/apache/accumulo/test/continuous/HistData.java new file mode 100644 index 0000000..f53a6a6 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/continuous/HistData.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.continuous; + +import java.io.Serializable; +import java.util.Objects; + +class HistData implements Comparable>, Serializable { + private static final long serialVersionUID = 1L; + + T bin; + long count; + + HistData(T bin) { + this.bin = bin; + count = 0; + } + + @Override + public int hashCode() { + return Objects.hashCode(bin) + Objects.hashCode(count); + } + + @SuppressWarnings("unchecked") + @Override + public boolean equals(Object obj) { + return obj == this || (obj != null && obj instanceof HistData && 0 == compareTo((HistData) obj)); + } + + @SuppressWarnings("unchecked") + @Override + public int compareTo(HistData o) { + return ((Comparable) bin).compareTo(o.bin); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java b/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java index dd17f3d..8dd3c9d 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/Histogram.java @@ -29,39 +29,9 @@ import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.TreeSet; -class HistData implements Comparable>, Serializable { - private static final long serialVersionUID = 1L; - - T bin; - long count; - - HistData(T bin) { - this.bin = bin; - count = 0; - } - - @Override - public int hashCode() { - return Objects.hashCode(bin) + Objects.hashCode(count); - } - - @SuppressWarnings("unchecked") - @Override - public boolean equals(Object obj) { - return obj == this || (obj != null && obj instanceof HistData && 0 == compareTo((HistData) obj)); - } - - @SuppressWarnings("unchecked") - @Override - public int compareTo(HistData o) { - return ((Comparable) bin).compareTo(o.bin); - } -} - public class Histogram implements Serializable { private static final long serialVersionUID = 1L;