falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-2226 Submit , Schedule and submitAndSchedule API for extension
Date Mon, 26 Dec 2016 11:27:54 GMT
Repository: falcon
Updated Branches:
  refs/heads/master b48f2df48 -> 9946758ea


FALCON-2226 Submit ,Schedule and submitAndSchedule API for extension

Author: Praveen Adlakha <adlakha.praveen@gmail.com>

Reviewers: @pallavi-rao

Closes #327 from PraveenAdlakha/2226 and squashes the following commits:

59a43ef [Praveen Adlakha] minor changes in falcon client and localextensionmanager
473f04a [Praveen Adlakha] comments addressed
6a346aa [Praveen Adlakha] comments addressed
8733f53 [Praveen Adlakha] checkstyle issue resolved
9ba005e [Praveen Adlakha] FALCON-2226 Submit ,Schedule and submitAndSchedule API for extension
in distributed mode
699b06f [Praveen Adlakha] WIP
29da911 [Praveen Adlakha] WIP
0b7d02a [Praveen Adlakha] merge conflicts resolved
6b77cc1 [Praveen Adlakha] FALCON-2223 Distributed mode support for User Extension


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9946758e
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9946758e
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9946758e

Branch: refs/heads/master
Commit: 9946758eac38d985a0c50fddee4267e8e320f7a2
Parents: b48f2df
Author: Praveen Adlakha <adlakha.praveen@gmail.com>
Authored: Mon Dec 26 16:57:43 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Mon Dec 26 16:57:43 2016 +0530

