Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 917F1119EF for ; Thu, 10 Jul 2014 06:58:17 +0000 (UTC) Received: (qmail 50825 invoked by uid 500); 10 Jul 2014 06:58:17 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 50782 invoked by uid 500); 10 Jul 2014 06:58:17 -0000 Mailing-List: contact commits-help@falcon.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.incubator.apache.org Delivered-To: mailing list commits@falcon.incubator.apache.org Received: (qmail 50773 invoked by uid 99); 10 Jul 2014 06:58:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Jul 2014 06:58:17 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 10 Jul 2014 06:57:58 +0000 Received: (qmail 49939 invoked by uid 99); 10 Jul 2014 06:57:30 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Jul 2014 06:57:30 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 51C7C9AA6A6; Thu, 10 Jul 2014 06:57:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shwethags@apache.org To: commits@falcon.incubator.apache.org Date: Thu, 10 Jul 2014 06:57:36 -0000 Message-Id: <75787cd2d86241979b967ae694619393@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [7/9] FALCON-369 Refactor workflow builder. Contributed by Shwetha GS X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java new file mode 100644 index 0000000..bb8dfcc --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.oozie; + +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.CatalogStorage; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.ProcessHelper; +import org.apache.falcon.entity.Storage; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.Property; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.process.Input; +import org.apache.falcon.entity.v0.process.Output; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.oozie.feed.FeedBundleBuilder; +import org.apache.falcon.oozie.process.ProcessBundleBuilder; +import org.apache.falcon.security.SecurityUtil; +import org.apache.falcon.service.FalconPathFilter; +import org.apache.falcon.service.SharedLibraryHostingService; +import org.apache.falcon.util.StartupProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.Marshaller; +import java.io.IOException; +import java.io.OutputStream; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; + +/** + * Base class for building oozie entities - workflow, coordinator and bundle. + * @param + */ +public abstract class OozieEntityBuilder { + public static final Logger LOG = LoggerFactory.getLogger(OozieEntityBuilder.class); + + public static final String METASTOREURIS = "hive.metastore.uris"; + public static final String METASTORE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal"; + public static final String METASTORE_USE_THRIFT_SASL = "hive.metastore.sasl.enabled"; + + public static final String ENTITY_PATH = "ENTITY_PATH"; + public static final String ENTITY_NAME = "ENTITY_NAME"; + + private static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith("falcon"); + } + + @Override + public String getJarName(Path path) { + String name = path.getName(); + if (name.endsWith(".jar")) { + name = name.substring(0, name.indexOf(".jar")); + } + return name; + } + }; + + protected T entity; + protected final boolean isSecurityEnabled = SecurityUtil.isSecurityEnabled(); + + public OozieEntityBuilder(T entity) { + this.entity = entity; + } + + public abstract Properties build(Cluster cluster, Path buildPath) throws FalconException; + + protected String getStoragePath(Path path) { + if (path != null) { + return getStoragePath(path.toString()); + } + return null; + } + + protected String getStoragePath(String path) { + if (StringUtils.isNotEmpty(path)) { + if (new Path(path).toUri().getScheme() == null && !path.startsWith("${nameNode}")) { + path = "${nameNode}" + path; + } + } + return path; + } + + public static OozieEntityBuilder get(Entity entity) { + switch(entity.getEntityType()) { + case FEED: + return new FeedBundleBuilder((Feed) entity); + + case PROCESS: + return new ProcessBundleBuilder((Process)entity); + + default: + } + throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType()); + } + + protected void marshal(Cluster cluster, JAXBElement jaxbElement, JAXBContext jaxbContext, Path outPath) + throws FalconException { + try { + Marshaller marshaller = jaxbContext.createMarshaller(); + marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE); + FileSystem fs = HadoopClientFactory.get().createFileSystem( + outPath.toUri(), ClusterHelper.getConfiguration(cluster)); + OutputStream out = fs.create(outPath); + try { + marshaller.marshal(jaxbElement, out); + } finally { + out.close(); + } + if (LOG.isDebugEnabled()) { + StringWriter writer = new StringWriter(); + marshaller.marshal(jaxbElement, writer); + LOG.debug("Writing definition to {} on cluster {}", outPath, cluster.getName()); + LOG.debug(writer.getBuffer().toString()); + } + + LOG.info("Marshalled {} to {}", jaxbElement.getDeclaredType(), outPath); + } catch (Exception e) { + throw new FalconException("Unable to marshall app object", e); + } + } + + protected boolean isTableStorageType(Cluster cluster) throws FalconException { + return entity.getEntityType() == EntityType.PROCESS + ? isTableStorageType(cluster, (Process) entity) : isTableStorageType(cluster, (Feed) entity); + } + + protected boolean isTableStorageType(Cluster cluster, Feed feed) throws FalconException { + Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster); + return Storage.TYPE.TABLE == storageType; + } + + protected boolean isTableStorageType(Cluster cluster, Process process) throws FalconException { + Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process); + return Storage.TYPE.TABLE == storageType; + } + + protected Properties getHiveCredentials(Cluster cluster) { + String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster); + if (metaStoreUrl == null) { + throw new IllegalStateException( + "Registry interface is not defined in cluster: " + cluster.getName()); + } + + Properties hiveCredentials = new Properties(); + hiveCredentials.put(METASTOREURIS, metaStoreUrl); + hiveCredentials.put("hive.metastore.execute.setugi", "true"); + hiveCredentials.put("hcatNode", metaStoreUrl.replace("thrift", "hcat")); + hiveCredentials.put("hcat.metastore.uri", metaStoreUrl); + + if (isSecurityEnabled) { + String principal = ClusterHelper + .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL); + hiveCredentials.put(METASTORE_KERBEROS_PRINCIPAL, principal); + hiveCredentials.put(METASTORE_USE_THRIFT_SASL, "true"); + hiveCredentials.put("hcat.metastore.principal", principal); + } + + return hiveCredentials; + } + + protected Configuration getHiveCredentialsAsConf(Cluster cluster) { + Properties hiveCredentials = getHiveCredentials(cluster); + + Configuration hiveConf = new Configuration(false); + for (Entry entry : hiveCredentials.entrySet()) { + hiveConf.set((String)entry.getKey(), (String)entry.getValue()); + } + + return hiveConf; + } + + protected Properties getEntityProperties(Entity myEntity) { + Properties properties = new Properties(); + switch (myEntity.getEntityType()) { + case CLUSTER: + org.apache.falcon.entity.v0.cluster.Properties clusterProps = ((Cluster) myEntity).getProperties(); + if (clusterProps != null) { + for (Property prop : clusterProps.getProperties()) { + properties.put(prop.getName(), prop.getValue()); + } + } + break; + + case FEED: + org.apache.falcon.entity.v0.feed.Properties feedProps = ((Feed) myEntity).getProperties(); + if (feedProps != null) { + for (org.apache.falcon.entity.v0.feed.Property prop : feedProps.getProperties()) { + properties.put(prop.getName(), prop.getValue()); + } + } + break; + + case PROCESS: + org.apache.falcon.entity.v0.process.Properties processProps = ((Process) myEntity).getProperties(); + if (processProps != null) { + for (org.apache.falcon.entity.v0.process.Property prop : processProps.getProperties()) { + properties.put(prop.getName(), prop.getValue()); + } + } + break; + + default: + throw new IllegalArgumentException("Unhandled entity type " + myEntity.getEntityType()); + } + return properties; + } + + protected void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage, Properties props) { + String prefix = "falcon_" + input.getName(); + + propagateCommonCatalogTableProperties(tableStorage, props, prefix); + + props.put(prefix + "_partition_filter_pig", + "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}"); + props.put(prefix + "_partition_filter_hive", + "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}"); + props.put(prefix + "_partition_filter_java", + "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}"); + props.put(prefix + "_datain_partitions_hive", + "${coord:dataInPartitions('" + input.getName() + "', 'hive-export')}"); + } + + protected void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage, Properties props) { + String prefix = "falcon_" + output.getName(); + + propagateCommonCatalogTableProperties(tableStorage, props, prefix); + + //pig and java actions require partition expression as "key1=val1, key2=val2" + props.put(prefix + "_partitions_pig", + "${coord:dataOutPartitions('" + output.getName() + "')}"); + props.put(prefix + "_partitions_java", + "${coord:dataOutPartitions('" + output.getName() + "')}"); + + //hive requires partition expression as "key1='val1', key2='val2'" (with quotes around values) + //there is no direct EL expression in oozie + List partitions = new ArrayList(); + for (String key : tableStorage.getDatedPartitionKeys()) { + StringBuilder expr = new StringBuilder(); + expr.append("${coord:dataOutPartitionValue('").append(output.getName()).append("', '").append(key) + .append("')}"); + props.put(prefix + "_dated_partition_value_" + key, expr.toString()); + partitions.add(key + "='" + expr + "'"); + + } + props.put(prefix + "_partitions_hive", StringUtils.join(partitions, ",")); + } + + protected void propagateCommonCatalogTableProperties(CatalogStorage tableStorage, Properties props, String prefix) { + props.put(prefix + "_storage_type", tableStorage.getType().name()); + props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl()); + props.put(prefix + "_database", tableStorage.getDatabase()); + props.put(prefix + "_table", tableStorage.getTable()); + } + + protected void copySharedLibs(Cluster cluster, Path libPath) throws FalconException { + try { + SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"), + libPath, cluster, FALCON_JAR_FILTER); + } catch (IOException e) { + throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e); + } + } + + protected Properties getProperties(Path path, String name) { + if (path == null) { + return null; + } + + Properties prop = new Properties(); + prop.setProperty(OozieEntityBuilder.ENTITY_PATH, getStoragePath(path)); + prop.setProperty(OozieEntityBuilder.ENTITY_NAME, name); + return prop; + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java new file mode 100644 index 0000000..ac78297 --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.oozie; + +import org.apache.commons.io.IOUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.Tag; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.oozie.feed.FeedReplicationWorkflowBuilder; +import org.apache.falcon.oozie.feed.FeedRetentionWorkflowBuilder; +import org.apache.falcon.oozie.process.HiveProcessWorkflowBuilder; +import org.apache.falcon.oozie.process.OozieProcessWorkflowBuilder; +import org.apache.falcon.oozie.process.PigProcessWorkflowBuilder; +import org.apache.falcon.oozie.workflow.ACTION; +import org.apache.falcon.oozie.workflow.CREDENTIAL; +import org.apache.falcon.oozie.workflow.CREDENTIALS; +import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.security.SecurityUtil; +import org.apache.falcon.util.OozieUtils; +import org.apache.falcon.util.RuntimeProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +/** + * Base class for building orchestration workflow in oozie. + * @param + */ +public abstract class OozieOrchestrationWorkflowBuilder extends OozieEntityBuilder { + protected static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth"; + public static final Set FALCON_ACTIONS = new HashSet( + Arrays.asList(new String[]{"recordsize", "succeeded-post-processing", "failed-post-processing", })); + private final Tag lifecycle; + + public OozieOrchestrationWorkflowBuilder(T entity, Tag lifecycle) { + super(entity); + this.lifecycle = lifecycle; + } + + public static final OozieOrchestrationWorkflowBuilder get(Entity entity, Tag lifecycle) { + switch(entity.getEntityType()) { + case FEED: + Feed feed = (Feed) entity; + switch (lifecycle) { + case RETENTION: + return new FeedRetentionWorkflowBuilder(feed); + + case REPLICATION: + return new FeedReplicationWorkflowBuilder(feed); + + default: + throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle " + + lifecycle); + } + + case PROCESS: + Process process = (Process) entity; + switch(process.getWorkflow().getEngine()) { + case PIG: + return new PigProcessWorkflowBuilder(process); + + case OOZIE: + return new OozieProcessWorkflowBuilder(process); + + case HIVE: + return new HiveProcessWorkflowBuilder(process); + + default: + break; + } + + default: + } + + throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle " + lifecycle); + } + + protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException { + marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow), + OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml")); + } + + protected WORKFLOWAPP getWorkflow(String template) throws FalconException { + InputStream resourceAsStream = null; + try { + resourceAsStream = OozieOrchestrationWorkflowBuilder.class.getResourceAsStream(template); + Unmarshaller unmarshaller = OozieUtils.WORKFLOW_JAXB_CONTEXT.createUnmarshaller(); + @SuppressWarnings("unchecked") + JAXBElement jaxbElement = (JAXBElement) unmarshaller.unmarshal(resourceAsStream); + return jaxbElement.getValue(); + } catch (JAXBException e) { + throw new FalconException(e); + } finally { + IOUtils.closeQuietly(resourceAsStream); + } + } + + protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) + throws FalconException { + String libext = ClusterHelper.getLocation(cluster, "working") + "/libext"; + FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster)); + try { + addExtensionJars(fs, new Path(libext), wf); + addExtensionJars(fs, new Path(libext, entity.getEntityType().name()), wf); + if (tag != null) { + addExtensionJars(fs, + new Path(libext, entity.getEntityType().name() + "/" + tag.name().toLowerCase()), wf); + } + } catch(IOException e) { + throw new FalconException(e); + } + } + + private void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException { + FileStatus[] libs = null; + try { + libs = fs.listStatus(path); + } catch(FileNotFoundException ignore) { + //Ok if the libext is not configured + } + + if (libs == null) { + return; + } + + for(FileStatus lib : libs) { + if (lib.isDir()) { + continue; + } + + for(Object obj: wf.getDecisionOrForkOrJoin()) { + if (!(obj instanceof ACTION)) { + continue; + } + ACTION action = (ACTION) obj; + List files = null; + if (action.getJava() != null) { + files = action.getJava().getFile(); + } else if (action.getPig() != null) { + files = action.getPig().getFile(); + } else if (action.getMapReduce() != null) { + files = action.getMapReduce().getFile(); + } + if (files != null) { + files.add(lib.getPath().toString()); + } + } + } + } + + // creates hive-site.xml configuration in conf dir for the given cluster on the same cluster. + protected void createHiveConfiguration(Cluster cluster, Path workflowPath, String prefix) throws FalconException { + Configuration hiveConf = getHiveCredentialsAsConf(cluster); + + try { + Configuration conf = ClusterHelper.getConfiguration(cluster); + FileSystem fs = HadoopClientFactory.get().createFileSystem(conf); + + // create hive conf to stagingDir + Path confPath = new Path(workflowPath + "/conf"); + + persistHiveConfiguration(fs, confPath, hiveConf, prefix); + } catch (IOException e) { + throw new FalconException("Unable to create create hive site", e); + } + } + + private void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf, + String prefix) throws IOException { + OutputStream out = null; + try { + out = fs.create(new Path(confPath, prefix + "hive-site.xml")); + hiveConf.writeXml(out); + } finally { + IOUtils.closeQuietly(out); + } + } + + /** + * This is only necessary if table is involved and is secure mode. + * + * @param workflowApp workflow xml + * @param cluster cluster entity + */ + protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster, String credentialName) { + CREDENTIALS credentials = workflowApp.getCredentials(); + if (credentials == null) { + credentials = new CREDENTIALS(); + } + + credentials.getCredential().add(createHCatalogCredential(cluster, credentialName)); + + // add credential for workflow + workflowApp.setCredentials(credentials); + } + + /** + * This is only necessary if table is involved and is secure mode. + * + * @param workflowApp workflow xml + * @param cluster cluster entity + */ + protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster, + String credentialName, Set actions) { + addHCatalogCredentials(workflowApp, cluster, credentialName); + + // add credential to each action + for (Object object : workflowApp.getDecisionOrForkOrJoin()) { + if (!(object instanceof ACTION)) { + continue; + } + + ACTION action = (ACTION) object; + String actionName = action.getName(); + if (actions.contains(actionName)) { + action.setCred(credentialName); + } + } + } + + /** + * This is only necessary if table is involved and is secure mode. + * + * @param cluster cluster entity + * @param credentialName credential name + * @return CREDENTIALS object + */ + private CREDENTIAL createHCatalogCredential(Cluster cluster, String credentialName) { + final String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster); + + CREDENTIAL credential = new CREDENTIAL(); + credential.setName(credentialName); + credential.setType("hcat"); + + credential.getProperty().add(createProperty("hcat.metastore.uri", metaStoreUrl)); + credential.getProperty().add(createProperty("hcat.metastore.principal", + ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL))); + + return credential; + } + + private CREDENTIAL.Property createProperty(String name, String value) { + CREDENTIAL.Property property = new CREDENTIAL.Property(); + property.setName(name); + property.setValue(value); + return property; + } + + protected void addOozieRetries(WORKFLOWAPP workflow) { + for (Object object : workflow.getDecisionOrForkOrJoin()) { + if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) { + continue; + } + org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object; + String actionName = action.getName(); + if (FALCON_ACTIONS.contains(actionName)) { + decorateWithOozieRetries(action); + } + } + } + + protected void decorateWithOozieRetries(ACTION action) { + Properties props = RuntimeProperties.get(); + action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3")); + action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java new file mode 100644 index 0000000..6917f4e --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.oozie.feed; + +import org.apache.falcon.FalconException; +import org.apache.falcon.Tag; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.oozie.OozieBundleBuilder; +import org.apache.falcon.oozie.OozieCoordinatorBuilder; +import org.apache.hadoop.fs.Path; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Builds oozie bundle for the feed. + */ +public class FeedBundleBuilder extends OozieBundleBuilder { + public FeedBundleBuilder(Feed entity) { + super(entity); + } + + @Override protected Path getLibPath(Cluster cluster, Path buildPath) { + return new Path(buildPath, "lib"); + } + + @Override protected List doBuild(Cluster cluster, Path buildPath) throws FalconException { + List props = new ArrayList(); + List evictionProps = + OozieCoordinatorBuilder.get(entity, Tag.RETENTION).buildCoords(cluster, buildPath); + if (evictionProps != null) { + props.addAll(evictionProps); + } + + List replicationProps = OozieCoordinatorBuilder.get(entity, Tag.REPLICATION).buildCoords(cluster, + buildPath); + if (replicationProps != null) { + props.addAll(replicationProps); + } + + if (!props.isEmpty()) { + copySharedLibs(cluster, getLibPath(cluster, buildPath)); + } + + return props; + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java new file mode 100644 index 0000000..3226cf2 --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java @@ -0,0 +1,418 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.oozie.feed; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.Tag; +import org.apache.falcon.entity.CatalogStorage; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.Storage; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.ClusterType; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.expression.ExpressionHelper; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.messaging.EntityInstanceMessage.ARG; +import org.apache.falcon.oozie.OozieCoordinatorBuilder; +import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder; +import org.apache.falcon.oozie.coordinator.COORDINATORAPP; +import org.apache.falcon.oozie.coordinator.SYNCDATASET; +import org.apache.falcon.oozie.coordinator.WORKFLOW; +import org.apache.falcon.oozie.coordinator.ACTION; +import org.apache.falcon.util.RuntimeProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Properties; + +/** + * Builds oozie coordinator for feed replication, one per source-target cluster combination. + */ +public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder { + private static final String REPLICATION_COORD_TEMPLATE = "/coordinator/replication-coordinator.xml"; + private static final int THIRTY_MINUTES = 30 * 60 * 1000; + + private static final String PARALLEL = "parallel"; + private static final String TIMEOUT = "timeout"; + private static final String MR_MAX_MAPS = "maxMaps"; + private static final String MR_MAP_BANDWIDTH = "mapBandwidthKB"; + + public FeedReplicationCoordinatorBuilder(Feed entity) { + super(entity, Tag.REPLICATION); + } + + @Override public List buildCoords(Cluster cluster, Path buildPath) throws FalconException { + org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName()); + if (feedCluster.getType() == ClusterType.TARGET) { + List props = new ArrayList(); + OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, Tag.REPLICATION); + for (org.apache.falcon.entity.v0.feed.Cluster srcFeedCluster : entity.getClusters().getClusters()) { + + if (srcFeedCluster.getType() == ClusterType.SOURCE) { + Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, srcFeedCluster.getName()); + // workflow is serialized to a specific dir + Path coordPath = new Path(buildPath, Tag.REPLICATION.name() + "/" + srcCluster.getName()); + + // Different workflow for each source since hive credentials vary for each cluster + builder.build(cluster, coordPath); + + props.add(doBuild(srcCluster, cluster, coordPath)); + } + } + return props; + } + return null; + } + + private Properties doBuild(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException { + long replicationDelayInMillis = getReplicationDelayInMillis(srcCluster); + Date sourceStartDate = getStartDate(srcCluster, replicationDelayInMillis); + Date sourceEndDate = getEndDate(srcCluster); + + Date targetStartDate = getStartDate(trgCluster, replicationDelayInMillis); + Date targetEndDate = getEndDate(trgCluster); + + if (noOverlapExists(sourceStartDate, sourceEndDate, + targetStartDate, targetEndDate)) { + LOG.warn("Not creating replication coordinator, as the source cluster: {} and target cluster: {} do " + + "not have overlapping dates", srcCluster.getName(), trgCluster.getName()); + return null; + } + + COORDINATORAPP coord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE); + + String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, Arrays.asList(srcCluster.getName()), + entity).toString(); + String start = sourceStartDate.after(targetStartDate) + ? SchemaHelper.formatDateUTC(sourceStartDate) : SchemaHelper.formatDateUTC(targetStartDate); + String end = sourceEndDate.before(targetEndDate) + ? SchemaHelper.formatDateUTC(sourceEndDate) : SchemaHelper.formatDateUTC(targetEndDate); + + initializeCoordAttributes(coord, coordName, start, end, replicationDelayInMillis); + setCoordControls(coord); + + final Storage sourceStorage = FeedHelper.createReadOnlyStorage(srcCluster, entity); + initializeInputDataSet(srcCluster, coord, sourceStorage); + + final Storage targetStorage = FeedHelper.createStorage(trgCluster, entity); + initializeOutputDataSet(trgCluster, coord, targetStorage); + + ACTION replicationWorkflowAction = getReplicationWorkflowAction( + srcCluster, trgCluster, buildPath, coordName, sourceStorage, targetStorage); + coord.setAction(replicationWorkflowAction); + + marshal(trgCluster, coord, buildPath); + return getProperties(buildPath, coordName); + } + + private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path buildPath, + String wfName, Storage sourceStorage, Storage targetStorage) throws FalconException { + ACTION action = new ACTION(); + WORKFLOW workflow = new WORKFLOW(); + + workflow.setAppPath(getStoragePath(buildPath.toString())); + Properties props = createCoordDefaultConfiguration(trgCluster, wfName); + props.put("srcClusterName", srcCluster.getName()); + props.put("srcClusterColo", srcCluster.getColo()); + if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden + props.put(MR_MAX_MAPS, getDefaultMaxMaps()); + } + if (props.get(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden + props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth()); + } + + // the storage type is uniform across source and target feeds for replication + props.put("falconFeedStorageType", sourceStorage.getType().name()); + + String instancePaths = ""; + if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) { + String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster); + instancePaths = pathsWithPartitions; + + propagateFileSystemCopyProperties(pathsWithPartitions, props); + } else if (sourceStorage.getType() == Storage.TYPE.TABLE) { + instancePaths = "${coord:dataIn('input')}"; + final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage; + propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource"); + final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage; + propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget"); + propagateTableCopyProperties(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, props); + setupHiveConfiguration(srcCluster, trgCluster, buildPath); + } + + propagateLateDataProperties(instancePaths, sourceStorage.getType().name(), props); + props.putAll(FeedHelper.getUserWorkflowProperties("replication")); + + workflow.setConfiguration(getConfig(props)); + action.setWorkflow(workflow); + + return action; + } + + private String getDefaultMaxMaps() { + return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5"); + } + + private String getDefaultMapBandwidth() { + return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidthKB", "102400"); + } + + private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster) throws FalconException { + String srcPart = FeedHelper.normalizePartitionExpression( + FeedHelper.getCluster(entity, srcCluster.getName()).getPartition()); + srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart); + + String targetPart = FeedHelper.normalizePartitionExpression( + FeedHelper.getCluster(entity, trgCluster.getName()).getPartition()); + targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart); + + StringBuilder pathsWithPartitions = new StringBuilder(); + pathsWithPartitions.append("${coord:dataIn('input')}/") + .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart)); + + String parts = pathsWithPartitions.toString().replaceAll("//+", "/"); + parts = StringUtils.stripEnd(parts, "/"); + return parts; + } + + private void propagateFileSystemCopyProperties(String paths, Properties props) throws FalconException { + props.put("sourceRelativePaths", paths); + + props.put("distcpSourcePaths", "${coord:dataIn('input')}"); + props.put("distcpTargetPaths", "${coord:dataOut('output')}"); + } + + private void propagateTableStorageProperties(Cluster cluster, CatalogStorage tableStorage, + Properties props, String prefix) { + props.put(prefix + "NameNode", ClusterHelper.getStorageUrl(cluster)); + props.put(prefix + "JobTracker", ClusterHelper.getMREndPoint(cluster)); + props.put(prefix + "HcatNode", tableStorage.getCatalogUrl()); + + props.put(prefix + "Database", tableStorage.getDatabase()); + props.put(prefix + "Table", tableStorage.getTable()); + props.put(prefix + "Partition", "${coord:dataInPartitions('input', 'hive-export')}"); + } + + private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage, + Cluster trgCluster, CatalogStorage targetStorage, Properties props) { + // create staging dirs for export at source & set it as distcpSourcePaths + String sourceStagingPath = + FeedHelper.getStagingPath(srcCluster, entity, sourceStorage, Tag.REPLICATION, + NOMINAL_TIME_EL + "/" + trgCluster.getName()); + props.put("distcpSourcePaths", sourceStagingPath); + + // create staging dirs for import at target & set it as distcpTargetPaths + String targetStagingPath = + FeedHelper.getStagingPath(trgCluster, entity, targetStorage, Tag.REPLICATION, + NOMINAL_TIME_EL + "/" + trgCluster.getName()); + props.put("distcpTargetPaths", targetStagingPath); + + props.put("sourceRelativePaths", IGNORE); // this will bot be used for Table storage. + } + + private void propagateLateDataProperties(String instancePaths, String falconFeedStorageType, Properties props) { + // todo these pairs are the same but used in different context + // late data handler - should-record action + props.put("falconInputFeeds", entity.getName()); + props.put("falconInPaths", instancePaths); + + // storage type for each corresponding feed - in this case only one feed is involved + // needed to compute usage based on storage type in LateDataHandler + props.put("falconInputFeedStorageTypes", falconFeedStorageType); + + // falcon post processing + props.put(ARG.feedNames.getPropName(), entity.getName()); + props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}"); + } + + private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException { + Configuration conf = ClusterHelper.getConfiguration(trgCluster); + FileSystem fs = HadoopClientFactory.get().createFileSystem(conf); + + try { + // copy import export scripts to stagingDir + Path scriptPath = new Path(buildPath, "scripts"); + copyHiveScript(fs, scriptPath, "/workflow/", "falcon-table-export.hql"); + copyHiveScript(fs, scriptPath, "/workflow/", "falcon-table-import.hql"); + + // create hive conf to stagingDir + Path confPath = new Path(buildPath + "/conf"); + persistHiveConfiguration(fs, confPath, srcCluster, "falcon-source-"); + persistHiveConfiguration(fs, confPath, trgCluster, "falcon-target-"); + } catch (IOException e) { + throw new FalconException("Unable to create hive conf files", e); + } + } + + private void copyHiveScript(FileSystem fs, Path scriptPath, String localScriptPath, + String scriptName) throws IOException { + OutputStream out = null; + InputStream in = null; + try { + out = fs.create(new Path(scriptPath, scriptName)); + in = FeedReplicationCoordinatorBuilder.class.getResourceAsStream(localScriptPath + scriptName); + IOUtils.copy(in, out); + } finally { + IOUtils.closeQuietly(in); + IOUtils.closeQuietly(out); + } + } + + protected void persistHiveConfiguration(FileSystem fs, Path confPath, + Cluster cluster, String prefix) throws IOException { + Configuration hiveConf = getHiveCredentialsAsConf(cluster); + OutputStream out = null; + try { + out = fs.create(new Path(confPath, prefix + "hive-site.xml")); + hiveConf.writeXml(out); + } finally { + IOUtils.closeQuietly(out); + } + } + + private void initializeCoordAttributes(COORDINATORAPP coord, String coordName, String start, String end, + long delayInMillis) { + coord.setName(coordName); + coord.setFrequency("${coord:" + entity.getFrequency().toString() + "}"); + + if (delayInMillis > 0) { + long delayInMins = -1 * delayInMillis / (1000 * 60); + String elExp = "${now(0," + delayInMins + ")}"; + + coord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp); + coord.getOutputEvents().getDataOut().get(0).setInstance(elExp); + } + + coord.setStart(start); + coord.setEnd(end); + coord.setTimezone(entity.getTimezone().getID()); + } + + private void setCoordControls(COORDINATORAPP coord) throws FalconException { + long frequencyInMillis = ExpressionHelper.get().evaluate(entity.getFrequency().toString(), Long.class); + long timeoutInMillis = frequencyInMillis * 6; + if (timeoutInMillis < THIRTY_MINUTES) { + timeoutInMillis = THIRTY_MINUTES; + } + + Properties props = getEntityProperties(entity); + String timeout = props.getProperty(TIMEOUT); + if (timeout!=null) { + try{ + timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class); + } catch (Exception ignore) { + LOG.error("Unable to evaluate timeout:", ignore); + } + } + coord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60))); + coord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2)); + + String parallelProp = props.getProperty(PARALLEL); + int parallel = 1; + if (parallelProp != null) { + try { + parallel = Integer.parseInt(parallelProp); + } catch (NumberFormatException ignore) { + LOG.error("Unable to parse parallel:", ignore); + } + } + coord.getControls().setConcurrency(String.valueOf(parallel)); + } + + + private void initializeInputDataSet(Cluster cluster, COORDINATORAPP coord, Storage storage) throws FalconException { + SYNCDATASET inputDataset = (SYNCDATASET)coord.getDatasets().getDatasetOrAsyncDataset().get(0); + + String uriTemplate = storage.getUriTemplate(LocationType.DATA); + if (storage.getType() == Storage.TYPE.TABLE) { + uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!! + } + inputDataset.setUriTemplate(uriTemplate); + + setDatasetValues(inputDataset, cluster); + + if (entity.getAvailabilityFlag() == null) { + inputDataset.setDoneFlag(""); + } else { + inputDataset.setDoneFlag(entity.getAvailabilityFlag()); + } + } + + private void initializeOutputDataSet(Cluster cluster, COORDINATORAPP coord, + Storage storage) throws FalconException { + SYNCDATASET outputDataset = (SYNCDATASET)coord.getDatasets().getDatasetOrAsyncDataset().get(1); + + String uriTemplate = storage.getUriTemplate(LocationType.DATA); + if (storage.getType() == Storage.TYPE.TABLE) { + uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!! + } + outputDataset.setUriTemplate(uriTemplate); + + setDatasetValues(outputDataset, cluster); + } + + private void setDatasetValues(SYNCDATASET dataset, Cluster cluster) { + dataset.setInitialInstance(SchemaHelper.formatDateUTC( + FeedHelper.getCluster(entity, cluster.getName()).getValidity().getStart())); + dataset.setTimezone(entity.getTimezone().getID()); + dataset.setFrequency("${coord:" + entity.getFrequency().toString() + "}"); + } + + private long getReplicationDelayInMillis(Cluster srcCluster) throws FalconException { + Frequency replicationDelay = FeedHelper.getCluster(entity, srcCluster.getName()).getDelay(); + long delayInMillis=0; + if (replicationDelay != null) { + delayInMillis = ExpressionHelper.get().evaluate( + replicationDelay.toString(), Long.class); + } + + return delayInMillis; + } + + private Date getStartDate(Cluster cluster, long replicationDelayInMillis) { + Date startDate = FeedHelper.getCluster(entity, cluster.getName()).getValidity().getStart(); + return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis); + } + + private Date getEndDate(Cluster cluster) { + return FeedHelper.getCluster(entity, cluster.getName()).getValidity().getEnd(); + } + + private boolean noOverlapExists(Date sourceStartDate, Date sourceEndDate, + Date targetStartDate, Date targetEndDate) { + return sourceStartDate.after(targetEndDate) || targetStartDate.after(sourceEndDate); + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java new file mode 100644 index 0000000..00fab99 --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.oozie.feed; + +import org.apache.falcon.FalconException; +import org.apache.falcon.Tag; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder; +import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.hadoop.fs.Path; + +import java.util.Properties; + +/** + * Builds feed replication workflow, one per source-target cluster combination. + */ +public class FeedReplicationWorkflowBuilder extends OozieOrchestrationWorkflowBuilder { + private static final String REPLICATION_WF_TEMPLATE = "/workflow/replication-workflow.xml"; + private static final String SOURCE_HIVE_CREDENTIAL_NAME = "falconSourceHiveAuth"; + private static final String TARGET_HIVE_CREDENTIAL_NAME = "falconTargetHiveAuth"; + + public FeedReplicationWorkflowBuilder(Feed entity) { + super(entity, Tag.REPLICATION); + } + + @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException { + WORKFLOWAPP workflow = getWorkflow(REPLICATION_WF_TEMPLATE); + Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, buildPath.getName()); + String wfName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString(); + workflow.setName(wfName); + + addLibExtensionsToWorkflow(cluster, workflow, Tag.REPLICATION); + + addOozieRetries(workflow); + + if (isTableStorageType(cluster)) { + setupHiveCredentials(cluster, srcCluster, workflow); + } + + marshal(cluster, workflow, buildPath); + + return getProperties(buildPath, wfName); + } + + private void setupHiveCredentials(Cluster targetCluster, Cluster sourceCluster, WORKFLOWAPP workflowApp) { + if (isSecurityEnabled) { + // add hcatalog credentials for secure mode and add a reference to each action + addHCatalogCredentials(workflowApp, sourceCluster, SOURCE_HIVE_CREDENTIAL_NAME); + addHCatalogCredentials(workflowApp, targetCluster, TARGET_HIVE_CREDENTIAL_NAME); + } + + // hive-site.xml file is created later in coordinator initialization but + // actions are set to point to that here + + for (Object object : workflowApp.getDecisionOrForkOrJoin()) { + if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) { + continue; + } + + org.apache.falcon.oozie.workflow.ACTION action = + (org.apache.falcon.oozie.workflow.ACTION) object; + String actionName = action.getName(); + if ("recordsize".equals(actionName)) { + // add reference to hive-site conf to each action + action.getJava().setJobXml("${wf:appPath()}/conf/falcon-source-hive-site.xml"); + + if (isSecurityEnabled) { // add a reference to credential in the action + action.setCred(SOURCE_HIVE_CREDENTIAL_NAME); + } + } else if ("table-export".equals(actionName)) { + if (isSecurityEnabled) { // add a reference to credential in the action + action.setCred(SOURCE_HIVE_CREDENTIAL_NAME); + } + } else if ("table-import".equals(actionName)) { + if (isSecurityEnabled) { // add a reference to credential in the action + action.setCred(TARGET_HIVE_CREDENTIAL_NAME); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java new file mode 100644 index 0000000..4393c94 --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.oozie.feed; + +import org.apache.falcon.FalconException; +import org.apache.falcon.Tag; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.Storage; +import org.apache.falcon.entity.v0.Frequency.TimeUnit; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.messaging.EntityInstanceMessage.ARG; +import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps; +import org.apache.falcon.oozie.OozieCoordinatorBuilder; +import org.apache.falcon.oozie.OozieEntityBuilder; +import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder; +import org.apache.falcon.oozie.coordinator.ACTION; +import org.apache.falcon.oozie.coordinator.COORDINATORAPP; +import org.apache.falcon.oozie.coordinator.WORKFLOW; +import org.apache.hadoop.fs.Path; + +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Properties; + +/** + * Builds feed retention coordinator. + */ +public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder { + public FeedRetentionCoordinatorBuilder(Feed entity) { + super(entity, Tag.RETENTION); + } + + @Override public List buildCoords(Cluster cluster, Path buildPath) throws FalconException { + org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName()); + + if (feedCluster.getValidity().getEnd().before(new Date())) { + LOG.warn("Feed Retention is not applicable as Feed's end time for cluster {} is not in the future", + cluster.getName()); + return null; + } + + COORDINATORAPP coord = new COORDINATORAPP(); + String coordName = getEntityName(); + coord.setName(coordName); + coord.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd())); + coord.setStart(SchemaHelper.formatDateUTC(new Date())); + coord.setTimezone(entity.getTimezone().getID()); + TimeUnit timeUnit = entity.getFrequency().getTimeUnit(); + if (timeUnit == TimeUnit.hours || timeUnit == TimeUnit.minutes) { + coord.setFrequency("${coord:hours(6)}"); + } else { + coord.setFrequency("${coord:days(1)}"); + } + + Path coordPath = getBuildPath(buildPath); + Properties props = createCoordDefaultConfiguration(cluster, coordName); + props.put("timeZone", entity.getTimezone().getID()); + props.put("frequency", entity.getFrequency().getTimeUnit().name()); + + final Storage storage = FeedHelper.createStorage(cluster, entity); + props.put("falconFeedStorageType", storage.getType().name()); + + String feedDataPath = storage.getUriTemplate(); + props.put("feedDataPath", + feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX)); + + props.put("limit", feedCluster.getRetention().getLimit().toString()); + + props.put(ARG.operation.getPropName(), EntityOps.DELETE.name()); + props.put(ARG.feedNames.getPropName(), entity.getName()); + props.put(ARG.feedInstancePaths.getPropName(), IGNORE); + + props.put("falconInputFeeds", entity.getName()); + props.put("falconInPaths", IGNORE); + + props.putAll(FeedHelper.getUserWorkflowProperties("eviction")); + + WORKFLOW workflow = new WORKFLOW(); + Properties wfProp = OozieOrchestrationWorkflowBuilder.get(entity, Tag.RETENTION).build(cluster, coordPath); + workflow.setAppPath(getStoragePath(wfProp.getProperty(OozieEntityBuilder.ENTITY_PATH))); + workflow.setConfiguration(getConfig(props)); + ACTION action = new ACTION(); + action.setWorkflow(workflow); + + coord.setAction(action); + + marshal(cluster, coord, coordPath); + + return Arrays.asList(getProperties(coordPath, coordName)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java new file mode 100644 index 0000000..4a7f96b --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.oozie.feed; + +import org.apache.falcon.FalconException; +import org.apache.falcon.Tag; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder; +import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.hadoop.fs.Path; + +import java.util.Properties; + +/** + * Builds feed retention workflow. + */ +public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuilder { + private static final String RETENTION_WF_TEMPLATE = "/workflow/retention-workflow.xml"; + + public FeedRetentionWorkflowBuilder(Feed entity) { + super(entity, Tag.DEFAULT); + } + + @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException { + WORKFLOWAPP workflow = getWorkflow(RETENTION_WF_TEMPLATE); + String wfName = EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString(); + workflow.setName(wfName); + addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION); + addOozieRetries(workflow); + + if (isTableStorageType(cluster)) { + setupHiveCredentials(cluster, buildPath, workflow); + } + + marshal(cluster, workflow, buildPath); + return getProperties(buildPath, wfName); + } + + private void setupHiveCredentials(Cluster cluster, Path wfPath, + WORKFLOWAPP workflowApp) throws FalconException { + if (isSecurityEnabled) { + // add hcatalog credentials for secure mode and add a reference to each action + addHCatalogCredentials(workflowApp, cluster, HIVE_CREDENTIAL_NAME); + } + + // create hive-site.xml file so actions can use it in the classpath + createHiveConfiguration(cluster, wfPath, ""); // no prefix since only one hive instance + + for (Object object : workflowApp.getDecisionOrForkOrJoin()) { + if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) { + continue; + } + + org.apache.falcon.oozie.workflow.ACTION action = + (org.apache.falcon.oozie.workflow.ACTION) object; + String actionName = action.getName(); + if ("eviction".equals(actionName)) { + // add reference to hive-site conf to each action + action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml"); + + if (isSecurityEnabled) { + // add a reference to credential in the action + action.setCred(HIVE_CREDENTIAL_NAME); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java new file mode 100644 index 0000000..79a1883 --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.oozie.process; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.ProcessHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.oozie.hive.CONFIGURATION.Property; +import org.apache.falcon.oozie.workflow.ACTION; +import org.apache.falcon.oozie.workflow.CONFIGURATION; +import org.apache.falcon.util.OozieUtils; +import org.apache.hadoop.fs.Path; + +import javax.xml.bind.JAXBElement; +import java.util.List; + +/** + * Builds orchestration workflow for process where engine is hive. + */ +public class HiveProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder { + public HiveProcessWorkflowBuilder(Process entity) { + super(entity); + } + + @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException { + if (!action.getName().equals("user-hive-job")) { + return; + } + + JAXBElement actionJaxbElement = OozieUtils.unMarshalHiveAction(action); + org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue(); + + Path userWfPath = ProcessHelper.getUserWorkflowPath(entity, cluster, buildPath); + hiveAction.setScript(getStoragePath(userWfPath)); + + addPrepareDeleteOutputPath(hiveAction); + + final List paramList = hiveAction.getParam(); + addInputFeedsAsParams(paramList, cluster); + addOutputFeedsAsParams(paramList, cluster); + + propagateEntityProperties(hiveAction); + + // adds hive-site.xml in hive classpath + hiveAction.setJobXml("${wf:appPath()}/conf/hive-site.xml"); + + addArchiveForCustomJars(cluster, hiveAction.getArchive(), ProcessHelper.getUserLibPath(entity, cluster, + buildPath)); + + OozieUtils.marshalHiveAction(action, actionJaxbElement); + } + + private void propagateEntityProperties(org.apache.falcon.oozie.hive.ACTION hiveAction) { + CONFIGURATION conf = new CONFIGURATION(); + super.propagateEntityProperties(conf, hiveAction.getParam()); + + List hiveConf = hiveAction.getConfiguration().getProperty(); + for (CONFIGURATION.Property prop : conf.getProperty()) { + Property hiveProp = new Property(); + hiveProp.setName(prop.getName()); + hiveProp.setValue(prop.getValue()); + hiveConf.add(hiveProp); + } + } + + private void addPrepareDeleteOutputPath(org.apache.falcon.oozie.hive.ACTION hiveAction) throws FalconException { + + List deleteOutputPathList = getPrepareDeleteOutputPathList(); + if (deleteOutputPathList.isEmpty()) { + return; + } + + org.apache.falcon.oozie.hive.PREPARE prepare = new org.apache.falcon.oozie.hive.PREPARE(); + List deleteList = prepare.getDelete(); + + for (String deletePath : deleteOutputPathList) { + org.apache.falcon.oozie.hive.DELETE delete = new org.apache.falcon.oozie.hive.DELETE(); + delete.setPath(deletePath); + deleteList.add(delete); + } + + if (!deleteList.isEmpty()) { + hiveAction.setPrepare(prepare); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java new file mode 100644 index 0000000..977d8c1 --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.oozie.process; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.ProcessHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.oozie.workflow.ACTION; +import org.apache.hadoop.fs.Path; + +/** + * Builds oozie workflow for process where the engine is oozie. + */ +public class OozieProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder { + public OozieProcessWorkflowBuilder(Process entity) { + super(entity); + } + + @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException { + if (!action.getName().equals("user-oozie-workflow")) { + return; + } + action.getSubWorkflow().setAppPath(getStoragePath(ProcessHelper.getUserWorkflowPath(entity, cluster, + buildPath))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java new file mode 100644 index 0000000..29f601d --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.oozie.process; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.ProcessHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.oozie.workflow.ACTION; +import org.apache.falcon.oozie.workflow.DELETE; +import org.apache.falcon.oozie.workflow.PIG; +import org.apache.falcon.oozie.workflow.PREPARE; +import org.apache.hadoop.fs.Path; + +import java.util.List; + +/** + * Builds orchestration workflow for process where engine is pig. + */ +public class PigProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder { + + public PigProcessWorkflowBuilder(Process entity) { + super(entity); + } + + @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException { + if (!action.getName().equals("user-pig-job")) { + return; + } + + PIG pigAction = action.getPig(); + Path userWfPath = ProcessHelper.getUserWorkflowPath(entity, cluster, buildPath); + pigAction.setScript(getStoragePath(userWfPath)); + + addPrepareDeleteOutputPath(pigAction); + + final List paramList = pigAction.getParam(); + addInputFeedsAsParams(paramList, cluster); + addOutputFeedsAsParams(paramList, cluster); + + propagateEntityProperties(pigAction.getConfiguration(), pigAction.getParam()); + + if (isTableStorageType(cluster)) { // adds hive-site.xml in pig classpath + pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml"); + } + + addArchiveForCustomJars(cluster, pigAction.getArchive(), ProcessHelper.getUserLibPath(entity, cluster, + buildPath)); + } + + private void addPrepareDeleteOutputPath(PIG pigAction) throws FalconException { + List deleteOutputPathList = getPrepareDeleteOutputPathList(); + if (deleteOutputPathList.isEmpty()) { + return; + } + + final PREPARE prepare = new PREPARE(); + final List deleteList = prepare.getDelete(); + + for (String deletePath : deleteOutputPathList) { + final DELETE delete = new DELETE(); + delete.setPath(deletePath); + deleteList.add(delete); + } + + if (!deleteList.isEmpty()) { + pigAction.setPrepare(prepare); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java new file mode 100644 index 0000000..86cea93 --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.oozie.process; + +import org.apache.falcon.FalconException; +import org.apache.falcon.Tag; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.ProcessHelper; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency.TimeUnit; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.process.Input; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.oozie.OozieBundleBuilder; +import org.apache.falcon.oozie.OozieCoordinatorBuilder; +import org.apache.falcon.update.UpdateHelper; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.oozie.client.CoordinatorJob.Timeunit; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Builds oozie bundle for process - schedulable entity in oozie. + */ +public class ProcessBundleBuilder extends OozieBundleBuilder { + + public ProcessBundleBuilder(Process entity) { + super(entity); + } + + @Override protected Properties getAdditionalProperties(Cluster cluster) throws FalconException { + Properties properties = new Properties(); + if (entity.getInputs() != null) { + for (Input in : entity.getInputs().getInputs()) { + if (in.isOptional()) { + properties.putAll(getOptionalInputProperties(in, cluster.getName())); + } + } + } + return properties; + } + + private Properties getOptionalInputProperties(Input in, String clusterName) throws FalconException { + Properties properties = new Properties(); + Feed feed = EntityUtil.getEntity(EntityType.FEED, in.getFeed()); + org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, clusterName); + String inName = in.getName(); + properties.put(inName + ".frequency", String.valueOf(feed.getFrequency().getFrequency())); + properties.put(inName + ".freq_timeunit", mapToCoordTimeUnit(feed.getFrequency().getTimeUnit()).name()); + properties.put(inName + ".timezone", feed.getTimezone().getID()); + properties.put(inName + ".end_of_duration", Timeunit.NONE.name()); + properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart())); + properties.put(inName + ".done-flag", "notused"); + + String locPath = FeedHelper.createStorage(clusterName, feed) + .getUriTemplate(LocationType.DATA).replace('$', '%'); + properties.put(inName + ".uri-template", locPath); + + properties.put(inName + ".start-instance", in.getStart()); + properties.put(inName + ".end-instance", in.getEnd()); + return properties; + } + + private Timeunit mapToCoordTimeUnit(TimeUnit tu) { + switch (tu) { + case days: + return Timeunit.DAY; + + case hours: + return Timeunit.HOUR; + + case minutes: + return Timeunit.MINUTE; + + case months: + return Timeunit.MONTH; + + default: + throw new IllegalArgumentException("Unhandled time unit " + tu); + } + } + + @Override protected Path getLibPath(Cluster cluster, Path buildPath) throws FalconException { + return ProcessHelper.getUserLibPath(entity, cluster, buildPath); + } + + @Override protected List doBuild(Cluster cluster, Path buildPath) throws FalconException { + copyUserWorkflow(cluster, buildPath); + + return OozieCoordinatorBuilder.get(entity, Tag.DEFAULT).buildCoords(cluster, buildPath); + } + + private void copyUserWorkflow(Cluster cluster, Path buildPath) throws FalconException { + try { + FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster)); + + //Copy user workflow and lib to staging dir + Map checksums = UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getPath()), + new Path(buildPath, EntityUtil.PROCESS_USER_DIR)); + if (entity.getWorkflow().getLib() != null && fs.exists(new Path(entity.getWorkflow().getLib()))) { + checksums.putAll(UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getLib()), + new Path(buildPath, EntityUtil.PROCESS_USERLIB_DIR))); + } + + writeChecksums(fs, new Path(buildPath, EntityUtil.PROCESS_CHECKSUM_FILE), checksums); + } catch (IOException e) { + throw new FalconException("Failed to copy user workflow/lib", e); + } + } + + private void writeChecksums(FileSystem fs, Path path, Map checksums) throws FalconException { + try { + FSDataOutputStream stream = fs.create(path); + try { + for (Map.Entry entry : checksums.entrySet()) { + stream.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes()); + } + } finally { + stream.close(); + } + } catch (IOException e) { + throw new FalconException("Failed to copy user workflow/lib", e); + } + } +}