falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject git commit: FALCON-238 Support updates at specific time. Contributed by Shwetha GS
Date Mon, 06 Jan 2014 08:16:11 GMT
Updated Branches:
  refs/heads/master 8472f129f -> 28f6a4ec6


FALCON-238 Support updates at specific time. Contributed by Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/28f6a4ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/28f6a4ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/28f6a4ec

Branch: refs/heads/master
Commit: 28f6a4ec68f95f8f16c75aae92aa68ac156455df
Parents: 8472f12
Author: Shwetha GS <shwethags@gmail.com>
Authored: Mon Jan 6 13:46:01 2014 +0530
Committer: Shwetha GS <shwethags@gmail.com>
Committed: Mon Jan 6 13:46:01 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +-
 .../workflow/engine/AbstractWorkflowEngine.java |  2 +-
 .../workflow/engine/OozieWorkflowEngine.java    | 26 ++++++++-----
 .../falcon/resource/AbstractEntityManager.java  | 19 ++++++----
 .../proxy/SchedulableEntityManagerProxy.java    |  7 ++--
 .../src/main/java/org/apache/falcon/Debug.java  |  3 +-
 .../falcon/resource/ConfigSyncService.java      |  5 ++-
 .../falcon/resource/EntityManagerJerseyIT.java  | 39 ++++++++++++++------
 8 files changed, 69 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/28f6a4ec/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f6a6cff..6f90209 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,7 +5,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
-
+    FALCON-238 Support updates at specific time. (Shwetha GS)
+   
   IMPROVEMENTS
     FALCON-64 Add example entity xmls in falcon package. (Shwetha GS)
     

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/28f6a4ec/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index b86a715..043d622 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -77,7 +77,7 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract InstancesResult getStatus(Entity entity, Date start, Date end) throws
FalconException;
 
-    public abstract void update(Entity oldEntity, Entity newEntity, String cluster) throws
FalconException;
+    public abstract Date update(Entity oldEntity, Entity newEntity, String cluster, Date
end) throws FalconException;
 
     public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException;
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/28f6a4ec/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index b0af401..3e28c45 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -788,18 +788,18 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public void update(Entity oldEntity, Entity newEntity, String cluster) throws FalconException
{
+    public Date update(Entity oldEntity, Entity newEntity, String cluster, Date reqEndTime)
throws FalconException {
         boolean entityUpdated = UpdateHelper.isEntityUpdated(oldEntity, newEntity, cluster);
         boolean wfUpdated = UpdateHelper.isWorkflowUpdated(cluster, newEntity);
 
         if (!entityUpdated && !wfUpdated) {
             LOG.debug("Nothing to update for cluster " + cluster);
-            return;
+            return null;
         }
 
         Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
         Path stagingPath = EntityUtil.getLastCommittedStagingPath(clusterEntity, oldEntity);
-
+        Date effectiveEndTime = null;
         if (stagingPath != null) {  //update if entity is scheduled
             BundleJob bundle = findBundleForStagingPath(cluster, oldEntity, stagingPath);
             bundle = getBundleInfo(cluster, bundle.getId());
@@ -817,12 +817,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 LOG.info("Change operation is adequate! : " + cluster + ", bundle: " + bundle.getId());
                 updateCoords(cluster, bundle, EntityUtil.getParallel(newEntity),
                         EntityUtil.getEndTime(newEntity, cluster));
-                return;
+                return newEndTime;
             }
 
             LOG.debug("Going to update ! : " + newEntity.toShortString() + "for cluster "
+ cluster + ", bundle: "
                     + bundle.getId());
-            updateInternal(oldEntity, newEntity, cluster, bundle, false);
+            effectiveEndTime = updateInternal(oldEntity, newEntity, cluster, bundle, false,
reqEndTime);
             LOG.info("Entity update complete : " + newEntity.toShortString() + cluster +
", bundle: " + bundle.getId());
         }
 
@@ -854,12 +854,16 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 //            boolean processCreated = feedBundle.getCreatedTime().before(
 //                    affectedProcBundle.getCreatedTime());
 
-            updateInternal(affectedEntity, affectedEntity, cluster, affectedProcBundle,
-                    false);
+            Date depEndTime =
+                    updateInternal(affectedEntity, affectedEntity, cluster, affectedProcBundle,
false, reqEndTime);
+            if (effectiveEndTime == null || effectiveEndTime.after(depEndTime)) {
+                effectiveEndTime = depEndTime;
+            }
             LOG.info("Entity update complete : " + affectedEntity.toShortString() + cluster
                     + ", bundle: " + affectedProcBundle.getId());
         }
         LOG.info("Entity update and all dependent entities updated: " + oldEntity.toShortString());
+        return effectiveEndTime;
     }
 
     //Returns bundle whose app path is same as the staging path(argument)
