falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suh...@apache.org
Subject falcon git commit: FALCON-944 Parallel update APIs create 2 coords. Contributed by Suhas Vasu
Date Tue, 17 Feb 2015 11:35:49 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 8f32de0f3 -> 1685541f7


FALCON-944 Parallel update APIs create 2 coords. Contributed by Suhas Vasu


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/1685541f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/1685541f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/1685541f

Branch: refs/heads/master
Commit: 1685541f705b85cf348582f17a088db9d6c91ce0
Parents: 8f32de0
Author: Suhas Vasu <suhas.v@inmobi.com>
Authored: Tue Feb 17 17:05:34 2015 +0530
Committer: Suhas Vasu <suhas.v@inmobi.com>
Committed: Tue Feb 17 17:05:34 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/falcon/entity/lock/MemoryLocks.java  | 79 ++++++++++++++++++++
 .../falcon/entity/lock/MemoryLocksTest.java     | 65 ++++++++++++++++
 .../falcon/resource/AbstractEntityManager.java  | 48 +++++++++++-
 .../AbstractSchedulableEntityManager.java       | 23 +++++-
 .../java/org/apache/falcon/cli/FalconCLIIT.java |  1 +
 .../falcon/resource/EntityManagerJerseyIT.java  | 74 ++++++++++++++++++
 7 files changed, 287 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/1685541f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 57061d3..0123fae 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -67,6 +67,8 @@ Trunk (Unreleased)
    Seetharam)
 
   OPTIMIZATIONS
+   FALCON-944 Parallel update APIs create 2 coords (Suhas Vasu)
+
    FALCON-943 process update copying user lib is very slow. (Shwetha G S)
 
    FALCON-419 Update deprecated HCatalog API to use Hive Metastore API.

