From notifications-return-46823-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Thu Dec 20 01:35:21 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id C92C4180670 for ; Thu, 20 Dec 2018 01:35:20 +0100 (CET) Received: (qmail 30479 invoked by uid 500); 20 Dec 2018 00:35:20 -0000 Mailing-List: contact notifications-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@apache.org Delivered-To: mailing list notifications@accumulo.apache.org Received: (qmail 30456 invoked by uid 99); 20 Dec 2018 00:35:19 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Dec 2018 00:35:19 +0000 From: GitBox To: notifications@accumulo.apache.org Subject: [GitHub] milleruntime commented on a change in pull request #837: fixes #800 avoid importing files for completed bulk transactions Message-ID: <154526611941.20244.813742749449400607.gitbox@gitbox.apache.org> Date: Thu, 20 Dec 2018 00:35:19 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit milleruntime commented on a change in pull request #837: fixes #800 avoid importing files for completed bulk transactions URL: https://github.com/apache/accumulo/pull/837#discussion_r243124001 ########## File path: test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java ########## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.TabletLocator; +import org.apache.accumulo.core.client.impl.Translator; +import org.apache.accumulo.core.client.impl.Translators; +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.client.rfile.RFileWriter; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.data.thrift.MapFileInfo; +import org.apache.accumulo.core.data.thrift.TKeyExtent; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.master.tableOps.BulkImport; +import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfigurationFactory; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.thrift.TApplicationException; +import org.apache.thrift.TServiceClient; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +public class BulkFailureIT extends AccumuloClusterHarness { + + /** + * This test verifies two things. First it ensures that after a bulk imported file is compacted + * that import request are ignored. Second it ensures that after the bulk import transaction is + * canceled that import request fail. The public API for bulk import can not be used for this + * test. Internal (non public API) RPCs and Zookeeper state is manipulated directly. This is the + * only way to interleave compactions with multiple, duplicate import RPC request. + */ + @Test + public void testImportCompactionImport() throws Exception { + Connector c = getConnector(); + String table = getUniqueNames(1)[0]; + + SortedMap testData = createTestData(); + + FileSystem fs = getCluster().getFileSystem(); + String testFile = createTestFile(testData, fs); + + c.tableOperations().create(table); + String tableId = c.tableOperations().tableIdMap().get(table); + + // Table has no splits, so this extent corresponds to the tables single tablet + KeyExtent extent = new KeyExtent(tableId.toString(), null, null); + + // Set up site configuration because this test uses server side code that expects it. + setupSiteConfig(); + + long fateTxid = 99999999L; + + AccumuloServerContext asCtx = new AccumuloServerContext( + new ServerConfigurationFactory(HdfsZooInstance.getInstance())); + ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, fateTxid); + + VolumeManager vm = VolumeManagerImpl.get(); + + // move the file into a directory for the table and rename the file to something unique + String bulkDir = BulkImport.prepareBulkImport(asCtx, vm, testFile, tableId); + + // determine the files new name and path + FileStatus status = fs.listStatus(new Path(bulkDir))[0]; + Path bulkLoadPath = fs.makeQualified(status.getPath()); + + // Directly ask the tablet to load the file. + assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen()); + + assertEquals(ImmutableSet.of(bulkLoadPath), getFiles(c, extent)); + assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent)); + assertEquals(testData, readTable(table, c)); + + // Compact the bulk imported file. Subsequent request to load the file should be ignored. + c.tableOperations().compact(table, new CompactionConfig().setWait(true)); + + Set tabletFiles = getFiles(c, extent); + assertFalse(tabletFiles.contains(bulkLoadPath)); + assertEquals(1, tabletFiles.size()); + assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent)); + assertEquals(testData, readTable(table, c)); + + // this request should be ignored by the tablet + assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen()); + + assertEquals(tabletFiles, getFiles(c, extent)); + assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent)); + assertEquals(testData, readTable(table, c)); + + // this is done to ensure the tablet reads the load flags from the metadata table when it loads + c.tableOperations().offline(table, true); + c.tableOperations().online(table, true); + + // this request should be ignored by the tablet + assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen()); + + assertEquals(tabletFiles, getFiles(c, extent)); + assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent)); + assertEquals(testData, readTable(table, c)); + + // After this, all load request should fail. + ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, fateTxid); + + try { + // expect this to fail + assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen()); + fail(); + } catch (TApplicationException tae) { Review comment: Interesting. I thought the ExpectedException worked just like a try/catch, but reported exceptions that didn't get thrown at the end. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services