falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [1/6] falcon git commit: FALCON-1213 Base framework of the native scheduler
Date Tue, 20 Oct 2015 12:09:58 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 9e6d5a6c5 -> a0911bd82


http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
new file mode 100644
index 0000000..d27ac7e
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.exception.InvalidStateTransitionException;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ProcessExecutionInstance;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.InMemoryStateStore;
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Tests the state changes of an instance.
+ */
+public class InstanceStateServiceTest {
+
+    private InstanceStateChangeHandler listener = Mockito.mock(InstanceStateChangeHandler.class);
+    private ProcessExecutionInstance mockInstance;
+
+    @BeforeMethod
+    public void setup() {
+        Process testProcess = new Process();
+        testProcess.setName("test");
+        // Setup new mocks so we can verify the no. of invocations
+        mockInstance = Mockito.mock(ProcessExecutionInstance.class);
+        Mockito.when(mockInstance.getEntity()).thenReturn(testProcess);
+        Mockito.when(mockInstance.getInstanceTime()).thenReturn(DateTime.now());
+        Mockito.when(mockInstance.getCluster()).thenReturn("testCluster");
+    }
+
+    @AfterMethod
+    public void tearDown() {
+        ((InMemoryStateStore) AbstractStateStore.get()).clear();
+    }
+
+    // Tests an entity instance's lifecycle : Trigger -> waiting -> ready -> running
+    // -> suspendAll -> resumeAll -> success
+    @Test
+    public void testLifeCycle() throws FalconException {
+        StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.TRIGGER, listener);
+        InstanceState instanceFromStore = AbstractStateStore.get()
+                .getExecutionInstance(new ID(mockInstance));
+        Mockito.verify(listener).onTrigger(mockInstance);
+        Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.WAITING));
+        StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.CONDITIONS_MET,
listener);
+        Mockito.verify(listener).onConditionsMet(mockInstance);
+        Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.READY));
+        StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SCHEDULE,
listener);
+        Mockito.verify(listener).onSchedule(mockInstance);
+        Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.RUNNING));
+        StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SUSPEND, listener);
+        Mockito.verify(listener).onSuspend(mockInstance);
+        Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.SUSPENDED));
+        StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.RESUME_RUNNING,
listener);
+        Mockito.verify(listener).onResume(mockInstance);
+        Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.RUNNING));
+        StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SUCCEED, listener);
+        Mockito.verify(listener).onSuccess(mockInstance);
+        Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.SUCCEEDED));
+        Assert.assertEquals(AbstractStateStore.get().getAllEntities().size(), 0);
+    }
+
+    @Test
+    public void testInvalidTransitions() throws FalconException {
+        StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.TRIGGER, listener);
+        try {
+            StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SCHEDULE,
listener);
+            Assert.fail("Exception expected");
+        } catch (InvalidStateTransitionException e) {
+            // Do nothing
+        }
+
+        // Resume an instance that is not suspended
+        StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.CONDITIONS_MET,
listener);
+        try {
+            StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.RESUME_READY,
listener);
+            Assert.fail("Exception expected");
+        } catch (InvalidStateTransitionException e) {
+            // Do nothing
+        }
+
+        StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SCHEDULE,
listener);
+        StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.FAIL, listener);
+
+        // Attempt killing a completed instance
+        try {
+            StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.KILL,
listener);
+            Assert.fail("Exception expected");
+        } catch (InvalidStateTransitionException e) {
+            // Do nothing
+        }
+    }
+
+    @Test(dataProvider = "state_and_events")
+    public void testIdempotency(InstanceState.STATE state, InstanceState.EVENT event)
+        throws InvalidStateTransitionException, StateStoreException {
+        InstanceState instanceState = new InstanceState(mockInstance).setCurrentState(state);
+        instanceState.nextTransition(event);
+        Assert.assertEquals(instanceState.getCurrentState(), state);
+    }
+
+    @DataProvider(name = "state_and_events")
+    public Object[][] stateAndEvents() {
+        return new Object[][] {
+            {InstanceState.STATE.WAITING, InstanceState.EVENT.TRIGGER},
+            {InstanceState.STATE.READY, InstanceState.EVENT.CONDITIONS_MET},
+            {InstanceState.STATE.TIMED_OUT, InstanceState.EVENT.TIME_OUT},
+            {InstanceState.STATE.RUNNING, InstanceState.EVENT.SCHEDULE},
+            {InstanceState.STATE.SUSPENDED, InstanceState.EVENT.SUSPEND},
+            {InstanceState.STATE.KILLED, InstanceState.EVENT.KILL},
+            {InstanceState.STATE.SUCCEEDED, InstanceState.EVENT.SUCCEED},
+            {InstanceState.STATE.FAILED, InstanceState.EVENT.FAIL},
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/resources/config/cluster/cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/config/cluster/cluster-0.1.xml b/scheduler/src/test/resources/config/cluster/cluster-0.1.xml
new file mode 100644
index 0000000..223cbc6
--- /dev/null
+++ b/scheduler/src/test/resources/config/cluster/cluster-0.1.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<cluster colo="default" description="" name="testCluster" xmlns="uri:falcon:cluster:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
+    <interfaces>
+        <interface type="readonly" endpoint="hftp://localhost:50010"
+                   version="0.20.2"/>
+        <interface type="write" endpoint="jail://testCluster:00"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie" version="4.0"/>
+        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="http://localhost:48080/templeton/v1"
+                   version="0.11.0"/>
+    </interfaces>
+    <locations>
+        <location name="staging" path="/projects/falcon/staging"/>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+    </locations>
+    <properties>
+        <property name="field1" value="value1"/>
+        <property name="field2" value="value2"/>
+    </properties>
+</cluster>

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/resources/config/feed/feed-0.1.xml
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/config/feed/feed-0.1.xml b/scheduler/src/test/resources/config/feed/feed-0.1.xml
new file mode 100644
index 0000000..25daf7d
--- /dev/null
+++ b/scheduler/src/test/resources/config/feed/feed-0.1.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1"
+        >
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+    <availabilityFlag>_SUCCESS</availabilityFlag>
+
+    <frequency>hours(1)</frequency>
+    <sla slaLow="hours(2)" slaHigh="hours(3)"/>
+    <timezone>UTC</timezone>
+
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <sla slaLow="hours(3)" slaHigh="hours(4)"/>
+            <locations>
+                <location type="data" path="/projects/falcon/clicks"/>
+                <location type="stats" path="/projects/falcon/clicksStats"/>
+                <location type="meta" path="/projects/falcon/clicksMetaData"/>
+            </locations>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/config/process/process-0.1.xml b/scheduler/src/test/resources/config/process/process-0.1.xml
new file mode 100644
index 0000000..deeb554
--- /dev/null
+++ b/scheduler/src/test/resources/config/process/process-0.1.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="sample" xmlns="uri:falcon:process:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
+    <pipelines>testPipeline,dataReplication_Pipeline</pipelines>
+    <clusters>
+        <cluster name="testCluster">
+            <validity start="2011-11-02T00:00Z" end="2091-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>minutes(1)</frequency>
+    <sla shouldStartIn="minutes(2)" shouldEndIn="minutes(4)"/>
+
+    <!-- what -->
+    <inputs>
+        <input name="clicks" feed="clicksFeed" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="clicksOutput" feed="clicksSummary" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <properties>
+        <property name="name1" value="value1"/>
+        <property name="name2" value="value2"/>
+    </properties>
+
+    <workflow engine="oozie" path="/falcon/test/workflow"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+
+    <late-process policy="exp-backoff" delay="minutes(1)">
+        <late-input input="clicks" workflow-path="hdfs://clicks/late/workflow"/>
+    </late-process>
+</process>

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index da12d3a..8891e5f 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -119,6 +119,12 @@
 
         <dependency>
             <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-scheduler</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
             <artifactId>falcon-retention</artifactId>
         </dependency>
 


Mime
View raw message