falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [34/51] [partial] falcon git commit: FALCON-1830 Removed code source directories and updated pom
Date Tue, 01 Mar 2016 08:26:20 GMT
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
deleted file mode 100644
index 9b1e1f4..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-
-
-/**
- * Captures the workflow execution context.
- */
-public class WorkflowExecutionContext {
-
-    private static final Logger LOG = LoggerFactory.getLogger(WorkflowExecutionContext.class);
-
-    public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; // nominal time
-
-    public static final String OUTPUT_FEED_SEPARATOR = ",";
-    public static final String INPUT_FEED_SEPARATOR = "#";
-    public static final String CLUSTER_NAME_SEPARATOR = ",";
-
-    /**
-     * Workflow execution status.
-     */
-    public enum Status {WAITING, RUNNING, SUSPENDED, SUCCEEDED, FAILED, TIMEDOUT, KILLED}
-
-    /**
-     * Workflow execution type.
-     */
-    public enum Type {PRE_PROCESSING, POST_PROCESSING, WORKFLOW_JOB, COORDINATOR_ACTION}
-
-    /**
-     * Entity operations supported.
-     */
-    public enum EntityOperations {
-        GENERATE, DELETE, REPLICATE, IMPORT, EXPORT
-    }
-
-    public static final WorkflowExecutionArgs[] USER_MESSAGE_ARGS = {
-        WorkflowExecutionArgs.CLUSTER_NAME,
-        WorkflowExecutionArgs.ENTITY_NAME,
-        WorkflowExecutionArgs.ENTITY_TYPE,
-        WorkflowExecutionArgs.NOMINAL_TIME,
-        WorkflowExecutionArgs.OPERATION,
-
-        WorkflowExecutionArgs.OUTPUT_FEED_NAMES,
-        WorkflowExecutionArgs.OUTPUT_FEED_PATHS,
-
-        WorkflowExecutionArgs.WORKFLOW_ID,
-        WorkflowExecutionArgs.WORKFLOW_USER,
-        WorkflowExecutionArgs.RUN_ID,
-        WorkflowExecutionArgs.STATUS,
-        WorkflowExecutionArgs.TIMESTAMP,
-        WorkflowExecutionArgs.LOG_DIR,
-    };
-
-    private final Map<WorkflowExecutionArgs, String> context;
-    private final long creationTime;
-    private Configuration actionJobConf;
-
-    public WorkflowExecutionContext(Map<WorkflowExecutionArgs, String> context) {
-        this.context = context;
-        creationTime = System.currentTimeMillis();
-    }
-
-    public String getValue(WorkflowExecutionArgs arg) {
-        return context.get(arg);
-    }
-
-    public void setValue(WorkflowExecutionArgs arg, String value) {
-        context.put(arg, value);
-    }
-
-    public String getValue(WorkflowExecutionArgs arg, String defaultValue) {
-        return context.containsKey(arg) ? context.get(arg) : defaultValue;
-    }
-
-    public boolean containsKey(WorkflowExecutionArgs arg) {
-        return context.containsKey(arg);
-    }
-
-    public Set<Map.Entry<WorkflowExecutionArgs, String>> entrySet() {
-        return context.entrySet();
-    }
-
-    // helper methods
-    public boolean hasWorkflowSucceeded() {
-        return Status.SUCCEEDED.name().equals(getValue(WorkflowExecutionArgs.STATUS));
-    }
-
-    public boolean hasWorkflowFailed() {
-        return Status.FAILED.name().equals(getValue(WorkflowExecutionArgs.STATUS));
-    }
-
-    public boolean isWorkflowKilledManually(){
-        try {
-            return WorkflowEngineFactory.getWorkflowEngine().
-                    isWorkflowKilledByUser(
-                            getValue(WorkflowExecutionArgs.CLUSTER_NAME),
-                            getValue(WorkflowExecutionArgs.WORKFLOW_ID));
-        } catch (Exception e) {
-            LOG.error("Got Error in getting error codes from actions: " + e);
-        }
-        return false;
-    }
-
-    public boolean hasWorkflowTimedOut() {
-        return Status.TIMEDOUT.name().equals(getValue(WorkflowExecutionArgs.STATUS));
-    }
-
-    public boolean hasWorkflowBeenKilled() {
-        return Status.KILLED.name().equals(getValue(WorkflowExecutionArgs.STATUS));
-    }
-
-    public String getContextFile() {
-        return getValue(WorkflowExecutionArgs.CONTEXT_FILE);
-    }
-
-    public Status getWorkflowStatus() {
-        return Status.valueOf(getValue(WorkflowExecutionArgs.STATUS));
-    }
-
-    public String getLogDir() {
-        return getValue(WorkflowExecutionArgs.LOG_DIR);
-    }
-
-    public String getLogFile() {
-        return getValue(WorkflowExecutionArgs.LOG_FILE);
-    }
-
-    String getNominalTime() {
-        return getValue(WorkflowExecutionArgs.NOMINAL_TIME);
-    }
-
-    /**
-     * Returns nominal time as a ISO8601 formatted string.
-     * @return a ISO8601 formatted string
-     */
-    public String getNominalTimeAsISO8601() {
-        return SchemaHelper.formatDateUTCToISO8601(getNominalTime(), INSTANCE_FORMAT);
-    }
-
-    String getTimestamp() {
-        return getValue(WorkflowExecutionArgs.TIMESTAMP);
-    }
-
-    /**
-     * Returns timestamp as a long.
-     * @return Date as long (milliseconds since epoch) for the timestamp.
-     */
-    public long getTimeStampAsLong() {
-        String dateString = getTimestamp();
-        try {
-            DateFormat dateFormat = new SimpleDateFormat(INSTANCE_FORMAT.substring(0, dateString.length()));
-            dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-            return dateFormat.parse(dateString).getTime();
-        } catch (java.text.ParseException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Returns timestamp as a ISO8601 formatted string.
-     * @return a ISO8601 formatted string
-     */
-    public String getTimeStampAsISO8601() {
-        return SchemaHelper.formatDateUTCToISO8601(getTimestamp(), INSTANCE_FORMAT);
-    }
-
-    public String getClusterName() {
-        String value =  getValue(WorkflowExecutionArgs.CLUSTER_NAME);
-        if (EntityOperations.REPLICATE != getOperation()) {
-            return value;
-        }
-
-        return value.split(CLUSTER_NAME_SEPARATOR)[0];
-    }
-
-    public String getSrcClusterName() {
-        String value =  getValue(WorkflowExecutionArgs.CLUSTER_NAME);
-        if (EntityOperations.REPLICATE != getOperation()) {
-            return value;
-        }
-
-        String[] parts = value.split(CLUSTER_NAME_SEPARATOR);
-        if (parts.length != 2) {
-            throw new IllegalArgumentException("Replicated cluster pair is missing in " + value);
-        }
-
-        return parts[1];
-    }
-
-    public String getEntityName() {
-        return getValue(WorkflowExecutionArgs.ENTITY_NAME);
-    }
-
-    public String getEntityType() {
-        return getValue(WorkflowExecutionArgs.ENTITY_TYPE).toUpperCase();
-    }
-
-    public EntityOperations getOperation() {
-        if (getValue(WorkflowExecutionArgs.OPERATION) != null) {
-            return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.OPERATION));
-        }
-        return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.DATA_OPERATION));
-    }
-
-    public String getOutputFeedNames() {
-        return getValue(WorkflowExecutionArgs.OUTPUT_FEED_NAMES);
-    }
-
-    public String[] getOutputFeedNamesList() {
-        return getOutputFeedNames().split(OUTPUT_FEED_SEPARATOR);
-    }
-
-    public String getOutputFeedInstancePaths() {
-        return getValue(WorkflowExecutionArgs.OUTPUT_FEED_PATHS);
-    }
-
-    public String[] getOutputFeedInstancePathsList() {
-        return getOutputFeedInstancePaths().split(OUTPUT_FEED_SEPARATOR);
-    }
-
-    public String getInputFeedNames() {
-        return getValue(WorkflowExecutionArgs.INPUT_FEED_NAMES);
-    }
-
-    public String[] getInputFeedNamesList() {
-        return getInputFeedNames().split(INPUT_FEED_SEPARATOR);
-    }
-
-    public String getInputFeedInstancePaths() {
-        return getValue(WorkflowExecutionArgs.INPUT_FEED_PATHS);
-    }
-
-    public String[] getInputFeedInstancePathsList() {
-        return getInputFeedInstancePaths().split(INPUT_FEED_SEPARATOR);
-    }
-
-    public String getWorkflowEngineUrl() {
-        return getValue(WorkflowExecutionArgs.WF_ENGINE_URL);
-    }
-
-    public String getUserWorkflowEngine() {
-        return getValue(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE);
-    }
-
-    public String getUserWorkflowVersion() {
-        return getValue(WorkflowExecutionArgs.USER_WORKFLOW_VERSION);
-    }
-
-    public String getWorkflowId() {
-        return getValue(WorkflowExecutionArgs.WORKFLOW_ID);
-    }
-
-    public String getWorkflowParentId() {
-        return getValue(WorkflowExecutionArgs.PARENT_ID);
-    }
-
-    public String getUserSubflowId() {
-        return getValue(WorkflowExecutionArgs.USER_SUBFLOW_ID);
-    }
-
-    public int getWorkflowRunId() {
-        return Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID));
-    }
-
-    public String getWorkflowRunIdString() {
-        return String.valueOf(Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID)));
-    }
-
-    public String getWorkflowUser() {
-        return getValue(WorkflowExecutionArgs.WORKFLOW_USER);
-    }
-
-    public long getExecutionCompletionTime() {
-
-        return creationTime;
-    }
-
-    public String getDatasourceName() { return getValue(WorkflowExecutionArgs.DATASOURCE_NAME); }
-
-    public long getWorkflowStartTime() {
-        return Long.parseLong(getValue(WorkflowExecutionArgs.WF_START_TIME));
-    }
-
-    public long getWorkflowEndTime() {
-        return Long.parseLong(getValue(WorkflowExecutionArgs.WF_END_TIME));
-    }
-
-
-    public Type getContextType() {
-        return Type.valueOf(getValue(WorkflowExecutionArgs.CONTEXT_TYPE));
-    }
-
-    public String getCounters() {
-        return getValue(WorkflowExecutionArgs.COUNTERS);
-    }
-
-    /**
-     * this method is invoked from with in the workflow.
-     *
-     * @throws java.io.IOException
-     * @throws org.apache.falcon.FalconException
-     */
-    public void serialize() throws IOException, FalconException {
-        serialize(getContextFile());
-    }
-
-    /**
-     * this method is invoked from with in the workflow.
-     *
-     * @param contextFile file to serialize the workflow execution metadata
-     * @throws org.apache.falcon.FalconException
-     */
-    public void serialize(String contextFile) throws FalconException {
-        LOG.info("Saving context to: [{}]", contextFile);
-        OutputStream out = null;
-        Path file = new Path(contextFile);
-        try {
-            FileSystem fs =
-                    actionJobConf == null ? HadoopClientFactory.get().createProxiedFileSystem(file.toUri())
-                                 : HadoopClientFactory.get().createProxiedFileSystem(file.toUri(), actionJobConf);
-            out = fs.create(file);
-            out.write(JSONValue.toJSONString(context).getBytes());
-        } catch (IOException e) {
-            throw new FalconException("Error serializing context to: " + contextFile,  e);
-        } finally {
-            if (out != null) {
-                try {
-                    out.close();
-                } catch (IOException ignore) {
-                    // ignore
-                }
-            }
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "WorkflowExecutionContext{" + context.toString() + "}";
-    }
-
-    @SuppressWarnings("unchecked")
-    public static WorkflowExecutionContext deSerialize(String contextFile) throws FalconException {
-        try {
-            Path lineageDataPath = new Path(contextFile); // file has 777 permissions
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                    lineageDataPath.toUri());
-
-            BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(lineageDataPath)));
-            return new WorkflowExecutionContext((Map<WorkflowExecutionArgs, String>) JSONValue.parse(in));
-        } catch (IOException e) {
-            throw new FalconException("Error opening context file: " + contextFile, e);
-        }
-    }
-
-    public static String getFilePath(String logDir, String entityName, String entityType,
-                                     EntityOperations operation) {
-        // needed by feed clean up
-        String parentSuffix = EntityType.PROCESS.name().equals(entityType)
-                || EntityOperations.REPLICATE == operation ? "" : "/context/";
-
-        // LOG_DIR is sufficiently unique
-        return new Path(logDir + parentSuffix, entityName + "-wf-post-exec-context.json").toString();
-    }
-
-
-    public static Path getCounterFile(String logDir) {
-        return new Path(logDir, "counter.txt");
-    }
-
-    public static String readCounters(FileSystem fs, Path counterFile) throws IOException{
-        StringBuilder counterBuffer = new StringBuilder();
-        BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(counterFile)));
-        try {
-            String line;
-            while ((line = in.readLine()) != null) {
-                counterBuffer.append(line);
-                counterBuffer.append(",");
-            }
-        } catch (IOException e) {
-            throw e;
-        } finally {
-            IOUtils.closeQuietly(in);
-        }
-
-        String counterString = counterBuffer.toString();
-        if (StringUtils.isNotBlank(counterString) && counterString.length() > 0) {
-            return counterString.substring(0, counterString.length() - 1);
-        } else {
-            return null;
-        }
-    }
-
-    public static WorkflowExecutionContext create(String[] args, Type type) throws FalconException {
-        return create(args, type, null);
-    }
-
-    public static WorkflowExecutionContext create(String[] args, Type type, Configuration conf) throws FalconException {
-        Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs, String>();
-
-        try {
-            CommandLine cmd = getCommand(args);
-            for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
-                String optionValue = arg.getOptionValue(cmd);
-                if (StringUtils.isNotEmpty(optionValue)) {
-                    wfProperties.put(arg, optionValue);
-                }
-            }
-        } catch (ParseException e) {
-            throw new FalconException("Error parsing wf args", e);
-        }
-
-        WorkflowExecutionContext executionContext = new WorkflowExecutionContext(wfProperties);
-        executionContext.actionJobConf = conf;
-        executionContext.context.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name());
-        executionContext.context.put(WorkflowExecutionArgs.CONTEXT_FILE,
-                getFilePath(executionContext.getLogDir(), executionContext.getEntityName(),
-                        executionContext.getEntityType(), executionContext.getOperation()));
-        addCounterToWF(executionContext);
-
-        return executionContext;
-    }
-
-    private static void addCounterToWF(WorkflowExecutionContext executionContext) throws FalconException {
-        if (executionContext.hasWorkflowFailed()) {
-            LOG.info("Workflow Instance failed, counter will not be added: {}",
-                    executionContext.getWorkflowRunIdString());
-            return;
-        }
-
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                new Path(executionContext.getLogDir()).toUri());
-        Path counterFile = getCounterFile(executionContext.getLogDir());
-        try {
-            if (fs.exists(counterFile)) {
-                String counters = readCounters(fs, counterFile);
-                if (StringUtils.isNotBlank(counters)) {
-                    executionContext.context.put(WorkflowExecutionArgs.COUNTERS, counters);
-                }
-            }
-        } catch (IOException e) {
-            LOG.error("Error in accessing counter file :" + e);
-        } finally {
-            try {
-                if (fs.exists(counterFile)) {
-                    fs.delete(counterFile, false);
-                }
-            } catch (IOException e) {
-                LOG.error("Unable to delete counter file: {}", e);
-            }
-        }
-    }
-
-    private static CommandLine getCommand(String[] arguments) throws ParseException {
-        Options options = new Options();
-
-        for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
-            addOption(options, arg, arg.isRequired());
-        }
-
-        return new GnuParser().parse(options, arguments, false);
-    }
-
-    private static void addOption(Options options, WorkflowExecutionArgs arg, boolean isRequired) {
-        Option option = arg.getOption();
-        option.setRequired(isRequired);
-        options.addOption(option);
-    }
-
-    public static WorkflowExecutionContext create(Map<WorkflowExecutionArgs, String> wfProperties) {
-        return WorkflowExecutionContext.create(wfProperties, Type.POST_PROCESSING);
-    }
-
-    public static WorkflowExecutionContext create(Map<WorkflowExecutionArgs, String> wfProperties, Type type) {
-        wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name());
-        return new WorkflowExecutionContext(wfProperties);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
deleted file mode 100644
index 7bf14f2..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow;
-
-import org.apache.falcon.FalconException;
-
-/**
- * A listener interface for workflow execution.
- */
-public interface WorkflowExecutionListener {
-
-    /**
-     * Invoked when a workflow is succeeds.
-     * @param context
-     * @throws FalconException
-     */
-    void onSuccess(WorkflowExecutionContext context) throws FalconException;
-
-    /**
-     * Invoked when a workflow fails.
-     * @param context
-     * @throws FalconException
-     */
-    void onFailure(WorkflowExecutionContext context) throws FalconException;
-
-    /**
-     * Invoked on start of a workflow. Basically, when the workflow is RUNNING.
-     * @param context
-     * @throws FalconException
-     */
-    void onStart(WorkflowExecutionContext context) throws FalconException;
-
-    /**
-     * Invoked when a workflow is suspended.
-     * @param context
-     * @throws FalconException
-     */
-    void onSuspend(WorkflowExecutionContext context) throws FalconException;
-
-    /**
-     * Invoked when a workflow is in waiting state.
-     * @param context
-     * @throws FalconException
-     */
-    void onWait(WorkflowExecutionContext context) throws FalconException;
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
deleted file mode 100644
index b692258..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.aspect.GenericAlert;
-import org.apache.falcon.entity.EntityNotRegisteredException;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.service.FalconService;
-import org.apache.falcon.util.ReflectionUtils;
-import org.apache.falcon.util.StartupProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Date;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * A workflow job end notification service.
- */
-public class WorkflowJobEndNotificationService implements FalconService {
-
-    private static final Logger LOG = LoggerFactory.getLogger(WorkflowJobEndNotificationService.class);
-
-    public static final String SERVICE_NAME = WorkflowJobEndNotificationService.class.getSimpleName();
-
-    private Set<WorkflowExecutionListener> listeners = new LinkedHashSet<WorkflowExecutionListener>();
-
-    // Maintain a cache of context built, so we don't have to query Oozie for every state change.
-    private Map<String, Properties> contextMap = new ConcurrentHashMap<>();
-
-    @Override
-    public String getName() {
-        return SERVICE_NAME;
-    }
-
-    // Mainly for test
-    Map<String, Properties> getContextMap() {
-        return contextMap;
-    }
-
-    @Override
-    public void init() throws FalconException {
-        String listenerClassNames = StartupProperties.get().getProperty(
-                "workflow.execution.listeners");
-        if (StringUtils.isEmpty(listenerClassNames)) {
-            return;
-        }
-
-        for (String listenerClassName : listenerClassNames.split(",")) {
-            listenerClassName = listenerClassName.trim();
-            if (listenerClassName.isEmpty()) {
-                continue;
-            }
-            WorkflowExecutionListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName);
-            registerListener(listener);
-        }
-    }
-
-    @Override
-    public void destroy() throws FalconException {
-        listeners.clear();
-    }
-
-    public void registerListener(WorkflowExecutionListener listener) {
-        listeners.add(listener);
-    }
-
-    public void unregisterListener(WorkflowExecutionListener listener) {
-        listeners.remove(listener);
-    }
-
-    public void notifyFailure(WorkflowExecutionContext context) throws FalconException {
-        notifyWorkflowEnd(context);
-    }
-
-    public void notifySuccess(WorkflowExecutionContext context) throws FalconException {
-        notifyWorkflowEnd(context);
-    }
-
-    public void notifyStart(WorkflowExecutionContext context) throws FalconException {
-        // Start notifications can only be from Oozie JMS notifications
-        if (!updateContextFromWFConf(context)) {
-            return;
-        }
-        LOG.debug("Sending workflow start notification to listeners with context : {} ", context);
-        for (WorkflowExecutionListener listener : listeners) {
-            try {
-                listener.onStart(context);
-            } catch (Throwable t) {
-                // do not rethrow as other listeners do not get a chance
-                LOG.error("Error in listener {}", listener.getClass().getName(), t);
-            }
-        }
-    }
-
-    public void notifySuspend(WorkflowExecutionContext context) throws FalconException {
-        // Suspend notifications can only be from Oozie JMS notifications
-        if (!updateContextFromWFConf(context)) {
-            return;
-        }
-        LOG.debug("Sending workflow suspend notification to listeners with context : {} ", context);
-        for (WorkflowExecutionListener listener : listeners) {
-            try {
-                listener.onSuspend(context);
-            } catch (Throwable t) {
-                // do not rethrow as other listeners do not get a chance
-                LOG.error("Error in listener {}", listener.getClass().getName(), t);
-            }
-        }
-
-        instrumentAlert(context);
-        contextMap.remove(context.getWorkflowId());
-    }
-
-    public void notifyWait(WorkflowExecutionContext context) throws FalconException {
-        // Wait notifications can only be from Oozie JMS notifications
-        LOG.debug("Sending workflow wait notification to listeners with context : {} ", context);
-        for (WorkflowExecutionListener listener : listeners) {
-            try {
-                listener.onWait(context);
-            } catch (Throwable t) {
-                // do not rethrow as other listeners do not get a chance
-                LOG.error("Error in listener {}", listener.getClass().getName(), t);
-            }
-        }
-    }
-
-    // The method retrieves the conf from the cache if it is in cache.
-    // Else, queries WF Engine to retrieve the conf of the workflow
-    private boolean updateContextFromWFConf(WorkflowExecutionContext context) throws FalconException {
-        Properties wfProps = contextMap.get(context.getWorkflowId());
-        if (wfProps == null) {
-            Entity entity = null;
-            try {
-                entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
-            } catch (EntityNotRegisteredException e) {
-                // Entity no longer exists. No need to notify.
-                LOG.debug("Entity {} of type {} doesn't exist in config store. Notification Ignored.",
-                        context.getEntityName(), context.getEntityType());
-                contextMap.remove(context.getWorkflowId());
-                return false;
-            }
-            for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
-                try {
-                    InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity)
-                            .getJobDetails(cluster, context.getWorkflowId()).getInstances();
-                    if (instances != null && instances.length > 0) {
-                        wfProps = getWFProps(instances[0].getWfParams());
-                        // Required by RetryService. But, is not part of conf.
-                        wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(),
-                                Integer.toString(instances[0].getRunId()));
-                    }
-                } catch (FalconException e) {
-                    // Do Nothing. Move on to the next cluster.
-                    continue;
-                }
-                contextMap.put(context.getWorkflowId(), wfProps);
-            }
-        }
-
-        // No extra props to enhance the context with.
-        if (wfProps == null || wfProps.isEmpty()) {
-            return true;
-        }
-
-        for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
-            if (wfProps.containsKey(arg.getName())) {
-                context.setValue(arg, wfProps.getProperty(arg.getName()));
-            }
-        }
-        return true;
-    }
-
-    private Properties getWFProps(InstancesResult.KeyValuePair[] wfParams) {
-        Properties props = new Properties();
-        for (InstancesResult.KeyValuePair kv : wfParams) {
-            props.put(kv.getKey(), kv.getValue());
-        }
-        return props;
-    }
-
-    // This method handles both success and failure notifications.
-    private void notifyWorkflowEnd(WorkflowExecutionContext context) throws FalconException {
-        // Need to distinguish notification from post processing for backward compatibility
-        if (context.getContextType() == WorkflowExecutionContext.Type.POST_PROCESSING) {
-            boolean engineNotifEnabled = false;
-            try {
-                engineNotifEnabled = WorkflowEngineFactory.getWorkflowEngine()
-                        .isNotificationEnabled(context.getClusterName(), context.getWorkflowId());
-            } catch (FalconException e) {
-                LOG.debug("Received error while checking if notification is enabled. "
-                        + "Hence, assuming notification is not enabled.");
-            }
-            // Ignore the message from post processing as there will be one more from Oozie.
-            if (engineNotifEnabled) {
-                LOG.info("Ignoring message from post processing as engine notification is enabled.");
-                return;
-            } else {
-                updateContextWithTime(context);
-            }
-        } else {
-            if (!updateContextFromWFConf(context)) {
-                return;
-            }
-        }
-
-        LOG.debug("Sending workflow end notification to listeners with context : {} ", context);
-
-        for (WorkflowExecutionListener listener : listeners) {
-            try {
-                if (context.hasWorkflowSucceeded()) {
-                    listener.onSuccess(context);
-                    instrumentAlert(context);
-                } else {
-                    listener.onFailure(context);
-                    if (context.hasWorkflowBeenKilled() || context.hasWorkflowFailed()) {
-                        instrumentAlert(context);
-                    }
-                }
-            } catch (Throwable t) {
-                // do not rethrow as other listeners do not get a chance
-                LOG.error("Error in listener {}", listener.getClass().getName(), t);
-            }
-        }
-
-        contextMap.remove(context.getWorkflowId());
-    }
-
-    // In case of notifications coming from post notifications, start and end time need to be populated.
-    private void updateContextWithTime(WorkflowExecutionContext context) {
-        try {
-            InstancesResult result = WorkflowEngineFactory.getWorkflowEngine()
-                    .getJobDetails(context.getClusterName(), context.getWorkflowId());
-            Date startTime = result.getInstances()[0].startTime;
-            Date endTime = result.getInstances()[0].endTime;
-            Date now = new Date();
-            if (startTime == null) {
-                startTime = now;
-            }
-            if (endTime == null) {
-                endTime = now;
-            }
-            context.setValue(WorkflowExecutionArgs.WF_START_TIME, Long.toString(startTime.getTime()));
-            context.setValue(WorkflowExecutionArgs.WF_END_TIME, Long.toString(endTime.getTime()));
-        } catch(FalconException e) {
-            LOG.error("Unable to retrieve job details for " + context.getWorkflowId() + " on cluster "
-                    + context.getClusterName(), e);
-        }
-    }
-
-    private void instrumentAlert(WorkflowExecutionContext context) {
-        String clusterName = context.getClusterName();
-        String entityName = context.getEntityName();
-        String entityType = context.getEntityType();
-        String operation = context.getOperation().name();
-        String workflowId = context.getWorkflowId();
-        String workflowUser = context.getWorkflowUser();
-        String nominalTime = context.getNominalTimeAsISO8601();
-        String runId = String.valueOf(context.getWorkflowRunId());
-        Date now = new Date();
-        // Start and/or End time may not be set in case of workflow suspend
-        Date endTime;
-        if (context.getWorkflowEndTime() == 0) {
-            endTime = now;
-        } else {
-            endTime = new Date(context.getWorkflowEndTime());
-        }
-
-        Date startTime;
-        if (context.getWorkflowStartTime() == 0) {
-            startTime = now;
-        } else {
-            startTime = new Date(context.getWorkflowStartTime());
-        }
-        Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
-
-        if (context.hasWorkflowFailed()) {
-            GenericAlert.instrumentFailedInstance(clusterName, entityType,
-                    entityName, nominalTime, workflowId, workflowUser, runId, operation,
-                    SchemaHelper.formatDateUTC(startTime), "", "", duration);
-        } else {
-            GenericAlert.instrumentSucceededInstance(clusterName, entityType,
-                    entityName, nominalTime, workflowId, workflowUser, runId, operation,
-                    SchemaHelper.formatDateUTC(startTime), duration);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
deleted file mode 100644
index 4d8402a..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow.engine;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.LifeCycle;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.resource.InstancesSummaryResult;
-
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-
-/**
- * Workflow engine should minimally support the
- * following operations.
- */
-public abstract class AbstractWorkflowEngine {
-
-    public static final String NAME_NODE = "nameNode";
-    public static final String JOB_TRACKER = "jobTracker";
-
-    protected Set<WorkflowEngineActionListener> listeners = new HashSet<WorkflowEngineActionListener>();
-
-    public void registerListener(WorkflowEngineActionListener listener) {
-        listeners.add(listener);
-    }
-
-    public abstract boolean isAlive(Cluster cluster) throws FalconException;
-
-    public abstract void schedule(Entity entity, Boolean skipDryRun, Map<String, String> properties)
-        throws FalconException;
-
-    public abstract String suspend(Entity entity) throws FalconException;
-
-    public abstract String resume(Entity entity) throws FalconException;
-
-    public abstract String delete(Entity entity) throws FalconException;
-
-    public abstract String delete(Entity entity, String cluster) throws FalconException;
-
-    public abstract String reRun(String cluster, String wfId, Properties props, boolean isForced)
-        throws FalconException;
-
-    public abstract void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException;
-
-    public abstract boolean isActive(Entity entity) throws FalconException;
-
-    public abstract boolean isSuspended(Entity entity) throws FalconException;
-
-    public abstract boolean isCompleted(Entity entity) throws FalconException;
-
-    public abstract InstancesResult getRunningInstances(Entity entity,
-                                                        List<LifeCycle> lifeCycles) throws FalconException;
-
-    public abstract InstancesResult killInstances(Entity entity, Date start, Date end, Properties props,
-                                                  List<LifeCycle> lifeCycles) throws FalconException;
-
-    public abstract InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props,
-                                                   List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException;
-
-    public abstract InstancesResult suspendInstances(Entity entity, Date start, Date end, Properties props,
-                                                     List<LifeCycle> lifeCycles) throws FalconException;
-
-    public abstract InstancesResult resumeInstances(Entity entity, Date start, Date end, Properties props,
-                                                    List<LifeCycle> lifeCycles) throws FalconException;
-
-    public abstract InstancesResult getStatus(Entity entity, Date start, Date end,
-                                              List<LifeCycle> lifeCycles, Boolean allAttempts) throws FalconException;
-
-    public abstract InstancesSummaryResult getSummary(Entity entity, Date start, Date end,
-                                                      List<LifeCycle> lifeCycles) throws FalconException;
-
-    public abstract String update(Entity oldEntity, Entity newEntity,
-                                  String cluster, Boolean skipDryRun) throws FalconException;
-
-    public abstract String touch(Entity entity, String cluster, Boolean skipDryRun) throws FalconException;
-
-    public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException;
-
-    public abstract Properties getWorkflowProperties(String cluster, String jobId) throws FalconException;
-
-    public abstract InstancesResult getJobDetails(String cluster, String jobId) throws FalconException;
-
-    public abstract InstancesResult getInstanceParams(Entity entity, Date start, Date end,
-                                                      List<LifeCycle> lifeCycles) throws FalconException;
-
-    public abstract boolean isNotificationEnabled(String cluster, String jobID) throws FalconException;
-
-    public abstract Boolean isWorkflowKilledByUser(String cluster, String jobId) throws FalconException;
-
-
-    /**
-     * Returns the short name of the Workflow Engine.
-     * @return
-     */
-    public abstract String getName();
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/engine/WorkflowEngineActionListener.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/WorkflowEngineActionListener.java b/common/src/main/java/org/apache/falcon/workflow/engine/WorkflowEngineActionListener.java
deleted file mode 100644
index 2a1cbd4..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/engine/WorkflowEngineActionListener.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow.engine;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.Entity;
-
-/**
- * Listener that will be notified before and after
- * workflow life cycle operations are performed.
- */
-public interface WorkflowEngineActionListener {
-
-    void beforeSchedule(Entity entity, String cluster) throws FalconException;
-
-    void afterSchedule(Entity entity, String cluster) throws FalconException;
-
-    void beforeDelete(Entity entity, String cluster) throws FalconException;
-
-    void afterDelete(Entity entity, String cluster) throws FalconException;
-
-    void beforeSuspend(Entity entity, String cluster) throws FalconException;
-
-    void afterSuspend(Entity entity, String cluster) throws FalconException;
-
-    void beforeResume(Entity entity, String cluster) throws FalconException;
-
-    void afterResume(Entity entity, String cluster) throws FalconException;
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java b/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java
deleted file mode 100644
index 3f07c3c..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow.util;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.Shell;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.StringWriter;
-
-/**
- * Utility to read oozie action conf at oozie.action.conf.xml.
- */
-public final class OozieActionConfigurationHelper {
-
-    private static final Logger LOG = LoggerFactory.getLogger(OozieActionConfigurationHelper.class);
-
-    private OozieActionConfigurationHelper() {
-    }
-
-    public static Configuration createActionConf() throws IOException {
-        Configuration conf = new Configuration();
-        Path confPath = new Path("file:///" + System.getProperty("oozie.action.conf.xml"));
-
-        final boolean actionConfExists = confPath.getFileSystem(conf).exists(confPath);
-        LOG.info("Oozie Action conf {} found ? {}", confPath, actionConfExists);
-        if (actionConfExists) {
-            LOG.info("Oozie Action conf found, adding path={}, conf={}", confPath, conf.toString());
-            conf.addResource(confPath);
-            dumpConf(conf, "oozie action conf ");
-        }
-
-        String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
-        if (tokenFile != null) {
-            if (Shell.WINDOWS) {
-                if (tokenFile.charAt(0) == '"') {
-                    tokenFile = tokenFile.substring(1);
-                }
-                if (tokenFile.charAt(tokenFile.length() - 1) == '"') {
-                    tokenFile = tokenFile.substring(0, tokenFile.length() - 1);
-                }
-            }
-
-            conf.set("mapreduce.job.credentials.binary", tokenFile);
-            System.setProperty("mapreduce.job.credentials.binary", tokenFile);
-            conf.set("tez.credentials.path", tokenFile);
-            System.setProperty("tez.credentials.path", tokenFile);
-        }
-
-        conf.set("datanucleus.plugin.pluginRegistryBundleCheck", "LOG");
-        conf.setBoolean("hive.exec.mode.local.auto", false);
-
-        return conf;
-    }
-
-    public static void dumpConf(Configuration conf, String message) throws IOException {
-        StringWriter writer = new StringWriter();
-        Configuration.dumpConfiguration(conf, writer);
-        LOG.info(message + " {}", writer);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java b/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java
deleted file mode 100644
index 05f248e..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow.util;
-
-/**
- * Oozie Constants used across multiple modules.
- */
-public final class OozieConstants {
-    /**
-     * Constant for the oozie running in local.
-     */
-    public static final String LOCAL_OOZIE = "localoozie";
-
-    private OozieConstants() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/log4j.xml b/common/src/main/resources/log4j.xml
deleted file mode 100644
index 75c8267..0000000
--- a/common/src/main/resources/log4j.xml
+++ /dev/null
@@ -1,86 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-    <appender name="console" class="org.apache.log4j.ConsoleAppender">
-        <param name="Target" value="System.out"/>
-        <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
-        </layout>
-    </appender>
-
-    <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
-        <param name="File" value="${user.dir}/target/logs/application.log"/>
-        <param name="Append" value="true"/>
-        <param name="Threshold" value="debug"/>
-        <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
-        </layout>
-    </appender>
-
-    <appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
-        <param name="File" value="${user.dir}/target/logs/audit.log"/>
-        <param name="Append" value="true"/>
-        <param name="Threshold" value="debug"/>
-        <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %x %m%n"/>
-        </layout>
-    </appender>
-
-    <appender name="METRIC" class="org.apache.log4j.DailyRollingFileAppender">
-        <param name="File" value="${user.dir}/target/logs/metric.log"/>
-        <param name="Append" value="true"/>
-        <param name="Threshold" value="debug"/>
-        <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %m%n"/>
-        </layout>
-    </appender>
-
-    <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender">
-        <param name="File" value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/>
-        <param name="Append" value="true"/>
-        <param name="Threshold" value="debug"/>
-        <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %m%n"/>
-        </layout>
-    </appender>
-
-    <logger name="org.apache.falcon" additivity="false">
-        <level value="debug"/>
-        <appender-ref ref="FILE"/>
-    </logger>
-
-    <logger name="AUDIT">
-        <level value="info"/>
-        <appender-ref ref="AUDIT"/>
-    </logger>
-
-    <logger name="METRIC">
-        <level value="info"/>
-        <appender-ref ref="METRIC"/>
-    </logger>
-
-    <root>
-        <priority value="info"/>
-        <appender-ref ref="console"/>
-    </root>
-
-</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties
deleted file mode 100644
index 643559e..0000000
--- a/common/src/main/resources/runtime.properties
+++ /dev/null
@@ -1,54 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-*.domain=debug
-
-*.falcon.parentworkflow.retry.max=3
-*.falcon.parentworkflow.retry.interval.secs=1
-
-*.falcon.replication.workflow.maxmaps=5
-*.falcon.replication.workflow.mapbandwidth=100
-*.webservices.default.results.per.page=10
-
-# If true, do not run retention past feedCluster validity end time.
-# This will retain recent instances beyond feedCluster validity end time.
-*.falcon.retention.keep.instances.beyond.validity=true
-
-# Default configs to handle replication for late arriving feeds.
-*.feed.late.allowed=true
-*.feed.late.frequency=hours(3)
-*.feed.late.policy=exp-backoff
-
-# If true, Falcon skips oozie dryrun while scheduling entities.
-*.falcon.skip.dryrun=false
-
-######### Proxyuser Configuration Start #########
-
-#List of hosts the '#USER#' user is allowed to perform 'doAs 'operations from. The '#USER#' must be replaced with the
-#username of the user who is allowed to perform 'doAs' operations. The value can be the '*' wildcard or a list of
-#comma separated hostnames
-
-*.falcon.service.ProxyUserService.proxyuser.#USER#.hosts=*
-
-#List of groups the '#USER#' user is allowed to 'doAs 'operations. The '#USER#' must be replaced with the
-#username of the user who is allowed to perform 'doAs' operations. The value can be the '*' wildcard or a list of
-#comma separated groups
-
-*.falcon.service.ProxyUserService.proxyuser.#USER#.groups=*
-
-######### Proxyuser Configuration End #########
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
deleted file mode 100644
index 2497cce..0000000
--- a/common/src/main/resources/startup.properties
+++ /dev/null
@@ -1,306 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-*.domain=debug
-
-######### Implementation classes #########
-## DONT MODIFY UNLESS SURE ABOUT CHANGE ##
-
-*.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine
-*.lifecycle.engine.impl=org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory
-*.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder
-*.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder
-*.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager
-*.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService
-*.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager
-*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
-
-##### Falcon Services #####
-*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
-                        org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
-                        org.apache.falcon.service.ProcessSubscriberService,\
-                        org.apache.falcon.service.FeedSLAMonitoringService,\
-                        org.apache.falcon.service.LifecyclePolicyMap,\
-                        org.apache.falcon.entity.store.ConfigurationStore,\
-                        org.apache.falcon.rerun.service.RetryService,\
-                        org.apache.falcon.rerun.service.LateRunService,\
-                        org.apache.falcon.metadata.MetadataMappingService,\
-                        org.apache.falcon.service.LogCleanupService,\
-                        org.apache.falcon.service.GroupsService,\
-                        org.apache.falcon.service.ProxyUserService,\
-                        org.apache.falcon.adfservice.ADFProviderService
-## If you wish to use Falcon native scheduler add the commented out services below to application.services ##
-#                        org.apache.falcon.notification.service.impl.JobCompletionService,\
-#                        org.apache.falcon.notification.service.impl.SchedulerService,\
-#                        org.apache.falcon.notification.service.impl.AlarmService,\
-#                        org.apache.falcon.notification.service.impl.DataAvailabilityService,\
-#                        org.apache.falcon.execution.FalconExecutionService,\
-#                        org.apache.falcon.state.store.service.FalconJPAService
-
-
-# List of Lifecycle policies configured.
-*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
-# List of builders for the policies.
-*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder
-##### Falcon Configuration Store Change listeners #####
-*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
-                        org.apache.falcon.entity.ColoClusterRelation,\
-                        org.apache.falcon.group.FeedGroupMap,\
-                        org.apache.falcon.entity.store.FeedLocationStore,\
-                        org.apache.falcon.service.FeedSLAMonitoringService,\
-                        org.apache.falcon.service.SharedLibraryHostingService
-## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
-#                       org.apache.falcon.state.store.jdbc.JdbcStateStore
-
-##### JMS MQ Broker Implementation class #####
-*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
-
-##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-all,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3
-
-##### Workflow Job Execution Completion listeners #####
-*.workflow.execution.listeners=
-
-######### Implementation classes #########
-
-
-######### System startup parameters #########
-
-# Location of libraries that is shipped to Hadoop
-*.system.lib.location=${FALCON_HOME}/sharedlibs
-
-# Location to store user entity configurations
-
-#Configurations used in UTs
-debug.config.store.uri=file://${user.dir}/target/store
-#Location to store state of Feed SLA monitoring service
-debug.feed.sla.service.store.uri= file://${user.dir}/target/data/sla/pendingfeedinstances
-debug.config.oozie.conf.uri=${user.dir}/target/oozie
-debug.system.lib.location=${system.lib.location}
-debug.broker.url=vm://localhost
-debug.retry.recorder.path=${user.dir}/target/retry
-debug.libext.feed.retention.paths=${falcon.libext}
-debug.libext.feed.replication.paths=${falcon.libext}
-debug.libext.process.paths=${falcon.libext}
-
-#Configurations used in ITs
-it.config.store.uri=file://${user.dir}/target/store
-it.config.oozie.conf.uri=${user.dir}/target/oozie
-it.system.lib.location=${system.lib.location}
-it.broker.url=tcp://localhost:61616
-it.retry.recorder.path=${user.dir}/target/retry
-it.libext.feed.retention.paths=${falcon.libext}
-it.libext.feed.replication.paths=${falcon.libext}
-it.libext.process.paths=${falcon.libext}
-it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandler
-
-*.falcon.cleanup.service.frequency=minutes(5)
-
-######### Properties for Feed SLA Monitoring #########
-# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
-*.feed.sla.serialization.frequency.millis=3600000
-
-# Maximum number of pending instances per feed that will be recorded. After this older instances will be removed in
-# a FIFO fashion.
-*.feed.sla.queue.size=288
-
-# Do not change unless really sure
-# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60
-*.feed.sla.statusCheck.frequency.seconds=600
-
-# Do not change unless really sure
-# Time Duration (in milliseconds) in future for generating pending feed instances.
-# In every cycle pending feed instances are added for monitoring, till this time in future.
-# It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000
-*.feed.sla.lookAheadWindow.millis=900000
-
-
-######### Properties for configuring JMS provider - activemq #########
-# Default Active MQ url
-*.broker.url=tcp://localhost:61616
-
-# default time-to-live for a JMS message 3 days (time in minutes)
-*.broker.ttlInMins=4320
-*.entity.topic=FALCON.ENTITY.TOPIC
-*.max.retry.failure.count=1
-*.retry.recorder.path=${user.dir}/logs/retry
-
-######### Properties for configuring iMon client and metric #########
-*.internal.queue.size=1000
-
-
-######### Graph Database Properties #########
-# Graph implementation
-*.falcon.graph.blueprints.graph=com.thinkaurelius.titan.core.TitanFactory
-
-# Graph Storage
-*.falcon.graph.storage.directory=${user.dir}/target/graphdb
-*.falcon.graph.storage.backend=berkeleyje
-*.falcon.graph.serialize.path=${user.dir}/target/graphdb
-*.falcon.graph.preserve.history=false
-*.falcon.graph.transaction.retry.count=3
-*.falcon.graph.transaction.retry.delay=5
-
-# Uncomment and override the following properties for enabling metrics for titan db and pushing them to graphite. You
-# can use other reporters like ganglia also.
-# Refer (http://thinkaurelius.github.io/titan/wikidoc/0.4.2/Titan-Performance-and-Monitoring)for finding the
-# relevant configurations for your use case. NOTE: you have to prefix all the properties with "*.falcon.graph."
-# *.falcon.graph.storage.enable-basic-metrics = true
-# Required; IP or hostname string
-# *.falcon.graph.metrics.graphite.hostname = 192.168.0.1
-# Required; specify logging interval in milliseconds
-# *.falcon.graph.metrics.graphite.interval = 60000
-
-######### Authentication Properties #########
-
-# Authentication type must be specified: simple|kerberos
-*.falcon.authentication.type=simple
-
-##### Service Configuration
-
-# Indicates the Kerberos principal to be used in Falcon Service.
-*.falcon.service.authentication.kerberos.principal=
-
-# Location of the keytab file with the credentials for the Service principal.
-*.falcon.service.authentication.kerberos.keytab=
-
-# name node principal to talk to config store
-*.dfs.namenode.kerberos.principal=
-
-##### SPNEGO Configuration
-
-# Authentication type must be specified: simple|kerberos|<class>
-# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility
-*.falcon.http.authentication.type=simple
-
-# Indicates how long (in seconds) an authentication token is valid before it has to be renewed.
-*.falcon.http.authentication.token.validity=36000
-
-# The signature secret for signing the authentication tokens.
-*.falcon.http.authentication.signature.secret=falcon
-
-# The domain to use for the HTTP cookie that stores the authentication token.
-*.falcon.http.authentication.cookie.domain=
-
-# Indicates if anonymous requests are allowed when using 'simple' authentication.
-*.falcon.http.authentication.simple.anonymous.allowed=false
-
-# Indicates the Kerberos principal to be used for HTTP endpoint.
-# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
-*.falcon.http.authentication.kerberos.principal=
-
-# Location of the keytab file with the credentials for the HTTP principal.
-*.falcon.http.authentication.kerberos.keytab=
-
-# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details.
-*.falcon.http.authentication.kerberos.name.rules=DEFAULT
-
-# Comma separated list of black listed users
-*.falcon.http.authentication.blacklisted.users=
-
-######### Authentication Properties #########
-
-
-######### Authorization Properties #########
-
-# Authorization Enabled flag: false (default)|true
-*.falcon.security.authorization.enabled=false
-
-# The name of the group of super-users
-*.falcon.security.authorization.superusergroup=falcon
-
-# Admin Users, comma separated users
-*.falcon.security.authorization.admin.users=falcon,ambari-qa
-
-# Admin Group Membership, comma separated users
-*.falcon.security.authorization.admin.groups=falcon,staff
-
-# Authorization Provider Implementation Fully Qualified Class Name
-*.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider
-
-######### Authorization Properties #########
-
-######### ADF Configurations start #########
-
-# A String object that represents the namespace
-*.microsoft.windowsazure.services.servicebus.namespace=
-
-# Request and status queues on the namespace
-*.microsoft.windowsazure.services.servicebus.requestqueuename=
-*.microsoft.windowsazure.services.servicebus.statusqueuename=
-
-# A String object that contains the SAS key name
-*.microsoft.windowsazure.services.servicebus.sasKeyName=
-
-# A String object that contains the SAS key
-*.microsoft.windowsazure.services.servicebus.sasKey=
-
-# A String object containing the base URI that is added to your Service Bus namespace to form the URI to connect
-# to the Service Bus service. To access the default public Azure service, pass ".servicebus.windows.net"
-*.microsoft.windowsazure.services.servicebus.serviceBusRootUri=
-
-# Service bus polling frequency
-*.microsoft.windowsazure.services.servicebus.polling.frequency=
-
-# Super user
-*.microsoft.windowsazure.services.servicebus.superuser=
-
-######### ADF Configurations end ###########
-
-######### SMTP Properties ########
-
-# Setting SMTP hostname
-#*.falcon.email.smtp.host=localhost
-
-# Setting SMTP port number
-#*.falcon.email.smtp.port=25
-
-# Setting email from address
-#*.falcon.email.from.address=falcon@localhost
-
-# Setting email Auth
-#*.falcon.email.smtp.auth=false
-
-#Setting user name
-#*.falcon.email.smtp.user=""
-
-#Setting password
-#*.falcon.email.smtp.password=""
-
-# Setting monitoring plugin, if SMTP parameters is defined
-#*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
-#                     org.apache.falcon.plugin.EmailNotificationPlugin
-
-######### StateStore Properties #####
-#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
-#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
-#*.falcon.statestore.jdbc.url=jdbc:derby:data/statestore.db;create=true
-#*.falcon.statestore.jdbc.username=sa
-#*.falcon.statestore.jdbc.password=
-#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
-## Maximum number of active connections that can be allocated from this pool at the same time.
-#*.falcon.statestore.pool.max.active.conn=10
-#*.falcon.statestore.connection.properties=
-## Indicates the interval (in milliseconds) between eviction runs.
-#*.falcon.statestore.validate.db.connection.eviction.interval=300000
-## The number of objects to examine during each run of the idle object evictor thread.
-#*.falcon.statestore.validate.db.connection.eviction.num=10
-## Creates Falcon DB.
-## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
-## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
-#*.falcon.statestore.create.db.schema=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/statestore.credentials
----------------------------------------------------------------------
diff --git a/common/src/main/resources/statestore.credentials b/common/src/main/resources/statestore.credentials
deleted file mode 100644
index 86c32a1..0000000
--- a/common/src/main/resources/statestore.credentials
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-
-######### StateStore Credentials #####
-#*.falcon.statestore.jdbc.username=sa
-#*.falcon.statestore.jdbc.password=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/statestore.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/statestore.properties b/common/src/main/resources/statestore.properties
deleted file mode 100644
index 44e79b3..0000000
--- a/common/src/main/resources/statestore.properties
+++ /dev/null
@@ -1,45 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-*.domain=debug
-
-######### StateStore Properties #####
-#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
-#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
-## Falcon currently supports derby, mysql and postgreSQL, change url based on DB.
-#*.falcon.statestore.jdbc.url=jdbc:derby:data/falcon.db;create=true
-
-## StateStore credentials file where username,password and other properties can be stored securely.
-## Set this credentials file permission 400 and make sure user who starts falcon should only have read permission.
-## Give Absolute path to credentials file along with file name or put in classpath with filename statestore.credentials.
-## Credentials file should be present either in given location or class path, otherwise falcon won't start.
-#*.falcon.statestore.credentials.file=
-
-#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
-## Maximum number of active connections that can be allocated from this pool at the same time.
-#*.falcon.statestore.pool.max.active.conn=10
-## Any additional connection properties that need to be used, specified as comma separated key=value pairs.
-#*.falcon.statestore.connection.properties=
-## Indicates the interval (in milliseconds) between eviction runs.
-#*.falcon.statestore.validate.db.connection.eviction.interval=300000
-## The number of objects to examine during each run of the idle object evictor thread.
-#*.falcon.statestore.validate.db.connection.eviction.num=10
-## Creates Falcon DB.
-## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
-## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
-#*.falcon.statestore.create.db.schema=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
deleted file mode 100644
index 0df59b2..0000000
--- a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.cleanup;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.AbstractTestBase;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Test for log cleanup service.
- */
-public class LogCleanupServiceTest extends AbstractTestBase {
-
-    private FileSystem fs;
-    private FileSystem tfs;
-    private EmbeddedCluster targetDfsCluster;
-
-    private final Path instanceLogPath = new Path("/projects/falcon/staging/falcon/workflows/process/"
-        + "sample" + "/logs/job-2010-01-01-01-00/000");
-    private final Path instanceLogPath1 = new Path("/projects/falcon/staging/falcon/workflows/process/"
-        + "sample" + "/logs/job-2010-01-01-01-00/001");
-    private final Path instanceLogPath2 = new Path("/projects/falcon/staging/falcon/workflows/process/"
-        + "sample" + "/logs/job-2010-01-01-02-00/001");
-    private final Path instanceLogPath3 = new Path("/projects/falcon/staging/falcon/workflows/process/"
-        + "sample2" + "/logs/job-2010-01-01-01-00/000");
-    private final Path instanceLogPath4 = new Path("/projects/falcon/staging/falcon/workflows/process/"
-        + "sample" + "/logs/latedata/2010-01-01-01-00");
-    private final Path instanceLogPath5 = new Path("/projects/falcon/staging/falcon/workflows/process/"
-            + "sample3" + "/logs/job-2010-01-01-01-00/000");
-    private final Path feedInstanceLogPath = new Path("/projects/falcon/staging/falcon/workflows/feed/"
-        + "impressionFeed" + "/logs/job-2010-01-01-01-00/testCluster/000");
-    private final Path feedInstanceLogPath1 = new Path("/projects/falcon/staging/falcon/workflows/feed/"
-        + "impressionFeed2" + "/logs/job-2010-01-01-01-00/testCluster/000");
-
-
-    @AfterClass
-    public void tearDown() {
-        this.dfsCluster.shutdown();
-        this.targetDfsCluster.shutdown();
-    }
-
-    @Override
-    @BeforeClass
-    public void setup() throws Exception {
-        this.dfsCluster = EmbeddedCluster.newCluster("testCluster", CurrentUser.getUser());
-        conf = dfsCluster.getConf();
-        fs = dfsCluster.getFileSystem();
-        fs.delete(new Path("/"), true);
-
-        storeEntity(EntityType.CLUSTER, "testCluster");
-        System.setProperty("test.build.data", "target/tdfs/data" + System.currentTimeMillis());
-        this.targetDfsCluster = EmbeddedCluster.newCluster("backupCluster");
-        conf = targetDfsCluster.getConf();
-
-        storeEntity(EntityType.CLUSTER, "backupCluster");
-        storeEntity(EntityType.FEED, "impressionFeed");
-        storeEntity(EntityType.FEED, "clicksFeed");
-        storeEntity(EntityType.FEED, "imp-click-join1");
-        storeEntity(EntityType.FEED, "imp-click-join2");
-        storeEntity(EntityType.PROCESS, "sample");
-        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "sample");
-        Process otherProcess = (Process) process.copy();
-        otherProcess.setName("sample2");
-        otherProcess.setFrequency(new Frequency("days(1)"));
-        Process noACLProcess = (Process) process.copy();
-        noACLProcess.setName("sample3");
-        noACLProcess.setACL(null);
-        ConfigurationStore.get().remove(EntityType.PROCESS,
-                otherProcess.getName());
-        ConfigurationStore.get().publish(EntityType.PROCESS, otherProcess);
-        ConfigurationStore.get().remove(EntityType.PROCESS,
-                noACLProcess.getName());
-        ConfigurationStore.get().publish(EntityType.PROCESS, noACLProcess);
-
-        fs.mkdirs(instanceLogPath);
-        fs.mkdirs(instanceLogPath1);
-        fs.mkdirs(instanceLogPath2);
-        fs.mkdirs(instanceLogPath3);
-        fs.mkdirs(instanceLogPath4);
-        fs.mkdirs(instanceLogPath5);
-
-        // fs.setTimes wont work on dirs
-        fs.createNewFile(new Path(instanceLogPath, "oozie.log"));
-        fs.createNewFile(new Path(instanceLogPath, "pigAction_SUCCEEDED.log"));
-
-        tfs = targetDfsCluster.getFileSystem();
-        tfs.delete(new Path("/"), true);
-        fs.mkdirs(feedInstanceLogPath);
-        fs.mkdirs(feedInstanceLogPath1);
-        tfs.mkdirs(feedInstanceLogPath);
-        tfs.mkdirs(feedInstanceLogPath1);
-        fs.createNewFile(new Path(feedInstanceLogPath, "oozie.log"));
-        tfs.createNewFile(new Path(feedInstanceLogPath, "oozie.log"));
-
-        // table feed staging dir setup
-        initializeStagingDirs();
-        Thread.sleep(1000);
-    }
-
-    private void initializeStagingDirs() throws Exception {
-        final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
-        Feed tableFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(inputStream);
-        getStore().publish(EntityType.FEED, tableFeed);
-    }
-
-    @Test
-    public void testProcessLogs() throws IOException, FalconException, InterruptedException {
-
-        Assert.assertTrue(fs.exists(instanceLogPath));
-        Assert.assertTrue(fs.exists(instanceLogPath1));
-        Assert.assertTrue(fs.exists(instanceLogPath2));
-        Assert.assertTrue(fs.exists(instanceLogPath3));
-
-        AbstractCleanupHandler processCleanupHandler = new ProcessCleanupHandler();
-        processCleanupHandler.cleanup();
-
-        Assert.assertFalse(fs.exists(instanceLogPath));
-        Assert.assertFalse(fs.exists(instanceLogPath1));
-        Assert.assertFalse(fs.exists(instanceLogPath2));
-        Assert.assertFalse(fs.exists(instanceLogPath5));
-        Assert.assertTrue(fs.exists(instanceLogPath3));
-    }
-
-    @Test
-    public void testFeedLogs() throws IOException, FalconException, InterruptedException {
-
-        Assert.assertTrue(fs.exists(feedInstanceLogPath));
-        Assert.assertTrue(tfs.exists(feedInstanceLogPath));
-        Assert.assertTrue(fs.exists(feedInstanceLogPath1));
-        Assert.assertTrue(tfs.exists(feedInstanceLogPath1));
-
-        AbstractCleanupHandler feedCleanupHandler = new FeedCleanupHandler();
-        feedCleanupHandler.cleanup();
-
-        Assert.assertFalse(fs.exists(feedInstanceLogPath));
-        Assert.assertFalse(tfs.exists(feedInstanceLogPath));
-        Assert.assertTrue(fs.exists(feedInstanceLogPath1));
-        Assert.assertTrue(tfs.exists(feedInstanceLogPath1));
-    }
-}


Mime
View raw message