Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E32DF103ED for ; Fri, 30 Aug 2013 05:42:13 +0000 (UTC) Received: (qmail 77334 invoked by uid 500); 30 Aug 2013 05:42:11 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 77277 invoked by uid 500); 30 Aug 2013 05:42:08 -0000 Mailing-List: contact commits-help@falcon.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.incubator.apache.org Delivered-To: mailing list commits@falcon.incubator.apache.org Received: (qmail 77222 invoked by uid 99); 30 Aug 2013 05:41:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Aug 2013 05:41:59 +0000 X-ASF-Spam-Status: No, hits=-2002.2 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 30 Aug 2013 05:41:56 +0000 Received: (qmail 77170 invoked by uid 99); 30 Aug 2013 05:41:34 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Aug 2013 05:41:34 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id EABC189E467; Fri, 30 Aug 2013 05:41:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sriksun@apache.org To: commits@falcon.incubator.apache.org Date: Fri, 30 Aug 2013 05:41:33 -0000 Message-Id: <0bb4bcaa995840528507cfbd76e71484@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] git commit: FALCON-86 Hive table integration with cluster entity. Contributed by Venkatesh Seetharam X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master 70b8a8e11 -> edf03d4a8 FALCON-86 Hive table integration with cluster entity. Contributed by Venkatesh Seetharam Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/8b09e1b2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/8b09e1b2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/8b09e1b2 Branch: refs/heads/master Commit: 8b09e1b272113e9b769a20f09b6c0c4e2ec7c18a Parents: 70b8a8e Author: srikanth.sundarrajan Authored: Fri Aug 30 10:52:05 2013 +0530 Committer: srikanth.sundarrajan Committed: Fri Aug 30 10:52:05 2013 +0530 ---------------------------------------------------------------------- .../falcon/catalog/AbstractCatalogService.java | 37 ++++++++++++++ .../falcon/catalog/CatalogServiceFactory.java | 49 ++++++++++++++++++ .../falcon/catalog/HiveCatalogService.java | 46 +++++++++++++++++ .../entity/parser/ClusterEntityParser.java | 31 ++++++++++- common/src/main/resources/startup.properties | 2 + .../entity/parser/ClusterEntityParserTest.java | 54 ++++++++++++-------- .../resources/config/cluster/cluster-0.1.xml | 3 +- .../config/cluster/cluster-no-registry.xml | 42 +++++++++++++++ src/conf/startup.properties | 2 + .../resource/ClusterEntityValidationIT.java | 2 + webapp/src/test/resources/cluster-template.xml | 6 +-- 11 files changed, 246 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8b09e1b2/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java new file mode 100644 index 0000000..4086611 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.catalog; + +import org.apache.falcon.FalconException; + +/** + * Interface definition for a catalog registry service + * such as Hive or HCatalog. + */ +public abstract class AbstractCatalogService { + + /** + * This method checks if the catalog service is alive. + * + * @param catalogBaseUrl url for the catalog service + * @return if the service was reachable + * @throws FalconException exception + */ + public abstract boolean isAlive(String catalogBaseUrl) throws FalconException; +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8b09e1b2/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java b/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java new file mode 100644 index 0000000..c8a0fa0 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.catalog; + +import org.apache.falcon.FalconException; +import org.apache.falcon.util.ReflectionUtils; +import org.apache.falcon.util.StartupProperties; + +/** + * Factory for providing appropriate catalog service + * implementation to the falcon service. + */ +@SuppressWarnings("unchecked") +public final class CatalogServiceFactory { + + public static final String CATALOG_SERVICE = "catalog.service.impl"; + + private CatalogServiceFactory() { + } + + public static boolean isEnabled() { + return StartupProperties.get().containsKey(CATALOG_SERVICE); + } + + public static AbstractCatalogService getCatalogService() throws FalconException { + if (!isEnabled()) { + throw new FalconException( + "Catalog integration is not enabled in falcon. Implementation is missing: " + CATALOG_SERVICE); + } + + return ReflectionUtils.getInstance(CATALOG_SERVICE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8b09e1b2/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java new file mode 100644 index 0000000..e6f7fe2 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.catalog; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.falcon.FalconException; +import org.apache.log4j.Logger; + +import javax.ws.rs.core.MediaType; + +/** + * An implementation of CatalogService that uses Hive Meta Store (HCatalog) + * as the backing Catalog registry. + */ +public class HiveCatalogService extends AbstractCatalogService { + + private static final Logger LOG = Logger.getLogger(HiveCatalogService.class); + + @Override + public boolean isAlive(String catalogBaseUrl) throws FalconException { + LOG.info("Checking if the service is alive for: " + catalogBaseUrl); + + Client client = Client.create(); + WebResource service = client.resource(catalogBaseUrl); + ClientResponse response = service.path("status").accept(MediaType.APPLICATION_JSON).head(); + return response.getStatus() == 200; + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8b09e1b2/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java index b4e4a95..631ec60 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java @@ -23,11 +23,13 @@ import java.io.IOException; import javax.jms.ConnectionFactory; import org.apache.falcon.FalconException; +import org.apache.falcon.catalog.CatalogServiceFactory; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.store.StoreAccessException; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.cluster.Interfacetype; +import org.apache.falcon.entity.v0.cluster.Interface; import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowEngineFactory; @@ -56,6 +58,9 @@ public class ClusterEntityParser extends EntityParser { validateScheme(cluster, Interfacetype.WRITE); validateScheme(cluster, Interfacetype.WORKFLOW); validateScheme(cluster, Interfacetype.MESSAGING); + if (ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY) != null) { + validateScheme(cluster, Interfacetype.REGISTRY); + } // No interface validations in prism or other falcon servers. // Only the falcon server for which the cluster belongs to should validate interfaces @@ -69,8 +74,7 @@ public class ClusterEntityParser extends EntityParser { validateExecuteInterface(cluster); validateWorkflowInterface(cluster); validateMessagingInterface(cluster); - - // Interfacetype.REGISTRY is not validated as its not used + validateRegistryInterface(cluster); } private void validateScheme(Cluster cluster, Interfacetype interfacetype) @@ -153,4 +157,27 @@ public class ClusterEntityParser extends EntityParser { + " for: " + implementation, e); } } + + private void validateRegistryInterface(Cluster cluster) throws ValidationException { + final Interface catalogInterface = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY); + if (catalogInterface == null) { + LOG.info("Catalog service is not enabled for cluster: " + cluster.getName()); + return; + } + + if (!CatalogServiceFactory.isEnabled()) { + throw new ValidationException("Catalog registry implementation is not defined: catalog.service.impl"); + } + + final String catalogUrl = catalogInterface.getEndpoint(); + LOG.info("Validating catalog registry interface: " + catalogUrl); + + try { + if (!CatalogServiceFactory.getCatalogService().isAlive(catalogUrl)) { + throw new ValidationException("Unable to reach Catalog server:" + catalogUrl); + } + } catch (FalconException e) { + throw new ValidationException("Invalid Catalog server or port: " + catalogUrl, e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8b09e1b2/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 1b4e470..80d46f7 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -27,6 +27,8 @@ *.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager *.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService *.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager +*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService + *.application.services=org.apache.falcon.entity.store.ConfigurationStore,\ org.apache.falcon.service.ProcessSubscriberService,\ org.apache.falcon.rerun.service.RetryService,\ http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8b09e1b2/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java index 20d14e8..dffa691 100644 --- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java +++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java @@ -18,15 +18,6 @@ package org.apache.falcon.entity.parser; -import static org.testng.AssertJUnit.assertEquals; - -import java.io.IOException; -import java.io.InputStream; -import java.io.StringWriter; - -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; - import org.apache.falcon.FalconException; import org.apache.falcon.cluster.util.EmbeddedCluster; import org.apache.falcon.entity.AbstractTestBase; @@ -35,11 +26,16 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.cluster.Interface; import org.apache.falcon.entity.v0.cluster.Interfacetype; +import org.apache.falcon.util.StartupProperties; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import javax.xml.bind.JAXBException; +import java.io.IOException; +import java.io.InputStream; + /** * Test for validating cluster entity parsing. */ @@ -56,31 +52,45 @@ public class ClusterEntityParserTest extends AbstractTestBase { ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(conf.get("fs.default.name")); Assert.assertNotNull(cluster); - assertEquals(cluster.getName(), "testCluster"); + Assert.assertEquals(cluster.getName(), "testCluster"); Interface execute = ClusterHelper.getInterface(cluster, Interfacetype.EXECUTE); - assertEquals(execute.getEndpoint(), "localhost:8021"); - assertEquals(execute.getVersion(), "0.20.2"); + Assert.assertEquals(execute.getEndpoint(), "localhost:8021"); + Assert.assertEquals(execute.getVersion(), "0.20.2"); Interface readonly = ClusterHelper.getInterface(cluster, Interfacetype.READONLY); - assertEquals(readonly.getEndpoint(), "hftp://localhost:50010"); - assertEquals(readonly.getVersion(), "0.20.2"); + Assert.assertEquals(readonly.getEndpoint(), "hftp://localhost:50010"); + Assert.assertEquals(readonly.getVersion(), "0.20.2"); Interface write = ClusterHelper.getInterface(cluster, Interfacetype.WRITE); //assertEquals(write.getEndpoint(), conf.get("fs.default.name")); - assertEquals(write.getVersion(), "0.20.2"); + Assert.assertEquals(write.getVersion(), "0.20.2"); Interface workflow = ClusterHelper.getInterface(cluster, Interfacetype.WORKFLOW); - assertEquals(workflow.getEndpoint(), "http://localhost:11000/oozie/"); - assertEquals(workflow.getVersion(), "3.1"); + Assert.assertEquals(workflow.getEndpoint(), "http://localhost:11000/oozie/"); + Assert.assertEquals(workflow.getVersion(), "3.1"); + + Interface catalog = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY); + Assert.assertEquals(catalog.getEndpoint(), "http://localhost:48080/templeton/v1"); + Assert.assertEquals(catalog.getVersion(), "0.11.0"); + + Assert.assertEquals(ClusterHelper.getLocation(cluster, "staging"), "/projects/falcon/staging"); + } + + @Test + public void testParseClusterWithoutRegistry() throws IOException, FalconException, JAXBException { + + InputStream stream = this.getClass().getResourceAsStream("/config/cluster/cluster-no-registry.xml"); + + Cluster cluster = parser.parse(stream); - assertEquals(ClusterHelper.getLocation(cluster, "staging"), "/projects/falcon/staging"); + Interface catalog = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY); + Assert.assertNull(catalog); - StringWriter stringWriter = new StringWriter(); - Marshaller marshaller = EntityType.CLUSTER.getMarshaller(); - marshaller.marshal(cluster, stringWriter); - System.out.println(stringWriter.toString()); + StartupProperties.get().setProperty("catalog.service.impl", ""); + catalog = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY); + Assert.assertNull(catalog); } /** http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8b09e1b2/common/src/test/resources/config/cluster/cluster-0.1.xml ---------------------------------------------------------------------- diff --git a/common/src/test/resources/config/cluster/cluster-0.1.xml b/common/src/test/resources/config/cluster/cluster-0.1.xml index fd6e06e..658711d 100644 --- a/common/src/test/resources/config/cluster/cluster-0.1.xml +++ b/common/src/test/resources/config/cluster/cluster-0.1.xml @@ -29,7 +29,8 @@ version="3.1"/> - + http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8b09e1b2/common/src/test/resources/config/cluster/cluster-no-registry.xml ---------------------------------------------------------------------- diff --git a/common/src/test/resources/config/cluster/cluster-no-registry.xml b/common/src/test/resources/config/cluster/cluster-no-registry.xml new file mode 100644 index 0000000..85dfe32 --- /dev/null +++ b/common/src/test/resources/config/cluster/cluster-no-registry.xml @@ -0,0 +1,42 @@ + + + + + consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting + + + + + + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8b09e1b2/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 6b8617e..dca38b4 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -31,6 +31,8 @@ *.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager *.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService *.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager +*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService + *.application.services=org.apache.falcon.entity.store.ConfigurationStore,\ org.apache.falcon.service.ProcessSubscriberService,\ org.apache.falcon.rerun.service.RetryService,\ http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8b09e1b2/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java index 8f2b6e4..b96c994 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java @@ -77,6 +77,8 @@ public class ClusterEntityValidationIT { {Interfacetype.WORKFLOW, "http://workflow-interface:9999/oozie/"}, {Interfacetype.MESSAGING, "messaging-interface:9999"}, {Interfacetype.MESSAGING, "tcp://messaging-interface:9999"}, + {Interfacetype.REGISTRY, "catalog-interface:9999"}, + {Interfacetype.REGISTRY, "http://catalog-interface:9999"}, }; } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/8b09e1b2/webapp/src/test/resources/cluster-template.xml ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/cluster-template.xml b/webapp/src/test/resources/cluster-template.xml index 11aaedf..fbc46c5 100644 --- a/webapp/src/test/resources/cluster-template.xml +++ b/webapp/src/test/resources/cluster-template.xml @@ -17,8 +17,7 @@ limitations under the License. --> - + @@ -29,7 +28,8 @@ version="3.1"/> - +