http://git-wip-us.apache.org/repos/asf/falcon/blob/1685541f/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java b/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java
new file mode 100644
index 0000000..a91b5d2
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.entity.lock;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * In memory resource locking that provides lock capabilities.
+ */
+public final class MemoryLocks {
+    private static final Logger LOG = LoggerFactory.getLogger(MemoryLocks.class);
+    private static ConcurrentHashMap<String, Boolean> locks = new ConcurrentHashMap<String,
Boolean>();
+
+    private static MemoryLocks instance = new MemoryLocks();
+
+    private MemoryLocks() {
+    }
+
+    public static MemoryLocks getInstance() {
+        return instance;
+    }
+
+    /**
+     * Obtain a lock for an entity.
+     *
+     * @param entity entity object.
+     * @return the lock token for the resource, or <code>null</code> if the lock
could not be obtained.
+     */
+    public boolean acquireLock(Entity entity) {
+        boolean lockObtained = false;
+        String entityName = getLockKey(entity);
+
+        Boolean putResponse = locks.putIfAbsent(entityName, true);
+        if (putResponse == null || !putResponse) {
+            LOG.info("Lock obtained for schedule/update of {} by {}",
+                    entity.toShortString(), Thread.currentThread().getName());
+            lockObtained = true;
+        }
+        return lockObtained;
+    }
+
+    /**
+     * Release the lock for an entity.
+     *
+     * @param entity entity object.
+     */
+    public void releaseLock(Entity entity) {
+        String entityName = getLockKey(entity);
+
+        locks.remove(entityName);
+        LOG.info("Successfully released lock for {} by {}",
+                entity.toShortString(), Thread.currentThread().getName());
+    }
+
+    private String getLockKey(Entity entity) {
+        return entity.getEntityType().toString() + "." + entity.getName();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/1685541f/common/src/test/java/org/apache/falcon/entity/lock/MemoryLocksTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/lock/MemoryLocksTest.java b/common/src/test/java/org/apache/falcon/entity/lock/MemoryLocksTest.java
new file mode 100644
index 0000000..5050b73
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/lock/MemoryLocksTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity.lock;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.*;
+import org.apache.falcon.entity.v0.process.Process;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Test for Memory Locking mechanism used for schedule/update of entities.
+ */
+
+public class MemoryLocksTest {
+    private static final String FEED_XML = "/config/feed/feed-0.1.xml";
+    private static final String PROCESS_XML = "/config/process/process-0.1.xml";
+
+    @Test
+    public void testSuccessfulMemoryLockAcquisition() throws Exception {
+        MemoryLocks memoryLocks = MemoryLocks.getInstance();
+        Entity feed = (Entity) EntityType.FEED.getUnmarshaller().unmarshal(this.getClass().getResource(FEED_XML));
+        Assert.assertEquals(memoryLocks.acquireLock(feed), true);
+        memoryLocks.releaseLock(feed);
+    }
+
+    @Test
+    public void testUnsuccessfulMemoryLockAcquisition() throws Exception {
+        MemoryLocks memoryLocks = MemoryLocks.getInstance();
+        Entity feed = (Entity) EntityType.FEED.getUnmarshaller().unmarshal(this.getClass().getResource(FEED_XML));
+        Assert.assertEquals(memoryLocks.acquireLock(feed), true);
+        Assert.assertEquals(memoryLocks.acquireLock(feed), false);
+        memoryLocks.releaseLock(feed);
+    }
+
+    @Test
+    public void testDuplicateEntityNameLockAcquisition() throws Exception {
+        MemoryLocks memoryLocks = MemoryLocks.getInstance();
+        //In case both feed & process have identical names, they shouldn't clash during
updates
+        Entity feed = (Entity) EntityType.FEED.getUnmarshaller().unmarshal(this.getClass().getResource(FEED_XML));
+        org.apache.falcon.entity.v0.process.Process process = (Process) EntityType.PROCESS.getUnmarshaller().
+                unmarshal(this.getClass().getResource(PROCESS_XML));
+        process.setName(feed.getName());
+        Assert.assertEquals(memoryLocks.acquireLock(feed), true);
+        Assert.assertEquals(memoryLocks.acquireLock(process), true);
+        memoryLocks.releaseLock(feed);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/1685541f/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index caa9a74..424bada 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -27,6 +27,7 @@ import org.apache.falcon.FalconWebException;
 import org.apache.falcon.Pair;
 import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.lock.MemoryLocks;
 import org.apache.falcon.entity.parser.EntityParser;
 import org.apache.falcon.entity.parser.EntityParserFactory;
 import org.apache.falcon.entity.parser.ValidationException;
@@ -72,6 +73,7 @@ import java.util.Set;
  */
 public abstract class AbstractEntityManager {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractEntityManager.class);
+    private static MemoryLocks memoryLocks = MemoryLocks.getInstance();
 
     protected static final int XML_DEBUG_LEN = 10 * 1024;
     protected static final String DEFAULT_NUM_RESULTS = "10";
@@ -265,10 +267,9 @@ public abstract class AbstractEntityManager {
         }
     }
 
-    // Parallel update can get very clumsy if two feeds are updated which
-    // are referred by a single process. Sequencing them.
-    public synchronized APIResult update(HttpServletRequest request, String type, String
entityName, String colo) {
+    public APIResult update(HttpServletRequest request, String type, String entityName, String
colo) {
         checkColo(colo);
+        List<Entity> tokenList = null;
         try {
             EntityType entityType = EntityType.getEnum(type);
             Entity oldEntity = EntityUtil.getEntity(type, entityName);
@@ -280,6 +281,8 @@ public abstract class AbstractEntityManager {
             validateUpdate(oldEntity, newEntity);
             configStore.initiateUpdate(newEntity);
 
+            tokenList = obtainUpdateEntityLocks(oldEntity);
+
             StringBuilder result = new StringBuilder("Updated successfully");
             //Update in workflow engine
             if (!DeploymentUtil.isPrism()) {
@@ -304,9 +307,48 @@ public abstract class AbstractEntityManager {
             throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
         } finally {
             ConfigurationStore.get().cleanupUpdateInit();
+            releaseUpdateEntityLocks(entityName, tokenList);
         }
     }
 
+    private List<Entity> obtainUpdateEntityLocks(Entity entity)
+        throws FalconException {
+        List<Entity> tokenList = new ArrayList<Entity>();
+
+        //first obtain lock for the entity for which update is issued.
+        if (memoryLocks.acquireLock(entity)) {
+            tokenList.add(entity);
+        } else {
+            throw new FalconException("Looks like an update command is already issued for
" + entity.toShortString());
+        }
+
+        //now obtain locks for all dependent entities.
+        Set<Entity> affectedEntities = EntityGraph.get().getDependents(entity);
+        for (Entity e : affectedEntities) {
+            if (memoryLocks.acquireLock(e)) {
+                tokenList.add(e);
+            } else {
+                LOG.error("Error while trying to acquire lock for {}. Releasing already obtained
locks",
+                        e.toShortString());
+                throw new FalconException("There are multiple update commands running for
dependent entity "
+                        + e.toShortString());
+            }
+        }
+        return tokenList;
+    }
+
+    private void releaseUpdateEntityLocks(String entityName, List<Entity> tokenList)
{
+        if (tokenList != null && !tokenList.isEmpty()) {
+            for (Entity entity : tokenList) {
+                memoryLocks.releaseLock(entity);
+            }
+            LOG.info("All update locks released for {}", entityName);
+        } else {
+            LOG.info("No locks to release for " + entityName);
+        }
+
+    }
+
     private void validateUpdate(Entity oldEntity, Entity newEntity) throws FalconException
{
         if (oldEntity.getEntityType() != newEntity.getEntityType() || !oldEntity.equals(newEntity))
{
             throw new FalconException(

http://git-wip-us.apache.org/repos/asf/falcon/blob/1685541f/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index adfef35..f7c2f61 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -23,6 +23,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconWebException;
 import org.apache.falcon.Pair;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.lock.MemoryLocks;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -47,6 +48,7 @@ import java.util.*;
 public abstract class AbstractSchedulableEntityManager extends AbstractInstanceManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractSchedulableEntityManager.class);
+    private static MemoryLocks memoryLocks = MemoryLocks.getInstance();
 
     /**
      * Schedules an submitted entity immediately.
@@ -73,8 +75,25 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
         throws FalconException, AuthorizationException {
 
         checkSchedulableEntity(type);
-        Entity entityObj = EntityUtil.getEntity(type, entity);
-        getWorkflowEngine().schedule(entityObj);
+        Entity entityObj = null;
+        try {
+            entityObj = EntityUtil.getEntity(type, entity);
+            //first acquire lock on entity before scheduling
+            if (!memoryLocks.acquireLock(entityObj)) {
+                throw new FalconException("Looks like an schedule/update command is already
running for "
+                        + entityObj.toShortString());
+            }
+            LOG.info("Memory lock obtained for {} by {}", entityObj.toShortString(), Thread.currentThread().getName());
+            getWorkflowEngine().schedule(entityObj);
+        } catch (Exception e) {
+            throw new FalconException("Entity schedule failed for " + type + ": " + entity,
e);
+        } finally {
+            if (entityObj != null) {
+                memoryLocks.releaseLock(entityObj);
+                LOG.info("Memory lock released for {}", entityObj.toShortString());
+            }
+        }
+
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/falcon/blob/1685541f/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 7512302..d46f112 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -116,6 +116,7 @@ public class FalconCLIIT {
 
         filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE,
overlay);
         Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type process -file
" + filePath), 0);
+        OozieTestUtils.waitForProcessWFtoStart(context);
 
         Assert.assertEquals(executeWithURL("entity -update -name " + overlay.get("processName")
                 + " -type process -file " + filePath), 0);

http://git-wip-us.apache.org/repos/asf/falcon/blob/1685541f/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index c6fd420..280253d 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -66,6 +66,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.regex.Pattern;
 
 /**
@@ -812,6 +816,49 @@ public class EntityManagerJerseyIT {
         }
     }
 
+    @Test
+    public void testDuplicateUpdateCommands() throws Exception {
+        TestContext context = newContext();
+        context.scheduleProcess();
+        OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING);
+        List<BundleJob> bundles = OozieTestUtils.getBundles(context);
+        Assert.assertEquals(bundles.size(), 1);
+
+        Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
+
+        String feed3 = "f3" + System.currentTimeMillis();
+        Map<String, String> overlay = new HashMap<String, String>();
+        overlay.put("inputFeedName", feed3);
+        overlay.put("cluster", context.clusterName);
+        overlay.put("user", System.getProperty("user.name"));
+        ClientResponse response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay,
EntityType.FEED);
+        context.assertSuccessful(response);
+
+        Input input = new Input();
+        input.setFeed(feed3);
+        input.setName("inputData2");
+        input.setStart("today(20,0)");
+        input.setEnd("today(20,20)");
+        process.getInputs().getInputs().add(input);
+
+        updateEndtime(process);
+        Date endTime = getEndTime();
+        ExecutorService service =  Executors.newSingleThreadExecutor();
+        Future<ClientResponse> future = service.submit(new UpdateCommand(context, process,
endTime));
+        response = update(context, process, endTime);
+        ClientResponse duplicateUpdateThreadResponse = future.get();
+
+        // since there are duplicate threads for updates, there is no guarantee which request
will succeed
+        if (response.getStatus() == Response.Status.OK.getStatusCode()) {
+            context.assertSuccessful(response);
+            context.assertFailure(duplicateUpdateThreadResponse);
+        } else {
+            context.assertFailure(response);
+            context.assertSuccessful(duplicateUpdateThreadResponse);
+        }
+
+    }
+
     public Date getEndTime() {
         Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
         cal.add(Calendar.DAY_OF_MONTH, 1);
@@ -821,4 +868,31 @@ public class EntityManagerJerseyIT {
         cal.set(Calendar.MILLISECOND, 0);
         return cal.getTime();
     }
+
+    class UpdateCommand implements Callable<ClientResponse> {
+        private TestContext context;
+        private Process process;
+        private Date endTime;
+
+        public TestContext getContext() {
+            return context;
+        }
+        public Process getProcess() {
+            return process;
+        }
+        public Date getEndTime() {
+            return endTime;
+        }
+
+        public UpdateCommand(TestContext context, Process process, Date endTime) {
+            this.context = context;
+            this.process = process;
+            this.endTime = endTime;
+        }
+
+        @Override
+        public ClientResponse call() throws Exception {
+            return update(context, process, endTime);
+        }
+    }
 }


Mime
View raw message