----------------------------------------------------------------------
 .../org/apache/falcon/client/FalconClient.java  |   1 +
 .../resource/AbstractExtensionManager.java      |  63 -------
 .../falcon/resource/proxy/EntityProxy.java      |  90 ++++++++++
 .../resource/proxy/ExtensionManagerProxy.java   | 176 ++++++++++++++-----
 .../HttpServletRequestInputStreamWrapper.java   |  42 +++++
 .../proxy/SchedulableEntityManagerProxy.java    |  56 ------
 .../apache/falcon/unit/FalconUnitClient.java    |  17 +-
 .../falcon/unit/LocalExtensionManager.java      |  48 ++++-
 8 files changed, 315 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 1614b24..d5b8342 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -1137,6 +1137,7 @@ public class FalconClient extends AbstractFalconClient {
         FormDataMultiPart entitiesForm = getEntitiesForm(extensionName, jobName, configPath);
         ClientResponse clientResponse = new ResourceBuilder()
                 .path(ExtensionOperations.SUBMIT_AND_SCHEDULE.path, extensionName)
+                .addQueryParam(JOB_NAME_OPT, jobName)
                 .addQueryParam(DO_AS_OPT, doAsUser)
                 .call(ExtensionOperations.SUBMIT_AND_SCHEDULE, entitiesForm);
         return getResponse(APIResult.class, clientResponse);

http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index f0f21a9..a50b535 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -17,32 +17,20 @@
  */
 package org.apache.falcon.resource;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconWebException;
-import org.apache.falcon.entity.parser.ProcessEntityParser;
 import org.apache.falcon.entity.parser.ValidationException;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
 import org.apache.falcon.extensions.store.ExtensionStore;
 import org.apache.falcon.persistence.ExtensionJobsBean;
 import org.apache.falcon.security.CurrentUser;
-import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 
 /**
  * A base class for managing Extension Operations.
@@ -113,57 +101,6 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager
{
         return detailsObject;
     }
 
-
-    protected void submitEntities(String extensionName, String doAsUser, String jobName,
-                                  Map<EntityType, List<Entity>> entityMap, InputStream
configStream)
-        throws FalconException, IOException {
-        List<Entity> feeds = entityMap.get(EntityType.FEED);
-        List<Entity> processes = entityMap.get(EntityType.PROCESS);
-        validateFeeds(feeds);
-        validateProcesses(processes);
-        List<String> feedNames = new ArrayList<>();
-        List<String> processNames = new ArrayList<>();
-        for (Entity feed : feeds) {
-            submitInternal(feed, doAsUser);
-            feedNames.add(feed.getName());
-        }
-        for (Entity process: processes) {
-            submitInternal(process, doAsUser);
-            processNames.add(process.getName());
-        }
-
-        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
-        byte[] configBytes = null;
-        if (configStream != null) {
-            configBytes = IOUtils.toByteArray(configStream);
-        }
-        metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
-    }
-
-
-    private void validateFeeds(List<Entity> feeds) throws FalconException {
-        for (Entity feed : feeds) {
-            super.validate(feed);
-        }
-    }
-
-    private void validateProcesses(List<Entity> processes) throws FalconException {
-        ProcessEntityParser processEntityParser = new ProcessEntityParser();
-        for (Entity process : processes) {
-            processEntityParser.validate((Process)process, false);
-        }
-    }
-
-    protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap)
throws FalconException,
-            AuthorizationException {
-        for (Object feed: entityMap.get(EntityType.FEED)) {
-            scheduleInternal(EntityType.FEED.name(), ((Feed)feed).getName(), null, null);
-        }
-        for (Object process: entityMap.get(EntityType.PROCESS)) {
-            scheduleInternal(EntityType.PROCESS.name(), ((Process)process).getName(), null,
null);
-        }
-    }
-
     public static String getJobNameFromTag(String tags) {
         int nameStart = tags.indexOf(TAG_PREFIX_EXTENSION_JOB);
         if (nameStart == -1) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxy.java
new file mode 100644
index 0000000..e1240e3
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxy.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource.proxy;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.FalconRuntimException;
+import org.apache.falcon.FalconWebException;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.AbstractEntityManager;
+
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Entity Proxy class to talk to channels.
+ */
+public abstract class EntityProxy<T extends APIResult> extends AbstractEntityManager{
+    private final Class<T> clazz;
+    private String type;
+    private String name;
+
+    public EntityProxy(String type, String name, Class<T> resultClazz) {
+        this.clazz = resultClazz;
+        this.type = type;
+        this.name = name;
+    }
+
+
+    private T getResultInstance(APIResult.Status status, String message) {
+        try {
+            Constructor<T> constructor = clazz.getConstructor(APIResult.Status.class,
String.class);
+            return constructor.newInstance(status, message);
+        } catch (Exception e) {
+            throw new FalconRuntimException("Unable to consolidate result.", e);
+        }
+    }
+
+    public EntityProxy(String type, String name) {
+        this(type, name, (Class<T>) APIResult.class);
+    }
+
+    public T execute() {
+        Set<String> colos = getColosToApply();
+
+        Map<String, T> results = new HashMap();
+
+        for (String colo : colos) {
+            try {
+                results.put(colo, doExecute(colo));
+            } catch (FalconWebException e) {
+                String message = ((APIResult) e.getResponse().getEntity()).getMessage();
+                results.put(colo, getResultInstance(APIResult.Status.FAILED, message));
+            } catch (Throwable throwable) {
+                results.put(colo, getResultInstance(APIResult.Status.FAILED, throwable.getClass().getName()
+ "::"
+                        + throwable.getMessage()));
+            }
+        }
+
+        T finalResult = consolidateResult(results, clazz);
+        if (finalResult.getStatus() == APIResult.Status.FAILED) {
+            throw FalconWebException.newAPIException(finalResult.getMessage());
+        } else {
+            return finalResult;
+        }
+    }
+
+    protected Set<String> getColosToApply() {
+        return getApplicableColos(type, name);
+    }
+
+    protected abstract T doExecute(String colo) throws FalconException;
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index 5e556a1..9ddebe8 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -44,16 +44,18 @@ import org.apache.falcon.resource.AbstractExtensionManager;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.resource.ExtensionInstanceList;
 import org.apache.falcon.resource.ExtensionJobList;
+import org.apache.falcon.resource.channel.Channel;
+import org.apache.falcon.resource.channel.ChannelFactory;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.service.Services;
 import org.apache.falcon.util.DeploymentUtil;
-import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.servlet.ServletInputStream;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
@@ -66,16 +68,22 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
+import javax.xml.bind.JAXBException;
 import java.util.Collections;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Properties;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
+import java.util.TreeMap;
+import java.util.SortedMap;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
 
 /**
  * Jersey Resource for extension job operations.
@@ -97,6 +105,11 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
     private static final String EXTENSION_TYPE = "type";
     private static final String EXTENSION_DESC = "description";
     private static final String EXTENSION_LOCATION = "location";
+    private boolean embeddedMode = DeploymentUtil.isEmbeddedMode();
+    private String currentColo = DeploymentUtil.getCurrentColo();
+    private final Map<String, Channel> configSyncChannels = new HashMap<String,
Channel>();
+    private static final String PRISM_TAG = "prism";
+    private final Map<String, Channel> entityManagerChannels = new HashMap<String,
Channel>();
 
 
 
@@ -313,19 +326,19 @@ public class ExtensionManagerProxy extends AbstractExtensionManager
{
             @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
             @FormDataParam("config") InputStream config) {
         checkIfExtensionServiceIsEnabled();
-        Map<EntityType, List<Entity>> entityMap;
+        SortedMap<EntityType, List<Entity>> entityMap;
 
         try {
             entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
-            submitEntities(extensionName, doAsUser, jobName, entityMap, config);
-        } catch (FalconException | IOException e) {
+            submitEntities(extensionName, jobName, entityMap, config, request);
+        } catch (FalconException | IOException | JAXBException e) {
             LOG.error("Error while submitting extension job: ", e);
             throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
         }
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully:"
+ jobName);
     }
 
-    private Map<EntityType, List<Entity>> getEntityList(String extensionName,
String jobName,
+    private SortedMap<EntityType, List<Entity>> getEntityList(String extensionName,
String jobName,
                                                         List<FormDataBodyPart> feedForms,
                                                         List<FormDataBodyPart> processForms,
InputStream config)
         throws FalconException, IOException{
@@ -333,7 +346,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
         List<Entity> feeds = getFeeds(feedForms);
         ExtensionType extensionType = getExtensionType(extensionName);
         List<Entity> entities;
-        Map<EntityType, List<Entity>> entityMap = new HashMap<>();
+        TreeMap<EntityType, List<Entity>> entityMap = new TreeMap<>();
         if (ExtensionType.TRUSTED.equals(extensionType)) {
             entities = generateEntities(extensionName, config);
             List<Entity> trustedFeeds = new ArrayList<>();
@@ -364,6 +377,13 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
         return extensionDetails.getExtensionType();
     }
 
+    private Channel getEntityManager(String colo) throws FalconException {
+        if (!entityManagerChannels.containsKey(colo)) {
+            initializeFor(colo);
+        }
+        return entityManagerChannels.get(colo);
+    }
+
     @POST
     @Path("submitAndSchedule/{extension-name}")
     @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA})
@@ -377,54 +397,85 @@ public class ExtensionManagerProxy extends AbstractExtensionManager
{
             @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
             @FormDataParam("config") InputStream config) {
         checkIfExtensionServiceIsEnabled();
-        Map<EntityType, List<Entity>> entityMap;
+        SortedMap<EntityType, List<Entity>> entityMap;
         try {
             entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
-            submitEntities(extensionName, doAsUser, jobName, entityMap, config);
-            scheduleEntities(entityMap);
-        } catch (FalconException | IOException e) {
+            submitEntities(extensionName, jobName, entityMap, config, request);
+            scheduleEntities(entityMap, request);
+        } catch (FalconException | IOException | JAXBException e) {
             LOG.error("Error while submitting extension job: ", e);
             throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
         }
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled
successfully");
     }
 
-    private List<Entity> getFeeds(List<FormDataBodyPart> feedForms) {
-        List<Entity> feeds = new ArrayList<>();
-        if (feedForms != null && !feedForms.isEmpty()) {
-            for (FormDataBodyPart formDataBodyPart : feedForms) {
-                feeds.add(formDataBodyPart.getValueAs(Feed.class));
+    protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap,
HttpServletRequest request)
+        throws FalconException, JAXBException, IOException {
+
+        for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
+            for(final Entity entity : entry.getValue()){
+                final HttpServletRequest httpServletRequest = getEntityStream(entity, entity.getEntityType(),
request);
+                final HttpServletRequest bufferedRequest = getBufferedRequest(httpServletRequest);
+                final Set<String> colos = getApplicableColos(entity.getEntityType().toString(),
entity);
+
+                new EntityProxy(entity.getEntityType().toString(), entity.getName()) {
+                    @Override
+                    protected Set<String> getColosToApply() {
+                        return colos;
+                    }
+
+                    @Override
+                    protected APIResult doExecute(String colo) throws FalconException {
+                        return getEntityManager(colo).invoke("schedule", bufferedRequest,
entity.getEntityType().toString(),
+                            entity.getName(), colo, Boolean.FALSE, "");
+                    }
+                }.execute();
             }
         }
-        return feeds;
     }
 
-    private List<Entity> getProcesses(List<FormDataBodyPart> processForms) {
-        List<Entity> processes = new ArrayList<>();
-        if (processForms != null && !processForms.isEmpty()) {
-            for (FormDataBodyPart formDataBodyPart : processForms) {
-                processes.add(formDataBodyPart.getValueAs(Process.class));
-            }
+    private BufferedRequest getBufferedRequest(HttpServletRequest request) {
+        if (request instanceof BufferedRequest) {
+            return (BufferedRequest) request;
         }
-        return processes;
+        return new BufferedRequest(request);
     }
 
-    protected void submitEntities(String extensionName, String doAsUser, String jobName,
-                                  Map<EntityType, List<Entity>> entityMap, InputStream
configStream)
-        throws FalconException, IOException {
+    protected void submitEntities(String extensionName, String jobName,
+                                  SortedMap<EntityType, List<Entity>> entityMap,
InputStream configStream,
+                                  HttpServletRequest request) throws FalconException, IOException,
JAXBException {
         List<Entity> feeds = entityMap.get(EntityType.FEED);
         List<Entity> processes = entityMap.get(EntityType.PROCESS);
         validateFeeds(feeds);
         validateProcesses(processes);
         List<String> feedNames = new ArrayList<>();
         List<String> processNames = new ArrayList<>();
-        for (Entity feed : feeds) {
-            submitInternal(feed, doAsUser);
-            feedNames.add(feed.getName());
-        }
-        for (Entity process: processes) {
-            submitInternal(process, doAsUser);
-            processNames.add(process.getName());
+
+        for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
+            for(final Entity entity : entry.getValue()){
+                final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(),
request);
+                final Set<String> colos = getApplicableColos(entity.getEntityType().toString(),
entity);
+                new EntityProxy(entity.getEntityType().toString(), entity.getName()) {
+                    @Override
+                    protected Set<String> getColosToApply() {
+                        return colos;
+                    }
+
+                    @Override
+                    protected APIResult doExecute(String colo) throws FalconException {
+                        return getConfigSyncChannel(colo).invoke("submit", bufferedRequest,
+                                entity.getEntityType().toString(), colo);
+                    }
+                }.execute();
+                if (!embeddedMode) {
+                    super.submit(bufferedRequest, entity.getEntityType().toString(), currentColo);
+                }
+                if (entity.getEntityType().equals(EntityType.FEED)){
+                    feedNames.add(entity.getName());
+                }else{
+                    processNames.add(entity.getName());
+                }
+            }
         }
 
         ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
@@ -435,14 +486,29 @@ public class ExtensionManagerProxy extends AbstractExtensionManager
{
         metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
     }
 
-    protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap)
throws FalconException,
-            AuthorizationException {
-        for (Object feed: entityMap.get(EntityType.FEED)) {
-            scheduleInternal(EntityType.FEED.name(), ((Feed)feed).getName(), null, null);
-        }
-        for (Object process: entityMap.get(EntityType.PROCESS)) {
-            scheduleInternal(EntityType.PROCESS.name(), ((Process)process).getName(), null,
null);
+    private void initializeFor(String colo) throws FalconException {
+        entityManagerChannels.put(colo, ChannelFactory.get("SchedulableEntityManager", colo));
+        configSyncChannels.put(colo, ChannelFactory.get("ConfigSyncService", colo));
+    }
+
+    private Channel getConfigSyncChannel(String colo) throws FalconException {
+        if (!configSyncChannels.containsKey(colo)) {
+            initializeFor(colo);
         }
+        return configSyncChannels.get(colo);
+    }
+
+    private HttpServletRequest getEntityStream(Entity entity, EntityType type, HttpServletRequest
request) throws IOException, JAXBException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        type.getMarshaller().marshal(entity, baos);
+        final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(baos.toByteArray());
+        ServletInputStream servletInputStream=new ServletInputStream(){
+            public int read() throws IOException {
+                return byteArrayInputStream.read();
+            }
+        };
+        return getBufferedRequest(new HttpServletRequestInputStreamWrapper(request, servletInputStream));
     }
 
 
@@ -459,6 +525,26 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
         }
     }
 
+    private List<Entity> getFeeds(List<FormDataBodyPart> feedForms) {
+        List<Entity> feeds = new ArrayList<>();
+        if (feedForms != null && !feedForms.isEmpty()) {
+            for (FormDataBodyPart formDataBodyPart : feedForms) {
+                feeds.add(formDataBodyPart.getValueAs(Feed.class));
+            }
+        }
+        return feeds;
+    }
+
+    private List<Entity> getProcesses(List<FormDataBodyPart> processForms) {
+        List<Entity> processes = new ArrayList<>();
+        if (processForms != null && !processForms.isEmpty()) {
+            for (FormDataBodyPart formDataBodyPart : processForms) {
+                processes.add(formDataBodyPart.getValueAs(Process.class));
+            }
+        }
+        return processes;
+    }
+
     @POST
     @Path("update/{extension-name}")
     @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})

http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/prism/src/main/java/org/apache/falcon/resource/proxy/HttpServletRequestInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/HttpServletRequestInputStreamWrapper.java
b/prism/src/main/java/org/apache/falcon/resource/proxy/HttpServletRequestInputStreamWrapper.java
new file mode 100644
index 0000000..5492c0b
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/HttpServletRequestInputStreamWrapper.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.resource.proxy;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletRequestWrapper;
+
+/**
+ * Wrapper class to wrap new ServletInputStream.
+ */
+
+public class HttpServletRequestInputStreamWrapper extends HttpServletRequestWrapper {
+
+    private ServletInputStream inputStream;
+
+
+    public HttpServletRequestInputStreamWrapper(HttpServletRequest request , ServletInputStream
stream){
+        super(request);
+        this.inputStream = stream;
+    }
+
+    @Override
+    public ServletInputStream getInputStream(){
+        return inputStream;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 26de20e..ed1054c 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -54,7 +54,6 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
-import java.lang.reflect.Constructor;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -852,60 +851,5 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
     }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
-    private abstract class EntityProxy<T extends APIResult> {
-        private final Class<T> clazz;
-        private String type;
-        private String name;
 
-        public EntityProxy(String type, String name, Class<T> resultClazz) {
-            this.clazz = resultClazz;
-            this.type = type;
-            this.name = name;
-        }
-
-
-        private T getResultInstance(APIResult.Status status, String message) {
-            try {
-                Constructor<T> constructor = clazz.getConstructor(APIResult.Status.class,
String.class);
-                return constructor.newInstance(status, message);
-            } catch (Exception e) {
-                throw new FalconRuntimException("Unable to consolidate result.", e);
-            }
-        }
-
-        public EntityProxy(String type, String name) {
-            this(type, name, (Class<T>) APIResult.class);
-        }
-
-        public T execute() {
-            Set<String> colos = getColosToApply();
-
-            Map<String, T> results = new HashMap();
-
-            for (String colo : colos) {
-                try {
-                    results.put(colo, doExecute(colo));
-                } catch (FalconWebException e) {
-                    String message = ((APIResult) e.getResponse().getEntity()).getMessage();
-                    results.put(colo, getResultInstance(APIResult.Status.FAILED, message));
-                } catch (Throwable throwable) {
-                    results.put(colo, getResultInstance(APIResult.Status.FAILED, throwable.getClass().getName()
+ "::"
-                        + throwable.getMessage()));
-                }
-            }
-
-            T finalResult = consolidateResult(results, clazz);
-            if (finalResult.getStatus() == APIResult.Status.FAILED) {
-                throw FalconWebException.newAPIException(finalResult.getMessage());
-            } else {
-                return finalResult;
-            }
-        }
-
-        protected Set<String> getColosToApply() {
-            return getApplicableColos(type, name);
-        }
-
-        protected abstract T doExecute(String colo) throws FalconException;
-    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index c9e1d4c..7036dc7 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -56,14 +56,15 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
+import java.util.TreeMap;
 import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
+import java.util.Properties;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
+import java.util.Date;
+import java.util.LinkedHashMap;
 import java.util.TimeZone;
+import java.util.SortedMap;
 
 /**
  * Client for Falcon Unit.
@@ -285,14 +286,14 @@ public class FalconUnitClient extends AbstractFalconClient {
 
         InputStream configStream = getServletInputStream(configPath);
         try {
-            Map<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName,
jobName, configStream);
+            SortedMap<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName,
jobName, configStream);
             return localExtensionManager.submitExtensionJob(extensionName, jobName, configStream,
entityMap);
         } catch (FalconException | IOException e) {
             throw new FalconCLIException("Failed in submitting extension job " + jobName);
         }
     }
 
-    private Map<EntityType, List<Entity>> getEntityTypeListMap(String extensionName,
String jobName, InputStream configStream) {
+    private SortedMap<EntityType, List<Entity>> getEntityTypeListMap(String extensionName,
String jobName, InputStream configStream) {
         List<Entity> entities = getEntities(extensionName, jobName, configStream);
         List<Entity> feeds = new ArrayList<>();
         List<Entity> processes = new ArrayList<>();
@@ -303,7 +304,7 @@ public class FalconUnitClient extends AbstractFalconClient {
                 processes.add(entity);
             }
         }
-        Map<EntityType, List<Entity>> entityMap = new HashMap<>();
+        SortedMap<EntityType, List<Entity>> entityMap = new TreeMap<>();
         entityMap.put(EntityType.PROCESS, processes);
         entityMap.put(EntityType.FEED, feeds);
         return entityMap;
@@ -326,7 +327,7 @@ public class FalconUnitClient extends AbstractFalconClient {
                                                    String doAsUser) {
         InputStream configStream = getServletInputStream(configPath);
         try {
-            Map<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName,
jobName, configStream);
+            SortedMap<EntityType, List<Entity>> entityMap = getEntityTypeListMap(extensionName,
jobName, configStream);
             return localExtensionManager.submitAndSchedulableExtensionJob(extensionName,
jobName, configStream,
                     entityMap);
         } catch (FalconException | IOException e) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/9946758e/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index 7f55f9a..57c339e 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -18,17 +18,21 @@
 
 package org.apache.falcon.unit;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.extensions.store.ExtensionStore;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.AbstractExtensionManager;
 import org.apache.falcon.security.CurrentUser;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedMap;
 
 /**
  * A proxy implementation of the extension operations in local mode.
@@ -37,16 +41,48 @@ public class LocalExtensionManager extends AbstractExtensionManager {
     public LocalExtensionManager() {}
 
     public APIResult submitExtensionJob(String extensionName, String jobName, InputStream
config,
-                                        Map<EntityType, List<Entity>> entityMap)
throws FalconException, IOException {
-        submitEntities(extensionName, null, jobName, entityMap, config);
+                                        SortedMap<EntityType, List<Entity>> entityMap)
+        throws FalconException, IOException {
+
+        for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
+            for(Entity entity : entry.getValue()){
+                submitInternal(entity, "falconUser");
+            }
+        }
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully"
+ jobName);
     }
 
-    public APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName,
InputStream config,
-                                                      Map<EntityType, List<Entity>>
entityMap)
+    public APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName,
InputStream configStream,
+                                                      SortedMap<EntityType, List<Entity>>
entityMap)
         throws FalconException, IOException {
-        submitEntities(extensionName, null, jobName, entityMap, config);
-        scheduleEntities(entityMap);
+        List<String> feedNames = new ArrayList<>();
+        List<String> processNames = new ArrayList<>();
+        for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
+            for(Entity entity : entry.getValue()){
+                submitInternal(entity, "falconUser");
+            }
+        }
+
+        for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
+            for(Entity entity : entry.getValue()){
+                scheduleInternal(entry.getKey().name(), entity.getName(), null, null);
+            }
+        }
+        byte[] configBytes = null;
+        if (configStream != null) {
+            configBytes = IOUtils.toByteArray(configStream);
+        }
+        for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
+            for(final Entity entity : entry.getValue()){
+                if (entity.getEntityType().equals(EntityType.FEED)){
+                    feedNames.add(entity.getName());
+                }else{
+                    processNames.add(entity.getName());
+                }
+            }
+        }
+        ExtensionStore.getMetaStore().storeExtensionJob(jobName, extensionName, feedNames,
processNames, configBytes);
+
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully"
+ jobName);
     }
 


Mime
View raw message