Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1413018882 for ; Thu, 2 Jul 2015 01:49:08 +0000 (UTC) Received: (qmail 66586 invoked by uid 500); 2 Jul 2015 01:49:07 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 66429 invoked by uid 500); 2 Jul 2015 01:49:07 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 66062 invoked by uid 99); 2 Jul 2015 01:49:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jul 2015 01:49:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D3F27E362F; Thu, 2 Jul 2015 01:49:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sershe@apache.org To: commits@hive.apache.org Date: Thu, 02 Jul 2015 01:49:10 -0000 Message-Id: In-Reply-To: <579eae0695674bed96320b6fec3f99e9@git.apache.org> References: <579eae0695674bed96320b6fec3f99e9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/25] hive git commit: HIVE-10165 Improve hive-hcatalog-streaming extensibility and support updates and deletes (Eliot West via gates) http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java new file mode 100644 index 0000000..703cef6 --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java @@ -0,0 +1,544 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming.mutate; + +import static org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState.ABORTED; +import static org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState.COMMITTED; +import static org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils.databaseBuilder; +import static org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils.tableBuilder; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hive.hcatalog.streaming.TestStreaming; +import org.apache.hive.hcatalog.streaming.mutate.StreamingAssert.Factory; +import org.apache.hive.hcatalog.streaming.mutate.StreamingAssert.Record; +import org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils.TableBuilder; +import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClient; +import org.apache.hive.hcatalog.streaming.mutate.client.MutatorClientBuilder; +import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable; +import org.apache.hive.hcatalog.streaming.mutate.client.Transaction; +import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolver; +import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinator; +import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinatorBuilder; +import org.apache.hive.hcatalog.streaming.mutate.worker.MutatorFactory; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * This test is based on {@link TestStreaming} and has a similar core set of tests to ensure that basic transactional + * behaviour is as expected in the {@link RecordMutator} line. This is complemented with a set of tests related to the + * use of update and delete operations. + */ +public class TestMutations { + + private static final List EUROPE_FRANCE = Arrays.asList("Europe", "France"); + private static final List EUROPE_UK = Arrays.asList("Europe", "UK"); + private static final List ASIA_INDIA = Arrays.asList("Asia", "India"); + // id + private static final int[] BUCKET_COLUMN_INDEXES = new int[] { 0 }; + private static final int RECORD_ID_COLUMN = 2; + + @Rule + public TemporaryFolder warehouseFolder = new TemporaryFolder(); + + private StreamingTestUtils testUtils = new StreamingTestUtils(); + private HiveConf conf; + private IMetaStoreClient metaStoreClient; + private String metaStoreUri; + private Database database; + private TableBuilder partitionedTableBuilder; + private TableBuilder unpartitionedTableBuilder; + private Factory assertionFactory; + + public TestMutations() throws Exception { + conf = testUtils.newHiveConf(metaStoreUri); + testUtils.prepareTransactionDatabase(conf); + metaStoreClient = testUtils.newMetaStoreClient(conf); + assertionFactory = new StreamingAssert.Factory(metaStoreClient, conf); + } + + @Before + public void setup() throws Exception { + database = databaseBuilder(warehouseFolder.getRoot()).name("testing").dropAndCreate(metaStoreClient); + + partitionedTableBuilder = tableBuilder(database) + .name("partitioned") + .addColumn("id", "int") + .addColumn("msg", "string") + .partitionKeys("continent", "country"); + + unpartitionedTableBuilder = tableBuilder(database) + .name("unpartitioned") + .addColumn("id", "int") + .addColumn("msg", "string"); + } + + @Test + public void testTransactionBatchEmptyCommitPartitioned() throws Exception { + Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient); + + MutatorClient client = new MutatorClientBuilder() + .addSinkTable(table.getDbName(), table.getTableName(), true) + .metaStoreUri(metaStoreUri) + .build(); + client.connect(); + + Transaction transaction = client.newTransaction(); + + transaction.begin(); + + transaction.commit(); + assertThat(transaction.getState(), is(COMMITTED)); + client.close(); + } + + @Test + public void testTransactionBatchEmptyCommitUnpartitioned() throws Exception { + Table table = unpartitionedTableBuilder.create(metaStoreClient); + + MutatorClient client = new MutatorClientBuilder() + .addSinkTable(table.getDbName(), table.getTableName(), false) + .metaStoreUri(metaStoreUri) + .build(); + client.connect(); + + Transaction transaction = client.newTransaction(); + + transaction.begin(); + + transaction.commit(); + assertThat(transaction.getState(), is(COMMITTED)); + client.close(); + } + + @Test + public void testTransactionBatchEmptyAbortPartitioned() throws Exception { + Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient); + + MutatorClient client = new MutatorClientBuilder() + .addSinkTable(table.getDbName(), table.getTableName(), true) + .metaStoreUri(metaStoreUri) + .build(); + client.connect(); + + Transaction transaction = client.newTransaction(); + + List destinations = client.getTables(); + + transaction.begin(); + + MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN, + BUCKET_COLUMN_INDEXES); + MutatorCoordinator coordinator = new MutatorCoordinatorBuilder() + .metaStoreUri(metaStoreUri) + .table(destinations.get(0)) + .mutatorFactory(mutatorFactory) + .build(); + + coordinator.close(); + + transaction.abort(); + assertThat(transaction.getState(), is(ABORTED)); + client.close(); + } + + @Test + public void testTransactionBatchEmptyAbortUnartitioned() throws Exception { + Table table = unpartitionedTableBuilder.create(metaStoreClient); + + MutatorClient client = new MutatorClientBuilder() + .addSinkTable(table.getDbName(), table.getTableName(), false) + .metaStoreUri(metaStoreUri) + .build(); + client.connect(); + + Transaction transaction = client.newTransaction(); + + List destinations = client.getTables(); + + transaction.begin(); + + MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN, + BUCKET_COLUMN_INDEXES); + MutatorCoordinator coordinator = new MutatorCoordinatorBuilder() + .metaStoreUri(metaStoreUri) + .table(destinations.get(0)) + .mutatorFactory(mutatorFactory) + .build(); + + coordinator.close(); + + transaction.abort(); + assertThat(transaction.getState(), is(ABORTED)); + client.close(); + } + + @Test + public void testTransactionBatchCommitPartitioned() throws Exception { + Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient); + + MutatorClient client = new MutatorClientBuilder() + .addSinkTable(table.getDbName(), table.getTableName(), true) + .metaStoreUri(metaStoreUri) + .build(); + client.connect(); + + Transaction transaction = client.newTransaction(); + + List destinations = client.getTables(); + + transaction.begin(); + + MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN, + BUCKET_COLUMN_INDEXES); + MutatorCoordinator coordinator = new MutatorCoordinatorBuilder() + .metaStoreUri(metaStoreUri) + .table(destinations.get(0)) + .mutatorFactory(mutatorFactory) + .build(); + + BucketIdResolver bucketIdAppender = mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets()); + MutableRecord record = (MutableRecord) bucketIdAppender.attachBucketIdToRecord(new MutableRecord(1, + "Hello streaming")); + coordinator.insert(ASIA_INDIA, record); + coordinator.close(); + + transaction.commit(); + + StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA); + streamingAssertions.assertMinTransactionId(1L); + streamingAssertions.assertMaxTransactionId(1L); + streamingAssertions.assertExpectedFileCount(1); + + List readRecords = streamingAssertions.readRecords(); + assertThat(readRecords.size(), is(1)); + assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}")); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + + assertThat(transaction.getState(), is(COMMITTED)); + client.close(); + } + + @Test + public void testMulti() throws Exception { + Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient); + + MutatorClient client = new MutatorClientBuilder() + .addSinkTable(table.getDbName(), table.getTableName(), true) + .metaStoreUri(metaStoreUri) + .build(); + client.connect(); + + Transaction transaction = client.newTransaction(); + + List destinations = client.getTables(); + + transaction.begin(); + + MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN, + BUCKET_COLUMN_INDEXES); + MutatorCoordinator coordinator = new MutatorCoordinatorBuilder() + .metaStoreUri(metaStoreUri) + .table(destinations.get(0)) + .mutatorFactory(mutatorFactory) + .build(); + + BucketIdResolver bucketIdResolver = mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets()); + MutableRecord asiaIndiaRecord1 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(1, + "Hello streaming")); + MutableRecord europeUkRecord1 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(2, + "Hello streaming")); + MutableRecord europeFranceRecord1 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(3, + "Hello streaming")); + MutableRecord europeFranceRecord2 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(4, + "Bonjour streaming")); + + coordinator.insert(ASIA_INDIA, asiaIndiaRecord1); + coordinator.insert(EUROPE_UK, europeUkRecord1); + coordinator.insert(EUROPE_FRANCE, europeFranceRecord1); + coordinator.insert(EUROPE_FRANCE, europeFranceRecord2); + coordinator.close(); + + transaction.commit(); + + // ASIA_INDIA + StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA); + streamingAssertions.assertMinTransactionId(1L); + streamingAssertions.assertMaxTransactionId(1L); + streamingAssertions.assertExpectedFileCount(1); + + List readRecords = streamingAssertions.readRecords(); + assertThat(readRecords.size(), is(1)); + assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}")); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + + // EUROPE_UK + streamingAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK); + streamingAssertions.assertMinTransactionId(1L); + streamingAssertions.assertMaxTransactionId(1L); + streamingAssertions.assertExpectedFileCount(1); + + readRecords = streamingAssertions.readRecords(); + assertThat(readRecords.size(), is(1)); + assertThat(readRecords.get(0).getRow(), is("{2, Hello streaming}")); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + + // EUROPE_FRANCE + streamingAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE); + streamingAssertions.assertMinTransactionId(1L); + streamingAssertions.assertMaxTransactionId(1L); + streamingAssertions.assertExpectedFileCount(1); + + readRecords = streamingAssertions.readRecords(); + assertThat(readRecords.size(), is(2)); + assertThat(readRecords.get(0).getRow(), is("{3, Hello streaming}")); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + assertThat(readRecords.get(1).getRow(), is("{4, Bonjour streaming}")); + assertThat(readRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L))); + + client.close(); + } + + @Test + public void testTransactionBatchCommitUnpartitioned() throws Exception { + Table table = unpartitionedTableBuilder.create(metaStoreClient); + + MutatorClient client = new MutatorClientBuilder() + .addSinkTable(table.getDbName(), table.getTableName(), false) + .metaStoreUri(metaStoreUri) + .build(); + client.connect(); + + Transaction transaction = client.newTransaction(); + + List destinations = client.getTables(); + + transaction.begin(); + + MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN, + BUCKET_COLUMN_INDEXES); + MutatorCoordinator coordinator = new MutatorCoordinatorBuilder() + .metaStoreUri(metaStoreUri) + .table(destinations.get(0)) + .mutatorFactory(mutatorFactory) + .build(); + + BucketIdResolver bucketIdResolver = mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets()); + MutableRecord record = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(1, + "Hello streaming")); + + coordinator.insert(Collections. emptyList(), record); + coordinator.close(); + + transaction.commit(); + + StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table); + streamingAssertions.assertMinTransactionId(1L); + streamingAssertions.assertMaxTransactionId(1L); + streamingAssertions.assertExpectedFileCount(1); + + List readRecords = streamingAssertions.readRecords(); + assertThat(readRecords.size(), is(1)); + assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}")); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + + assertThat(transaction.getState(), is(COMMITTED)); + client.close(); + } + + @Test + public void testTransactionBatchAbort() throws Exception { + Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).create(metaStoreClient); + + MutatorClient client = new MutatorClientBuilder() + .addSinkTable(table.getDbName(), table.getTableName(), true) + .metaStoreUri(metaStoreUri) + .build(); + client.connect(); + + Transaction transaction = client.newTransaction(); + + List destinations = client.getTables(); + + transaction.begin(); + + MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN, + BUCKET_COLUMN_INDEXES); + MutatorCoordinator coordinator = new MutatorCoordinatorBuilder() + .metaStoreUri(metaStoreUri) + .table(destinations.get(0)) + .mutatorFactory(mutatorFactory) + .build(); + + BucketIdResolver bucketIdResolver = mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets()); + MutableRecord record1 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(1, + "Hello streaming")); + MutableRecord record2 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(2, + "Welcome to streaming")); + + coordinator.insert(ASIA_INDIA, record1); + coordinator.insert(ASIA_INDIA, record2); + coordinator.close(); + + transaction.abort(); + + assertThat(transaction.getState(), is(ABORTED)); + + client.close(); + + StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA); + streamingAssertions.assertNothingWritten(); + } + + @Test + public void testUpdatesAndDeletes() throws Exception { + // Set up some base data then stream some inserts/updates/deletes to a number of partitions + MutatorFactory mutatorFactory = new ReflectiveMutatorFactory(conf, MutableRecord.class, RECORD_ID_COLUMN, + BUCKET_COLUMN_INDEXES); + + // INSERT DATA + // + Table table = partitionedTableBuilder.addPartition(ASIA_INDIA).addPartition(EUROPE_FRANCE).create(metaStoreClient); + + MutatorClient client = new MutatorClientBuilder() + .addSinkTable(table.getDbName(), table.getTableName(), true) + .metaStoreUri(metaStoreUri) + .build(); + client.connect(); + + Transaction insertTransaction = client.newTransaction(); + + List destinations = client.getTables(); + + insertTransaction.begin(); + + MutatorCoordinator insertCoordinator = new MutatorCoordinatorBuilder() + .metaStoreUri(metaStoreUri) + .table(destinations.get(0)) + .mutatorFactory(mutatorFactory) + .build(); + + BucketIdResolver bucketIdResolver = mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets()); + MutableRecord asiaIndiaRecord1 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(1, + "Namaste streaming 1")); + MutableRecord asiaIndiaRecord2 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(2, + "Namaste streaming 2")); + MutableRecord europeUkRecord1 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(3, + "Hello streaming 1")); + MutableRecord europeUkRecord2 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(4, + "Hello streaming 2")); + MutableRecord europeFranceRecord1 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(5, + "Bonjour streaming 1")); + MutableRecord europeFranceRecord2 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(6, + "Bonjour streaming 2")); + + insertCoordinator.insert(ASIA_INDIA, asiaIndiaRecord1); + insertCoordinator.insert(ASIA_INDIA, asiaIndiaRecord2); + insertCoordinator.insert(EUROPE_UK, europeUkRecord1); + insertCoordinator.insert(EUROPE_UK, europeUkRecord2); + insertCoordinator.insert(EUROPE_FRANCE, europeFranceRecord1); + insertCoordinator.insert(EUROPE_FRANCE, europeFranceRecord2); + insertCoordinator.close(); + + insertTransaction.commit(); + + assertThat(insertTransaction.getState(), is(COMMITTED)); + client.close(); + + // MUTATE DATA + // + client = new MutatorClientBuilder() + .addSinkTable(table.getDbName(), table.getTableName(), true) + .metaStoreUri(metaStoreUri) + .build(); + client.connect(); + + Transaction mutateTransaction = client.newTransaction(); + + destinations = client.getTables(); + + mutateTransaction.begin(); + + MutatorCoordinator mutateCoordinator = new MutatorCoordinatorBuilder() + .metaStoreUri(metaStoreUri) + .table(destinations.get(0)) + .mutatorFactory(mutatorFactory) + .build(); + + bucketIdResolver = mutatorFactory.newBucketIdResolver(destinations.get(0).getTotalBuckets()); + MutableRecord asiaIndiaRecord3 = (MutableRecord) bucketIdResolver.attachBucketIdToRecord(new MutableRecord(20, + "Namaste streaming 3")); + + mutateCoordinator.update(ASIA_INDIA, new MutableRecord(2, "UPDATED: Namaste streaming 2", new RecordIdentifier(1L, + 0, 1L))); + mutateCoordinator.insert(ASIA_INDIA, asiaIndiaRecord3); + mutateCoordinator.delete(EUROPE_UK, new MutableRecord(3, "Hello streaming 1", new RecordIdentifier(1L, 0, 0L))); + mutateCoordinator.delete(EUROPE_FRANCE, + new MutableRecord(5, "Bonjour streaming 1", new RecordIdentifier(1L, 0, 0L))); + mutateCoordinator.update(EUROPE_FRANCE, new MutableRecord(6, "UPDATED: Bonjour streaming 2", new RecordIdentifier( + 1L, 0, 1L))); + mutateCoordinator.close(); + + mutateTransaction.commit(); + + assertThat(mutateTransaction.getState(), is(COMMITTED)); + + StreamingAssert indiaAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA); + indiaAssertions.assertMinTransactionId(1L); + indiaAssertions.assertMaxTransactionId(2L); + List indiaRecords = indiaAssertions.readRecords(); + assertThat(indiaRecords.size(), is(3)); + assertThat(indiaRecords.get(0).getRow(), is("{1, Namaste streaming 1}")); + assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + assertThat(indiaRecords.get(1).getRow(), is("{2, UPDATED: Namaste streaming 2}")); + assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L))); + assertThat(indiaRecords.get(2).getRow(), is("{20, Namaste streaming 3}")); + assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new RecordIdentifier(2L, 0, 0L))); + + StreamingAssert ukAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK); + ukAssertions.assertMinTransactionId(1L); + ukAssertions.assertMaxTransactionId(2L); + List ukRecords = ukAssertions.readRecords(); + assertThat(ukRecords.size(), is(1)); + assertThat(ukRecords.get(0).getRow(), is("{4, Hello streaming 2}")); + assertThat(ukRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L))); + + StreamingAssert franceAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE); + franceAssertions.assertMinTransactionId(1L); + franceAssertions.assertMaxTransactionId(2L); + List franceRecords = franceAssertions.readRecords(); + assertThat(franceRecords.size(), is(1)); + assertThat(franceRecords.get(0).getRow(), is("{6, UPDATED: Bonjour streaming 2}")); + assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L))); + + client.close(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java new file mode 100644 index 0000000..706697a --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java @@ -0,0 +1,66 @@ +package org.apache.hive.hcatalog.streaming.mutate.client; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +import java.io.File; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hive.hcatalog.streaming.mutate.StreamingTestUtils; +import org.junit.Test; + +public class TestAcidTableSerializer { + + @Test + public void testSerializeDeserialize() throws Exception { + Database database = StreamingTestUtils.databaseBuilder(new File("/tmp")).name("db_1").build(); + Table table = StreamingTestUtils + .tableBuilder(database) + .name("table_1") + .addColumn("one", "string") + .addColumn("two", "integer") + .partitionKeys("partition") + .addPartition("p1") + .buckets(10) + .build(); + + AcidTable acidTable = new AcidTable("db_1", "table_1", true, TableType.SINK); + acidTable.setTable(table); + acidTable.setTransactionId(42L); + + String encoded = AcidTableSerializer.encode(acidTable); + System.out.println(encoded); + AcidTable decoded = AcidTableSerializer.decode(encoded); + + assertThat(decoded.getDatabaseName(), is("db_1")); + assertThat(decoded.getTableName(), is("table_1")); + assertThat(decoded.createPartitions(), is(true)); + assertThat(decoded.getOutputFormatName(), is("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")); + assertThat(decoded.getTotalBuckets(), is(10)); + assertThat(decoded.getQualifiedName(), is("DB_1.TABLE_1")); + assertThat(decoded.getTransactionId(), is(42L)); + assertThat(decoded.getTableType(), is(TableType.SINK)); + assertThat(decoded.getTable(), is(table)); + } + + @Test + public void testSerializeDeserializeNoTableNoTransaction() throws Exception { + AcidTable acidTable = new AcidTable("db_1", "table_1", true, TableType.SINK); + + String encoded = AcidTableSerializer.encode(acidTable); + AcidTable decoded = AcidTableSerializer.decode(encoded); + + assertThat(decoded.getDatabaseName(), is("db_1")); + assertThat(decoded.getTableName(), is("table_1")); + assertThat(decoded.createPartitions(), is(true)); + assertThat(decoded.getOutputFormatName(), is(nullValue())); + assertThat(decoded.getTotalBuckets(), is(0)); + assertThat(decoded.getQualifiedName(), is("DB_1.TABLE_1")); + assertThat(decoded.getTransactionId(), is(0L)); + assertThat(decoded.getTableType(), is(TableType.SINK)); + assertThat(decoded.getTable(), is(nullValue())); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java new file mode 100644 index 0000000..ca3f7b2 --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java @@ -0,0 +1,176 @@ +package org.apache.hive.hcatalog.streaming.mutate.client; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hive.hcatalog.streaming.TransactionBatch.TxnState; +import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock; +import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockFailureListener; +import org.apache.thrift.TException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestMutatorClient { + + private static final long TRANSACTION_ID = 42L; + private static final String TABLE_NAME_1 = "TABLE_1"; + private static final String TABLE_NAME_2 = "TABLE_2"; + private static final String DB_NAME = "DB_1"; + private static final String USER = "user"; + private static final AcidTable TABLE_1 = new AcidTable(DB_NAME, TABLE_NAME_1, true, TableType.SINK); + private static final AcidTable TABLE_2 = new AcidTable(DB_NAME, TABLE_NAME_2, true, TableType.SINK); + + @Mock + private IMetaStoreClient mockMetaStoreClient; + @Mock + private Lock mockLock; + @Mock + private Table mockTable1, mockTable2; + @Mock + private StorageDescriptor mockSd; + @Mock + private Map mockParameters; + @Mock + private HiveConf mockConfiguration; + @Mock + private LockFailureListener mockLockFailureListener; + + private MutatorClient client; + + @Before + public void configureMocks() throws Exception { + when(mockMetaStoreClient.getTable(DB_NAME, TABLE_NAME_1)).thenReturn(mockTable1); + when(mockTable1.getDbName()).thenReturn(DB_NAME); + when(mockTable1.getTableName()).thenReturn(TABLE_NAME_1); + when(mockTable1.getSd()).thenReturn(mockSd); + when(mockTable1.getParameters()).thenReturn(mockParameters); + when(mockMetaStoreClient.getTable(DB_NAME, TABLE_NAME_2)).thenReturn(mockTable2); + when(mockTable2.getDbName()).thenReturn(DB_NAME); + when(mockTable2.getTableName()).thenReturn(TABLE_NAME_2); + when(mockTable2.getSd()).thenReturn(mockSd); + when(mockTable2.getParameters()).thenReturn(mockParameters); + when(mockSd.getNumBuckets()).thenReturn(1, 2); + when(mockSd.getOutputFormat()).thenReturn(OrcOutputFormat.class.getName()); + when(mockParameters.get("transactional")).thenReturn(Boolean.TRUE.toString()); + + when(mockMetaStoreClient.openTxn(USER)).thenReturn(TRANSACTION_ID); + + client = new MutatorClient(mockMetaStoreClient, mockConfiguration, mockLockFailureListener, USER, + Collections.singletonList(TABLE_1)); + } + + @Test + public void testCheckValidTableConnect() throws Exception { + List inTables = new ArrayList<>(); + inTables.add(TABLE_1); + inTables.add(TABLE_2); + client = new MutatorClient(mockMetaStoreClient, mockConfiguration, mockLockFailureListener, USER, inTables); + + client.connect(); + List outTables = client.getTables(); + + assertThat(client.isConnected(), is(true)); + assertThat(outTables.size(), is(2)); + assertThat(outTables.get(0).getDatabaseName(), is(DB_NAME)); + assertThat(outTables.get(0).getTableName(), is(TABLE_NAME_1)); + assertThat(outTables.get(0).getTotalBuckets(), is(2)); + assertThat(outTables.get(0).getOutputFormatName(), is(OrcOutputFormat.class.getName())); + assertThat(outTables.get(0).getTransactionId(), is(0L)); + assertThat(outTables.get(0).getTable(), is(mockTable1)); + assertThat(outTables.get(1).getDatabaseName(), is(DB_NAME)); + assertThat(outTables.get(1).getTableName(), is(TABLE_NAME_2)); + assertThat(outTables.get(1).getTotalBuckets(), is(2)); + assertThat(outTables.get(1).getOutputFormatName(), is(OrcOutputFormat.class.getName())); + assertThat(outTables.get(1).getTransactionId(), is(0L)); + assertThat(outTables.get(1).getTable(), is(mockTable2)); + } + + @Test + public void testCheckNonTransactionalTableConnect() throws Exception { + when(mockParameters.get("transactional")).thenReturn(Boolean.FALSE.toString()); + + try { + client.connect(); + fail(); + } catch (ConnectionException e) { + } + + assertThat(client.isConnected(), is(false)); + } + + @Test + public void testCheckUnBucketedTableConnect() throws Exception { + when(mockSd.getNumBuckets()).thenReturn(0); + + try { + client.connect(); + fail(); + } catch (ConnectionException e) { + } + + assertThat(client.isConnected(), is(false)); + } + + @Test + public void testMetaStoreFailsOnConnect() throws Exception { + when(mockMetaStoreClient.getTable(anyString(), anyString())).thenThrow(new TException()); + + try { + client.connect(); + fail(); + } catch (ConnectionException e) { + } + + assertThat(client.isConnected(), is(false)); + } + + @Test(expected = ConnectionException.class) + public void testGetDestinationsFailsIfNotConnected() throws Exception { + client.getTables(); + } + + @Test + public void testNewTransaction() throws Exception { + List inTables = new ArrayList<>(); + inTables.add(TABLE_1); + inTables.add(TABLE_2); + client = new MutatorClient(mockMetaStoreClient, mockConfiguration, mockLockFailureListener, USER, inTables); + + client.connect(); + Transaction transaction = client.newTransaction(); + List outTables = client.getTables(); + + assertThat(client.isConnected(), is(true)); + + assertThat(transaction.getTransactionId(), is(TRANSACTION_ID)); + assertThat(transaction.getState(), is(TxnState.INACTIVE)); + assertThat(outTables.get(0).getTransactionId(), is(TRANSACTION_ID)); + assertThat(outTables.get(1).getTransactionId(), is(TRANSACTION_ID)); + } + + @Test + public void testCloseClosesClient() throws Exception { + client.close(); + assertThat(client.isConnected(), is(false)); + verify(mockMetaStoreClient).close(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java new file mode 100644 index 0000000..179207a --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestTransaction.java @@ -0,0 +1,95 @@ +package org.apache.hive.hcatalog.streaming.mutate.client; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hive.hcatalog.streaming.TransactionBatch; +import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock; +import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestTransaction { + + private static final String USER = "user"; + private static final long TRANSACTION_ID = 10L; + + @Mock + private Lock mockLock; + @Mock + private IMetaStoreClient mockMetaStoreClient; + + private Transaction transaction; + + @Before + public void createTransaction() throws Exception { + when(mockLock.getUser()).thenReturn(USER); + when(mockMetaStoreClient.openTxn(USER)).thenReturn(TRANSACTION_ID); + transaction = new Transaction(mockMetaStoreClient, mockLock); + } + + @Test + public void testInitialState() { + assertThat(transaction.getState(), is(TransactionBatch.TxnState.INACTIVE)); + assertThat(transaction.getTransactionId(), is(TRANSACTION_ID)); + } + + @Test + public void testBegin() throws Exception { + transaction.begin(); + + verify(mockLock).acquire(TRANSACTION_ID); + assertThat(transaction.getState(), is(TransactionBatch.TxnState.OPEN)); + } + + @Test + public void testBeginLockFails() throws Exception { + doThrow(new LockException("")).when(mockLock).acquire(TRANSACTION_ID); + + try { + transaction.begin(); + } catch (TransactionException ignore) { + } + + assertThat(transaction.getState(), is(TransactionBatch.TxnState.INACTIVE)); + } + + @Test + public void testCommit() throws Exception { + transaction.commit(); + + verify(mockLock).release(); + verify(mockMetaStoreClient).commitTxn(TRANSACTION_ID); + assertThat(transaction.getState(), is(TransactionBatch.TxnState.COMMITTED)); + } + + @Test(expected = TransactionException.class) + public void testCommitLockFails() throws Exception { + doThrow(new LockException("")).when(mockLock).release(); + transaction.commit(); + } + + @Test + public void testAbort() throws Exception { + transaction.abort(); + + verify(mockLock).release(); + verify(mockMetaStoreClient).rollbackTxn(TRANSACTION_ID); + assertThat(transaction.getState(), is(TransactionBatch.TxnState.ABORTED)); + } + + @Test(expected = TransactionException.class) + public void testAbortLockFails() throws Exception { + doThrow(new LockException("")).when(mockLock).release(); + transaction.abort(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java new file mode 100644 index 0000000..8e6d06e --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestHeartbeatTimerTask.java @@ -0,0 +1,100 @@ +package org.apache.hive.hcatalog.streaming.mutate.client.lock; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; + +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.thrift.TException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestHeartbeatTimerTask { + + private static final long TRANSACTION_ID = 10L; + private static final long LOCK_ID = 1L; + private static final List TABLES = createTable(); + + @Mock + private IMetaStoreClient mockMetaStoreClient; + @Mock + private LockFailureListener mockListener; + + private HeartbeatTimerTask task; + + @Before + public void create() throws Exception { + task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID); + } + + @Test + public void testRun() throws Exception { + task.run(); + + verify(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID); + } + + @Test + public void testRunNullTransactionId() throws Exception { + task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID); + + task.run(); + + verify(mockMetaStoreClient).heartbeat(0, LOCK_ID); + } + + @Test + public void testRunHeartbeatFailsNoSuchLockException() throws Exception { + NoSuchLockException exception = new NoSuchLockException(); + doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID); + + task.run(); + + verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Arrays.asList("DB.TABLE"), exception); + } + + @Test + public void testRunHeartbeatFailsNoSuchTxnException() throws Exception { + NoSuchTxnException exception = new NoSuchTxnException(); + doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID); + + task.run(); + + verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Arrays.asList("DB.TABLE"), exception); + } + + @Test + public void testRunHeartbeatFailsTxnAbortedException() throws Exception { + TxnAbortedException exception = new TxnAbortedException(); + doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID); + + task.run(); + + verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Arrays.asList("DB.TABLE"), exception); + } + + @Test + public void testRunHeartbeatFailsTException() throws Exception { + TException exception = new TException(); + doThrow(exception).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID); + + task.run(); + } + + private static List
createTable() { + Table table = new Table(); + table.setDbName("DB"); + table.setTableName("TABLE"); + return Arrays.asList(table); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java new file mode 100644 index 0000000..ef1e80c --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java @@ -0,0 +1,283 @@ +package org.apache.hive.hcatalog.streaming.mutate.client.lock; + +import static org.apache.hadoop.hive.metastore.api.LockState.ABORT; +import static org.apache.hadoop.hive.metastore.api.LockState.ACQUIRED; +import static org.apache.hadoop.hive.metastore.api.LockState.NOT_ACQUIRED; +import static org.apache.hadoop.hive.metastore.api.LockState.WAITING; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import java.net.InetAddress; +import java.util.Collection; +import java.util.List; +import java.util.Timer; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockLevel; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.thrift.TException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.google.common.collect.ImmutableList; + +@RunWith(MockitoJUnitRunner.class) +public class TestLock { + + private static final Table TABLE_1 = createTable("DB", "ONE"); + private static final Table TABLE_2 = createTable("DB", "TWO"); + private static final List
TABLES = ImmutableList.of(TABLE_1, TABLE_2); + private static final long LOCK_ID = 42; + private static final long TRANSACTION_ID = 109; + private static final String USER = "ewest"; + + @Mock + private IMetaStoreClient mockMetaStoreClient; + @Mock + private LockFailureListener mockListener; + @Mock + private LockResponse mockLockResponse; + @Mock + private HeartbeatFactory mockHeartbeatFactory; + @Mock + private Timer mockHeartbeat; + @Captor + private ArgumentCaptor requestCaptor; + + private Lock lock; + private HiveConf configuration = new HiveConf(); + + @Before + public void injectMocks() throws Exception { + when(mockMetaStoreClient.lock(any(LockRequest.class))).thenReturn(mockLockResponse); + when(mockLockResponse.getLockid()).thenReturn(LOCK_ID); + when(mockLockResponse.getState()).thenReturn(ACQUIRED); + when( + mockHeartbeatFactory.newInstance(any(IMetaStoreClient.class), any(LockFailureListener.class), any(Long.class), + any(Collection.class), anyLong(), anyInt())).thenReturn(mockHeartbeat); + + lock = new Lock(mockMetaStoreClient, mockHeartbeatFactory, configuration, mockListener, USER, TABLES, 3, 0); + } + + @Test + public void testAcquireReadLockWithNoIssues() throws Exception { + lock.acquire(); + assertEquals(Long.valueOf(LOCK_ID), lock.getLockId()); + assertNull(lock.getTransactionId()); + } + + @Test + public void testAcquireTxnLockWithNoIssues() throws Exception { + lock.acquire(TRANSACTION_ID); + assertEquals(Long.valueOf(LOCK_ID), lock.getLockId()); + assertEquals(Long.valueOf(TRANSACTION_ID), lock.getTransactionId()); + } + + @Test + public void testAcquireReadLockCheckHeartbeatCreated() throws Exception { + configuration.set("hive.txn.timeout", "100s"); + lock.acquire(); + + verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), any(Long.class), eq(TABLES), + eq(LOCK_ID), eq(75)); + } + + @Test + public void testAcquireTxnLockCheckHeartbeatCreated() throws Exception { + configuration.set("hive.txn.timeout", "100s"); + lock.acquire(TRANSACTION_ID); + + verify(mockHeartbeatFactory).newInstance(eq(mockMetaStoreClient), eq(mockListener), eq(TRANSACTION_ID), eq(TABLES), + eq(LOCK_ID), eq(75)); + } + + @Test + public void testAcquireLockCheckUser() throws Exception { + lock.acquire(); + verify(mockMetaStoreClient).lock(requestCaptor.capture()); + LockRequest actualRequest = requestCaptor.getValue(); + assertEquals(USER, actualRequest.getUser()); + } + + @Test + public void testAcquireReadLockCheckLocks() throws Exception { + lock.acquire(); + verify(mockMetaStoreClient).lock(requestCaptor.capture()); + + LockRequest request = requestCaptor.getValue(); + assertEquals(0, request.getTxnid()); + assertEquals(USER, request.getUser()); + assertEquals(InetAddress.getLocalHost().getHostName(), request.getHostname()); + + List components = request.getComponent(); + + assertEquals(2, components.size()); + + LockComponent expected1 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB"); + expected1.setTablename("ONE"); + assertTrue(components.contains(expected1)); + + LockComponent expected2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB"); + expected2.setTablename("TWO"); + assertTrue(components.contains(expected2)); + } + + @Test + public void testAcquireTxnLockCheckLocks() throws Exception { + lock.acquire(TRANSACTION_ID); + verify(mockMetaStoreClient).lock(requestCaptor.capture()); + + LockRequest request = requestCaptor.getValue(); + assertEquals(TRANSACTION_ID, request.getTxnid()); + assertEquals(USER, request.getUser()); + assertEquals(InetAddress.getLocalHost().getHostName(), request.getHostname()); + + List components = request.getComponent(); + + System.out.println(components); + assertEquals(2, components.size()); + + LockComponent expected1 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB"); + expected1.setTablename("ONE"); + assertTrue(components.contains(expected1)); + + LockComponent expected2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB"); + expected2.setTablename("TWO"); + assertTrue(components.contains(expected2)); + } + + @Test(expected = LockException.class) + public void testAcquireLockNotAcquired() throws Exception { + when(mockLockResponse.getState()).thenReturn(NOT_ACQUIRED); + lock.acquire(); + } + + @Test(expected = LockException.class) + public void testAcquireLockAborted() throws Exception { + when(mockLockResponse.getState()).thenReturn(ABORT); + lock.acquire(); + } + + @Test(expected = LockException.class) + public void testAcquireLockWithWaitRetriesExceeded() throws Exception { + when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, WAITING); + lock.acquire(); + } + + @Test + public void testAcquireLockWithWaitRetries() throws Exception { + when(mockLockResponse.getState()).thenReturn(WAITING, WAITING, ACQUIRED); + lock.acquire(); + assertEquals(Long.valueOf(LOCK_ID), lock.getLockId()); + } + + @Test + public void testReleaseLock() throws Exception { + lock.acquire(); + lock.release(); + verify(mockMetaStoreClient).unlock(LOCK_ID); + } + + @Test + public void testReleaseLockNoLock() throws Exception { + lock.release(); + verifyNoMoreInteractions(mockMetaStoreClient); + } + + @Test + public void testReleaseLockCancelsHeartbeat() throws Exception { + lock.acquire(); + lock.release(); + verify(mockHeartbeat).cancel(); + } + + @Test + public void testReadHeartbeat() throws Exception { + HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID); + task.run(); + verify(mockMetaStoreClient).heartbeat(0, LOCK_ID); + } + + @Test + public void testTxnHeartbeat() throws Exception { + HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID); + task.run(); + verify(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID); + } + + @Test + public void testReadHeartbeatFailsNoSuchLockException() throws Exception { + Throwable t = new NoSuchLockException(); + doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID); + HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, null, TABLES, LOCK_ID); + task.run(); + verify(mockListener).lockFailed(LOCK_ID, null, Lock.asStrings(TABLES), t); + } + + @Test + public void testTxnHeartbeatFailsNoSuchLockException() throws Exception { + Throwable t = new NoSuchLockException(); + doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID); + HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID); + task.run(); + verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t); + } + + @Test + public void testHeartbeatFailsNoSuchTxnException() throws Exception { + Throwable t = new NoSuchTxnException(); + doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID); + HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID); + task.run(); + verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t); + } + + @Test + public void testHeartbeatFailsTxnAbortedException() throws Exception { + Throwable t = new TxnAbortedException(); + doThrow(t).when(mockMetaStoreClient).heartbeat(TRANSACTION_ID, LOCK_ID); + HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID); + task.run(); + verify(mockListener).lockFailed(LOCK_ID, TRANSACTION_ID, Lock.asStrings(TABLES), t); + } + + @Test + public void testHeartbeatContinuesTException() throws Exception { + Throwable t = new TException(); + doThrow(t).when(mockMetaStoreClient).heartbeat(0, LOCK_ID); + HeartbeatTimerTask task = new HeartbeatTimerTask(mockMetaStoreClient, mockListener, TRANSACTION_ID, TABLES, LOCK_ID); + task.run(); + verifyZeroInteractions(mockListener); + } + + private static Table createTable(String databaseName, String tableName) { + Table table = new Table(); + table.setDbName(databaseName); + table.setTableName(tableName); + return table; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java new file mode 100644 index 0000000..f81373e --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java @@ -0,0 +1,38 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hive.hcatalog.streaming.mutate.MutableRecord; +import org.junit.Test; + +public class TestBucketIdResolverImpl { + + private static final int TOTAL_BUCKETS = 12; + private static final int RECORD_ID_COLUMN = 2; + // id - TODO: use a non-zero index to check for offset errors. + private static final int[] BUCKET_COLUMN_INDEXES = new int[] { 0 }; + + private BucketIdResolver capturingBucketIdResolver = new BucketIdResolverImpl( + ObjectInspectorFactory.getReflectionObjectInspector(MutableRecord.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA), RECORD_ID_COLUMN, TOTAL_BUCKETS, BUCKET_COLUMN_INDEXES); + + @Test + public void testAttachBucketIdToRecord() { + MutableRecord record = new MutableRecord(1, "hello"); + capturingBucketIdResolver.attachBucketIdToRecord(record); + assertThat(record.rowId, is(new RecordIdentifier(-1L, 8, -1L))); + assertThat(record.id, is(1)); + assertThat(record.msg.toString(), is("hello")); + } + + @Test(expected = IllegalArgumentException.class) + public void testNoBucketColumns() { + new BucketIdResolverImpl(ObjectInspectorFactory.getReflectionObjectInspector(MutableRecord.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA), RECORD_ID_COLUMN, TOTAL_BUCKETS, new int[0]); + + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestGroupingValidator.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestGroupingValidator.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestGroupingValidator.java new file mode 100644 index 0000000..74fa59b --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestGroupingValidator.java @@ -0,0 +1,70 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collections; + +import org.junit.Test; + +public class TestGroupingValidator { + + private GroupingValidator validator = new GroupingValidator(); + + @Test + public void uniqueGroups() { + assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1)); + assertTrue(validator.isInSequence(Arrays.asList("c", "C"), 3)); + assertTrue(validator.isInSequence(Arrays.asList("b", "B"), 2)); + } + + @Test + public void sameGroup() { + assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1)); + assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1)); + assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1)); + } + + @Test + public void revisitedGroup() { + assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1)); + assertTrue(validator.isInSequence(Arrays.asList("c", "C"), 3)); + assertFalse(validator.isInSequence(Arrays.asList("a", "A"), 1)); + } + + @Test + public void samePartitionDifferentBucket() { + assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1)); + assertTrue(validator.isInSequence(Arrays.asList("c", "C"), 3)); + assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 2)); + } + + @Test + public void sameBucketDifferentPartition() { + assertTrue(validator.isInSequence(Arrays.asList("a", "A"), 1)); + assertTrue(validator.isInSequence(Arrays.asList("c", "C"), 3)); + assertTrue(validator.isInSequence(Arrays.asList("b", "B"), 1)); + } + + @Test + public void uniqueGroupsNoPartition() { + assertTrue(validator.isInSequence(Collections. emptyList(), 1)); + assertTrue(validator.isInSequence(Collections. emptyList(), 3)); + assertTrue(validator.isInSequence(Collections. emptyList(), 2)); + } + + @Test + public void sameGroupNoPartition() { + assertTrue(validator.isInSequence(Collections. emptyList(), 1)); + assertTrue(validator.isInSequence(Collections. emptyList(), 1)); + assertTrue(validator.isInSequence(Collections. emptyList(), 1)); + } + + @Test + public void revisitedGroupNoPartition() { + assertTrue(validator.isInSequence(Collections. emptyList(), 1)); + assertTrue(validator.isInSequence(Collections. emptyList(), 3)); + assertFalse(validator.isInSequence(Collections. emptyList(), 1)); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java new file mode 100644 index 0000000..6e9ffa2 --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java @@ -0,0 +1,234 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestMutatorCoordinator { + + private static final List UNPARTITIONED = Collections. emptyList(); + private static final List PARTITION_B = Arrays.asList("B"); + private static final List PARTITION_A = Arrays.asList("A"); + private static final long TRANSACTION_ID = 2L; + private static final int BUCKET_ID = 0; + private static final Path PATH_A = new Path("X"); + private static final Path PATH_B = new Path("B"); + private static final Object RECORD = "RECORD"; + private static final RecordIdentifier ROW__ID_B0_R0 = new RecordIdentifier(10L, BUCKET_ID, 0L); + private static final RecordIdentifier ROW__ID_B0_R1 = new RecordIdentifier(10L, BUCKET_ID, 1L); + private static final RecordIdentifier ROW__ID_B1_R0 = new RecordIdentifier(10L, BUCKET_ID + 1, 0L); + private static final RecordIdentifier ROW__ID_INSERT = new RecordIdentifier(-1L, BUCKET_ID, -1L); + + @Mock + private IMetaStoreClient mockMetaStoreClient; + @Mock + private MutatorFactory mockMutatorFactory; + @Mock + private CreatePartitionHelper mockPartitionHelper; + @Mock + private GroupingValidator mockGroupingValidator; + @Mock + private SequenceValidator mockSequenceValidator; + @Mock + private AcidTable mockAcidTable; + @Mock + private RecordInspector mockRecordInspector; + @Mock + private BucketIdResolver mockBucketIdResolver; + @Mock + private Mutator mockMutator; + + private MutatorCoordinator coordinator; + + private HiveConf configuration = new HiveConf(); + + @Before + public void createCoordinator() throws Exception { + when(mockAcidTable.getOutputFormatName()).thenReturn(OrcOutputFormat.class.getName()); + when(mockAcidTable.getTotalBuckets()).thenReturn(1); + when(mockAcidTable.getTransactionId()).thenReturn(TRANSACTION_ID); + when(mockAcidTable.createPartitions()).thenReturn(true); + when(mockMutatorFactory.newRecordInspector()).thenReturn(mockRecordInspector); + when(mockMutatorFactory.newBucketIdResolver(anyInt())).thenReturn(mockBucketIdResolver); + when(mockMutatorFactory.newMutator(any(OrcOutputFormat.class), anyLong(), any(Path.class), anyInt())).thenReturn( + mockMutator); + when(mockPartitionHelper.getPathForPartition(any(List.class))).thenReturn(PATH_A); + when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_INSERT); + when(mockSequenceValidator.isInSequence(any(RecordIdentifier.class))).thenReturn(true); + when(mockGroupingValidator.isInSequence(any(List.class), anyInt())).thenReturn(true); + + coordinator = new MutatorCoordinator(mockMetaStoreClient, configuration, mockMutatorFactory, mockPartitionHelper, + mockGroupingValidator, mockSequenceValidator, mockAcidTable, false); + } + + @Test + public void insert() throws Exception { + coordinator.insert(UNPARTITIONED, RECORD); + + verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutator).insert(RECORD); + } + + @Test + public void multipleInserts() throws Exception { + coordinator.insert(UNPARTITIONED, RECORD); + coordinator.insert(UNPARTITIONED, RECORD); + coordinator.insert(UNPARTITIONED, RECORD); + + verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutator, times(3)).insert(RECORD); + } + + @Test + public void insertPartitionChanges() throws Exception { + when(mockPartitionHelper.getPathForPartition(PARTITION_A)).thenReturn(PATH_A); + when(mockPartitionHelper.getPathForPartition(PARTITION_B)).thenReturn(PATH_B); + + coordinator.insert(PARTITION_A, RECORD); + coordinator.insert(PARTITION_B, RECORD); + + verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A); + verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID)); + verify(mockMutator, times(2)).insert(RECORD); + } + + @Test + public void bucketChanges() throws Exception { + when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0, ROW__ID_B1_R0); + + when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(0, 1); + + coordinator.update(UNPARTITIONED, RECORD); + coordinator.delete(UNPARTITIONED, RECORD); + + verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory) + .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID + 1)); + verify(mockMutator).update(RECORD); + verify(mockMutator).delete(RECORD); + } + + @Test + public void partitionThenBucketChanges() throws Exception { + when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0, ROW__ID_B0_R1, ROW__ID_B1_R0, + ROW__ID_INSERT); + + when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(0, 0, 1, 0); + + when(mockPartitionHelper.getPathForPartition(PARTITION_A)).thenReturn(PATH_A); + when(mockPartitionHelper.getPathForPartition(PARTITION_B)).thenReturn(PATH_B); + + coordinator.update(PARTITION_A, RECORD); + coordinator.delete(PARTITION_B, RECORD); + coordinator.update(PARTITION_B, RECORD); + coordinator.insert(PARTITION_B, RECORD); + + verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A); + verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory, times(2)).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), + eq(BUCKET_ID)); + verify(mockMutatorFactory) + .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID + 1)); + verify(mockMutator, times(2)).update(RECORD); + verify(mockMutator).delete(RECORD); + verify(mockMutator).insert(RECORD); + verify(mockSequenceValidator, times(4)).reset(); + } + + @Test(expected = RecordSequenceException.class) + public void outOfSequence() throws Exception { + when(mockSequenceValidator.isInSequence(any(RecordIdentifier.class))).thenReturn(false); + + coordinator.update(UNPARTITIONED, RECORD); + coordinator.delete(UNPARTITIONED, RECORD); + + verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutator).update(RECORD); + verify(mockMutator).delete(RECORD); + } + + @Test(expected = GroupRevisitedException.class) + public void revisitGroup() throws Exception { + when(mockGroupingValidator.isInSequence(any(List.class), anyInt())).thenReturn(false); + + coordinator.update(UNPARTITIONED, RECORD); + coordinator.delete(UNPARTITIONED, RECORD); + + verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutator).update(RECORD); + verify(mockMutator).delete(RECORD); + } + + @Test(expected = BucketIdException.class) + public void insertWithBadBucket() throws Exception { + when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0); + + when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(1); + + coordinator.insert(UNPARTITIONED, RECORD); + } + + @Test(expected = BucketIdException.class) + public void updateWithBadBucket() throws Exception { + when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0); + + when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(1); + + coordinator.update(UNPARTITIONED, RECORD); + } + + @Test + public void deleteWithBadBucket() throws Exception { + when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0); + + when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(1); + + coordinator.delete(UNPARTITIONED, RECORD); + } + + @Test + public void closeNoRecords() throws Exception { + coordinator.close(); + + // No mutator created + verifyZeroInteractions(mockMutator); + } + + @Test + public void closeUsedCoordinator() throws Exception { + coordinator.insert(UNPARTITIONED, RECORD); + coordinator.close(); + + verify(mockMutator).close(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java new file mode 100644 index 0000000..b29c763 --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java @@ -0,0 +1,99 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestMutatorImpl { + + private static final Object RECORD = new Object(); + private static final int RECORD_ID_COLUMN = 2; + private static final int BUCKET_ID = 0; + private static final Path PATH = new Path("X"); + private static final long TRANSACTION_ID = 1L; + + @Mock + private AcidOutputFormat mockOutputFormat; + @Mock + private ObjectInspector mockObjectInspector; + @Mock + private RecordUpdater mockRecordUpdater; + @Captor + private ArgumentCaptor captureOptions; + + private final HiveConf configuration = new HiveConf(); + + private Mutator mutator; + + @Before + public void injectMocks() throws IOException { + when(mockOutputFormat.getRecordUpdater(eq(PATH), any(Options.class))).thenReturn(mockRecordUpdater); + mutator = new MutatorImpl(configuration, RECORD_ID_COLUMN, mockObjectInspector, mockOutputFormat, TRANSACTION_ID, + PATH, BUCKET_ID); + } + + @Test + public void testCreatesRecordReader() throws IOException { + verify(mockOutputFormat).getRecordUpdater(eq(PATH), captureOptions.capture()); + Options options = captureOptions.getValue(); + assertThat(options.getBucket(), is(BUCKET_ID)); + assertThat(options.getConfiguration(), is((Configuration) configuration)); + assertThat(options.getInspector(), is(mockObjectInspector)); + assertThat(options.getRecordIdColumn(), is(RECORD_ID_COLUMN)); + assertThat(options.getMinimumTransactionId(), is(TRANSACTION_ID)); + assertThat(options.getMaximumTransactionId(), is(TRANSACTION_ID)); + } + + @Test + public void testInsertDelegates() throws IOException { + mutator.insert(RECORD); + verify(mockRecordUpdater).insert(TRANSACTION_ID, RECORD); + } + + @Test + public void testUpdateDelegates() throws IOException { + mutator.update(RECORD); + verify(mockRecordUpdater).update(TRANSACTION_ID, RECORD); + } + + @Test + public void testDeleteDelegates() throws IOException { + mutator.delete(RECORD); + verify(mockRecordUpdater).delete(TRANSACTION_ID, RECORD); + } + + @Test + public void testCloseDelegates() throws IOException { + mutator.close(); + verify(mockRecordUpdater).close(false); + } + + @Test + public void testFlushDoesNothing() throws IOException { + mutator.flush(); + verify(mockRecordUpdater, never()).flush(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java new file mode 100644 index 0000000..389ad33 --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java @@ -0,0 +1,31 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hive.hcatalog.streaming.mutate.MutableRecord; +import org.junit.Test; + +public class TestRecordInspectorImpl { + + private static final int ROW_ID_COLUMN = 2; + + private RecordInspectorImpl inspector = new RecordInspectorImpl(ObjectInspectorFactory.getReflectionObjectInspector( + MutableRecord.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA), ROW_ID_COLUMN); + + @Test + public void testExtractRecordIdentifier() { + RecordIdentifier recordIdentifier = new RecordIdentifier(10L, 4, 20L); + MutableRecord record = new MutableRecord(1, "hello", recordIdentifier); + assertThat(inspector.extractRecordIdentifier(record), is(recordIdentifier)); + } + + @Test(expected = IllegalArgumentException.class) + public void testNotAStructObjectInspector() { + new RecordInspectorImpl(PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, 2); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/994d98c0/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java new file mode 100644 index 0000000..33f9606 --- /dev/null +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java @@ -0,0 +1,91 @@ +package org.apache.hive.hcatalog.streaming.mutate.worker; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.junit.Test; + +public class TestSequenceValidator { + + private static final int BUCKET_ID = 1; + + private SequenceValidator validator = new SequenceValidator(); + + @Test + public void testSingleInSequence() { + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); + } + + @Test + public void testRowIdInSequence() { + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), is(true)); + } + + @Test + public void testTxIdInSequence() { + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(4L, BUCKET_ID, 0)), is(true)); + } + + @Test + public void testMixedInSequence() { + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 1)), is(true)); + } + + @Test + public void testNegativeTxId() { + assertThat(validator.isInSequence(new RecordIdentifier(-1L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); + } + + @Test + public void testNegativeRowId() { + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, -1)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); + } + + @Test + public void testRowIdOutOfSequence() { + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(false)); + } + + @Test + public void testReset() { + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), is(true)); + // New partition for example + validator.reset(); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(true)); + } + + @Test + public void testTxIdOutOfSequence() { + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(4L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(false)); + } + + @Test + public void testMixedOutOfSequence() { + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 4)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(false)); + assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 5)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 6)), is(false)); + } + + @Test(expected = NullPointerException.class) + public void testNullRecordIdentifier() { + validator.isInSequence(null); + } + +}