@@ -944,8 +948,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private void updateInternal(Entity oldEntity, Entity newEntity, String cluster, BundleJob
oldBundle,
-                                boolean alreadyCreated) throws FalconException {
+    private Date updateInternal(Entity oldEntity, Entity newEntity, String cluster, BundleJob
oldBundle,
+                                boolean alreadyCreated, Date reqEndTime) throws FalconException
{
         OozieWorkflowBuilder<Entity> builder =
                 (OozieWorkflowBuilder<Entity>) WorkflowBuilder.getBuilder(ENGINE, oldEntity);
 
@@ -961,6 +965,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             // new entity is not scheduled yet, create new bundle
             LOG.info("New bundle hasn't been created yet. So will create one");
             endTime = offsetTime(now(), 3);
+            if (reqEndTime != null && reqEndTime.after(endTime)) {
+                endTime = reqEndTime;
+            }
             Date newStartTime = builder.getNextStartTime(newEntity, cluster, endTime);
             newBundle =
                     getBundleInfo(cluster, scheduleForUpdate(newEntity, cluster, newStartTime,
oldBundle.getUser()));
@@ -983,6 +990,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
         //create _SUCCESS in staging path to mark update is complete(to handle roll-forward
for updates)
         commitStagingPath(cluster, newBundle.getAppPath());
+        return endTime;
     }
 
     private String scheduleForUpdate(Entity entity, String cluster, Date startDate, String
user)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/28f6a4ec/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index d6f9df8..7333110 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -31,10 +31,7 @@ import org.apache.falcon.entity.parser.EntityParserFactory;
 import org.apache.falcon.entity.parser.ValidationException;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.store.EntityAlreadyExistsException;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityGraph;
-import org.apache.falcon.entity.v0.EntityIntegrityChecker;
-import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.*;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.resource.APIResult.Status;
 import org.apache.falcon.security.CurrentUser;
