Return-Path: X-Original-To: apmail-ambari-commits-archive@www.apache.org Delivered-To: apmail-ambari-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B1162110C2 for ; Fri, 2 May 2014 16:24:57 +0000 (UTC) Received: (qmail 23521 invoked by uid 500); 2 May 2014 16:24:54 -0000 Delivered-To: apmail-ambari-commits-archive@ambari.apache.org Received: (qmail 23420 invoked by uid 500); 2 May 2014 16:24:50 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 23256 invoked by uid 99); 2 May 2014 16:24:49 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 May 2014 16:24:49 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B4B604588B; Fri, 2 May 2014 16:24:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tbeerbower@apache.org To: commits@ambari.apache.org Date: Fri, 02 May 2014 16:24:53 -0000 Message-Id: <710798bbe15b439fab0aee05592f7ab8@git.apache.org> In-Reply-To: <6f84574e37704061ad9ced6d9f1e0073@git.apache.org> References: <6f84574e37704061ad9ced6d9f1e0073@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/7] AMBARI-5616 - Ambari Views: Pig view (Roman Rader via tbeerbower) http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/JobService.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/JobService.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/JobService.java new file mode 100644 index 0000000..23705e9 --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/JobService.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.resources.jobs; + +import com.google.inject.Inject; +import org.apache.ambari.view.ViewResourceHandler; +import org.apache.ambari.view.pig.persistence.utils.ItemNotFound; +import org.apache.ambari.view.pig.persistence.utils.OnlyOwnersFilteringStrategy; +import org.apache.ambari.view.pig.resources.files.FileResource; +import org.apache.ambari.view.pig.resources.jobs.models.PigJob; +import org.apache.ambari.view.pig.services.BaseService; +import org.apache.ambari.view.pig.utils.FilePaginator; +import org.json.simple.JSONObject; + +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.*; +import javax.ws.rs.core.*; +import javax.xml.ws.WebServiceException; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Callable; + +/** + * Servlet for Pig Jobs + * API: + * GET /:id + * read job info + * POST / + * create new job + * Required: scriptId + * Optional: params + * GET / + * get all jobs of current user + * GET /:id/notify + * callback from Templeton + */ +public class JobService extends BaseService { + @Inject + ViewResourceHandler handler; + + protected JobResourceManager resourceManager = null; + + public synchronized JobResourceManager getResourceManager() { + if (resourceManager == null) { + resourceManager = new JobResourceManager(context); + } + return resourceManager; + } + + public synchronized void setResourceManager(JobResourceManager resourceManager) { + this.resourceManager = resourceManager; + } + + /** + * Get single item + */ + @GET + @Path("{jobId}") + @Produces(MediaType.APPLICATION_JSON) + public Response getJob(@PathParam("jobId") String jobId) { + PigJob job = null; + try { + job = getResourceManager().read(jobId); + } catch (ItemNotFound itemNotFound) { + return Response.status(404).build(); + } + getResourceManager().retrieveJobStatus(job); + JSONObject object = new JSONObject(); + object.put("job", job); + return Response.ok(object).build(); + } + + /** + * Get single item + */ + @DELETE + @Path("{jobId}") + public Response killJob(@PathParam("jobId") String jobId) throws IOException { + PigJob job = null; + try { + job = getResourceManager().read(jobId); + } catch (ItemNotFound itemNotFound) { + return Response.status(404).build(); + } + getResourceManager().killJob(job); + return Response.status(204).build(); + } + + /** + * Callback from templeton + */ + @GET + @Path("{jobId}/notify") + public Response jobCompletionNotification(@Context HttpHeaders headers, + @Context UriInfo ui, + @PathParam("jobId") final String jobId) { + PigJob job = null; + try { + job = getResourceManager().ignorePermissions(new Callable() { + public PigJob call() throws Exception { + PigJob job = null; + try { + job = getResourceManager().read(jobId); + } catch (ItemNotFound itemNotFound) { + return null; + } + return job; + } + }); + } catch (Exception e) { + return Response.status(500).build(); + } + if (job == null) + return Response.status(404).build(); + + getResourceManager().retrieveJobStatus(job); + return Response.ok().build(); + } + + @GET + @Path("{jobId}/results/{fileName}") + @Produces(MediaType.APPLICATION_JSON) + public Response jobExitCode(@Context HttpHeaders headers, + @Context UriInfo ui, + @PathParam("jobId") String jobId, + @PathParam("fileName") String fileName, + @QueryParam("page") Long page) { + PigJob job = null; + try { + job = getResourceManager().read(jobId); + } catch (ItemNotFound itemNotFound) { + return Response.ok("No such job").status(404).build(); + } + try { + String filePath = job.getStatusDir() + "/" + fileName; + LOG.debug("Reading file " + filePath); + FilePaginator paginator = new FilePaginator(filePath, context); + + if (page == null) + page = 0L; + + FileResource file = new FileResource(); + file.filePath = filePath; + file.fileContent = paginator.readPage(page); + file.hasNext = paginator.pageCount() > page + 1; + file.page = page; + file.pageCount = paginator.pageCount(); + + JSONObject object = new JSONObject(); + object.put("file", file); + return Response.ok(object).status(200).build(); + } catch (IOException e) { + return Response.ok(e.getMessage()).status(404).build(); + } catch (InterruptedException e) { + return Response.ok(e.getMessage()).status(404).build(); + } + } + + /** + * Get all jobs + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getJobList(@Context HttpHeaders headers, @Context UriInfo ui) { + List allJobs = getResourceManager().readAll( + new OnlyOwnersFilteringStrategy(this.context.getUsername())); + + JSONObject object = new JSONObject(); + object.put("jobs", allJobs); + return Response.ok(object).build(); + } + + /** + * Create job + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response runJob(PigJobRequest request, @Context HttpServletResponse response, + @Context UriInfo ui) { + if (!request.validatePOST()) { + return badRequestResponse(request.explainPOST()); + } + try { + getResourceManager().create(request.job); + } catch (IllegalArgumentException e) { + return badRequestResponse(e.getMessage()); + } catch (WebServiceException e) { + return serverErrorResponse(e.getMessage()); + } + + PigJob job = null; + + try { + job = getResourceManager().read(request.job.getId()); + } catch (ItemNotFound itemNotFound) { + return Response.status(404).build(); + } + + response.setHeader("Location", + String.format("%s/%s", ui.getAbsolutePath().toString(), request.job.getId())); + + JSONObject object = new JSONObject(); + object.put("job", job); + return Response.ok(object).status(201).build(); + } + + public static class PigJobRequest { + public PigJob job; + + public String explainPOST() { + StringBuilder result = new StringBuilder(); + if ((job.getPigScript() == null || job.getPigScript().isEmpty()) && + (job.getForcedContent() == null || job.getForcedContent().isEmpty())) + result.append("No pigScript file or forcedContent specifed;"); + if (job.getTitle() == null || job.getTitle().isEmpty()) + result.append("No title specifed;"); + if (job.getId() != null && !job.getTitle().isEmpty()) + result.append("ID should not exists in creation request;"); + return result.toString(); + } + + public boolean validatePOST() { + return explainPOST().isEmpty(); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/models/PigJob.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/models/PigJob.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/models/PigJob.java new file mode 100644 index 0000000..e49c267 --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/models/PigJob.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.resources.jobs.models; + +import org.apache.ambari.view.pig.persistence.utils.PersonalResource; +import org.apache.commons.beanutils.BeanUtils; +import org.codehaus.jackson.annotate.JsonIgnore; + +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.util.Map; + +/** + * Bean to represent Pig job + * + * Job lifecycle: + * SUBMITTING + * | + * [POST to Templeton] + * | | + * SUBMITTED SUBMIT_FAILED + * | + * | + * [GET result from job/:job_id] + * | | | + * COMPLETED KILLED FAILED + */ +public class PigJob implements Serializable, PersonalResource { + + public enum Status { + UNKNOWN, + SUBMITTING, SUBMITTED, RUNNING, // in progress + SUBMIT_FAILED, COMPLETED, FAILED, KILLED // finished + } + + public boolean isInProgress() { + return status == Status.SUBMITTED || status == Status.SUBMITTING || + status == Status.RUNNING; + } + + public static final int RUN_STATE_RUNNING = 1; + public static final int RUN_STATE_SUCCEEDED = 2; + public static final int RUN_STATE_FAILED = 3; + public static final int RUN_STATE_PREP = 4; + public static final int RUN_STATE_KILLED = 5; + + public PigJob() { + } + + public PigJob(Map stringObjectMap) throws InvocationTargetException, IllegalAccessException { + BeanUtils.populate(this, stringObjectMap); + } + + String id = null; + String scriptId = null; + + // cloned script data + String pigScript = null; + String pythonScript = null; + String title = null; + String templetonArguments = null; + String owner; + + // job info + String forcedContent = null; + + /** + * jobType possible values: + * null - regular execute + * "explain" + * "syntax_check" + */ + String jobType = null; + + /** + * Additional file to use in Explain job + */ + String sourceFile = null; + String sourceFileContent = null; + + String statusDir; + Long dateStarted = 0L; + String jobId = null; + + // status fields (not reliable) + Status status = Status.UNKNOWN; + Integer percentComplete = null; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof PigJob)) return false; + + PigJob pigScript = (PigJob) o; + + if (!id.equals(pigScript.id)) return false; + + return true; + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getOwner() { + return owner; + } + + @Override + public void setOwner(String owner) { + this.owner = owner; + } + + public Status getStatus() { + return status; + } + + public void setStatus(Status status) { + this.status = status; + } + + public String getScriptId() { + return scriptId; + } + + public void setScriptId(String scriptId) { + this.scriptId = scriptId; + } + + public String getTempletonArguments() { + return templetonArguments; + } + + public void setTempletonArguments(String templetonArguments) { + this.templetonArguments = templetonArguments; + } + + public String getPigScript() { + return pigScript; + } + + public void setPigScript(String pigScript) { + this.pigScript = pigScript; + } + + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public void setStatusDir(String statusDir) { + this.statusDir = statusDir; + } + + public String getStatusDir() { + return statusDir; + } + + public Long getDateStarted() { + return dateStarted; + } + + public void setDateStarted(Long dateStarted) { + this.dateStarted = dateStarted; + } + + public Integer getPercentComplete() { + return percentComplete; + } + + public void setPercentComplete(Integer percentComplete) { + this.percentComplete = percentComplete; + } + + public String getPythonScript() { + return pythonScript; + } + + public void setPythonScript(String pythonScript) { + this.pythonScript = pythonScript; + } + + public String getForcedContent() { + return forcedContent; + } + + public void setForcedContent(String forcedContent) { + this.forcedContent = forcedContent; + } + + public String getJobType() { + return jobType; + } + + public void setJobType(String jobType) { + this.jobType = jobType; + } + + public String getSourceFileContent() { + return sourceFileContent; + } + + public void setSourceFileContent(String sourceFileContent) { + this.sourceFileContent = sourceFileContent; + } + + public String getSourceFile() { + return sourceFile; + } + + public void setSourceFile(String sourceFile) { + this.sourceFile = sourceFile; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/utils/JobPolling.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/utils/JobPolling.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/utils/JobPolling.java new file mode 100644 index 0000000..31eabae --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/utils/JobPolling.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.resources.jobs.utils; + +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.pig.persistence.utils.ItemNotFound; +import org.apache.ambari.view.pig.resources.jobs.JobResourceManager; +import org.apache.ambari.view.pig.resources.jobs.models.PigJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Observable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Polling manager + * Makes scheduled repeated polling of templeton to + * be aware of happen events like job finished, + * killed, changed progress and so on. + */ +public class JobPolling implements Runnable { + private final static Logger LOG = + LoggerFactory.getLogger(JobPolling.class); + + /** + * We should limit count of concurrent calls to templeton + * to avoid high load on component + */ + private static final int WORKER_COUNT = 2; + + private static final int POLLING_DELAY = 10*60; // 10 minutes + + /** + * In LONG_JOB_THRESHOLD seconds job reschedules polling from POLLING_DELAY to LONG_POLLING_DELAY + */ + private static final int LONG_POLLING_DELAY = 60*60; // 1 hour + private static final int LONG_JOB_THRESHOLD = 60*60; // 1 hour + + private static final ScheduledExecutorService pollWorkersPool = Executors.newScheduledThreadPool(WORKER_COUNT); + + private static final Map jobPollers = new HashMap(); + + private JobResourceManager resourceManager = null; + private final ViewContext context; + private PigJob job; + private volatile ScheduledFuture thisFuture; + + private JobPolling(ViewContext context, PigJob job) { + this.context = context; + this.job = job; + } + + protected synchronized JobResourceManager getResourceManager() { + if (resourceManager == null) { + resourceManager = new JobResourceManager(context); + } + return resourceManager; + } + + public void run() { + LOG.debug("Polling job status " + job.getJobId() + " #" + job.getId()); + try { + job = getResourceManager().read(job.getId()); + } catch (ItemNotFound itemNotFound) { + LOG.error("Job " + job.getJobId() + " does not exist! Polling canceled"); + thisFuture.cancel(false); + return; + } + getResourceManager().retrieveJobStatus(job); + + Long time = System.currentTimeMillis() / 1000L; + if (time - job.getDateStarted() > LONG_JOB_THRESHOLD) { + LOG.debug("Job becomes long.. Rescheduling polling to longer period"); + // If job running longer than LONG_JOB_THRESHOLD, reschedule + // it to poll every LONG_POLLING_DELAY instead of POLLING_DELAY + thisFuture.cancel(false); + scheduleJobPolling(true); + } + + switch (job.getStatus()) { + case SUBMIT_FAILED: + case COMPLETED: + case FAILED: + case KILLED: + LOG.debug("Job finished. Polling canceled"); + thisFuture.cancel(false); + break; + default: + } + } + + private void scheduleJobPolling(boolean longDelay) { + if (!longDelay) { + thisFuture = pollWorkersPool.scheduleWithFixedDelay(this, + POLLING_DELAY, POLLING_DELAY, TimeUnit.SECONDS); + } else { + thisFuture = pollWorkersPool.scheduleWithFixedDelay(this, + LONG_POLLING_DELAY, LONG_POLLING_DELAY, TimeUnit.SECONDS); + } + } + + private void scheduleJobPolling() { + scheduleJobPolling(false); + } + + /** + * Schedule job polling + * @param context ViewContext of web app + * @param job job instance + * @return returns false if already scheduled + */ + public static boolean pollJob(ViewContext context, PigJob job) { + if (jobPollers.get(job.getJobId()) == null) { + LOG.debug("Setting up polling for " + job.getJobId()); + JobPolling polling = new JobPolling(context, job); + polling.scheduleJobPolling(); + jobPollers.put(job.getJobId(), polling); + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceManager.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceManager.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceManager.java new file mode 100644 index 0000000..9714d27 --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceManager.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.resources.scripts; + +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.pig.persistence.utils.ItemNotFound; +import org.apache.ambari.view.pig.resources.PersonalCRUDResourceManager; +import org.apache.ambari.view.pig.resources.scripts.models.PigScript; +import org.apache.ambari.view.pig.services.BaseService; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.xml.ws.WebServiceException; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; + +public class ScriptResourceManager extends PersonalCRUDResourceManager { + private final static Logger LOG = + LoggerFactory.getLogger(ScriptResourceManager.class); + + public ScriptResourceManager(ViewContext context) { + super(PigScript.class, context); + } + + @Override + public PigScript create(PigScript object) { + super.create(object); + if (object.getPigScript() == null || object.getPigScript().isEmpty()) { + createDefaultScriptFile(object); + } + return object; + } + + private void createDefaultScriptFile(PigScript object) { + String userScriptsPath = context.getProperties().get("dataworker.userScriptsPath"); + if (userScriptsPath == null) { + String msg = "dataworker.userScriptsPath is not configured!"; + LOG.error(msg); + throw new WebServiceException(msg); + } + int checkId = 0; + + boolean fileCreated; + String newFilePath; + do { + String normalizedName = object.getTitle().replaceAll("[^a-zA-Z0-9 ]+", "").replaceAll(" ", "_").toLowerCase(); + String timestamp = new SimpleDateFormat("yyyy-MM-dd_hh-mm").format(new Date()); + newFilePath = String.format(userScriptsPath + + "/%s/%s-%s%s.pig", context.getUsername(), + normalizedName, timestamp, (checkId == 0)?"":"_"+checkId); + LOG.debug("Trying to create new file " + newFilePath); + + try { + FSDataOutputStream stream = BaseService.getHdfsApi(context).create(newFilePath, false); + stream.close(); + fileCreated = true; + LOG.debug("File created successfully!"); + } catch (FileAlreadyExistsException e) { + fileCreated = false; + LOG.debug("File already exists. Trying next id"); + } catch (IOException e) { + try { + delete(object.getId()); + } catch (ItemNotFound itemNotFound) { + throw new WebServiceException("Error in creation, during clean up: " + itemNotFound.toString(), itemNotFound); + } + throw new WebServiceException("Error in creation: " + e.toString(), e); + } catch (InterruptedException e) { + try { + delete(object.getId()); + } catch (ItemNotFound itemNotFound) { + throw new WebServiceException("Error in creation, during clean up: " + itemNotFound.toString(), itemNotFound); + } + throw new WebServiceException("Error in creation: " + e.toString(), e); + } + checkId += 1; + } while (!fileCreated); + + object.setPigScript(newFilePath); + getPigStorage().store(object); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceProvider.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceProvider.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceProvider.java new file mode 100644 index 0000000..478a460 --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceProvider.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.resources.scripts; + +import com.google.inject.Inject; +import org.apache.ambari.view.*; +import org.apache.ambari.view.pig.persistence.utils.ItemNotFound; +import org.apache.ambari.view.pig.persistence.utils.OnlyOwnersFilteringStrategy; +import org.apache.ambari.view.pig.resources.PersonalCRUDResourceManager; +import org.apache.ambari.view.pig.resources.scripts.models.PigScript; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.util.*; + +public class ScriptResourceProvider implements ResourceProvider { + @Inject + ViewContext context; + + protected ScriptResourceManager resourceManager = null; + protected final static Logger LOG = + LoggerFactory.getLogger(ScriptResourceProvider.class); + + protected synchronized PersonalCRUDResourceManager getResourceManager() { + if (resourceManager == null) { + resourceManager = new ScriptResourceManager(context); + } + return resourceManager; + } + + @Override + public PigScript getResource(String resourceId, Set properties) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { + try { + return getResourceManager().read(resourceId); + } catch (ItemNotFound itemNotFound) { + throw new NoSuchResourceException(resourceId); + } + } + + @Override + public Set getResources(ReadRequest readRequest) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { + return new HashSet(getResourceManager().readAll( + new OnlyOwnersFilteringStrategy(this.context.getUsername()))); + } + + @Override + public void createResource(String s, Map stringObjectMap) throws SystemException, ResourceAlreadyExistsException, NoSuchResourceException, UnsupportedPropertyException { + PigScript script = null; + try { + script = new PigScript(stringObjectMap); + } catch (InvocationTargetException e) { + throw new SystemException("error on creating resource", e); + } catch (IllegalAccessException e) { + throw new SystemException("error on creating resource", e); + } + getResourceManager().create(script); + } + + @Override + public boolean updateResource(String resourceId, Map stringObjectMap) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { + PigScript script = null; + try { + script = new PigScript(stringObjectMap); + } catch (InvocationTargetException e) { + throw new SystemException("error on updating resource", e); + } catch (IllegalAccessException e) { + throw new SystemException("error on updating resource", e); + } + try { + getResourceManager().update(script, resourceId); + } catch (ItemNotFound itemNotFound) { + throw new NoSuchResourceException(resourceId); + } + return true; + } + + @Override + public boolean deleteResource(String resourceId) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { + try { + getResourceManager().delete(resourceId); + } catch (ItemNotFound itemNotFound) { + throw new NoSuchResourceException(resourceId); + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptService.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptService.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptService.java new file mode 100644 index 0000000..c07f985 --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptService.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.resources.scripts; + +import com.google.inject.Inject; +import org.apache.ambari.view.ViewResourceHandler; +import org.apache.ambari.view.pig.persistence.utils.OnlyOwnersFilteringStrategy; +import org.apache.ambari.view.pig.persistence.utils.ItemNotFound; +import org.apache.ambari.view.pig.resources.PersonalCRUDResourceManager; +import org.apache.ambari.view.pig.resources.scripts.models.PigScript; +import org.apache.ambari.view.pig.services.BaseService; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.*; +import javax.ws.rs.core.*; +import java.util.List; + +/** + * Servlet for scripts + * API: + * GET /:id + * read script + * POST / + * create new script + * Required: title, pigScript + * GET / + * get all scripts of current user + */ +public class ScriptService extends BaseService { + @Inject + ViewResourceHandler handler; + + protected ScriptResourceManager resourceManager = null; + protected final static Logger LOG = + LoggerFactory.getLogger(ScriptService.class); + + protected synchronized PersonalCRUDResourceManager getResourceManager() { + if (resourceManager == null) { + resourceManager = new ScriptResourceManager(context); + } + return resourceManager; + } + + /** + * Get single item + */ + @GET + @Path("{scriptId}") + @Produces(MediaType.APPLICATION_JSON) + public Response getScript(@PathParam("scriptId") String scriptId) { + PigScript script = null; + try { + script = getResourceManager().read(scriptId); + } catch (ItemNotFound itemNotFound) { + return Response.status(404).build(); + } + JSONObject object = new JSONObject(); + object.put("script", script); + return Response.ok(object).build(); + } + + /** + * Delete single item + */ + @DELETE + @Path("{scriptId}") + public Response deleteScript(@PathParam("scriptId") String scriptId) { + try { + getResourceManager().delete(scriptId); + } catch (ItemNotFound itemNotFound) { + return Response.status(404).build(); + } + return Response.status(204).build(); + } + + /** + * Get all scripts + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getScriptList() { + LOG.debug("Getting all scripts"); + List allScripts = getResourceManager().readAll( + new OnlyOwnersFilteringStrategy(this.context.getUsername())); + + JSONObject object = new JSONObject(); + object.put("scripts", allScripts); + return Response.ok(object).build(); + } + + /** + * Update item + */ + @PUT + @Path("{scriptId}") + @Consumes(MediaType.APPLICATION_JSON) + public Response updateScript(PigScriptRequest request, + @PathParam("scriptId") String scriptId) { + try { + getResourceManager().update(request.script, scriptId); + } catch (ItemNotFound itemNotFound) { + return Response.status(404).build(); + } + return Response.status(204).build(); + } + + /** + * Create script + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + public Response saveScript(PigScriptRequest request, @Context HttpServletResponse response, + @Context UriInfo ui) { + getResourceManager().create(request.script); + + PigScript script = null; + + try { + script = getResourceManager().read(request.script.getId()); + } catch (ItemNotFound itemNotFound) { + return Response.status(404).build(); + } + + response.setHeader("Location", + String.format("%s/%s", ui.getAbsolutePath().toString(), request.script.getId())); + + JSONObject object = new JSONObject(); + object.put("script", script); + return Response.ok(object).status(201).build(); + } + + public static class PigScriptRequest { + public PigScript script; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/models/PigScript.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/models/PigScript.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/models/PigScript.java new file mode 100644 index 0000000..1c69adb --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/models/PigScript.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.resources.scripts.models; + +import org.apache.ambari.view.pig.persistence.utils.PersonalResource; +import org.apache.commons.beanutils.BeanUtils; + +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.util.Date; +import java.util.Map; + +/** + * Bean to represent script + */ +public class PigScript implements Serializable, PersonalResource { + String id; + + String title = ""; + String pigScript = ""; + String pythonScript = ""; + String templetonArguments = ""; + Date dateCreated; + String owner = ""; + + boolean opened = false; + + public PigScript() { + } + + public PigScript(Map stringObjectMap) throws InvocationTargetException, IllegalAccessException { + BeanUtils.populate(this, stringObjectMap); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof PigScript)) return false; + + PigScript pigScript = (PigScript) o; + + if (!id.equals(pigScript.id)) return false; + + return true; + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getPigScript() { + return pigScript; + } + + public void setPigScript(String pigScript) { + this.pigScript = pigScript; + } + + public String getTempletonArguments() { + return templetonArguments; + } + + public void setTempletonArguments(String templetonArguments) { + this.templetonArguments = templetonArguments; + } + + public Date getDateCreated() { + return dateCreated; + } + + public void setDateCreated(Date dateCreated) { + this.dateCreated = dateCreated; + } + + public boolean isOpened() { + return opened; + } + + public void setOpened(boolean opened) { + this.opened = opened; + } + + public String getOwner() { + return owner; + } + + public void setOwner(String owner) { + this.owner = owner; + } + + public String getPythonScript() { + return pythonScript; + } + + public void setPythonScript(String pythonScript) { + this.pythonScript = pythonScript; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceManager.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceManager.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceManager.java new file mode 100644 index 0000000..62e389e --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceManager.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.resources.udf; + +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.pig.resources.PersonalCRUDResourceManager; +import org.apache.ambari.view.pig.resources.udf.models.UDF; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UDFResourceManager extends PersonalCRUDResourceManager { + private final static Logger LOG = + LoggerFactory.getLogger(UDFResourceManager.class); + + public UDFResourceManager(ViewContext context) { + super(UDF.class, context); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceProvider.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceProvider.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceProvider.java new file mode 100644 index 0000000..3069ddd --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceProvider.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.resources.udf; + +import com.google.inject.Inject; +import org.apache.ambari.view.*; +import org.apache.ambari.view.pig.persistence.utils.ItemNotFound; +import org.apache.ambari.view.pig.persistence.utils.OnlyOwnersFilteringStrategy; +import org.apache.ambari.view.pig.resources.PersonalCRUDResourceManager; +import org.apache.ambari.view.pig.resources.udf.models.UDF; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class UDFResourceProvider implements ResourceProvider { + @Inject + ViewContext context; + + protected UDFResourceManager resourceManager = null; + protected final static Logger LOG = + LoggerFactory.getLogger(UDFResourceProvider.class); + + protected synchronized PersonalCRUDResourceManager getResourceManager() { + if (resourceManager == null) { + resourceManager = new UDFResourceManager(context); + } + return resourceManager; + } + + @Override + public UDF getResource(String resourceId, Set properties) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { + try { + return getResourceManager().read(resourceId); + } catch (ItemNotFound itemNotFound) { + throw new NoSuchResourceException(resourceId); + } + } + + @Override + public Set getResources(ReadRequest readRequest) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { + return new HashSet(getResourceManager().readAll( + new OnlyOwnersFilteringStrategy(this.context.getUsername()))); + } + + @Override + public void createResource(String s, Map stringObjectMap) throws SystemException, ResourceAlreadyExistsException, NoSuchResourceException, UnsupportedPropertyException { + UDF udf = null; + try { + udf = new UDF(stringObjectMap); + } catch (InvocationTargetException e) { + throw new SystemException("error on creating resource", e); + } catch (IllegalAccessException e) { + throw new SystemException("error on creating resource", e); + } + getResourceManager().create(udf); + } + + @Override + public boolean updateResource(String resourceId, Map stringObjectMap) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { + UDF udf = null; + try { + udf = new UDF(stringObjectMap); + } catch (InvocationTargetException e) { + throw new SystemException("error on updating resource", e); + } catch (IllegalAccessException e) { + throw new SystemException("error on updating resource", e); + } + try { + getResourceManager().update(udf, resourceId); + } catch (ItemNotFound itemNotFound) { + throw new NoSuchResourceException(resourceId); + } + return true; + } + + @Override + public boolean deleteResource(String resourceId) throws SystemException, NoSuchResourceException, UnsupportedPropertyException { + try { + getResourceManager().delete(resourceId); + } catch (ItemNotFound itemNotFound) { + throw new NoSuchResourceException(resourceId); + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFService.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFService.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFService.java new file mode 100644 index 0000000..d8b24bc --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFService.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.resources.udf; + +import com.google.inject.Inject; +import org.apache.ambari.view.ViewResourceHandler; +import org.apache.ambari.view.pig.persistence.utils.ItemNotFound; +import org.apache.ambari.view.pig.persistence.utils.OnlyOwnersFilteringStrategy; +import org.apache.ambari.view.pig.resources.PersonalCRUDResourceManager; +import org.apache.ambari.view.pig.resources.udf.models.UDF; +import org.apache.ambari.view.pig.services.BaseService; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.*; +import javax.ws.rs.core.*; +import java.util.List; + +/** + * Servlet for UDFs + * API: + * GET / + * get all UDFs + * GET /:id + * get one UDF + * PUT /:id + * update UDF + * POST / + * create new UDF + * Required: path, name + */ +public class UDFService extends BaseService { + @Inject + ViewResourceHandler handler; + + protected UDFResourceManager resourceManager = null; + protected final static Logger LOG = + LoggerFactory.getLogger(UDFService.class); + + protected synchronized PersonalCRUDResourceManager getResourceManager() { + if (resourceManager == null) { + resourceManager = new UDFResourceManager(context); + } + return resourceManager; + } + + /** + * Get single item + */ + @GET + @Path("{udfId}") + @Produces(MediaType.APPLICATION_JSON) + public Response getUDF(@PathParam("udfId") String udfId) { + UDF udf = null; + try { + udf = getResourceManager().read(udfId); + } catch (ItemNotFound itemNotFound) { + return Response.status(404).build(); + } + JSONObject object = new JSONObject(); + object.put("udf", udf); + return Response.ok(object).build(); + } + + /** + * Delete single item + */ + @DELETE + @Path("{udfId}") + public Response deleteUDF(@PathParam("udfId") String udfId) { + try { + getResourceManager().delete(udfId); + } catch (ItemNotFound itemNotFound) { + return Response.status(404).build(); + } + return Response.status(204).build(); + } + + /** + * Get all UDFs + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getUDFList(@Context UriInfo ui) { + LOG.debug("Getting all UDFs"); + List allUDFs = getResourceManager().readAll( + new OnlyOwnersFilteringStrategy(this.context.getUsername())); + + JSONObject object = new JSONObject(); + object.put("udfs", allUDFs); + return Response.ok(object).build(); + } + + /** + * Update item + */ + @PUT + @Path("{udfId}") + @Consumes(MediaType.APPLICATION_JSON) + public Response updateUDF(UDFRequest request, + @PathParam("udfId") String udfId) { + try { + getResourceManager().update(request.udf, udfId); + } catch (ItemNotFound itemNotFound) { + return Response.status(404).build(); + } + return Response.status(204).build(); + } + + /** + * Create UDF + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + public Response createUDF(UDFRequest request, @Context HttpServletResponse response, + @Context UriInfo ui) { + getResourceManager().create(request.udf); + + UDF udf = null; + + try { + udf = getResourceManager().read(request.udf.getId()); + } catch (ItemNotFound itemNotFound) { + return Response.status(404).build(); + } + + response.setHeader("Location", + String.format("%s/%s", ui.getAbsolutePath().toString(), request.udf.getId())); + + JSONObject object = new JSONObject(); + object.put("udf", udf); + return Response.ok(object).status(201).build(); + } + + public static class UDFRequest { + public UDF udf; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/models/UDF.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/models/UDF.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/models/UDF.java new file mode 100644 index 0000000..0a18329 --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/models/UDF.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.resources.udf.models; + +import org.apache.ambari.view.pig.persistence.utils.PersonalResource; +import org.apache.commons.beanutils.BeanUtils; + +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.util.Map; + +/** + * Bean to represent User Defined Functions + */ +public class UDF implements Serializable, PersonalResource { + String id; + String path; + String name; + String owner; + + public UDF() { + } + + public UDF(Map stringObjectMap) throws InvocationTargetException, IllegalAccessException { + BeanUtils.populate(this, stringObjectMap); + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getOwner() { + return owner; + } + + @Override + public void setOwner(String owner) { + this.owner = owner; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/BaseService.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/BaseService.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/BaseService.java new file mode 100644 index 0000000..b37c518 --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/BaseService.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.services; + +import com.google.inject.Inject; +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.ViewResourceHandler; +import org.apache.ambari.view.pig.persistence.Storage; +import org.apache.ambari.view.pig.utils.HdfsApi; +import org.apache.ambari.view.pig.persistence.utils.StorageUtil; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.xml.ws.WebServiceException; +import java.io.IOException; +import java.util.HashMap; + + +public class BaseService { + @Inject + protected ViewContext context; + + protected final static Logger LOG = + LoggerFactory.getLogger(BaseService.class); + + private Storage storage; + + public Storage getStorage() { + if (this.storage == null) { + storage = StorageUtil.getStorage(context); + } + return storage; + } + + public void setStorage(Storage storage) { + this.storage = storage; + } + + private static HdfsApi hdfsApi = null; + + public static HdfsApi getHdfsApi(ViewContext context) { + if (hdfsApi == null) { + Thread.currentThread().setContextClassLoader(null); + + String userName = context.getUsername(); + + String defaultFS = context.getProperties().get("dataworker.defaultFs"); + if (defaultFS == null) { + String message = "dataworker.defaultFs is not configured!"; + LOG.error(message); + throw new WebServiceException(message); + } + + try { + hdfsApi = new HdfsApi(defaultFS, userName); + LOG.info("HdfsApi connected OK"); + } catch (IOException e) { + String message = "HdfsApi IO error: " + e.getMessage(); + LOG.error(message); + throw new WebServiceException(message, e); + } catch (InterruptedException e) { + String message = "HdfsApi Interrupted error: " + e.getMessage(); + LOG.error(message); + throw new WebServiceException(message, e); + } + } + return hdfsApi; + } + + public HdfsApi getHdfsApi() { + return getHdfsApi(context); + } + + public static HdfsApi setHdfsApi(HdfsApi api) { + return hdfsApi = api; + } + + public static Response badRequestResponse(String message) { + HashMap response = new HashMap(); + response.put("message", message); + response.put("status", 400); + return Response.status(400).entity(new JSONObject(response)).type(MediaType.APPLICATION_JSON).build(); + } + + public static Response serverErrorResponse(String message) { + HashMap response = new HashMap(); + response.put("message", message); + response.put("status", 500); + return Response.status(500).entity(new JSONObject(response)).type(MediaType.APPLICATION_JSON).build(); + } + + public static Response notFoundResponse(String message) { + HashMap response = new HashMap(); + response.put("message", message); + response.put("status", 404); + return Response.status(404).entity(new JSONObject(response)).type(MediaType.APPLICATION_JSON).build(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/HelpService.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/HelpService.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/HelpService.java new file mode 100644 index 0000000..c5f1721 --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/HelpService.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.services; + +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.ViewResourceHandler; +import org.json.simple.JSONObject; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.*; + +public class HelpService extends BaseService { + private ViewContext context; + private ViewResourceHandler handler; + + public HelpService(ViewContext context, ViewResourceHandler handler) { + super(); + this.context = context; + this.handler = handler; + } + + @GET + @Path("/config") + @Produces(MediaType.APPLICATION_JSON) + public Response config(){ + JSONObject object = new JSONObject(); + String fs = context.getProperties().get("dataworker.defaultFs"); + object.put("dataworker.defaultFs", fs); + return Response.ok(object).build(); + } + + @GET + @Path("/version") + @Produces(MediaType.TEXT_PLAIN) + public Response version(){ + return Response.ok("0.0.1-SNAPSHOT").build(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/Request.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/Request.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/Request.java new file mode 100644 index 0000000..de9142f --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/Request.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.templeton.client; + +import com.google.gson.Gson; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.ambari.view.URLStreamProvider; +import org.apache.ambari.view.ViewContext; +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.UriBuilder; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +/** + * Request handler, supports GET, POST, PUT, DELETE methods + * @param data type to deserialize response from JSON + */ +public class Request { + protected final Class responseClass; + protected final ViewContext context; + protected final WebResource resource; + + protected final Gson gson = new Gson(); + + protected final static Logger LOG = + LoggerFactory.getLogger(Request.class); + + public Request(WebResource resource, Class responseClass, ViewContext context) { + this.resource = resource; + this.responseClass = responseClass; + this.context = context; + } + + /** + * Main implementation of GET request + * @param resource resource + * @return unmarshalled response data + */ + public RESPONSE get(WebResource resource) throws IOException { + LOG.debug("GET " + resource.toString()); + + InputStream inputStream = context.getURLStreamProvider().readFrom(resource.toString(), "GET", + null, new HashMap()); + + String responseJson = IOUtils.toString(inputStream); + LOG.debug(String.format("RESPONSE => %s", responseJson)); + return gson.fromJson(responseJson, responseClass); + } + + public RESPONSE get() throws IOException { + return get(this.resource); + } + + public RESPONSE get(MultivaluedMapImpl params) throws IOException { + return get(this.resource.queryParams(params)); + } + + /** + * Main implementation of POST request + * @param resource resource + * @param data post body + * @return unmarshalled response data + */ + public RESPONSE post(WebResource resource, MultivaluedMapImpl data) throws IOException { + LOG.debug("POST " + resource.toString()); + LOG.debug("data: " + data.toString()); + + UriBuilder builder = UriBuilder.fromPath("host/"); + for(String key : data.keySet()) { + for(String value : data.get(key)) + builder.queryParam(key, value); + } + + if (data != null) + LOG.debug("... data: " + builder.build().getRawQuery()); + + Map headers = new HashMap(); + headers.put("Content-Type", "application/x-www-form-urlencoded"); + + InputStream inputStream = context.getURLStreamProvider().readFrom(resource.toString(), + "POST", builder.build().getRawQuery(), headers); + String responseJson = IOUtils.toString(inputStream); + + LOG.debug(String.format("RESPONSE => %s", responseJson)); + return gson.fromJson(responseJson, responseClass); + } + + public RESPONSE post(MultivaluedMapImpl data) throws IOException { + return post(resource, data); + } + + public RESPONSE post() throws IOException { + return post(resource, new MultivaluedMapImpl()); + } + + public RESPONSE post(MultivaluedMapImpl params, MultivaluedMapImpl data) throws IOException { + return post(resource.queryParams(params), data); + } + + public static void main(String[] args) { + UriBuilder builder = UriBuilder.fromPath("host/"); + builder.queryParam("aa", "/tmp/.pigjobs/hue/test111_17-03-2014-16-50-37"); + System.out.println(builder.build().getRawQuery()); + } + + /** + * Main implementation of PUT request + * @param resource resource + * @param data put body + * @return unmarshalled response data + */ + public RESPONSE put(WebResource resource, MultivaluedMapImpl data) throws IOException { + LOG.debug("PUT " + resource.toString()); + + UriBuilder builder = UriBuilder.fromPath("host/"); + for(String key : data.keySet()) { + for(String value : data.get(key)) + builder.queryParam(key, value); + } + + if (data != null) + LOG.debug("... data: " + builder.build().getRawQuery()); + + Map headers = new HashMap(); + headers.put("Content-Type", "application/x-www-form-urlencoded"); + + InputStream inputStream = context.getURLStreamProvider().readFrom(resource.toString(), + "PUT", builder.build().getRawQuery(), headers); + String responseJson = IOUtils.toString(inputStream); + + LOG.debug(String.format("RESPONSE => %s", responseJson)); + return gson.fromJson(responseJson, responseClass); + } + + public RESPONSE put(MultivaluedMapImpl data) throws IOException { + return put(resource, data); + } + + public RESPONSE put() throws IOException { + return put(resource, new MultivaluedMapImpl()); + } + + public RESPONSE put(MultivaluedMapImpl params, MultivaluedMapImpl data) throws IOException { + return put(resource.queryParams(params), data); + } + + /** + * Main implementation of DELETE request + * @param resource resource + * @param data delete body + * @return unmarshalled response data + */ + public RESPONSE delete(WebResource resource, MultivaluedMapImpl data) throws IOException { + LOG.debug("DELETE " + resource.toString()); + + UriBuilder builder = UriBuilder.fromPath("host/"); + for(String key : data.keySet()) { + for(String value : data.get(key)) + builder.queryParam(key, value); + } + + if (data != null) + LOG.debug("... data: " + builder.build().getRawQuery()); + + Map headers = new HashMap(); + headers.put("Content-Type", "application/x-www-form-urlencoded"); + + InputStream inputStream = context.getURLStreamProvider().readFrom(resource.toString(), + "DELETE", builder.build().getRawQuery(), headers); + String responseJson = IOUtils.toString(inputStream); + + LOG.debug(String.format("RESPONSE => %s", responseJson)); + return gson.fromJson(responseJson, responseClass); + } + + public RESPONSE delete(MultivaluedMapImpl data) throws IOException { + return delete(resource, data); + } + + public RESPONSE delete() throws IOException { + return delete(resource, new MultivaluedMapImpl()); + } + + public RESPONSE delete(MultivaluedMapImpl params, MultivaluedMapImpl data) throws IOException { + return delete(resource.queryParams(params), data); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonApi.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonApi.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonApi.java new file mode 100644 index 0000000..9675a1e --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonApi.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.templeton.client; + +import com.google.gson.Gson; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.ambari.view.ViewContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MediaType; +import java.io.File; +import java.io.IOException; +import java.util.Map; + +//TODO: extract to separate JAR outside ambari-views scope +/** + * Templeton Business Delegate + */ +public class TempletonApi { + private final Gson gson = new Gson(); + + protected final static Logger LOG = + LoggerFactory.getLogger(TempletonApi.class); + + protected WebResource service; + private String username; + private String doAs; + private ViewContext context; + + /** + * TempletonApi constructor + * @param api dataworker.templeton_url + * @param username templeton username + * @param doAs doAs argument + * @param context context with URLStreamProvider + */ + public TempletonApi(String api, String username, String doAs, ViewContext context) { + this.username = username; + this.doAs = doAs; + this.context = context; + ClientConfig config = new DefaultClientConfig(); + config.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE); + Client client = Client.create(config); + this.service = client.resource(api); + } + + public TempletonApi(String api, String username, ViewContext context) { + this(api, username, username, context); + } + + /** + * Create and queue a Pig job. + * @param execute String containing an entire, short pig program to run. (e.g. pwd) + * @param pigFile HDFS file name of a pig program to run. (One of either "execute" or "file" is required ) + * @param statusDir A directory where Templeton will write the status of the Pig job. If + * provided, it is the caller's responsibility to remove this directory when done. + * @param arg Set a program argument. Optional None + * @return id A string containing the job ID similar to "job_201110132141_0001". + * info A JSON object containing the information returned when the job was queued. + */ + public JobData runPigQuery(String execute, File pigFile, String statusDir, String arg) throws IOException { + MultivaluedMapImpl data = new MultivaluedMapImpl(); + if (execute != null) + data.add("execute", execute); + if (pigFile != null) + data.add("file", pigFile.toString()); + if (statusDir != null) + data.add("statusdir", statusDir); + if (arg != null && !arg.isEmpty()) { + for(String arg1 : arg.split("\t")) { + data.add("arg", arg1); + } + } + + TempletonRequest request = + new TempletonRequest(service.path("pig"), JobData.class, username, doAs, context); + + return request.post(data); + } + + public JobData runPigQuery(File pigFile, String statusDir, String arg) throws IOException { + return runPigQuery(null, pigFile, statusDir, arg); + } + + public JobData runPigQuery(String execute, String statusDir, String arg) throws IOException { + return runPigQuery(execute, null, statusDir, arg); + } + + public JobData runPigQuery(String execute) throws IOException { + return runPigQuery(execute, null, null, null); + } + + public JobInfo checkJob(String jobId) throws IOException { + TempletonRequest request = + new TempletonRequest(service.path("jobs").path(jobId), JobInfo.class, username, context); + + return request.get(); + } + + public void killJob(String jobId) throws IOException { + TempletonRequest request = + new TempletonRequest(service.path("jobs").path(jobId), JobInfo.class, username, context); + + try { + request.delete(); + } catch (IOException e) { + //TODO: remove this after HIVE-5835 resolved + LOG.debug("Ignoring 500 response from webhcat (see HIVE-5835)"); + } + } + + public Status status() throws IOException { + TempletonRequest request = + new TempletonRequest(service.path("status"), Status.class, + username, doAs, context); + return request.get(); + } + + public class Status { + public String status; + public String version; + } + + public class JobData { + public String id; + } + + public class JobInfo { + public Map status; + public Map profile; + public Map userargs; + + public String id; + public String parentId; + public String percentComplete; + public Integer exitValue; + public String user; + public String callback; + public String completed; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonRequest.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonRequest.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonRequest.java new file mode 100644 index 0000000..38ec211 --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonRequest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.templeton.client; + +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.ambari.view.ViewContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Request handler that adds user.name and doAs + * GET parameters to every request + * @param data type to deserialize response from JSON + */ +public class TempletonRequest extends Request { + private String username; + private String doAs; + + protected final static Logger LOG = + LoggerFactory.getLogger(TempletonRequest.class); + + public TempletonRequest(WebResource resource, Class responseClass, + String username, ViewContext context) { + this(resource, responseClass, username, username, context); + } + + public TempletonRequest(WebResource resource, Class responseClass, + String username, String doAs, ViewContext context) { + super(resource, responseClass, context); + this.username = username; + this.doAs = doAs; + } + + public RESPONSE get(WebResource resource) throws IOException { + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("user.name", username); + params.add("doAs", doAs); + return super.get(resource.queryParams(params)); + } + + public RESPONSE put(WebResource resource, MultivaluedMapImpl data) throws IOException { + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("user.name", username); + params.add("doAs", doAs); + return super.put(resource.queryParams(params), data); + } + + public RESPONSE delete(WebResource resource, MultivaluedMapImpl data) throws IOException { + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("user.name", username); + params.add("doAs", doAs); + return super.delete(resource.queryParams(params), data); + } + + public RESPONSE post(WebResource resource, MultivaluedMapImpl data) throws IOException { + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("user.name", username); + params.add("doAs", doAs); + return super.post(resource.queryParams(params), data); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/utils/FilePaginator.java ---------------------------------------------------------------------- diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/utils/FilePaginator.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/utils/FilePaginator.java new file mode 100644 index 0000000..9312204 --- /dev/null +++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/utils/FilePaginator.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.pig.utils; + +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.pig.services.BaseService; +import org.apache.hadoop.fs.FSDataInputStream; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Arrays; + +import static java.lang.Math.ceil; + +public class FilePaginator { + private static int PAGE_SIZE = 1*1024*1024; // 1MB + + private String filePath; + private HdfsApi hdfsApi; + + public FilePaginator(String filePath, ViewContext context) { + this.filePath = filePath; + hdfsApi = BaseService.getHdfsApi(context); + } + + public static void setPageSize(int PAGE_SIZE) { + FilePaginator.PAGE_SIZE = PAGE_SIZE; + } + + public long pageCount() throws IOException, InterruptedException { + return (long) + ceil( hdfsApi.getFileStatus(filePath).getLen() / ((double)PAGE_SIZE) ); + } + + public String readPage(long page) throws IOException, InterruptedException { + FSDataInputStream stream = hdfsApi.open(filePath); + try { + stream.seek(page * PAGE_SIZE); + } catch (IOException e) { + throw new IllegalArgumentException("Page " + page + " does not exists"); + } + + byte[] buffer = new byte[PAGE_SIZE]; + int readCount = 0; + int read = 0; + while(read < PAGE_SIZE) { + try { + readCount = stream.read(buffer, read, PAGE_SIZE-read); + } catch (IOException e) { + stream.close(); + throw e; + } + if (readCount == -1) + break; + read += readCount; + } + if (read != 0) { + byte[] readData = Arrays.copyOfRange(buffer, 0, read); + return new String(readData, Charset.forName("UTF-8")); + } else { + if (page == 0) { + return ""; + } + throw new IllegalArgumentException("Page " + page + " does not exists"); + } + } +}