Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DF4C4200BC3 for ; Fri, 18 Nov 2016 16:38:44 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id DD976160B16; Fri, 18 Nov 2016 15:38:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 99D6C160B03 for ; Fri, 18 Nov 2016 16:38:42 +0100 (CET) Received: (qmail 89721 invoked by uid 500); 18 Nov 2016 15:38:41 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 89706 invoked by uid 99); 18 Nov 2016 15:38:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Nov 2016 15:38:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7959DE0230; Fri, 18 Nov 2016 15:38:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mohits@apache.org To: commits@hive.apache.org Message-Id: <6684c0f8c3bd4a96b01b8706bbcf80bb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-15232: Add notification events for functions and indexes (Mohit Sabharwal, reviewed by Chaoyu Tang) Date: Fri, 18 Nov 2016 15:38:41 +0000 (UTC) archived-at: Fri, 18 Nov 2016 15:38:45 -0000 Repository: hive Updated Branches: refs/heads/master cebd251b9 -> 62d802b87 HIVE-15232: Add notification events for functions and indexes (Mohit Sabharwal, reviewed by Chaoyu Tang) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/62d802b8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/62d802b8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/62d802b8 Branch: refs/heads/master Commit: 62d802b871f9654041bb8551b9622b9f5c75e856 Parents: cebd251 Author: Mohit Sabharwal Authored: Fri Nov 18 10:29:37 2016 -0500 Committer: Mohit Sabharwal Committed: Fri Nov 18 10:29:37 2016 -0500 ---------------------------------------------------------------------- .../hive/hcatalog/common/HCatConstants.java | 5 + .../listener/DbNotificationListener.java | 77 ++++- .../hcatalog/messaging/AlterIndexMessage.java | 30 ++ .../messaging/CreateFunctionMessage.java | 30 ++ .../hcatalog/messaging/CreateIndexMessage.java | 30 ++ .../hcatalog/messaging/DropFunctionMessage.java | 30 ++ .../hcatalog/messaging/DropIndexMessage.java | 30 ++ .../hcatalog/messaging/HCatEventMessage.java | 10 +- .../hcatalog/messaging/MessageDeserializer.java | 39 ++- .../hive/hcatalog/messaging/MessageFactory.java | 38 +++ .../messaging/json/JSONAlterIndexMessage.java | 89 ++++++ .../json/JSONCreateFunctionMessage.java | 81 ++++++ .../messaging/json/JSONCreateIndexMessage.java | 82 ++++++ .../messaging/json/JSONDropFunctionMessage.java | 81 ++++++ .../messaging/json/JSONDropIndexMessage.java | 82 ++++++ .../messaging/json/JSONMessageDeserializer.java | 56 +++- .../messaging/json/JSONMessageFactory.java | 62 +++- .../listener/DummyRawStoreFailEvent.java | 30 +- .../listener/TestDbNotificationListener.java | 286 ++++++++++++++++++- .../hadoop/hive/metastore/HiveMetaStore.java | 51 ++-- .../hive/metastore/MetaStoreEventListener.java | 16 ++ .../metastore/events/CreateFunctionEvent.java | 39 +++ .../metastore/events/DropFunctionEvent.java | 39 +++ .../hadoop/hive/metastore/DummyListener.java | 12 + 24 files changed, 1282 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java index 72930eb..3998407 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java @@ -162,6 +162,11 @@ public final class HCatConstants { public static final String HCAT_DROP_TABLE_EVENT = "DROP_TABLE"; public static final String HCAT_CREATE_DATABASE_EVENT = "CREATE_DATABASE"; public static final String HCAT_DROP_DATABASE_EVENT = "DROP_DATABASE"; + public static final String HCAT_CREATE_FUNCTION_EVENT = "CREATE_FUNCTION"; + public static final String HCAT_DROP_FUNCTION_EVENT = "DROP_FUNCTION"; + public static final String HCAT_CREATE_INDEX_EVENT = "CREATE_INDEX"; + public static final String HCAT_DROP_INDEX_EVENT = "DROP_INDEX"; + public static final String HCAT_ALTER_INDEX_EVENT = "ALTER_INDEX"; public static final String HCAT_INSERT_EVENT = "INSERT"; public static final String HCAT_MESSAGE_VERSION = "HCAT_MESSAGE_VERSION"; public static final String HCAT_MESSAGE_FORMAT = "HCAT_MESSAGE_FORMAT"; http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 0b3d891..ea7520d 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -17,8 +17,13 @@ */ package org.apache.hive.hcatalog.listener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.events.AddIndexEvent; +import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; +import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; +import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; +import org.apache.hadoop.hive.metastore.events.DropIndexEvent; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; @@ -42,6 +47,8 @@ import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.messaging.MessageFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; @@ -221,6 +228,72 @@ public class DbNotificationListener extends MetaStoreEventListener { enqueue(event); } + /** + * @param fnEvent function event + * @throws MetaException + */ + public void onCreateFunction (CreateFunctionEvent fnEvent) throws MetaException { + Function fn = fnEvent.getFunction(); + NotificationEvent event = new NotificationEvent(0, now(), + HCatConstants.HCAT_CREATE_FUNCTION_EVENT, + msgFactory.buildCreateFunctionMessage(fn).toString()); + event.setDbName(fn.getDbName()); + enqueue(event); + } + + /** + * @param fnEvent function event + * @throws MetaException + */ + public void onDropFunction (DropFunctionEvent fnEvent) throws MetaException { + Function fn = fnEvent.getFunction(); + NotificationEvent event = new NotificationEvent(0, now(), + HCatConstants.HCAT_DROP_FUNCTION_EVENT, + msgFactory.buildDropFunctionMessage(fn).toString()); + event.setDbName(fn.getDbName()); + enqueue(event); + } + + /** + * @param indexEvent index event + * @throws MetaException + */ + public void onAddIndex (AddIndexEvent indexEvent) throws MetaException { + Index index = indexEvent.getIndex(); + NotificationEvent event = new NotificationEvent(0, now(), + HCatConstants.HCAT_CREATE_INDEX_EVENT, + msgFactory.buildCreateIndexMessage(index).toString()); + event.setDbName(index.getDbName()); + enqueue(event); + } + + /** + * @param indexEvent index event + * @throws MetaException + */ + public void onDropIndex (DropIndexEvent indexEvent) throws MetaException { + Index index = indexEvent.getIndex(); + NotificationEvent event = new NotificationEvent(0, now(), + HCatConstants.HCAT_DROP_INDEX_EVENT, + msgFactory.buildDropIndexMessage(index).toString()); + event.setDbName(index.getDbName()); + enqueue(event); + } + + /** + * @param indexEvent index event + * @throws MetaException + */ + public void onAlterIndex (AlterIndexEvent indexEvent) throws MetaException { + Index before = indexEvent.getOldIndex(); + Index after = indexEvent.getNewIndex(); + NotificationEvent event = new NotificationEvent(0, now(), + HCatConstants.HCAT_ALTER_INDEX_EVENT, + msgFactory.buildAlterIndexMessage(before, after).toString()); + event.setDbName(before.getDbName()); + enqueue(event); + } + @Override public void onInsert(InsertEvent insertEvent) throws MetaException { NotificationEvent event = new NotificationEvent(0, now(), HCatConstants.HCAT_INSERT_EVENT, http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterIndexMessage.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterIndexMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterIndexMessage.java new file mode 100644 index 0000000..4841dce --- /dev/null +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterIndexMessage.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.messaging; + +/** + * HCat message sent when a Index is altered in HCatalog. + */ +public abstract class AlterIndexMessage extends HCatEventMessage { + + protected AlterIndexMessage() { + super(EventType.ALTER_INDEX); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateFunctionMessage.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateFunctionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateFunctionMessage.java new file mode 100644 index 0000000..753c165 --- /dev/null +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateFunctionMessage.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.messaging; + +/** + * HCat message sent when a Function is created in HCatalog. + */ +public abstract class CreateFunctionMessage extends HCatEventMessage { + + protected CreateFunctionMessage() { + super(EventType.CREATE_FUNCTION); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateIndexMessage.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateIndexMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateIndexMessage.java new file mode 100644 index 0000000..192f6de --- /dev/null +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateIndexMessage.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.messaging; + +/** + * HCat message sent when a Index is created in HCatalog. + */ +public abstract class CreateIndexMessage extends HCatEventMessage { + + protected CreateIndexMessage() { + super(EventType.CREATE_INDEX); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropFunctionMessage.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropFunctionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropFunctionMessage.java new file mode 100644 index 0000000..19d4d5b --- /dev/null +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropFunctionMessage.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.messaging; + +/** + * HCat message sent when a Function is dropped in HCatalog. + */ +public abstract class DropFunctionMessage extends HCatEventMessage { + + protected DropFunctionMessage() { + super(EventType.DROP_FUNCTION); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropIndexMessage.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropIndexMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropIndexMessage.java new file mode 100644 index 0000000..46b7394 --- /dev/null +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/DropIndexMessage.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.messaging; + +/** + * HCat message sent when a Index is dropped in HCatalog. + */ +public abstract class DropIndexMessage extends HCatEventMessage { + + protected DropIndexMessage() { + super(EventType.DROP_INDEX); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/HCatEventMessage.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/HCatEventMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/HCatEventMessage.java index 538fa68..dca95c7 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/HCatEventMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/HCatEventMessage.java @@ -40,7 +40,12 @@ public abstract class HCatEventMessage { DROP_PARTITION(HCatConstants.HCAT_DROP_PARTITION_EVENT), ALTER_TABLE(HCatConstants.HCAT_ALTER_TABLE_EVENT), ALTER_PARTITION(HCatConstants.HCAT_ALTER_PARTITION_EVENT), - INSERT(HCatConstants.HCAT_INSERT_EVENT); + INSERT(HCatConstants.HCAT_INSERT_EVENT), + CREATE_FUNCTION(HCatConstants.HCAT_CREATE_FUNCTION_EVENT), + DROP_FUNCTION(HCatConstants.HCAT_DROP_FUNCTION_EVENT), + CREATE_INDEX(HCatConstants.HCAT_CREATE_INDEX_EVENT), + DROP_INDEX(HCatConstants.HCAT_DROP_INDEX_EVENT), + ALTER_INDEX(HCatConstants.HCAT_ALTER_INDEX_EVENT); private String typeString; @@ -85,7 +90,7 @@ public abstract class HCatEventMessage { * Getter for the timestamp associated with the operation. * @return Timestamp (Long - seconds since epoch). */ - public abstract Long getTimestamp(); + public abstract Long getTimestamp(); /** * Class invariant. Checked after construction or deserialization. @@ -97,7 +102,6 @@ public abstract class HCatEventMessage { throw new IllegalStateException("Event-type unset."); if (getDB() == null) throw new IllegalArgumentException("DB-name unset."); - return this; } } http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageDeserializer.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageDeserializer.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageDeserializer.java index 8ea3998..e18780f 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageDeserializer.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageDeserializer.java @@ -19,6 +19,9 @@ package org.apache.hive.hcatalog.messaging; +import org.apache.hive.hcatalog.messaging.json.JSONCreateFunctionMessage; +import org.apache.hive.hcatalog.messaging.json.JSONDropFunctionMessage; + /** * Interface for converting HCat events from String-form back to HCatEventMessage instances. */ @@ -46,9 +49,18 @@ public abstract class MessageDeserializer { return getAlterPartitionMessage(messageBody); case DROP_PARTITION: return getDropPartitionMessage(messageBody); + case CREATE_FUNCTION: + return getCreateFunctionMessage(messageBody); + case DROP_FUNCTION: + return getDropFunctionMessage(messageBody); + case CREATE_INDEX: + return getCreateIndexMessage(messageBody); + case DROP_INDEX: + return getDropIndexMessage(messageBody); + case ALTER_INDEX: + return getAlterIndexMessage(messageBody); case INSERT: return getInsertMessage(messageBody); - default: throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString); } @@ -99,6 +111,31 @@ public abstract class MessageDeserializer { public abstract DropPartitionMessage getDropPartitionMessage(String messageBody); /** + * Method to de-serialize CreateFunctionMessage instance. + */ + public abstract CreateFunctionMessage getCreateFunctionMessage(String messageBody); + + /** + * Method to de-serialize DropFunctionMessage instance. + */ + public abstract DropFunctionMessage getDropFunctionMessage(String messageBody); + + /** + * Method to de-serialize CreateIndexMessage instance. + */ + public abstract CreateIndexMessage getCreateIndexMessage(String messageBody); + + /** + * Method to de-serialize DropIndexMessage instance. + */ + public abstract DropIndexMessage getDropIndexMessage(String messageBody); + + /** + * Method to de-serialize AlterIndexMessage instance. + */ + public abstract AlterIndexMessage getAlterIndexMessage(String messageBody); + + /** * Method to deserialize InsertMessage * @param messageBody the message in serialized form * @return message in object form http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java index 0710dd0..44574fe 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java @@ -22,6 +22,8 @@ package org.apache.hive.hcatalog.messaging; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.util.ReflectionUtils; @@ -163,6 +165,42 @@ public abstract class MessageFactory { public abstract DropPartitionMessage buildDropPartitionMessage(Table table, Iterator partitions); /** + * Factory method for CreateFunctionMessage. + * @param fn The Function being added. + * @return CreateFunctionMessage instance. + */ + public abstract CreateFunctionMessage buildCreateFunctionMessage(Function fn); + + /** + * Factory method for DropFunctionMessage. + * @param fn The Function being dropped. + * @return DropFunctionMessage instance. + */ + public abstract DropFunctionMessage buildDropFunctionMessage(Function fn); + + /** + * Factory method for CreateIndexMessage. + * @param idx The Index being added. + * @return CreateIndexMessage instance. + */ + public abstract CreateIndexMessage buildCreateIndexMessage(Index idx); + + /** + * Factory method for DropIndexMessage. + * @param idx The Index being dropped. + * @return DropIndexMessage instance. + */ + public abstract DropIndexMessage buildDropIndexMessage(Index idx); + + /** + * Factory method for AlterIndexMessage. + * @param before The index before the alter + * @param after The index after the alter + * @return AlterIndexMessage + */ + public abstract AlterIndexMessage buildAlterIndexMessage(Index before, Index after); + + /** * Factory method for building insert message * @param db Name of the database the insert occurred in * @param table Name of the table the insert occurred in http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterIndexMessage.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterIndexMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterIndexMessage.java new file mode 100644 index 0000000..25b0987 --- /dev/null +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterIndexMessage.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hive.hcatalog.messaging.AlterIndexMessage; +import org.apache.thrift.TException; + +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON Implementation of AlterIndexMessage. + */ +public class JSONAlterIndexMessage extends AlterIndexMessage { + + @JsonProperty + String server, servicePrincipal, db, beforeIndexObjJson, afterIndexObjJson; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONAlterIndexMessage() {} + + public JSONAlterIndexMessage(String server, String servicePrincipal, Index before, Index after, + Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = after.getDbName(); + this.timestamp = timestamp; + try { + this.beforeIndexObjJson = JSONMessageFactory.createIndexObjJson(before); + this.afterIndexObjJson = JSONMessageFactory.createIndexObjJson(after); + } catch (TException ex) { + throw new IllegalArgumentException("Could not serialize Index object", ex); + } + + checkValid(); + } + + @Override + public String getDB() { return db; } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public Long getTimestamp() { return timestamp; } + + public String getBeforeIndexObjJson() { + return beforeIndexObjJson; + } + + public String getAfterIndexObjJson() { + return afterIndexObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java new file mode 100644 index 0000000..fb883fc --- /dev/null +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hive.hcatalog.messaging.CreateFunctionMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON Implementation of CreateFunctionMessage. + */ +public class JSONCreateFunctionMessage extends CreateFunctionMessage { + + @JsonProperty + String server, servicePrincipal, db, functionObjJson; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONCreateFunctionMessage() {} + + public JSONCreateFunctionMessage(String server, String servicePrincipal, Function fn, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = fn.getDbName(); + this.timestamp = timestamp; + try { + this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn); + } catch (TException ex) { + throw new IllegalArgumentException("Could not serialize Function object", ex); + } + checkValid(); + } + + @Override + public String getDB() { return db; } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public Long getTimestamp() { return timestamp; } + + public String getFunctionObjJson() { + return functionObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateIndexMessage.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateIndexMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateIndexMessage.java new file mode 100644 index 0000000..8d83149 --- /dev/null +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateIndexMessage.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hive.hcatalog.messaging.CreateIndexMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON Implementation of CreateIndexMessage. + */ +public class JSONCreateIndexMessage extends CreateIndexMessage { + + @JsonProperty + String server, servicePrincipal, db, indexObjJson; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONCreateIndexMessage() {} + + public JSONCreateIndexMessage(String server, String servicePrincipal, Index index, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = index.getDbName(); + try { + this.indexObjJson = JSONMessageFactory.createIndexObjJson(index); + } catch (TException ex) { + throw new IllegalArgumentException("Could not serialize Index object", ex); + } + + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getDB() { return db; } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public Long getTimestamp() { return timestamp; } + + public String getIndexObjJson() { + return indexObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java new file mode 100644 index 0000000..334e36f --- /dev/null +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hive.hcatalog.messaging.DropFunctionMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON Implementation of CreateDatabaseMessage. + */ +public class JSONDropFunctionMessage extends DropFunctionMessage { + + @JsonProperty + String server, servicePrincipal, db, functionObjJson; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONDropFunctionMessage() {} + + public JSONDropFunctionMessage(String server, String servicePrincipal, Function fn, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = fn.getDbName(); + this.timestamp = timestamp; + try { + this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn); + } catch (TException ex) { + throw new IllegalArgumentException("Could not serialize Function object", ex); + } + checkValid(); + } + + @Override + public String getDB() { return db; } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public Long getTimestamp() { return timestamp; } + + public String getFunctionObjJson() { + return functionObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropIndexMessage.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropIndexMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropIndexMessage.java new file mode 100644 index 0000000..bacaa1d --- /dev/null +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropIndexMessage.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.messaging.json; + +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hive.hcatalog.messaging.DropIndexMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON Implementation of DropIndexMessage. + */ +public class JSONDropIndexMessage extends DropIndexMessage { + + @JsonProperty + String server, servicePrincipal, db, indexObjJson; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, required for Jackson. + */ + public JSONDropIndexMessage() {} + + public JSONDropIndexMessage(String server, String servicePrincipal, Index index, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = index.getDbName(); + try { + this.indexObjJson = JSONMessageFactory.createIndexObjJson(index); + } catch (TException ex) { + throw new IllegalArgumentException("Could not serialize Index object", ex); + } + + this.timestamp = timestamp; + checkValid(); + } + + @Override + public String getDB() { return db; } + + @Override + public String getServer() { return server; } + + @Override + public String getServicePrincipal() { return servicePrincipal; } + + @Override + public Long getTimestamp() { return timestamp; } + + public String getIndexObjJson() { + return indexObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java index 834fdde..bd45d09 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageDeserializer.java @@ -19,13 +19,17 @@ package org.apache.hive.hcatalog.messaging.json; -import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.messaging.AddPartitionMessage; +import org.apache.hive.hcatalog.messaging.AlterIndexMessage; import org.apache.hive.hcatalog.messaging.AlterPartitionMessage; import org.apache.hive.hcatalog.messaging.AlterTableMessage; import org.apache.hive.hcatalog.messaging.CreateDatabaseMessage; +import org.apache.hive.hcatalog.messaging.CreateFunctionMessage; +import org.apache.hive.hcatalog.messaging.CreateIndexMessage; import org.apache.hive.hcatalog.messaging.CreateTableMessage; import org.apache.hive.hcatalog.messaging.DropDatabaseMessage; +import org.apache.hive.hcatalog.messaging.DropFunctionMessage; +import org.apache.hive.hcatalog.messaging.DropIndexMessage; import org.apache.hive.hcatalog.messaging.DropPartitionMessage; import org.apache.hive.hcatalog.messaging.DropTableMessage; import org.apache.hive.hcatalog.messaging.InsertMessage; @@ -125,6 +129,56 @@ public class JSONMessageDeserializer extends MessageDeserializer { } @Override + public CreateFunctionMessage getCreateFunctionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONCreateFunctionMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONCreateFunctionMessage.", exception); + } + } + + @Override + public DropFunctionMessage getDropFunctionMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropFunctionMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONDropDatabaseMessage.", exception); + } + } + + @Override + public CreateIndexMessage getCreateIndexMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONCreateIndexMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONCreateIndexMessage.", exception); + } + } + + @Override + public DropIndexMessage getDropIndexMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropIndexMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONDropIndexMessage.", exception); + } + } + + @Override + public AlterIndexMessage getAlterIndexMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAlterIndexMessage.class); + } + catch (Exception exception) { + throw new IllegalArgumentException("Could not construct JSONAlterIndexMessage.", exception); + } + } + + @Override public InsertMessage getInsertMessage(String messageBody) { try { return mapper.readValue(messageBody, JSONInsertMessage.class); http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java index 6b74b54..251084f 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java @@ -19,12 +19,17 @@ package org.apache.hive.hcatalog.messaging.json; -import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hive.hcatalog.messaging.AlterIndexMessage; +import org.apache.hive.hcatalog.messaging.CreateFunctionMessage; +import org.apache.hive.hcatalog.messaging.CreateIndexMessage; +import org.apache.hive.hcatalog.messaging.DropFunctionMessage; +import org.apache.hive.hcatalog.messaging.DropIndexMessage; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hive.hcatalog.messaging.AddPartitionMessage; @@ -38,9 +43,13 @@ import org.apache.hive.hcatalog.messaging.DropTableMessage; import org.apache.hive.hcatalog.messaging.InsertMessage; import org.apache.hive.hcatalog.messaging.MessageDeserializer; import org.apache.hive.hcatalog.messaging.MessageFactory; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.util.Arrays; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -54,7 +63,6 @@ public class JSONMessageFactory extends MessageFactory { private static final Logger LOG = LoggerFactory.getLogger(JSONMessageFactory.class.getName()); - private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer(); @Override @@ -121,6 +129,36 @@ public class JSONMessageFactory extends MessageFactory { } @Override + public CreateFunctionMessage buildCreateFunctionMessage(Function fn) { + return new JSONCreateFunctionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, fn, + now()); + } + + @Override + public DropFunctionMessage buildDropFunctionMessage(Function fn) { + return new JSONDropFunctionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, fn, + now()); + } + + @Override + public CreateIndexMessage buildCreateIndexMessage(Index idx) { + return new JSONCreateIndexMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, idx, + now()); + } + + @Override + public DropIndexMessage buildDropIndexMessage(Index idx) { + return new JSONDropIndexMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, idx, + now()); + } + + @Override + public AlterIndexMessage buildAlterIndexMessage(Index before, Index after) { + return new JSONAlterIndexMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, before, after, + now()); + } + + @Override public InsertMessage buildInsertMessage(String db, String table, Map partKeyVals, List files) { return new JSONInsertMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db, table, partKeyVals, @@ -140,11 +178,21 @@ public class JSONMessageFactory extends MessageFactory { } private static List> getPartitionKeyValues(final Table table, Iterator iterator) { - return Lists.newArrayList(Iterators.transform(iterator, new Function>() { + return Lists.newArrayList(Iterators.transform(iterator, new com.google.common.base.Function>() { @Override public Map apply(@Nullable Partition partition) { return getPartitionKeyValues(table, partition); } })); } -} + + static String createFunctionObjJson(Function functionObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(functionObj, "UTF-8"); + } + + static String createIndexObjJson(Index indexObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(indexObj, "UTF-8"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index 4a7801b..5282a5a 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -300,7 +300,11 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { @Override public boolean addIndex(Index index) throws InvalidObjectException, MetaException { - return objectStore.addIndex(index); + if (shouldEventSucceed) { + return objectStore.addIndex(index); + } else { + throw new RuntimeException("Event failed."); + } } @Override @@ -312,7 +316,11 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { @Override public boolean dropIndex(String dbName, String origTableName, String indexName) throws MetaException { - return objectStore.dropIndex(dbName, origTableName, indexName); + if (shouldEventSucceed) { + return objectStore.dropIndex(dbName, origTableName, indexName); + } else { + throw new RuntimeException("Event failed."); + } } @Override @@ -330,7 +338,11 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { @Override public void alterIndex(String dbName, String baseTblName, String name, Index newIndex) throws InvalidObjectException, MetaException { - objectStore.alterIndex(dbName, baseTblName, name, newIndex); + if (shouldEventSucceed) { + objectStore.alterIndex(dbName, baseTblName, name, newIndex); + } else { + throw new RuntimeException("Event failed."); + } } @Override @@ -751,7 +763,11 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { @Override public void createFunction(Function func) throws InvalidObjectException, MetaException { - objectStore.createFunction(func); + if (shouldEventSucceed) { + objectStore.createFunction(func); + } else { + throw new RuntimeException("Event failed."); + } } @Override @@ -764,7 +780,11 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { public void dropFunction(String dbName, String funcName) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { - objectStore.dropFunction(dbName, funcName); + if (shouldEventSucceed) { + objectStore.dropFunction(dbName, funcName); + } else { + throw new RuntimeException("Event failed."); + } } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 1cd32d5..4f97cf4 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -23,8 +23,20 @@ import static junit.framework.Assert.assertNull; import static junit.framework.Assert.assertTrue; import static junit.framework.Assert.fail; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.FunctionType; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.ResourceType; +import org.apache.hadoop.hive.metastore.api.ResourceUri; +import org.apache.htrace.fasterxml.jackson.core.JsonFactory; +import org.apache.htrace.fasterxml.jackson.core.JsonParser; +import org.apache.htrace.fasterxml.jackson.databind.JsonNode; +import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; +import org.apache.htrace.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.protocol.TJSONProtocol; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -46,6 +58,8 @@ import org.apache.hive.hcatalog.common.HCatConstants; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.lang.reflect.Field; import java.util.ArrayList; @@ -55,7 +69,6 @@ import java.util.List; import java.util.Map; public class TestDbNotificationListener { - private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class.getName()); private static final int EVENTS_TTL = 30; private static final int CLEANUP_SLEEP_TIME = 10; @@ -402,7 +415,7 @@ public class TestDbNotificationListener { partition = new Partition(Arrays.asList("tomorrow"), "default", "dropPartTable", startTime, startTime, sd, emptyParameters); - msClient.add_partition(partition); + msClient.add_partition(partition); DummyRawStoreFailEvent.setEventSucceed(false); try { msClient.dropPartition("default", "dropparttable", Arrays.asList("tomorrow"), false); @@ -415,6 +428,244 @@ public class TestDbNotificationListener { } @Test + public void createFunction() throws Exception { + String funcName = "createFunction"; + String dbName = "default"; + String ownerName = "me"; + String funcClass = "o.a.h.h.myfunc"; + String funcResource = "file:/tmp/somewhere"; + Function func = new Function(funcName, dbName, funcClass, ownerName, PrincipalType.USER, + startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR, + funcResource))); + msClient.createFunction(func); + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(1, rsp.getEventsSize()); + NotificationEvent event = rsp.getEvents().get(0); + assertEquals(firstEventId + 1, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(HCatConstants.HCAT_CREATE_FUNCTION_EVENT, event.getEventType()); + assertEquals(dbName, event.getDbName()); + Function funcObj = getFunctionObj(getJsonTree(event)); + assertEquals(dbName, funcObj.getDbName()); + assertEquals(funcName, funcObj.getFunctionName()); + assertEquals(funcClass, funcObj.getClassName()); + assertEquals(ownerName, funcObj.getOwnerName()); + assertEquals(FunctionType.JAVA, funcObj.getFunctionType()); + assertEquals(1, funcObj.getResourceUrisSize()); + assertEquals(ResourceType.JAR, funcObj.getResourceUris().get(0).getResourceType()); + assertEquals(funcResource, funcObj.getResourceUris().get(0).getUri()); + + DummyRawStoreFailEvent.setEventSucceed(false); + func = new Function("createFunction2", dbName, "o.a.h.h.myfunc2", "me", PrincipalType.USER, + startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR, + "file:/tmp/somewhere2"))); + try { + msClient.createFunction(func); + } catch (Exception ex) { + // expected + } + + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(1, rsp.getEventsSize()); + } + + @Test + public void dropFunction() throws Exception { + String funcName = "dropfunctiontest"; + String dbName = "default"; + String ownerName = "me"; + String funcClass = "o.a.h.h.dropFunctionTest"; + String funcResource = "file:/tmp/somewhere"; + Function func = new Function(funcName, dbName, funcClass, ownerName, PrincipalType.USER, + startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR, + funcResource))); + msClient.createFunction(func); + msClient.dropFunction(dbName, funcName); + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(2, rsp.getEventsSize()); + NotificationEvent event = rsp.getEvents().get(1); + assertEquals(firstEventId + 2, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(HCatConstants.HCAT_DROP_FUNCTION_EVENT, event.getEventType()); + assertEquals(dbName, event.getDbName()); + Function funcObj = getFunctionObj(getJsonTree(event)); + assertEquals(dbName, funcObj.getDbName()); + assertEquals(funcName, funcObj.getFunctionName()); + assertEquals(funcClass, funcObj.getClassName()); + assertEquals(ownerName, funcObj.getOwnerName()); + assertEquals(FunctionType.JAVA, funcObj.getFunctionType()); + assertEquals(1, funcObj.getResourceUrisSize()); + assertEquals(ResourceType.JAR, funcObj.getResourceUris().get(0).getResourceType()); + assertEquals(funcResource, funcObj.getResourceUris().get(0).getUri()); + + func = new Function("dropfunctiontest2", dbName, "o.a.h.h.dropFunctionTest2", "me", + PrincipalType.USER, startTime, FunctionType.JAVA, Arrays.asList( + new ResourceUri(ResourceType.JAR, "file:/tmp/somewhere2"))); + msClient.createFunction(func); + DummyRawStoreFailEvent.setEventSucceed(false); + try { + msClient.dropFunction(dbName, "dropfunctiontest2"); + } catch (Exception ex) { + // expected + } + + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(3, rsp.getEventsSize()); + } + + @Test + public void createIndex() throws Exception { + String indexName = "createIndex"; + String dbName = "default"; + String tableName = "createIndexTable"; + String indexTableName = tableName + "__" + indexName + "__"; + int startTime = (int)(System.currentTimeMillis() / 1000); + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "int", "")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + Map params = new HashMap(); + params.put("key", "value"); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17, + serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params); + Table table = new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); + msClient.createTable(table); + Index index = new Index(indexName, null, "default", tableName, startTime, startTime, + indexTableName, sd, emptyParameters, false); + Table indexTable = new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); + msClient.createIndex(index, indexTable); + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(3, rsp.getEventsSize()); + NotificationEvent event = rsp.getEvents().get(2); + assertEquals(firstEventId + 3, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(HCatConstants.HCAT_CREATE_INDEX_EVENT, event.getEventType()); + assertEquals(dbName, event.getDbName()); + Index indexObj = getIndexObj(getJsonTree(event)); + assertEquals(dbName, indexObj.getDbName()); + assertEquals(indexName, indexObj.getIndexName()); + assertEquals(tableName, indexObj.getOrigTableName()); + assertEquals(indexTableName, indexObj.getIndexTableName()); + + DummyRawStoreFailEvent.setEventSucceed(false); + index = new Index("createIndexTable2", null, "default", tableName, startTime, startTime, + "createIndexTable2__createIndexTable2__", sd, emptyParameters, false); + Table indexTable2 = new Table("createIndexTable2__createIndexTable2__", dbName, "me", + startTime, startTime, 0, sd, null, emptyParameters, null, null, null); + try { + msClient.createIndex(index, indexTable2); + } catch (Exception ex) { + // expected + } + + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(3, rsp.getEventsSize()); + } + + @Test + public void dropIndex() throws Exception { + String indexName = "dropIndex"; + String dbName = "default"; + String tableName = "dropIndexTable"; + String indexTableName = tableName + "__" + indexName + "__"; + int startTime = (int)(System.currentTimeMillis() / 1000); + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "int", "")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + Map params = new HashMap(); + params.put("key", "value"); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17, + serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params); + Table table = new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); + msClient.createTable(table); + Index index = new Index(indexName, null, "default", tableName, startTime, startTime, + indexTableName, sd, emptyParameters, false); + Table indexTable = new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); + msClient.createIndex(index, indexTable); + msClient.dropIndex(dbName, tableName, indexName, true); // drops index and indexTable + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(4, rsp.getEventsSize()); + NotificationEvent event = rsp.getEvents().get(3); + assertEquals(firstEventId + 4, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(HCatConstants.HCAT_DROP_INDEX_EVENT, event.getEventType()); + assertEquals(dbName, event.getDbName()); + Index indexObj = getIndexObj(getJsonTree(event)); + assertEquals(dbName, indexObj.getDbName()); + assertEquals(indexName.toLowerCase(), indexObj.getIndexName()); + assertEquals(tableName.toLowerCase(), indexObj.getOrigTableName()); + assertEquals(indexTableName.toLowerCase(), indexObj.getIndexTableName()); + + index = new Index("dropIndexTable2", null, "default", tableName, startTime, startTime, + "dropIndexTable__dropIndexTable2__", sd, emptyParameters, false); + Table indexTable2 = new Table("dropIndexTable__dropIndexTable2__", dbName, "me", startTime, + startTime, 0, sd, null, emptyParameters, null, null, null); + msClient.createIndex(index, indexTable2); + DummyRawStoreFailEvent.setEventSucceed(false); + try { + msClient.dropIndex(dbName, tableName, "dropIndex2", true); // drops index and indexTable + } catch (Exception ex) { + // expected + } + + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(6, rsp.getEventsSize()); + } + + @Test + public void alterIndex() throws Exception { + String indexName = "alterIndex"; + String dbName = "default"; + String tableName = "alterIndexTable"; + String indexTableName = tableName + "__" + indexName + "__"; + int startTime = (int)(System.currentTimeMillis() / 1000); + List cols = new ArrayList(); + cols.add(new FieldSchema("col1", "int", "")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + Map params = new HashMap(); + params.put("key", "value"); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17, + serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params); + Table table = new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); + msClient.createTable(table); + Index oldIndex = new Index(indexName, null, "default", tableName, startTime, startTime, + indexTableName, sd, emptyParameters, false); + Table oldIndexTable = new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null, + emptyParameters, null, null, null); + msClient.createIndex(oldIndex, oldIndexTable); // creates index and index table + Index newIndex = new Index(indexName, null, "default", tableName, startTime, startTime + 1, + indexTableName, sd, emptyParameters, false); + msClient.alter_index(dbName, tableName, indexName, newIndex); + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(4, rsp.getEventsSize()); + NotificationEvent event = rsp.getEvents().get(3); + assertEquals(firstEventId + 4, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(HCatConstants.HCAT_ALTER_INDEX_EVENT, event.getEventType()); + assertEquals(dbName, event.getDbName()); + Index indexObj = getIndexObj(getJsonTree(event), "afterIndexObjJson"); + assertEquals(dbName, indexObj.getDbName()); + assertEquals(indexName, indexObj.getIndexName()); + assertEquals(tableName, indexObj.getOrigTableName()); + assertEquals(indexTableName, indexObj.getIndexTableName()); + assertTrue(indexObj.getCreateTime() < indexObj.getLastAccessTime()); + + DummyRawStoreFailEvent.setEventSucceed(false); + try { + msClient.alter_index(dbName, tableName, indexName, newIndex); + } catch (Exception ex) { + // expected + } + + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(4, rsp.getEventsSize()); + } + + @Test public void insertTable() throws Exception { List cols = new ArrayList(); cols.add(new FieldSchema("col1", "int", "nocomment")); @@ -700,7 +951,6 @@ public class TestDbNotificationListener { assertEquals(firstEventId + 24, event.getEventId()); assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType()); assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); - } @Test @@ -721,4 +971,30 @@ public class TestDbNotificationListener { LOG.info("second trigger done"); assertEquals(0, rsp2.getEventsSize()); } + + private ObjectNode getJsonTree(NotificationEvent event) throws Exception { + JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage()); + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(jsonParser, ObjectNode.class); + } + + private Function getFunctionObj(JsonNode jsonTree) throws Exception { + TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); + Function funcObj = new Function(); + String tableJson = jsonTree.get("functionObjJson").asText(); + deSerializer.deserialize(funcObj, tableJson, "UTF-8"); + return funcObj; + } + + private Index getIndexObj(JsonNode jsonTree) throws Exception { + return getIndexObj(jsonTree, "indexObjJson"); + } + + private Index getIndexObj(JsonNode jsonTree, String indexObjKey) throws Exception { + TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); + Index indexObj = new Index(); + String tableJson = jsonTree.get(indexObjKey).asText(); + deSerializer.deserialize(indexObj, tableJson, "UTF-8"); + return indexObj; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index c0ef25e..48bebb2 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -176,8 +176,10 @@ import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent; import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; import org.apache.hadoop.hive.metastore.events.CreateTableEvent; import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; import org.apache.hadoop.hive.metastore.events.DropIndexEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; @@ -3909,7 +3911,6 @@ public class HiveMetaStore extends ThriftHiveMetastore { + " idx=" + index_name + " newidx=" + newIndex.getIndexName()); newIndex.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System .currentTimeMillis() / 1000)); - boolean success = false; Exception ex = null; Index oldIndex = null; @@ -3917,9 +3918,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { try { ms.openTransaction(); oldIndex = get_index_by_name(dbname, base_table_name, index_name); - firePreEvent(new PreAlterIndexEvent(oldIndex, newIndex, this)); - ms.alterIndex(dbname, base_table_name, index_name, newIndex); if (transactionalListeners.size() > 0) { AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, true, this); @@ -4585,16 +4584,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { private Index add_index_core(final RawStore ms, final Index index, final Table indexTable) throws InvalidObjectException, AlreadyExistsException, MetaException { - boolean success = false, indexTableCreated = false; - String[] qualified = MetaStoreUtils.getQualifiedName(index.getDbName(), index.getIndexTableName()); - try { ms.openTransaction(); firePreEvent(new PreAddIndexEvent(index, this)); - Index old_index = null; try { old_index = get_index_by_name(index.getDbName(), index @@ -4628,7 +4623,6 @@ public class HiveMetaStore extends ThriftHiveMetastore { index.setCreateTime((int) time); index.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time)); - if (ms.addIndex(index)) { if (transactionalListeners.size() > 0) { AddIndexEvent addIndexEvent = new AddIndexEvent(index, true, this); @@ -4687,21 +4681,16 @@ public class HiveMetaStore extends ThriftHiveMetastore { final String dbName, final String tblName, final String indexName, final boolean deleteData) throws NoSuchObjectException, MetaException, TException, IOException, InvalidObjectException, InvalidInputException { - boolean success = false; Index index = null; Path tblPath = null; List partPaths = null; try { ms.openTransaction(); - // drop the underlying index table index = get_index_by_name(dbName, tblName, indexName); // throws exception if not exists - firePreEvent(new PreDropIndexEvent(index, this)); - ms.dropIndex(dbName, tblName, indexName); - String idxTblName = index.getIndexTableName(); if (idxTblName != null) { String[] qualified = MetaStoreUtils.getQualifiedName(index.getDbName(), idxTblName); @@ -4722,7 +4711,6 @@ public class HiveMetaStore extends ThriftHiveMetastore { // Drop the partitions and get a list of partition locations which need to be deleted partPaths = dropPartitionsAndGetLocations(ms, qualified[0], qualified[1], tblPath, tbl.getPartitionKeys(), deleteData); - if (!ms.dropTable(qualified[0], qualified[1])) { throw new MetaException("Unable to drop underlying data table " + qualified[0] + "." + qualified[1] + " for index " + indexName); @@ -6181,31 +6169,43 @@ public class HiveMetaStore extends ThriftHiveMetastore { InvalidObjectException, MetaException, NoSuchObjectException, TException { validateFunctionInfo(func); - boolean success = false; RawStore ms = getMS(); try { ms.openTransaction(); - Database db = ms.getDatabase(func.getDbName()); if (db == null) { throw new NoSuchObjectException("The database " + func.getDbName() + " does not exist"); } + Function existingFunc = ms.getFunction(func.getDbName(), func.getFunctionName()); if (existingFunc != null) { throw new AlreadyExistsException( "Function " + func.getFunctionName() + " already exists"); } - // set create time long time = System.currentTimeMillis() / 1000; func.setCreateTime((int) time); ms.createFunction(func); + if (transactionalListeners.size() > 0) { + CreateFunctionEvent createFunctionEvent = new CreateFunctionEvent(func, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onCreateFunction(createFunctionEvent); + } + } + success = ms.commitTransaction(); } finally { if (!success) { ms.rollbackTransaction(); } + + if (listeners.size() > 0) { + CreateFunctionEvent createFunctionEvent = new CreateFunctionEvent(func, success, this); + for (MetaStoreEventListener listener : listeners) { + listener.onCreateFunction(createFunctionEvent); + } + } } } @@ -6216,20 +6216,33 @@ public class HiveMetaStore extends ThriftHiveMetastore { boolean success = false; Function func = null; RawStore ms = getMS(); - try { ms.openTransaction(); - func = ms.getFunction(dbName, funcName); if (func == null) { throw new NoSuchObjectException("Function " + funcName + " does not exist"); } + ms.dropFunction(dbName, funcName); + if (transactionalListeners.size() > 0) { + DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onDropFunction(dropFunctionEvent); + } + } + success = ms.commitTransaction(); } finally { if (!success) { ms.rollbackTransaction(); } + + if (listeners.size() > 0) { + DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, success, this); + for (MetaStoreEventListener listener : listeners) { + listener.onDropFunction(dropFunctionEvent); + } + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java index 5e46ae1..b0defb5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java @@ -28,8 +28,10 @@ import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent; import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; import org.apache.hadoop.hive.metastore.events.CreateTableEvent; import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; import org.apache.hadoop.hive.metastore.events.DropIndexEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; @@ -145,6 +147,20 @@ public abstract class MetaStoreEventListener implements Configurable { } /** + * @param fnEvent function event + * @throws MetaException + */ + public void onCreateFunction (CreateFunctionEvent fnEvent) throws MetaException { + } + + /** + * @param fnEvent function event + * @throws MetaException + */ + public void onDropFunction (DropFunctionEvent fnEvent) throws MetaException { + } + + /** * This will be called when an insert is executed that does not cause a partition to be added. * If an insert causes a partition to be added it will cause {@link #onAddPartition} to be * called instead. http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/metastore/src/java/org/apache/hadoop/hive/metastore/events/CreateFunctionEvent.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/CreateFunctionEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/CreateFunctionEvent.java new file mode 100644 index 0000000..717ede2 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/CreateFunctionEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.events; + +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.api.Function; + +public class CreateFunctionEvent extends ListenerEvent { + + private final Function function; + + public CreateFunctionEvent (Function function, boolean status, HMSHandler handler) { + super (status, handler); + this.function = function; + } + + /** + * @return the function + */ + public Function getFunction () { + return function; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/metastore/src/java/org/apache/hadoop/hive/metastore/events/DropFunctionEvent.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/DropFunctionEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/DropFunctionEvent.java new file mode 100644 index 0000000..7190aae --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/DropFunctionEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.events; + +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.api.Function; + +public class DropFunctionEvent extends ListenerEvent { + + private final Function function; + + public DropFunctionEvent(Function function, boolean status, HMSHandler handler) { + super(status, handler); + this.function = function; + } + + /** + * @return the function + */ + public Function getFunction() { + return function; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/62d802b8/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java index a3b16d0..182e724 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyListener.java @@ -30,8 +30,10 @@ import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent; import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; import org.apache.hadoop.hive.metastore.events.CreateTableEvent; import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; import org.apache.hadoop.hive.metastore.events.DropIndexEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; @@ -126,6 +128,16 @@ public class DummyListener extends MetaStoreEventListener{ addEvent(indexEvent); } + @Override + public void onCreateFunction (CreateFunctionEvent fnEvent) throws MetaException { + addEvent(fnEvent); + } + + @Override + public void onDropFunction (DropFunctionEvent fnEvent) throws MetaException { + addEvent(fnEvent); + } + private void addEvent(ListenerEvent event) { notifyList.add(event); }