falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srik...@apache.org
Subject [01/47] git commit: Fix for inmemory rerun queue comparator
Date Fri, 26 Apr 2013 15:50:17 GMT
Updated Branches:
  refs/heads/master dfaf33f20 -> f15ef92a8


Fix for inmemory rerun queue comparator


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/ce2589e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/ce2589e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/ce2589e2

Branch: refs/heads/master
Commit: ce2589e2b36786a4fe3e61163ad58b62ccbe0e37
Parents: dfaf33f
Author: srikanth.sundarrajan <srikanth.sundarrajan@inmobi.com>
Authored: Fri Apr 12 12:54:00 2013 +0530
Committer: srikanth.sundarrajan <srikanth.sundarrajan@inmobi.com>
Committed: Fri Apr 12 12:54:00 2013 +0530

----------------------------------------------------------------------
 .../org/apache/falcon/rerun/event/RerunEvent.java  |   16 +---
 .../falcon/rerun/queue/InMemoryQueueTest.java      |   66 +++++++++++++++
 2 files changed, 70 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ce2589e2/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
index 41cb488..9526e0a 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.rerun.event;
 
+import java.util.Date;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 
@@ -80,18 +81,9 @@ public class RerunEvent implements Delayed {
 
 	@Override
 	public int compareTo(Delayed o) {
-		int ret = 0;
-		RerunEvent event = (RerunEvent) o;
-
-		if (this.delayInMilliSec < event.delayInMilliSec)
-			ret = -1;
-		else if (this.delayInMilliSec > event.delayInMilliSec)
-			ret = 1;
-		else if (this.msgInsertTime == event.msgInsertTime)
-			ret = 0;
-
-		return ret;
-
+        RerunEvent event = (RerunEvent) o;
+        return new Date(msgInsertTime + delayInMilliSec).
+                compareTo(new Date(event.msgInsertTime + event.delayInMilliSec));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ce2589e2/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
new file mode 100644
index 0000000..09779db
--- /dev/null
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/InMemoryQueueTest.java
@@ -0,0 +1,66 @@
+package org.apache.falcon.rerun.queue;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.rerun.event.RerunEvent;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.util.LinkedList;
+
+public class InMemoryQueueTest {
+
+    @Test (timeOut = 10000)
+    public void testDelayedQueue() throws Exception {
+        runTest();
+    }
+
+    private void runTest() throws InterruptedException, FalconException {
+        InMemoryQueue<MyEvent> queue = new InMemoryQueue<MyEvent>(new File("target"));
+
+        LinkedList<MyEvent> events = new LinkedList<MyEvent>();
+
+        for (int index = 0; index < 5; index++) {
+            Thread.sleep(30);
+            long time = System.currentTimeMillis();
+            int delay = ((5 - index) / 2) * 50;
+            MyEvent event = new MyEvent("someCluster", Integer.toString(index),
+                    time, delay, "someType", "someName", "someInstance", 0);
+            queue.offer(event);
+            boolean inserted = false;
+            for (int posn = 0; posn < events.size(); posn++) {
+                MyEvent thisEvent = events.get(posn);
+                if (thisEvent.getDelayInMilliSec() + thisEvent.getMsgInsertTime() >
+                        event.getDelayInMilliSec() + event.getMsgInsertTime()) {
+                    events.add(posn, event);
+                    inserted = true;
+                    break;
+                }
+            }
+            if (!inserted) {
+                events.add(event);
+            }
+        }
+
+        for (MyEvent event : events) {
+            MyEvent queueEvent = queue.take();
+            Assert.assertEquals(queueEvent.getWfId(), event.getWfId());
+        }
+    }
+
+    private class MyEvent extends RerunEvent {
+
+        public MyEvent(String clusterName, String wfId,
+                       long msgInsertTime, long delay, String entityType,
+                       String entityName, String instance, int runId) {
+            super(clusterName, wfId, msgInsertTime, delay,
+                    entityType, entityName, instance, runId);
+        }
+
+        @Override
+        public RerunType getType() {
+            RerunType type = super.getType();
+            return type == null ? RerunType.RETRY : type;
+        }
+    }
+}


Mime
View raw message