Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CD2BE196C9 for ; Fri, 15 Apr 2016 18:46:46 +0000 (UTC) Received: (qmail 17667 invoked by uid 500); 15 Apr 2016 18:46:46 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 17630 invoked by uid 500); 15 Apr 2016 18:46:46 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 17621 invoked by uid 99); 15 Apr 2016 18:46:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Apr 2016 18:46:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7FD15DFBA0; Fri, 15 Apr 2016 18:46:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: balu@apache.org To: commits@falcon.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: falcon git commit: FALCON-1789 Extension Job Management: REST API Date: Fri, 15 Apr 2016 18:46:46 +0000 (UTC) Repository: falcon Updated Branches: refs/heads/master 20bc33187 -> dc3e729a8 FALCON-1789 Extension Job Management: REST API Tested REST APIs. Will add IT tests (FALCON-1905) and documentations (FALCON-1904) in separate pull requests. Author: yzheng-hortonworks Reviewers: "Balu , Sowmya " Closes #98 from yzheng-hortonworks/FALCON-1789 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/dc3e729a Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/dc3e729a Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/dc3e729a Branch: refs/heads/master Commit: dc3e729a8b73a15b923ae6925af89b5a5644e289 Parents: 20bc331 Author: Ying Zheng Authored: Fri Apr 15 11:46:33 2016 -0700 Committer: bvellanki Committed: Fri Apr 15 11:46:33 2016 -0700 ---------------------------------------------------------------------- .../falcon/resource/ExtensionInstanceList.java | 95 +++++ .../falcon/resource/ExtensionJobList.java | 97 +++++ .../falcon/resource/AbstractEntityManager.java | 98 +++-- .../resource/extensions/ExtensionManager.java | 396 +++++++++++++++++++ prism/src/main/webapp/WEB-INF/web.xml | 4 +- .../src/main/webapp/WEB-INF/distributed/web.xml | 8 +- webapp/src/main/webapp/WEB-INF/embedded/web.xml | 4 +- webapp/src/main/webapp/WEB-INF/web.xml | 4 +- 8 files changed, 665 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/client/src/main/java/org/apache/falcon/resource/ExtensionInstanceList.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/ExtensionInstanceList.java b/client/src/main/java/org/apache/falcon/resource/ExtensionInstanceList.java new file mode 100644 index 0000000..65ca4d4 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/resource/ExtensionInstanceList.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource; + +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElementWrapper; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Instance list of an extension job used for marshalling / unmarshalling with REST calls. + */ +@XmlRootElement +@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}) +public class ExtensionInstanceList { + @XmlElement + private int numEntities; + + @XmlElementWrapper(name = "entitiesSummary") + private List entitySummary; + + public ExtensionInstanceList() { + numEntities = 0; + entitySummary = null; + } + + public ExtensionInstanceList(int numEntities) { + this.numEntities = numEntities; + entitySummary = new ArrayList<>(); + } + + public ExtensionInstanceList(int numEntities, List entitySummary) { + this.numEntities = numEntities; + this.entitySummary = entitySummary; + } + + public void addEntitySummary(EntitySummary summary) { + entitySummary.add(summary); + } + + public String toString() { + StringBuilder buffer = new StringBuilder(); + buffer.append(numEntities + "\n\n"); + for (EntitySummary summary : entitySummary) { + buffer.append(summary.toString()); + } + return buffer.toString(); + } + + /** + * Summary of an entity (including entity properties and instances. + */ + public static class EntitySummary { + @XmlElement + private EntityList.EntityElement entityProfile; + + @XmlElement + private InstancesResult.Instance[] instances; + + public EntitySummary() { + entityProfile = null; + instances = null; + } + + public EntitySummary(EntityList.EntityElement entityProfile, InstancesResult.Instance[] instances) { + this.entityProfile = entityProfile; + this.instances = instances; + } + + public String toString() { + StringBuilder buffer = new StringBuilder(); + buffer.append(entityProfile.toString() + "\n"); + buffer.append(Arrays.toString(instances) + "\n"); + return buffer.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/client/src/main/java/org/apache/falcon/resource/ExtensionJobList.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/ExtensionJobList.java b/client/src/main/java/org/apache/falcon/resource/ExtensionJobList.java new file mode 100644 index 0000000..71f92b8 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/resource/ExtensionJobList.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource; + +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElementWrapper; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +/** + * Extension job list used for marshalling / unmarshalling with REST calls. + */ +@XmlRootElement +@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}) +public class ExtensionJobList { + + @XmlElement + private int numJobs; + + @XmlElementWrapper(name = "jobs") + private List job; + + public ExtensionJobList() { + numJobs = 0; + job = null; + } + + public ExtensionJobList(int numJobs) { + this.numJobs = numJobs; + job = new ArrayList(); + } + + public ExtensionJobList(int numJobs, List elements) { + this.numJobs = numJobs; + this.job = elements; + } + + public void addJob(JobElement element) { + job.add(element); + } + + @Override + public String toString() { + StringBuilder buffer = new StringBuilder(); + buffer.append(numJobs + "\n\n"); + for (JobElement element : job) { + buffer.append(element.toString()); + } + return buffer.toString(); + } + + /** + * Element for a job. + */ + public static class JobElement { + @XmlElement + private String jobName; + + @XmlElement + private EntityList jobEntities; + + public JobElement() { + jobName = null; + jobEntities = null; + } + + public JobElement(String name, EntityList entities) { + jobName = name; + jobEntities = entities; + } + + @Override + public String toString() { + StringBuilder buffer = new StringBuilder(); + buffer.append("Job: " + jobName + ", #. entities: "); + buffer.append(jobEntities.toString() + "\n"); + return buffer.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 3ebe612..fde0cd7 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -296,11 +296,21 @@ public abstract class AbstractEntityManager { protected APIResult update(InputStream inputStream, String type, String entityName, String colo, Boolean skipDryRun) { checkColo(colo); + try { + EntityType entityType = EntityType.getEnum(type); + Entity entity = deserializeEntity(inputStream, entityType); + return update(entity, type, entityName, skipDryRun); + } catch (IOException | FalconException e) { + LOG.error("Update failed", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + + protected APIResult update(Entity newEntity, String type, String entityName, Boolean skipDryRun) { List tokenList = new ArrayList<>(); try { EntityType entityType = EntityType.getEnum(type); Entity oldEntity = EntityUtil.getEntity(type, entityName); - Entity newEntity = deserializeEntity(inputStream, entityType); // KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass decorateEntityWithACL(newEntity); validate(newEntity); @@ -400,7 +410,7 @@ public abstract class AbstractEntityManager { } } - private void canRemove(Entity entity) throws FalconException { + protected void canRemove(Entity entity) throws FalconException { Pair[] referencedBy = EntityIntegrityChecker.referencedBy(entity); if (referencedBy != null && referencedBy.length > 0) { StringBuilder messages = new StringBuilder(); @@ -415,9 +425,14 @@ public abstract class AbstractEntityManager { protected Entity submitInternal(InputStream inputStream, String type, String doAsUser) throws IOException, FalconException { - EntityType entityType = EntityType.getEnum(type); Entity entity = deserializeEntity(inputStream, entityType); + submitInternal(entity, doAsUser); + return entity; + } + + protected synchronized void submitInternal(Entity entity, String doAsUser) throws IOException, FalconException { + EntityType entityType = entity.getEntityType(); List tokenList = new ArrayList<>(); // KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass decorateEntityWithACL(entity); @@ -431,7 +446,7 @@ public abstract class AbstractEntityManager { Entity existingEntity = configStore.get(entityType, entity.getName()); if (existingEntity != null) { if (EntityUtil.equals(existingEntity, entity)) { - return existingEntity; + return; } throw new EntityAlreadyExistsException( @@ -442,8 +457,7 @@ public abstract class AbstractEntityManager { SecurityUtil.tryProxy(entity, doAsUser); // proxy before validating since FS/Oozie needs to be proxied validate(entity); configStore.publish(entityType, entity); - LOG.info("Submit successful: ({}): {}", type, entity.getName()); - return entity; + LOG.info("Submit successful: ({}): {}", entityType, entity.getName()); } /** @@ -514,7 +528,7 @@ public abstract class AbstractEntityManager { } @SuppressWarnings({"unchecked", "rawtypes"}) - private void validate(Entity entity) throws FalconException { + protected void validate(Entity entity) throws FalconException { EntityParser entityParser = EntityParserFactory.getParser(entity.getEntityType()); entityParser.validate(entity); } @@ -615,14 +629,37 @@ public abstract class AbstractEntityManager { String filterType, String filterTags, String filterBy, String orderBy, String sortOrder, Integer offset, Integer resultsPerPage, final String doAsUser) { - HashSet fields = new HashSet(Arrays.asList(fieldStr.toUpperCase().split(","))); Map> filterByFieldsValues = getFilterByFieldsValues(filterBy); - for (String key : filterByFieldsValues.keySet()) { + for (String key : filterByFieldsValues.keySet()) { if (!key.toUpperCase().equals("NAME") && !key.toUpperCase().equals("CLUSTER")) { fields.add(key.toUpperCase()); } } + try { + // get filtered entities + List entities = getEntityList( + nameSubsequence, tagKeywords, filterType, filterTags, filterBy, doAsUser); + + // sort entities and pagination + List entitiesReturn = sortEntitiesPagination(entities, orderBy, sortOrder, offset, resultsPerPage); + + // add total number of results + EntityList entityList = entitiesReturn.size() == 0 + ? new EntityList(new Entity[]{}, 0) + : new EntityList(buildEntityElements(new HashSet(fields), entitiesReturn), entities.size()); + return entityList; + } catch (Exception e) { + LOG.error("Failed to get entity list", e); + throw FalconWebException.newAPIException(e); + } + } + + public List getEntityList(String nameSubsequence, String tagKeywords, + String filterType, String filterTags, String filterBy, final String doAsUser) + throws FalconException, IOException { + + Map> filterByFieldsValues = getFilterByFieldsValues(filterBy); validateEntityFilterByClause(filterByFieldsValues); if (StringUtils.isNotEmpty(filterTags)) { filterByFieldsValues.put(EntityList.EntityFilterByFields.TAGS.name(), Arrays.asList(filterTags)); @@ -630,34 +667,21 @@ public abstract class AbstractEntityManager { // get filtered entities List entities = new ArrayList(); - try { - if (StringUtils.isEmpty(filterType)) { - // return entities of all types if no entity type specified - for (EntityType entityType : EntityType.values()) { - entities.addAll(getFilteredEntities( - entityType, nameSubsequence, tagKeywords, filterByFieldsValues, "", "", "", doAsUser)); - } - } else { - String[] types = filterType.split(","); - for (String type : types) { - EntityType entityType = EntityType.getEnum(type); - entities.addAll(getFilteredEntities( - entityType, nameSubsequence, tagKeywords, filterByFieldsValues, "", "", "", doAsUser)); - } + if (StringUtils.isEmpty(filterType)) { + // return entities of all types if no entity type specified + for (EntityType entityType : EntityType.values()) { + entities.addAll(getFilteredEntities( + entityType, nameSubsequence, tagKeywords, filterByFieldsValues, "", "", "", doAsUser)); + } + } else { + String[] types = filterType.split(","); + for (String type : types) { + EntityType entityType = EntityType.getEnum(type); + entities.addAll(getFilteredEntities( + entityType, nameSubsequence, tagKeywords, filterByFieldsValues, "", "", "", doAsUser)); } - } catch (Exception e) { - LOG.error("Failed to get entity list", e); - throw FalconWebException.newAPIException(e); } - - // sort entities and pagination - List entitiesReturn = sortEntitiesPagination(entities, orderBy, sortOrder, offset, resultsPerPage); - - // add total number of results - EntityList entityList = entitiesReturn.size() == 0 - ? new EntityList(new Entity[]{}, 0) - : new EntityList(buildEntityElements(new HashSet(fields), entitiesReturn), entities.size()); - return entityList; + return entities; } protected List sortEntitiesPagination(List entities, String orderBy, String sortOrder, @@ -1032,7 +1056,7 @@ public abstract class AbstractEntityManager { return retLen; } - private EntityElement[] buildEntityElements(HashSet fields, List entities) { + protected EntityElement[] buildEntityElements(HashSet fields, List entities) { EntityElement[] elements = new EntityElement[entities.size()]; int elementIndex = 0; for (Entity entity : entities) { @@ -1041,7 +1065,7 @@ public abstract class AbstractEntityManager { return elements; } - private EntityElement getEntityElement(Entity entity, HashSet fields) { + protected EntityElement getEntityElement(Entity entity, HashSet fields) { EntityElement elem = new EntityElement(); elem.type = entity.getEntityType().toString(); elem.name = entity.getName(); http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java new file mode 100644 index 0000000..07d4217 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource.extensions; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.POST; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.FalconWebException; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.extensions.Extension; +import org.apache.falcon.extensions.ExtensionProperties; +import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.AbstractSchedulableEntityManager; +import org.apache.falcon.resource.EntityList; +import org.apache.falcon.resource.ExtensionJobList; +import org.apache.falcon.resource.ExtensionInstanceList; +import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.util.DeploymentUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Jersey Resource for extension job operations. + */ +@Path("extension") +public class ExtensionManager extends AbstractSchedulableEntityManager { + public static final Logger LOG = LoggerFactory.getLogger(ExtensionManager.class); + + public static final String TAG_PREFIX_EXTENSION_NAME = "_falcon_extension_name="; + public static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job="; + public static final String TAG_SEPARATOR = ","; + public static final String ASCENDING_SORT_ORDER = "asc"; + public static final String DESCENDING_SORT_ORDER = "desc"; + + private Extension extension = new Extension(); + + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + @GET + @Path("list/{extension-name}") + @Produces({MediaType.TEXT_XML, MediaType.APPLICATION_JSON}) + public ExtensionJobList getExtensionJobs( + @PathParam("extension-name") String extensionName, + @DefaultValue("") @QueryParam("fields") String fields, + @DefaultValue(ASCENDING_SORT_ORDER) @QueryParam("sortOrder") String sortOrder, + @DefaultValue("0") @QueryParam("offset") Integer offset, + @QueryParam("numResults") Integer resultsPerPage, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + resultsPerPage = resultsPerPage == null ? getDefaultResultsPerPage() : resultsPerPage; + try { + // get filtered entities + List entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_NAME + extensionName, "", doAsUser); + + // group entities by extension job name + Map> groupedEntities = groupEntitiesByJob(entities); + + // sort by extension job name + List jobNames = new ArrayList<>(groupedEntities.keySet()); + switch (sortOrder.toLowerCase()) { + case DESCENDING_SORT_ORDER : + Collections.sort(jobNames, Collections.reverseOrder(String.CASE_INSENSITIVE_ORDER)); + break; + default: + Collections.sort(jobNames, String.CASE_INSENSITIVE_ORDER); + } + + // pagination and format output + int pageCount = getRequiredNumberOfResults(jobNames.size(), offset, resultsPerPage); + HashSet fieldSet = new HashSet<>(Arrays.asList(fields.toUpperCase().split(","))); + ExtensionJobList jobList = new ExtensionJobList(pageCount); + for (int i = offset; i < offset + pageCount; i++) { + String jobName = jobNames.get(i); + List jobEntities = groupedEntities.get(jobName); + EntityList entityList = new EntityList(buildEntityElements(fieldSet, jobEntities), jobEntities.size()); + jobList.addJob(new ExtensionJobList.JobElement(jobName, entityList)); + } + return jobList; + } catch (FalconException | IOException e) { + LOG.error("Failed to get extension job list of " + extensionName + ": ", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + + @GET + @Path("instances/{job-name}") + @Produces(MediaType.APPLICATION_JSON) + public ExtensionInstanceList getInstances( + @PathParam("job-name") final String jobName, + @QueryParam("start") final String nominalStart, + @QueryParam("end") final String nominalEnd, + @DefaultValue("") @QueryParam("instanceStatus") String instanceStatus, + @DefaultValue("") @QueryParam("fields") String fields, + @DefaultValue("") @QueryParam("orderBy") String orderBy, + @DefaultValue("") @QueryParam("sortOrder") String sortOrder, + @DefaultValue("0") @QueryParam("offset") final Integer offset, + @QueryParam("numResults") Integer resultsPerPage, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + resultsPerPage = resultsPerPage == null ? getDefaultResultsPerPage() : resultsPerPage; + try { + List entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser); + HashSet fieldSet = new HashSet<>(Arrays.asList(fields.toUpperCase().split(","))); + ExtensionInstanceList instances = new ExtensionInstanceList(entities.size()); + for (Entity entity : entities) { + InstancesResult entityInstances = super.getStatus( + entity.getEntityType().name(), entity.getName(), nominalStart, nominalEnd, + null, null, "STATUS:" + instanceStatus, orderBy, sortOrder, offset, resultsPerPage, null); + instances.addEntitySummary(new ExtensionInstanceList.EntitySummary( + getEntityElement(entity, fieldSet), entityInstances.getInstances())); + } + return instances; + } catch (FalconException | IOException e) { + LOG.error("Error when listing instances of extension job: " + jobName + ": ", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + + @POST + @Path("schedule/{job-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult schedule(@PathParam("job-name") String jobName, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + try { + List entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser); + for (Entity entity : entities) { + scheduleInternal(entity.getEntityType().name(), entity.getName(), null, null); + } + } catch (FalconException | IOException e) { + LOG.error("Error when scheduling extension job: " + jobName + ": ", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " scheduled successfully"); + } + + @POST + @Path("suspend/{job-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult suspend(@PathParam("job-name") String jobName, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + try { + List entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser); + for (Entity entity : entities) { + if (entity.getEntityType().isSchedulable()) { + if (getWorkflowEngine(entity).isActive(entity)) { + getWorkflowEngine(entity).suspend(entity); + } + } + } + } catch (FalconException | IOException e) { + LOG.error("Error when scheduling extension job: " + jobName + ": ", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully"); + } + + @POST + @Path("resume/{job-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult resume(@PathParam("job-name") String jobName, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + try { + List entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser); + for (Entity entity : entities) { + if (entity.getEntityType().isSchedulable()) { + if (getWorkflowEngine(entity).isSuspended(entity)) { + getWorkflowEngine(entity).resume(entity); + } + } + } + } catch (FalconException | IOException e) { + LOG.error("Error when resuming extension job " + jobName + ": ", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " resumed successfully"); + } + + @POST + @Path("delete/{job-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult delete(@PathParam("job-name") String jobName, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + try { + List entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser); + for (Entity entity : entities) { + // TODO(yzheng): need to remember the entity dependency graph for clean ordered removal + canRemove(entity); + if (entity.getEntityType().isSchedulable() && !DeploymentUtil.isPrism()) { + getWorkflowEngine(entity).delete(entity); + } + configStore.remove(entity.getEntityType(), entity.getName()); + } + } catch (FalconException | IOException e) { + LOG.error("Error when deleting extension job: " + jobName + ": ", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " deleted successfully"); + } + + @POST + @Path("submit/{extension-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult submit( + @PathParam("extension-name") String extensionName, + @Context HttpServletRequest request, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + try { + List entities = generateEntities(extensionName, request); + for (Entity entity : entities) { + submitInternal(entity, doAsUser); + } + } catch (FalconException | IOException e) { + LOG.error("Error when submitting extension job: ", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully"); + } + + @POST + @Path("submitAndSchedule/{extension-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult submitAndSchedule( + @PathParam("extension-name") String extensionName, + @Context HttpServletRequest request, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + try { + List entities = generateEntities(extensionName, request); + for (Entity entity : entities) { + submitInternal(entity, doAsUser); + } + for (Entity entity : entities) { + scheduleInternal(entity.getEntityType().name(), entity.getName(), null, null); + } + } catch (FalconException | IOException e) { + LOG.error("Error when submitting extension job: ", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully"); + } + + @POST + @Path("update/{extension-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult update( + @PathParam("extension-name") String extensionName, + @Context HttpServletRequest request, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + try { + List entities = generateEntities(extensionName, request); + for (Entity entity : entities) { + super.update(entity, entity.getEntityType().name(), entity.getName(), null); + } + } catch (FalconException | IOException e) { + LOG.error("Error when updating extension job: ", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully"); + } + + @POST + @Path("validate/{extension-name}") + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) + public APIResult validate( + @PathParam("extension-name") String extensionName, + @Context HttpServletRequest request, + @DefaultValue("") @QueryParam("doAs") String doAsUser) { + try { + List entities = generateEntities(extensionName, request); + for (Entity entity : entities) { + super.validate(entity); + } + } catch (FalconException | IOException e) { + LOG.error("Error when validating extension job: ", e); + throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + return new APIResult(APIResult.Status.SUCCEEDED, "Validated successfully"); + } + + private List generateEntities(String extensionName, HttpServletRequest request) + throws FalconException, IOException { + // get entities for extension job + Properties properties = new Properties(); + properties.load(request.getInputStream()); + List entities = extension.getEntities(extensionName, properties); + + // add tags on extension name and job + for (Entity entity : entities) { + String tags = entity.getTags(); + if (StringUtils.isEmpty(tags)) { + setEntityTags(entity, TAG_PREFIX_EXTENSION_NAME + extensionName + TAG_SEPARATOR + + TAG_PREFIX_EXTENSION_JOB + properties.getProperty(ExtensionProperties.JOB_NAME.getName())); + } else { + if (tags.indexOf(TAG_PREFIX_EXTENSION_NAME) != -1) { + throw new FalconException("Generated extention entity " + entity.getName() + + " should not contain tag prefix " + TAG_PREFIX_EXTENSION_NAME); + } + if (tags.indexOf(TAG_PREFIX_EXTENSION_JOB) != -1) { + throw new FalconException("Generated extention entity " + entity.getName() + + " should not contain tag prefix " + TAG_PREFIX_EXTENSION_JOB); + } + setEntityTags(entity, tags + TAG_SEPARATOR + TAG_PREFIX_EXTENSION_NAME + extensionName + TAG_SEPARATOR + + TAG_PREFIX_EXTENSION_JOB + properties.getProperty(ExtensionProperties.JOB_NAME.getName())); + } + } + + return entities; + } + + private void setEntityTags(Entity entity, String tags) { + switch (entity.getEntityType()) { + case PROCESS: + ((Process) entity).setTags(tags); + break; + case FEED: + ((Feed) entity).setTags(tags); + break; + case CLUSTER: + ((Cluster) entity).setTags(tags); + break; + default: + LOG.error("Unknown entity type: {}", entity.getEntityType().name()); + } + } + + private Map> groupEntitiesByJob(List entities) { + Map> groupedEntities = new HashMap<>(); + for (Entity entity : entities) { + String jobName = getJobNameFromTag(entity.getTags()); + if (!groupedEntities.containsKey(jobName)) { + groupedEntities.put(jobName, new ArrayList()); + } + groupedEntities.get(jobName).add(entity); + } + return groupedEntities; + } + + private String getJobNameFromTag(String tags) { + int nameStart = tags.indexOf(TAG_PREFIX_EXTENSION_JOB); + if (nameStart == -1) { + return null; + } + + nameStart = nameStart + TAG_PREFIX_EXTENSION_JOB.length(); + int nameEnd = tags.indexOf(',', nameStart); + if (nameEnd == -1) { + nameEnd = tags.length(); + } + return tags.substring(nameStart, nameEnd); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/prism/src/main/webapp/WEB-INF/web.xml ---------------------------------------------------------------------- diff --git a/prism/src/main/webapp/WEB-INF/web.xml b/prism/src/main/webapp/WEB-INF/web.xml index 7c1a7ad..dcc114c 100644 --- a/prism/src/main/webapp/WEB-INF/web.xml +++ b/prism/src/main/webapp/WEB-INF/web.xml @@ -79,7 +79,9 @@ com.sun.jersey.config.property.packages - org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata + org.apache.falcon.resource.admin,org.apache.falcon.resource.provider, + org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata, + org.apache.falcon.resource.extensions 1 http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/webapp/src/main/webapp/WEB-INF/distributed/web.xml ---------------------------------------------------------------------- diff --git a/webapp/src/main/webapp/WEB-INF/distributed/web.xml b/webapp/src/main/webapp/WEB-INF/distributed/web.xml index 4741897..e67191f 100644 --- a/webapp/src/main/webapp/WEB-INF/distributed/web.xml +++ b/webapp/src/main/webapp/WEB-INF/distributed/web.xml @@ -99,7 +99,13 @@ com.sun.jersey.config.property.classnames - org.apache.falcon.resource.admin.AdminResource,org.apache.falcon.resource.provider.JAXBContextResolver,org.apache.falcon.resource.SchedulableEntityManager,org.apache.falcon.resource.InstanceManager,org.apache.falcon.resource.metadata.MetadataDiscoveryResource,org.apache.falcon.resource.metadata.LineageMetadataResource + org.apache.falcon.resource.admin.AdminResource, + org.apache.falcon.resource.provider.JAXBContextResolver, + org.apache.falcon.resource.SchedulableEntityManager, + org.apache.falcon.resource.InstanceManager, + org.apache.falcon.resource.metadata.MetadataDiscoveryResource, + org.apache.falcon.resource.metadata.LineageMetadataResource, + org.apache.falcon.resource.extensions.ExtensionManager 1 http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/webapp/src/main/webapp/WEB-INF/embedded/web.xml ---------------------------------------------------------------------- diff --git a/webapp/src/main/webapp/WEB-INF/embedded/web.xml b/webapp/src/main/webapp/WEB-INF/embedded/web.xml index 5ecfe77..084f9ab 100644 --- a/webapp/src/main/webapp/WEB-INF/embedded/web.xml +++ b/webapp/src/main/webapp/WEB-INF/embedded/web.xml @@ -79,7 +79,9 @@ com.sun.jersey.config.property.packages - org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata + org.apache.falcon.resource.admin,org.apache.falcon.resource.provider, + org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata, + org.apache.falcon.resource.extensions 1 http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/webapp/src/main/webapp/WEB-INF/web.xml ---------------------------------------------------------------------- diff --git a/webapp/src/main/webapp/WEB-INF/web.xml b/webapp/src/main/webapp/WEB-INF/web.xml index acfa938..048d085 100644 --- a/webapp/src/main/webapp/WEB-INF/web.xml +++ b/webapp/src/main/webapp/WEB-INF/web.xml @@ -79,7 +79,9 @@ com.sun.jersey.config.property.packages - org.apache.falcon.resource.admin,org.apache.falcon.resource.proxy,org.apache.falcon.resource.provider,org.apache.falcon.resource.metadata + org.apache.falcon.resource.admin,org.apache.falcon.resource.provider, + org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata, + org.apache.falcon.resource.extensions 1