@@ -44,6 +41,7 @@ import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.log4j.Logger;
+import org.datanucleus.util.StringUtils;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
@@ -219,7 +217,8 @@ public abstract class AbstractEntityManager {
 
     // Parallel update can get very clumsy if two feeds are updated which
     // are referred by a single process. Sequencing them.
-    public synchronized APIResult update(HttpServletRequest request, String type, String
entityName, String colo) {
+    public synchronized APIResult update(HttpServletRequest request, String type, String
entityName, String colo,
+                                         String endTime) {
         checkColo(colo);
         try {
             EntityType entityType = EntityType.valueOf(type.toUpperCase());
@@ -231,6 +230,8 @@ public abstract class AbstractEntityManager {
             validateUpdate(oldEntity, newEntity);
             configStore.initiateUpdate(newEntity);
 
+            List<String> endTimes = new ArrayList<String>();
+            Date reqEndTime = StringUtils.isEmpty(endTime) ? null : EntityUtil.parseDateUTC(endTime);
             //Update in workflow engine
             if (!DeploymentUtil.isPrism()) {
                 Set<String> oldClusters = EntityUtil.getClustersDefinedInColos(oldEntity);
@@ -239,7 +240,10 @@ public abstract class AbstractEntityManager {
                 oldClusters.removeAll(newClusters); //deleted clusters
 
                 for (String cluster : newClusters) {
-                    getWorkflowEngine().update(oldEntity, newEntity, cluster);
+                    Date effectiveEndTime = getWorkflowEngine().update(oldEntity, newEntity,
cluster, reqEndTime);
+                    if (effectiveEndTime != null) {
+                        endTimes.add("(" + cluster + ", " + SchemaHelper.formatDateUTC(effectiveEndTime)
+ ")");
+                    }
                 }
                 for (String cluster : oldClusters) {
                     getWorkflowEngine().delete(oldEntity, cluster);
@@ -248,7 +252,8 @@ public abstract class AbstractEntityManager {
 
             configStore.update(entityType, newEntity);
 
-            return new APIResult(APIResult.Status.SUCCEEDED, entityName + " updated successfully");
+            return new APIResult(APIResult.Status.SUCCEEDED, entityName + " updated successfully"
+                    + (endTimes.isEmpty() ? "" : " with endTime " + endTimes));
         } catch (Throwable e) {
             LOG.error("Updation failed", e);
             throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/28f6a4ec/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 8ba2a97..bbb1120 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -187,7 +187,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
     public APIResult update(
             @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type")
final String type,
             @Dimension("entityName") @PathParam("entity") final String entityName,
-            @Dimension("colo") @QueryParam("colo") String ignore) {
+            @Dimension("colo") @QueryParam("colo") String ignore,
+            @Dimension("end") @DefaultValue("") @QueryParam("end") final String end) {
 
         final HttpServletRequest bufferedRequest = new BufferedRequest(request);
         final Set<String> oldColos = getApplicableColos(type, entityName);
@@ -222,7 +223,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
 
                 @Override
                 protected APIResult doExecute(String colo) throws FalconException {
-                    return getConfigSyncChannel(colo).invoke("update", bufferedRequest, type,
entityName, colo);
+                    return getConfigSyncChannel(colo).invoke("update", bufferedRequest, type,
entityName, colo, end);
                 }
             }.execute());
         }
@@ -242,7 +243,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
         }
 
         if (!embeddedMode) {
-            results.put(PRISM_TAG, super.update(bufferedRequest, type, entityName, currentColo));
+            results.put(PRISM_TAG, super.update(bufferedRequest, type, entityName, currentColo,
end));
         }
 
         return consolidateResult(results);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/28f6a4ec/webapp/src/main/java/org/apache/falcon/Debug.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/Debug.java b/webapp/src/main/java/org/apache/falcon/Debug.java
index 81d51f5..c831dc3 100644
--- a/webapp/src/main/java/org/apache/falcon/Debug.java
+++ b/webapp/src/main/java/org/apache/falcon/Debug.java
@@ -34,6 +34,7 @@ import org.apache.falcon.workflow.engine.OozieWorkflowEngine;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 import java.io.ByteArrayInputStream;
+import java.util.Date;
 
 /**
  * A driver for debugging purposes.
@@ -95,7 +96,7 @@ public final class Debug {
         StartupProperties.get().setProperty("current.colo", "ua1");
         OozieWorkflowEngine engine = new OozieWorkflowEngine();
         ConfigurationStore.get().initiateUpdate(newEntity);
-        engine.update(obj, newEntity, newEntity.getClusters().getClusters().get(0).getName());
+        engine.update(obj, newEntity, newEntity.getClusters().getClusters().get(0).getName(),
new Date());
         engine.delete(newEntity);
         System.exit(0);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/28f6a4ec/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
index 9722116..86e0431 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
@@ -64,7 +64,8 @@ public class ConfigSyncService extends AbstractEntityManager {
     public APIResult update(@Context HttpServletRequest request,
                             @Dimension("entityType") @PathParam("type") String type,
                             @Dimension("entityName") @PathParam("entity") String entityName,
-                            @Dimension("colo") @QueryParam("colo") String colo) {
-        return super.update(request, type, entityName, colo);
+                            @Dimension("colo") @QueryParam("colo") String colo,
+                            @Dimension("end") @DefaultValue("") @QueryParam("end") String
reqEndTime) {
+        return super.update(request, type, entityName, colo, reqEndTime);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/28f6a4ec/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index 60fd320..aa059bd 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -20,12 +20,7 @@ package org.apache.falcon.resource;
 import java.io.*;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
+import java.util.*;
 import java.util.regex.Pattern;
 
 import javax.servlet.ServletInputStream;
@@ -33,8 +28,10 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.xml.bind.JAXBException;
 
+import com.sun.jersey.api.client.WebResource;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.feed.*;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
@@ -47,6 +44,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.Job.Status;
 import org.apache.oozie.client.OozieClient;
@@ -117,11 +115,19 @@ public class EntityManagerJerseyIT {
     }
 
     private void update(TestContext context, Entity entity) throws Exception {
+        update(context, entity, null);
+    }
+
+    private void update(TestContext context, Entity entity, Date endTime) throws Exception
{
         File tmpFile = context.getTempFile();
         entity.getEntityType().getMarshaller().marshal(entity, tmpFile);
-        ClientResponse response = context.service.path("api/entities/update/"
-                + entity.getEntityType().name().toLowerCase() + "/" + entity.getName())
-                .header("Remote-User", TestContext.REMOTE_USER).accept(MediaType.TEXT_XML)
+        WebResource resource = context.service.path("api/entities/update/"
+                + entity.getEntityType().name().toLowerCase() + "/" + entity.getName());
+        if (endTime != null) {
+            resource = resource.queryParam("end", SchemaHelper.formatDateUTC(endTime));
+        }
+        ClientResponse response =
+                resource.header("Remote-User", TestContext.REMOTE_USER).accept(MediaType.TEXT_XML)
                 .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
         context.assertSuccessful(response);
     }
@@ -276,13 +282,16 @@ public class EntityManagerJerseyIT {
         input.setEnd("today(20,20)");
         process.getInputs().getInputs().add(input);
 
+        Date endTime = getEndTime();
         updateEndtime(process);
-        update(context, process);
+        update(context, process, endTime);
 
         //Assert that update creates new bundle and old coord is running
         bundles = context.getBundles();
         Assert.assertEquals(bundles.size(), 2);
-        Assert.assertEquals(ozClient.getCoordJobInfo(coordId).getStatus(), Status.RUNNING);
+        CoordinatorJob coord = ozClient.getCoordJobInfo(coordId);
+        Assert.assertEquals(coord.getStatus(), Status.RUNNING);
+        Assert.assertEquals(coord.getEndTime(), endTime);
     }
 
     @Test
@@ -701,4 +710,12 @@ public class EntityManagerJerseyIT {
             Assert.assertNotNull(entityElement.status); // status is null
         }
     }
+
+    public Date getEndTime() {
+        Calendar cal = Calendar.getInstance();
+        cal.add(Calendar.HOUR, 1);
+        cal.set(Calendar.SECOND, 0);
+        cal.set(Calendar.MILLISECOND, 0);
+        return cal.getTime();
+    }
 }


Mime
View raw message