Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 74EDFCB18 for ; Tue, 2 Jul 2013 14:27:33 +0000 (UTC) Received: (qmail 11079 invoked by uid 500); 2 Jul 2013 14:26:56 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 10851 invoked by uid 500); 2 Jul 2013 14:26:56 -0000 Mailing-List: contact commits-help@tajo.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.incubator.apache.org Delivered-To: mailing list commits@tajo.incubator.apache.org Received: (qmail 10717 invoked by uid 99); 2 Jul 2013 14:26:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Jul 2013 14:26:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 02 Jul 2013 14:26:30 +0000 Received: (qmail 70836 invoked by uid 99); 2 Jul 2013 14:16:00 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Jul 2013 14:16:00 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1471D53AE5; Tue, 2 Jul 2013 14:15:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: hyunsik@apache.org To: commits@tajo.incubator.apache.org Date: Tue, 02 Jul 2013 14:16:15 -0000 Message-Id: In-Reply-To: <88bae4b5a7914caa925b4e3f2a583ac8@git.apache.org> References: <88bae4b5a7914caa925b4e3f2a583ac8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/51] [partial] TAJO-22: The package prefix should be org.apache.tajo. (DaeMyung Kang via hyunsik) X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java new file mode 100644 index 0000000..26991a0 --- /dev/null +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.TupleComparator; +import org.apache.tajo.storage.TupleRange; +import org.apache.tajo.storage.index.bst.BSTIndex; +import org.apache.tajo.worker.dataserver.retriever.FileChunk; +import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * + * It retrieves the file chunk ranged between start and end keys. + * The start key is inclusive, but the end key is exclusive. + * + * Internally, there are four cases: + *
    + *
  • out of scope: the index range does not overlapped with the query range.
  • + *
  • overlapped: the index range is partially overlapped with the query range.
  • + *
  • included: the index range is included in the start and end keys
  • + *
  • covered: the index range covers the query range (i.e., start and end keys).
  • + *
+ */ +public class RangeRetrieverHandler implements RetrieverHandler { + private static final Log LOG = LogFactory.getLog(RangeRetrieverHandler.class); + private final File file; + private final BSTIndex.BSTIndexReader idxReader; + private final Schema schema; + private final TupleComparator comp; + + public RangeRetrieverHandler(File outDir, Schema schema, TupleComparator comp) throws IOException { + this.file = outDir; + BSTIndex index = new BSTIndex(new TajoConf()); + this.schema = schema; + this.comp = comp; + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path indexPath = fs.makeQualified(new Path(outDir.getCanonicalPath(), "index")); + this.idxReader = + index.getIndexReader(indexPath, this.schema, this.comp); + this.idxReader.open(); + LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + + idxReader.getLastKey()); + } + + @Override + public FileChunk get(Map> kvs) throws IOException { + // nothing to verify the file because AdvancedDataRetriever checks + // its validity of the file. + File data = new File(this.file, "data/data"); + byte [] startBytes = Base64.decodeBase64(kvs.get("start").get(0)); + Tuple start = RowStoreUtil.RowStoreDecoder.toTuple(schema, startBytes); + byte [] endBytes; + Tuple end; + endBytes = Base64.decodeBase64(kvs.get("end").get(0)); + end = RowStoreUtil.RowStoreDecoder.toTuple(schema, endBytes); + boolean last = kvs.containsKey("final"); + + if(!comp.isAscendingFirstKey()) { + Tuple tmpKey = start; + start = end; + end = tmpKey; + } + + LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end + + (last ? ", last=true" : "") + ")"); + + if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero + LOG.info("There is no contents"); + return null; + } + + if (comp.compare(end, idxReader.getFirstKey()) < 0 || + comp.compare(idxReader.getLastKey(), start) < 0) { + LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + + "], but request start:" + start + ", end: " + end); + return null; + } + + long startOffset; + long endOffset; + try { + startOffset = idxReader.find(start); + } catch (IOException ioe) { + LOG.error("State Dump (the requested range: " + + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + throw ioe; + } + try { + endOffset = idxReader.find(end); + if (endOffset == -1) { + endOffset = idxReader.find(end, true); + } + } catch (IOException ioe) { + LOG.error("State Dump (the requested range: " + + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + throw ioe; + } + + // if startOffset == -1 then case 2-1 or case 3 + if (startOffset == -1) { // this is a hack + // if case 2-1 or case 3 + try { + startOffset = idxReader.find(start, true); + } catch (IOException ioe) { + LOG.error("State Dump (the requested range: " + + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + throw ioe; + } + } + + if (startOffset == -1) { + throw new IllegalStateException("startOffset " + startOffset + " is negative \n" + + "State Dump (the requested range: " + + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + } + + // if greater than indexed values + if (last || (endOffset == -1 && comp.compare(idxReader.getLastKey(), end) < 0)) { + endOffset = data.length(); + } + + FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset); + LOG.info("Retrieve File Chunk: " + chunk); + return chunk; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java new file mode 100644 index 0000000..13cd98a --- /dev/null +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.TaskAttemptContext; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.logical.LogicalNode; +import org.apache.tajo.engine.planner.physical.PhysicalExec; +import org.apache.tajo.exception.InternalException; +import org.apache.tajo.storage.StorageManager; + +import java.io.IOException; + +public class TajoQueryEngine { + private final static Log LOG = LogFactory.getLog(TajoQueryEngine.class); + private final StorageManager storageManager; + private final PhysicalPlanner phyPlanner; + + public TajoQueryEngine(TajoConf conf) throws IOException { + this.storageManager = new StorageManager(conf); + this.phyPlanner = new PhysicalPlannerImpl(conf, storageManager); + } + + public PhysicalExec createPlan(TaskAttemptContext ctx, LogicalNode plan) + throws InternalException { + return phyPlanner.createPlan(ctx, plan); + } + + public void stop() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java new file mode 100644 index 0000000..c8a11c6 --- /dev/null +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java @@ -0,0 +1,640 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.collect.Lists; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.tajo.QueryConf; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TajoProtos.TaskAttemptState; +import org.apache.tajo.TaskAttemptContext; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStat; +import org.apache.tajo.engine.MasterWorkerProtos.*; +import org.apache.tajo.engine.exception.UnfinishedTaskException; +import org.apache.tajo.engine.json.GsonCreator; +import org.apache.tajo.engine.planner.PlannerUtil; +import org.apache.tajo.engine.planner.logical.LogicalNode; +import org.apache.tajo.engine.planner.logical.SortNode; +import org.apache.tajo.engine.planner.logical.StoreTableNode; +import org.apache.tajo.engine.planner.physical.PhysicalExec; +import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService.Interface; +import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest; +import org.apache.tajo.master.ExecutionBlock.PartitionType; +import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.storage.Fragment; +import org.apache.tajo.storage.StorageUtil; +import org.apache.tajo.storage.TupleComparator; +import org.apache.tajo.worker.TaskRunner.WorkerContext; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicBoolean; + +public class Task { + private static final Log LOG = LogFactory.getLog(Task.class); + + private final QueryConf conf; + private final FileSystem localFS; + private final WorkerContext workerContext; + private final Interface masterProxy; + private final LocalDirAllocator lDirAllocator; + private final QueryUnitAttemptId taskId; + + private final Path taskDir; + private final QueryUnitRequest request; + private final TaskAttemptContext context; + private List fetcherRunners; + private final LogicalNode plan; + private PhysicalExec executor; + private boolean interQuery; + private boolean killed = false; + private boolean aborted = false; + private boolean stopped = false; + private float progress = 0; + private final Reporter reporter; + private Path inputTableBaseDir; + + private static int completed = 0; + private static int failed = 0; + private static int succeeded = 0; + + /** + * flag that indicates whether progress update needs to be sent to parent. + * If true, it has been set. If false, it has been reset. + * Using AtomicBoolean since we need an atomic read & reset method. + */ + private AtomicBoolean progressFlag = new AtomicBoolean(false); + + // TODO - to be refactored + private PartitionType partitionType = null; + private Schema finalSchema = null; + private TupleComparator sortComp = null; + + static final String OUTPUT_FILE_PREFIX="part-"; + static final ThreadLocal OUTPUT_FILE_FORMAT_SUBQUERY = + new ThreadLocal() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(2); + return fmt; + } + }; + static final ThreadLocal OUTPUT_FILE_FORMAT_TASK = + new ThreadLocal() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(6); + return fmt; + } + }; + + public Task(QueryUnitAttemptId taskId, + final WorkerContext worker, final Interface masterProxy, + final QueryUnitRequest request) throws IOException { + this.request = request; + this.reporter = new Reporter(masterProxy); + this.reporter.startCommunicationThread(); + + this.taskId = request.getId(); + this.conf = worker.getConf(); + this.workerContext = worker; + this.masterProxy = masterProxy; + this.localFS = worker.getLocalFS(); + this.lDirAllocator = worker.getLocalDirAllocator(); + this.taskDir = StorageUtil.concatPath(workerContext.getBaseDir(), + taskId.getQueryUnitId().getId() + "_" + taskId.getId()); + + this.context = new TaskAttemptContext(conf, taskId, + request.getFragments().toArray(new Fragment[request.getFragments().size()]), + taskDir); + plan = GsonCreator.getInstance().fromJson(request.getSerializedData(), + LogicalNode.class); + interQuery = request.getProto().getInterQuery(); + if (interQuery) { + context.setInterQuery(); + StoreTableNode store = (StoreTableNode) plan; + this.partitionType = store.getPartitionType(); + if (partitionType == PartitionType.RANGE) { + SortNode sortNode = (SortNode) store.getSubNode(); + this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()); + this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys()); + } + } else { + // The final result of a task will be written in a file named part-ss-nnnnnnn, + // where ss is the subquery id associated with this task, and nnnnnn is the task id. + Path outFilePath = new Path(conf.getOutputPath(), + OUTPUT_FILE_PREFIX + + OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getSubQueryId().getId()) + "-" + + OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId())); + LOG.info("Output File Path: " + outFilePath); + context.setOutputPath(outFilePath); + } + + context.setState(TaskAttemptState.TA_PENDING); + LOG.info("=================================="); + LOG.info("* Subquery " + request.getId() + " is initialized"); + LOG.info("* InterQuery: " + interQuery + + (interQuery ? ", Use " + this.partitionType + " partitioning":"")); + + LOG.info("* Fragments (num: " + request.getFragments().size() + ")"); + for (Fragment f: request.getFragments()) { + LOG.info("==> Table Id:" + f.getId() + ", path:" + f.getPath() + "(" + f.getMeta().getStoreType() + "), " + + "(start:" + f.getStartOffset() + ", length: " + f.getLength() + ")"); + } + LOG.info("* Fetches (total:" + request.getFetches().size() + ") :"); + for (Fetch f : request.getFetches()) { + LOG.info("==> Table Id: " + f.getName() + ", url: " + f.getUrls()); + } + LOG.info("* Local task dir: " + taskDir); + LOG.info("* plan:\n"); + LOG.info(plan.toString()); + LOG.info("=================================="); + } + + public void init() throws IOException { + // initialize a task temporal dir + localFS.mkdirs(taskDir); + + if (request.getFetches().size() > 0) { + inputTableBaseDir = localFS.makeQualified( + lDirAllocator.getLocalPathForWrite( + getTaskAttemptDir(context.getTaskId()).toString() + "/in", conf)); + localFS.mkdirs(inputTableBaseDir); + Path tableDir; + for (String inputTable : context.getInputTables()) { + tableDir = new Path(inputTableBaseDir, inputTable); + if (!localFS.exists(tableDir)) { + LOG.info("the directory is created " + tableDir.toUri()); + localFS.mkdirs(tableDir); + } + } + } + + // for localizing the intermediate data + localize(request); + } + + public QueryUnitAttemptId getTaskId() { + return taskId; + } + + // getters and setters for flag + void setProgressFlag() { + progressFlag.set(true); + } + boolean resetProgressFlag() { + return progressFlag.getAndSet(false); + } + boolean getProgressFlag() { + return progressFlag.get(); + } + + public void localize(QueryUnitRequest request) throws IOException { + fetcherRunners = getFetchRunners(context, request.getFetches()); + + List cached = Lists.newArrayList(); + for (Fragment frag : request.getFragments()) { + if (frag.isDistCached()) { + cached.add(frag); + } + } + + if (cached.size() > 0) { + Path inFile; + + int i = fetcherRunners.size(); + for (Fragment cache : cached) { + inFile = new Path(inputTableBaseDir, "in_" + i); + workerContext.getDefaultFS().copyToLocalFile(cache.getPath(), inFile); + cache.setPath(inFile); + i++; + } + } + } + + public QueryUnitAttemptId getId() { + return context.getTaskId(); + } + + public TaskAttemptState getStatus() { + return context.getState(); + } + + public String toString() { + return "queryId: " + this.getId() + " status: " + this.getStatus(); + } + + public void setState(TaskAttemptState status) { + context.setState(status); + setProgressFlag(); + } + + public boolean hasFetchPhase() { + return fetcherRunners.size() > 0; + } + + public void fetch() { + for (Fetcher f : fetcherRunners) { + workerContext.getFetchLauncher().submit(new FetchRunner(context, f)); + } + } + + public void kill() { + killed = true; + context.stop(); + context.setState(TaskAttemptState.TA_KILLED); + setProgressFlag(); + } + + public void abort() { + aborted = true; + context.stop(); + context.setState(TaskAttemptState.TA_FAILED); + } + + public void cleanUp() { + // remove itself from worker + // 끝난건지 확인 + if (context.getState() == TaskAttemptState.TA_SUCCEEDED) { + try { + // context.getWorkDir() 지우기 + localFS.delete(context.getWorkDir(), true); + // tasks에서 자기 지우기 + synchronized (workerContext.getTasks()) { + workerContext.getTasks().remove(this.getId()); + } + } catch (IOException e) { + e.printStackTrace(); + } + } else { + LOG.error(new UnfinishedTaskException("QueryUnitAttemptId: " + + context.getTaskId() + " status: " + context.getState())); + } + } + + public TaskStatusProto getReport() { + TaskStatusProto.Builder builder = TaskStatusProto.newBuilder(); + builder.setWorkerName(workerContext.getNodeId()); + builder.setId(context.getTaskId().getProto()) + .setProgress(context.getProgress()).setState(context.getState()); + + return builder.build(); + } + + private TaskCompletionReport getTaskCompletionReport() { + TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder(); + builder.setId(context.getTaskId().getProto()); + + if (context.hasResultStats()) { + builder.setResultStats(context.getResultStats().getProto()); + } else { + builder.setResultStats(new TableStat().getProto()); + } + + Iterator> it = context.getRepartitions(); + if (it.hasNext()) { + do { + Entry entry = it.next(); + Partition.Builder part = Partition.newBuilder(); + part.setPartitionKey(entry.getKey()); + builder.addPartitions(part.build()); + } while (it.hasNext()); + } + + return builder.build(); + } + + private void waitForFetch() throws InterruptedException, IOException { + context.getFetchLatch().await(); + LOG.info(context.getTaskId() + " All fetches are done!"); + Collection inputs = Lists.newArrayList(context.getInputTables()); + for (String inputTable: inputs) { + File tableDir = new File(context.getFetchIn(), inputTable); + Fragment [] frags = localizeFetchedData(tableDir, inputTable, + context.getTable(inputTable).getMeta()); + context.changeFragment(inputTable, frags); + } + } + + public void run() { + + String errorMessage = null; + try { + context.setState(TaskAttemptState.TA_RUNNING); + setProgressFlag(); + + if (context.hasFetchPhase()) { + // If the fetch is still in progress, the query unit must wait for + // complete. + waitForFetch(); + } + + if (context.getFragmentSize() > 0) { + this.executor = workerContext.getTQueryEngine(). + createPlan(context, plan); + this.executor.init(); + while(executor.next() != null && !killed) { + ++progress; + } + this.executor.close(); + } + } catch (Exception e) { + // errorMessage will be sent to master. + errorMessage = ExceptionUtils.getStackTrace(e); + LOG.error(errorMessage); + aborted = true; + + } finally { + setProgressFlag(); + stopped = true; + completed++; + + if (killed || aborted) { + context.setProgress(0.0f); + + TaskFatalErrorReport.Builder errorBuilder = + TaskFatalErrorReport.newBuilder() + .setId(getId().getProto()); + if (errorMessage != null) { + errorBuilder.setErrorMessage(errorMessage); + } + + // stopping the status report + try { + reporter.stopCommunicationThread(); + } catch (InterruptedException e) { + LOG.warn(e); + } + + masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get()); + failed++; + + } else { + // if successful + context.setProgress(1.0f); + + // stopping the status report + try { + reporter.stopCommunicationThread(); + } catch (InterruptedException e) { + LOG.warn(e); + } + + TaskCompletionReport report = getTaskCompletionReport(); + masterProxy.done(null, report, NullCallback.get()); + succeeded++; + } + + cleanupTask(); + LOG.info("Task Counter - total:" + completed + ", succeeded: " + succeeded + + ", failed: " + failed); + } + } + + public void cleanupTask() { + workerContext.getTasks().remove(getId()); + } + + public int hashCode() { + return context.hashCode(); + } + + public boolean equals(Object obj) { + if (obj instanceof Task) { + Task other = (Task) obj; + return this.context.equals(other.context); + } + return false; + } + + private Fragment[] localizeFetchedData(File file, String name, TableMeta meta) + throws IOException { + Configuration c = new Configuration(conf); + c.set("fs.default.name", "file:///"); + FileSystem fs = FileSystem.get(c); + Path tablePath = new Path(file.getAbsolutePath()); + + List listTablets = new ArrayList(); + Fragment tablet; + + FileStatus[] fileLists = fs.listStatus(tablePath); + for (FileStatus f : fileLists) { + if (f.getLen() == 0) { + continue; + } + tablet = new Fragment(name, f.getPath(), meta, 0l, f.getLen(), null); + listTablets.add(tablet); + } + + Fragment[] tablets = new Fragment[listTablets.size()]; + listTablets.toArray(tablets); + + return tablets; + } + + private class FetchRunner implements Runnable { + private final TaskAttemptContext ctx; + private final Fetcher fetcher; + + public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) { + this.ctx = ctx; + this.fetcher = fetcher; + } + + @Override + public void run() { + int retryNum = 0; + int maxRetryNum = 5; + int retryWaitTime = 1000; + + try { // for releasing fetch latch + while(retryNum < maxRetryNum) { + if (retryNum > 0) { + try { + Thread.sleep(retryWaitTime); + } catch (InterruptedException e) { + LOG.error(e); + } + LOG.info("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")"); + } + try { + File fetched = fetcher.get(); + if (fetched != null) { + break; + } + } catch (IOException e) { + LOG.error("Fetch failed: " + fetcher.getURI(), e); + } + retryNum++; + } + } finally { + ctx.getFetchLatch().countDown(); + } + + if (retryNum == maxRetryNum) { + LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); + } + } + } + + private List getFetchRunners(TaskAttemptContext ctx, + List fetches) throws IOException { + + if (fetches.size() > 0) { + Path inputDir = lDirAllocator. + getLocalPathToRead( + getTaskAttemptDir(ctx.getTaskId()).toString() + "/in", conf); + File storeDir; + + int i = 0; + File storeFile; + List runnerList = Lists.newArrayList(); + for (Fetch f : fetches) { + storeDir = new File(inputDir.toString(), f.getName()); + if (!storeDir.exists()) { + storeDir.mkdirs(); + } + storeFile = new File(storeDir, "in_" + i); + Fetcher fetcher = new Fetcher(URI.create(f.getUrls()), storeFile); + runnerList.add(fetcher); + i++; + } + ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString())); + return runnerList; + } else { + return Lists.newArrayList(); + } + } + + protected class Reporter implements Runnable { + private Interface masterStub; + private Thread pingThread; + private Object lock = new Object(); + private static final int PROGRESS_INTERVAL = 3000; + + public Reporter(Interface masterStub) { + this.masterStub = masterStub; + } + + @Override + public void run() { + final int MAX_RETRIES = 3; + int remainingRetries = MAX_RETRIES; + + while (!stopped) { + try { + synchronized (lock) { + if (stopped) { + break; + } + lock.wait(PROGRESS_INTERVAL); + } + if (stopped) { + break; + } + resetProgressFlag(); + + if (getProgressFlag()) { + resetProgressFlag(); + masterStub.statusUpdate(null, getReport(), NullCallback.get()); + } else { + masterStub.ping(null, taskId.getProto(), NullCallback.get()); + } + + } catch (Throwable t) { + + LOG.info("Communication exception: " + StringUtils + .stringifyException(t)); + remainingRetries -=1; + if (remainingRetries == 0) { + ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0); + LOG.warn("Last retry, killing "); + System.exit(65); + } + } + } + } + + public void startCommunicationThread() { + if (pingThread == null) { + pingThread = new Thread(this, "communication thread"); + pingThread.setDaemon(true); + pingThread.start(); + } + } + + public void stopCommunicationThread() throws InterruptedException { + if (pingThread != null) { + // Intent of the lock is to not send an interupt in the middle of an + // umbilical.ping or umbilical.statusUpdate + synchronized(lock) { + //Interrupt if sleeping. Otherwise wait for the RPC call to return. + lock.notify(); + } + + pingThread.interrupt(); + pingThread.join(); + } + } + } + + public static final String FILECACHE = "filecache"; + public static final String APPCACHE = "appcache"; + public static final String USERCACHE = "usercache"; + + String fileCache; + public String getFileCacheDir() { + fileCache = USERCACHE + "/" + "hyunsik" + "/" + APPCACHE + "/" + + ConverterUtils.toString(taskId.getQueryId().getApplicationId()) + + "/" + "output"; + return fileCache; + } + + public static Path getTaskAttemptDir(QueryUnitAttemptId quid) { + Path workDir = + StorageUtil.concatPath( + quid.getSubQueryId().toString(), + String.valueOf(quid.getQueryUnitId().getId()), + String.valueOf(quid.getId())); + return workDir; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java new file mode 100644 index 0000000..05c5165 --- /dev/null +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -0,0 +1,394 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.tajo.QueryConf; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.SubQueryId; +import org.apache.tajo.TajoProtos.TaskAttemptState; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto; +import org.apache.tajo.engine.query.QueryUnitRequestImpl; +import org.apache.tajo.ipc.MasterWorkerProtocol; +import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService; +import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService.Interface; +import org.apache.tajo.rpc.CallFuture2; +import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.rpc.ProtoAsyncRpcClient; +import org.apache.tajo.util.TajoIdUtils; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.security.PrivilegedExceptionAction; +import java.util.Map; +import java.util.concurrent.*; + +import static org.apache.tajo.engine.MasterWorkerProtos.TaskFatalErrorReport; + +/** + * The driver class for Tajo QueryUnit processing. + */ +public class TaskRunner extends AbstractService { + /** class logger */ + private static final Log LOG = LogFactory.getLog(TaskRunner.class); + + private QueryConf conf; + + private volatile boolean stopped = false; + + private final SubQueryId subQueryId; + private ApplicationId appId; + private final NodeId nodeId; + private final ContainerId containerId; + + // Cluster Management + private MasterWorkerProtocolService.Interface master; + + // for temporal or intermediate files + private FileSystem localFS; + // for input files + private FileSystem defaultFS; + + private TajoQueryEngine queryEngine; + + // TODO - this should be configurable + private final int coreNum = 4; + + // for Fetcher + private final ExecutorService fetchLauncher = + Executors.newFixedThreadPool(coreNum * 4); + // It keeps all of the query unit attempts while a TaskRunner is running. + private final Map tasks = + new ConcurrentHashMap(); + private LocalDirAllocator lDirAllocator; + + // A thread to receive each assigned query unit and execute the query unit + private Thread taskLauncher; + + // Contains the object references related for TaskRunner + private WorkerContext workerContext; + // for the doAs block + private UserGroupInformation taskOwner; + + // for the local temporal dir + private String baseDir; + private Path baseDirPath; + + public TaskRunner( + final SubQueryId subQueryId, + final NodeId nodeId, + UserGroupInformation taskOwner, + Interface master, ContainerId containerId) { + super(TaskRunner.class.getName()); + this.subQueryId = subQueryId; + this.appId = subQueryId.getQueryId().getApplicationId(); + this.nodeId = nodeId; + this.taskOwner = taskOwner; + this.master = master; + this.containerId = containerId; + } + + @Override + public void init(Configuration _conf) { + this.conf = (QueryConf) _conf; + + try { + this.workerContext = new WorkerContext(); + + // initialize DFS and LocalFileSystems + defaultFS = FileSystem.get(URI.create(conf.getVar(ConfVars.ROOT_DIR)),conf); + localFS = FileSystem.getLocal(conf); + + // the base dir for an output dir + baseDir = ConverterUtils.toString(appId) + + "/output" + "/" + subQueryId.getId(); + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.TASK_LOCAL_DIR.varname); + + baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(baseDir, conf)); + LOG.info("TaskRunner basedir is created (" + baseDir +")"); + + // Setup QueryEngine according to the query plan + // Here, we can setup row-based query engine or columnar query engine. + this.queryEngine = new TajoQueryEngine(conf); + + Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook())); + } catch (Throwable t) { + LOG.error(t); + } + + super.init(conf); + } + + @Override + public void start() { + run(); + } + + @Override + public void stop() { + if (!isStopped()) { + // If TaskRunner is stopped, all running or pending tasks will be marked as failed. + for (Task task : tasks.values()) { + if (task.getStatus() == TaskAttemptState.TA_PENDING || + task.getStatus() == TaskAttemptState.TA_RUNNING) { + task.setState(TaskAttemptState.TA_FAILED); + } + } + + // If this flag become true, taskLauncher will be terminated. + this.stopped = true; + + LOG.info("STOPPED: " + nodeId); + synchronized (this) { + notifyAll(); + } + } + } + + class WorkerContext { + public QueryConf getConf() { + return conf; + } + + public String getNodeId() { + return nodeId.toString(); + } + + public MasterWorkerProtocolService.Interface getMaster() { + return master; + } + + public FileSystem getLocalFS() { + return localFS; + } + + public FileSystem getDefaultFS() { + return defaultFS; + } + + public LocalDirAllocator getLocalDirAllocator() { + return lDirAllocator; + } + + public TajoQueryEngine getTQueryEngine() { + return queryEngine; + } + + public Map getTasks() { + return tasks; + } + + public Task getTask(QueryUnitAttemptId taskId) { + return tasks.get(taskId); + } + + public ExecutorService getFetchLauncher() { + return fetchLauncher; + } + + public Path getBaseDir() { + return baseDirPath; + } + } + + static void fatalError(MasterWorkerProtocolService.Interface proxy, + QueryUnitAttemptId taskAttemptId, String message) { + TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder() + .setId(taskAttemptId.getProto()) + .setErrorMessage(message); + proxy.fatalError(null, builder.build(), NullCallback.get()); + } + + public void run() { + LOG.info("TaskRunner startup"); + + try { + + taskLauncher = new Thread(new Runnable() { + @Override + public void run() { + int receivedNum = 0; + CallFuture2 callFuture = null; + QueryUnitRequestProto taskRequest = null; + + while(!stopped) { + try { + if (callFuture == null) { + callFuture = new CallFuture2(); + master.getTask(null, ((ContainerIdPBImpl) containerId).getProto(), + callFuture); + } + try { + // wait for an assigning task for 3 seconds + taskRequest = callFuture.get(3, TimeUnit.SECONDS); + } catch (TimeoutException te) { + // if there has been no assigning task for a given period, + // TaskRunner will retry to request an assigning task. + LOG.error(te); + + continue; + } + + if (taskRequest != null) { + // QueryMaster can send the terminal signal to TaskRunner. + // If TaskRunner receives the terminal signal, TaskRunner will be terminated + // immediately. + if (taskRequest.getShouldDie()) { + LOG.info("received ShouldDie flag"); + stop(); + + } else { + + LOG.info("Accumulated Received Task: " + (++receivedNum)); + + QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId()); + if (tasks.containsKey(taskAttemptId)) { + fatalError(master, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId); + continue; + } + + LOG.info("Initializing: " + taskAttemptId); + Task task = new Task(taskAttemptId, workerContext, master, + new QueryUnitRequestImpl(taskRequest)); + tasks.put(taskAttemptId, task); + + task.init(); + if (task.hasFetchPhase()) { + task.fetch(); // The fetch is performed in an asynchronous way. + } + // task.run() is a blocking call. + task.run(); + + callFuture = null; + taskRequest = null; + } + } + } catch (Throwable t) { + LOG.error(t); + } + } + } + }); + taskLauncher.start(); + taskLauncher.join(); + + } catch (Throwable t) { + LOG.fatal("Unhandled exception. Starting shutdown.", t); + } finally { + for (Task t : tasks.values()) { + if (t.getStatus() != TaskAttemptState.TA_SUCCEEDED) { + t.abort(); + } + } + } + } + + private class ShutdownHook implements Runnable { + @Override + public void run() { + LOG.info("received SIGINT Signal"); + stop(); + } + } + + /** + * @return true if a stop has been requested. + */ + public boolean isStopped() { + return this.stopped; + } + + /** + * TaskRunner takes 5 arguments as follows: + *
    + *
  1. 1st: TaskRunnerListener hostname
  2. + *
  3. 2nd: TaskRunnerListener port
  4. + *
  5. 3nd: SubQueryId
  6. + *
  7. 4th: NodeId
  8. + *
  9. 5th: ContainerId
  10. + *
+ */ + public static void main(String[] args) throws Exception { + // Restore QueryConf + final QueryConf conf = new QueryConf(); + conf.addResource(new Path(QueryConf.FILENAME)); + + LOG.info("MiniTajoYarn NM Local Dir: " + conf.get(ConfVars.TASK_LOCAL_DIR.varname)); + LOG.info("OUTPUT DIR: " + conf.getOutputPath()); + LOG.info("Tajo Root Dir: " + conf.getVar(ConfVars.ROOT_DIR)); + + UserGroupInformation.setConfiguration(conf); + + // TaskRunnerListener's address + String host = args[0]; + int port = Integer.parseInt(args[1]); + final InetSocketAddress masterAddr = + NetUtils.createSocketAddrForHost(host, port); + + // SubQueryId from String + final SubQueryId subQueryId = TajoIdUtils.newSubQueryId(args[2]); + // NodeId has a form of hostname:port. + NodeId nodeId = ConverterUtils.toNodeId(args[3]); + ContainerId containerId = ConverterUtils.toContainerId(args[4]); + + // TODO - 'load credential' should be implemented + // Getting taskOwner + UserGroupInformation taskOwner = + UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.QUERY_USERNAME)); + //taskOwner.addToken(token); + + // TaskRunnerListener RPC + ProtoAsyncRpcClient client; + MasterWorkerProtocolService.Interface master; + + // initialize MasterWorkerProtocol as an actual task owner. + client = + taskOwner.doAs(new PrivilegedExceptionAction() { + @Override + public ProtoAsyncRpcClient run() throws Exception { + return new ProtoAsyncRpcClient(MasterWorkerProtocol.class, masterAddr); + } + }); + master = client.getStub(); + + + TaskRunner taskRunner = new TaskRunner(subQueryId, nodeId, taskOwner, master, containerId); + taskRunner.init(conf); + taskRunner.start(); + client.close(); + LOG.info("TaskRunner (" + nodeId + ") main thread exiting"); + System.exit(0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java new file mode 100644 index 0000000..6c93e4f --- /dev/null +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.dataserver; + +import java.io.IOException; + +public class FileAccessForbiddenException extends IOException { + private static final long serialVersionUID = -3383272565826389213L; + + public FileAccessForbiddenException() { + } + + public FileAccessForbiddenException(String message) { + super(message); + } + + public FileAccessForbiddenException(Throwable cause) { + super(cause); + } + + public FileAccessForbiddenException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java new file mode 100644 index 0000000..523d6e1 --- /dev/null +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.dataserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.net.NetUtils; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.ChannelGroupFuture; +import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.apache.tajo.worker.dataserver.retriever.DataRetriever; + +import java.net.InetSocketAddress; +import java.util.concurrent.Executors; + +public class HttpDataServer { + private final static Log LOG = LogFactory.getLog(HttpDataServer.class); + + private final InetSocketAddress addr; + private InetSocketAddress bindAddr; + private ServerBootstrap bootstrap = null; + private ChannelFactory factory = null; + private ChannelGroup channelGroup = null; + + public HttpDataServer(final InetSocketAddress addr, + final DataRetriever retriever) { + this.addr = addr; + this.factory = new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), + Runtime.getRuntime().availableProcessors() * 2); + + // Configure the server. + this.bootstrap = new ServerBootstrap(factory); + // Set up the event pipeline factory. + this.bootstrap.setPipelineFactory( + new HttpDataServerPipelineFactory(retriever)); + this.channelGroup = new DefaultChannelGroup(); + } + + public HttpDataServer(String bindaddr, DataRetriever retriever) { + this(NetUtils.createSocketAddr(bindaddr), retriever); + } + + public void start() { + // Bind and start to accept incoming connections. + Channel channel = bootstrap.bind(addr); + channelGroup.add(channel); + this.bindAddr = (InetSocketAddress) channel.getLocalAddress(); + LOG.info("HttpDataServer starts up (" + + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort() + + ")"); + } + + public InetSocketAddress getBindAddress() { + return this.bindAddr; + } + + public void stop() { + ChannelGroupFuture future = channelGroup.close(); + future.awaitUninterruptibly(); + factory.releaseExternalResources(); + + LOG.info("HttpDataServer shutdown (" + + this.bindAddr.getAddress().getHostAddress() + ":" + + this.bindAddr.getPort() + ")"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java new file mode 100644 index 0000000..6b9eea8 --- /dev/null +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.dataserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.*; +import org.jboss.netty.handler.codec.frame.TooLongFrameException; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.ssl.SslHandler; +import org.jboss.netty.handler.stream.ChunkedFile; +import org.jboss.netty.util.CharsetUtil; +import org.apache.tajo.worker.dataserver.retriever.DataRetriever; +import org.apache.tajo.worker.dataserver.retriever.FileChunk; + +import java.io.*; +import java.net.URLDecoder; + +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; +import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength; +import static org.jboss.netty.handler.codec.http.HttpMethod.GET; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +public class HttpDataServerHandler extends SimpleChannelUpstreamHandler { + private final static Log LOG = LogFactory.getLog(HttpDataServer.class); + private final DataRetriever retriever; + + public HttpDataServerHandler(DataRetriever retriever) { + this.retriever = retriever; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + HttpRequest request = (HttpRequest) e.getMessage(); + if (request.getMethod() != GET) { + sendError(ctx, METHOD_NOT_ALLOWED); + return; + } + + FileChunk [] file; + try { + file = retriever.handle(ctx, request); + } catch (FileNotFoundException fnf) { + LOG.error(fnf); + sendError(ctx, NOT_FOUND); + return; + } catch (IllegalArgumentException iae) { + LOG.error(iae); + sendError(ctx, BAD_REQUEST); + return; + } catch (FileAccessForbiddenException fafe) { + LOG.error(fafe); + sendError(ctx, FORBIDDEN); + return; + } catch (IOException ioe) { + LOG.error(ioe); + sendError(ctx, INTERNAL_SERVER_ERROR); + return; + } + + // Write the content. + Channel ch = e.getChannel(); + if (file == null) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT); + ch.write(response); + if (!isKeepAlive(request)) { + ch.close(); + } + } else { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + long totalSize = 0; + for (FileChunk chunk : file) { + totalSize += chunk.length(); + } + setContentLength(response, totalSize); + + // Write the initial line and the header. + ch.write(response); + + ChannelFuture writeFuture = null; + + for (FileChunk chunk : file) { + writeFuture = sendFile(ctx, ch, chunk); + if (writeFuture == null) { + sendError(ctx, NOT_FOUND); + return; + } + } + + // Decide whether to close the connection or not. + if (!isKeepAlive(request)) { + // Close the connection when the whole content is written out. + writeFuture.addListener(ChannelFutureListener.CLOSE); + } + } + } + + private ChannelFuture sendFile(ChannelHandlerContext ctx, Channel ch, FileChunk file) throws IOException { + RandomAccessFile raf; + try { + raf = new RandomAccessFile(file.getFile(), "r"); + } catch (FileNotFoundException fnfe) { + return null; + } + + ChannelFuture writeFuture; + if (ch.getPipeline().get(SslHandler.class) != null) { + // Cannot use zero-copy with HTTPS. + writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(), file.length(), 8192)); + } else { + // No encryption - use zero-copy. + final FileRegion region = new DefaultFileRegion(raf.getChannel(), file.startOffset(), file.length()); + writeFuture = ch.write(region); + writeFuture.addListener(new ChannelFutureListener() { + public void operationComplete(ChannelFuture future) { + region.releaseExternalResources(); + } + }); + } + + return writeFuture; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + throws Exception { + Channel ch = e.getChannel(); + Throwable cause = e.getCause(); + if (cause instanceof TooLongFrameException) { + sendError(ctx, BAD_REQUEST); + return; + } + + cause.printStackTrace(); + if (ch.isConnected()) { + sendError(ctx, INTERNAL_SERVER_ERROR); + } + } + + public static String sanitizeUri(String uri) { + // Decode the path. + try { + uri = URLDecoder.decode(uri, "UTF-8"); + } catch (UnsupportedEncodingException e) { + try { + uri = URLDecoder.decode(uri, "ISO-8859-1"); + } catch (UnsupportedEncodingException e1) { + throw new Error(); + } + } + + // Convert file separators. + uri = uri.replace('/', File.separatorChar); + + // Simplistic dumb security check. + // You will have to do something serious in the production environment. + if (uri.contains(File.separator + ".") + || uri.contains("." + File.separator) || uri.startsWith(".") + || uri.endsWith(".")) { + return null; + } + + // Convert to absolute path. + return uri; + } + + private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); + response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.setContent(ChannelBuffers.copiedBuffer( + "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8)); + + // Close the connection as soon as the error message is sent. + ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java new file mode 100644 index 0000000..0a47f6b --- /dev/null +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.dataserver; + +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.handler.codec.http.HttpRequestDecoder; +import org.jboss.netty.handler.codec.http.HttpResponseEncoder; +import org.jboss.netty.handler.stream.ChunkedWriteHandler; +import org.apache.tajo.worker.dataserver.retriever.DataRetriever; + +import static org.jboss.netty.channel.Channels.pipeline; + +public class HttpDataServerPipelineFactory implements ChannelPipelineFactory { + private final DataRetriever ret; + + public HttpDataServerPipelineFactory(DataRetriever ret) { + this.ret = ret; + } + + public ChannelPipeline getPipeline() throws Exception { + // Create a default pipeline implementation. + ChannelPipeline pipeline = pipeline(); + + // Uncomment the following line if you want HTTPS + // SSLEngine engine = + // SecureChatSslContextFactory.getServerContext().createSSLEngine(); + // engine.setUseClientMode(false); + // pipeline.addLast("ssl", new SslHandler(engine)); + + pipeline.addLast("decoder", new HttpRequestDecoder()); + //pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); + pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); + //pipeline.addLast("deflater", new HttpContentCompressor()); + pipeline.addLast("handler", new HttpDataServerHandler(ret)); + return pipeline; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java new file mode 100644 index 0000000..e688c39 --- /dev/null +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.dataserver; + +import com.google.common.collect.Maps; + +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URLEncoder; +import java.util.Map; + +public class HttpUtil { + public static Map getParams(URI uri) throws UnsupportedEncodingException { + return getParamsFromQuery(uri.getQuery()); + } + + /** + * It parses a query string into key/value pairs + * + * @param queryString decoded query string + * @return key/value pairs parsed from a given query string + * @throws UnsupportedEncodingException + */ + public static Map getParamsFromQuery(String queryString) throws UnsupportedEncodingException { + String [] queries = queryString.split("&"); + + Map params = Maps.newHashMap(); + String [] param; + for (String q : queries) { + param = q.split("="); + params.put(param[0], param[1]); + } + + return params; + } + + public static String buildQuery(Map params) throws UnsupportedEncodingException { + StringBuilder sb = new StringBuilder(); + + boolean first = true; + for (Map.Entry param : params.entrySet()) { + if (!first) { + sb.append("&"); + } + sb.append(URLEncoder.encode(param.getKey(), "UTF-8")). + append("="). + append(URLEncoder.encode(param.getValue(), "UTF-8")); + first = false; + } + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java new file mode 100644 index 0000000..d602d57 --- /dev/null +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.dataserver.retriever; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.jboss.netty.handler.codec.http.QueryStringDecoder; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.QueryUnitId; +import org.apache.tajo.SubQueryId; +import org.apache.tajo.util.TajoIdUtils; +import org.apache.tajo.worker.dataserver.FileAccessForbiddenException; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class AdvancedDataRetriever implements DataRetriever { + private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class); + private final Map handlerMap = Maps.newConcurrentMap(); + + public AdvancedDataRetriever() { + } + + public void register(QueryUnitAttemptId id, RetrieverHandler handler) { + synchronized (handlerMap) { + if (!handlerMap.containsKey(id.toString())) { + handlerMap.put(id.toString(), handler); + } + } + } + + public void unregister(QueryUnitAttemptId id) { + synchronized (handlerMap) { + if (handlerMap.containsKey(id.toString())) { + handlerMap.remove(id.toString()); + } + } + } + + /* (non-Javadoc) + * @see org.apache.tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest) + */ + @Override + public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request) + throws IOException { + + final Map> params = + new QueryStringDecoder(request.getUri()).getParameters(); + + if (!params.containsKey("qid")) { + throw new FileNotFoundException("No such qid: " + params.containsKey("qid")); + } + + if (params.containsKey("sid")) { + List chunks = Lists.newArrayList(); + List qids = splitMaps(params.get("qid")); + for (String qid : qids) { + String[] ids = qid.split("_"); + SubQueryId suid = TajoIdUtils.newSubQueryId(params.get("sid").get(0)); + QueryUnitId quid = new QueryUnitId(suid, Integer.parseInt(ids[0])); + QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid, + Integer.parseInt(ids[1])); + RetrieverHandler handler = handlerMap.get(attemptId.toString()); + FileChunk chunk = handler.get(params); + chunks.add(chunk); + } + return chunks.toArray(new FileChunk[chunks.size()]); + } else { + RetrieverHandler handler = handlerMap.get(params.get("qid").get(0)); + FileChunk chunk = handler.get(params); + if (chunk == null) { + if (params.containsKey("qid")) { // if there is no content corresponding to the query + return null; + } else { // if there is no + throw new FileNotFoundException("No such a file corresponding to " + params.get("qid")); + } + } + + File file = chunk.getFile(); + if (file.isHidden() || !file.exists()) { + throw new FileNotFoundException("No such file: " + file.getAbsolutePath()); + } + if (!file.isFile()) { + throw new FileAccessForbiddenException(file.getAbsolutePath() + " is not file"); + } + + return new FileChunk[] {chunk}; + } + } + + private List splitMaps(List qids) { + if (null == qids) { + LOG.error("QueryUnitId is EMPTY"); + return null; + } + + final List ret = new ArrayList(); + for (String qid : qids) { + Collections.addAll(ret, qid.split(",")); + } + return ret; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java new file mode 100644 index 0000000..b26ba74 --- /dev/null +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.dataserver.retriever; + +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.http.HttpRequest; + +import java.io.IOException; + +public interface DataRetriever { + FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java new file mode 100644 index 0000000..62dabbd --- /dev/null +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.dataserver.retriever; + +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.apache.tajo.worker.dataserver.FileAccessForbiddenException; +import org.apache.tajo.worker.dataserver.HttpDataServerHandler; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; + +public class DirectoryRetriever implements DataRetriever { + public String baseDir; + + public DirectoryRetriever(String baseDir) { + this.baseDir = baseDir; + } + + @Override + public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request) + throws IOException { + final String path = HttpDataServerHandler.sanitizeUri(request.getUri()); + if (path == null) { + throw new IllegalArgumentException("Wrong path: " +path); + } + + File file = new File(baseDir, path); + if (file.isHidden() || !file.exists()) { + throw new FileNotFoundException("No such file: " + baseDir + "/" + path); + } + if (!file.isFile()) { + throw new FileAccessForbiddenException("No such file: " + + baseDir + "/" + path); + } + + return new FileChunk[] {new FileChunk(file, 0, file.length())}; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java new file mode 100644 index 0000000..4f11168 --- /dev/null +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.dataserver.retriever; + +import java.io.File; +import java.io.FileNotFoundException; + +public class FileChunk { + private final File file; + private final long startOffset; + private final long length; + + public FileChunk(File file, long startOffset, long length) throws FileNotFoundException { + this.file = file; + this.startOffset = startOffset; + this.length = length; + } + + public File getFile() { + return this.file; + } + + public long startOffset() { + return this.startOffset; + } + + public long length() { + return this.length; + } + + public String toString() { + return " (start=" + startOffset() + ", length=" + length + ") " + + file.getAbsolutePath(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java new file mode 100644 index 0000000..e5479cc --- /dev/null +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.dataserver.retriever; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public interface RetrieverHandler { + /** + * + * @param kvs url-decoded key/value pairs + * @return a desired part of a file + * @throws IOException + */ + public FileChunk get(Map> kvs) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/QueryConf.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/QueryConf.java b/tajo-core/tajo-core-backend/src/main/java/tajo/QueryConf.java deleted file mode 100644 index a1e5179..0000000 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/QueryConf.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package tajo; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import tajo.conf.TajoConf; - -public class QueryConf extends TajoConf { - public static String FILENAME = "queryconf.xml"; - - public QueryConf() { - super(); - } - - public QueryConf(Configuration conf) { - super(conf); - if (! (conf instanceof QueryConf)) { - this.reloadConfiguration(); - } - } - - public void setUser(String username) { - setVar(ConfVars.QUERY_USERNAME, username); - } - - public String getUser() { - return getVar(ConfVars.QUERY_USERNAME); - } - - public void setOutputTable(String tableName) { - // it is determined in GlobalEngine.executeQuery(). - setVar(ConfVars.QUERY_OUTPUT_TABLE, tableName); - } - - public String getOutputTable() { - return getVar(ConfVars.QUERY_OUTPUT_TABLE); - } - - public void setOutputPath(Path path) { - // it is determined in QueryMaster.initStagingDir(). - setVar(ConfVars.QUERY_OUTPUT_DIR, path.toUri().toString()); - } - - public Path getOutputPath() { - return new Path(getVar(ConfVars.QUERY_OUTPUT_DIR)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/TaskAttemptContext.java b/tajo-core/tajo-core-backend/src/main/java/tajo/TaskAttemptContext.java deleted file mode 100644 index 1e87341..0000000 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/TaskAttemptContext.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package tajo; - -import com.google.common.base.Objects; -import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import tajo.TajoProtos.TaskAttemptState; -import tajo.catalog.statistics.TableStat; -import tajo.conf.TajoConf; -import tajo.storage.Fragment; - -import java.io.File; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.CountDownLatch; - - -/** - * Contains the information about executing subquery. - */ -public class TaskAttemptContext { - private static final Log LOG = LogFactory.getLog(TaskAttemptContext.class); - private final TajoConf conf; - private final Map> fragmentMap = new HashMap>(); - - private TaskAttemptState state; - private TableStat resultStats; - private QueryUnitAttemptId queryId; - private final Path workDir; - private boolean needFetch = false; - private CountDownLatch doneFetchPhaseSignal; - private float progress = 0; - private Map repartitions; - private File fetchIn; - private boolean stopped = false; - private boolean interQuery = false; - private Path outputPath; - - public TaskAttemptContext(TajoConf conf, final QueryUnitAttemptId queryId, - final Fragment[] fragments, - final Path workDir) { - this.conf = conf; - this.queryId = queryId; - - for(Fragment t : fragments) { - if (fragmentMap.containsKey(t.getId())) { - fragmentMap.get(t.getId()).add(t); - } else { - List frags = new ArrayList(); - frags.add(t); - fragmentMap.put(t.getId(), frags); - } - } - - this.workDir = workDir; - this.repartitions = Maps.newHashMap(); - - state = TaskAttemptState.TA_PENDING; - } - - public TajoConf getConf() { - return this.conf; - } - - public TaskAttemptState getState() { - return this.state; - } - - public void setState(TaskAttemptState state) { - this.state = state; - LOG.info("Query status of " + getTaskId() + " is changed to " + state); - } - - public boolean hasResultStats() { - return resultStats != null; - } - - public void setResultStats(TableStat stats) { - this.resultStats = stats; - } - - public TableStat getResultStats() { - return this.resultStats; - } - - public boolean isStopped() { - return this.stopped; - } - - public void setInterQuery() { - this.interQuery = true; - } - - public void setOutputPath(Path outputPath) { - this.outputPath = outputPath; - } - - public Path getOutputPath() { - return this.outputPath; - } - - public boolean isInterQuery() { - return this.interQuery; - } - - public void stop() { - this.stopped = true; - } - - public void addFetchPhase(int count, File fetchIn) { - this.needFetch = true; - this.doneFetchPhaseSignal = new CountDownLatch(count); - this.fetchIn = fetchIn; - } - - public File getFetchIn() { - return this.fetchIn; - } - - public boolean hasFetchPhase() { - return this.needFetch; - } - - public CountDownLatch getFetchLatch() { - return doneFetchPhaseSignal; - } - - public void addRepartition(int partKey, String path) { - repartitions.put(partKey, path); - } - - public Iterator> getRepartitions() { - return repartitions.entrySet().iterator(); - } - - public void changeFragment(String tableId, Fragment [] fragments) { - fragmentMap.remove(tableId); - for(Fragment t : fragments) { - if (fragmentMap.containsKey(t.getId())) { - fragmentMap.get(t.getId()).add(t); - } else { - List frags = new ArrayList(); - frags.add(t); - fragmentMap.put(t.getId(), frags); - } - } - } - - public Path getWorkDir() { - return this.workDir; - } - - public QueryUnitAttemptId getTaskId() { - return this.queryId; - } - - public float getProgress() { - return this.progress; - } - - public void setProgress(float progress) { - this.progress = progress; - } - - public Fragment getTable(String id) { - return fragmentMap.get(id).get(0); - } - - public int getFragmentSize() { - return fragmentMap.size(); - } - - public Collection getInputTables() { - return fragmentMap.keySet(); - } - - public Fragment [] getTables(String id) { - return fragmentMap.get(id).toArray(new Fragment[fragmentMap.get(id).size()]); - } - - public int hashCode() { - return Objects.hashCode(queryId); - } - - public boolean equals(Object obj) { - if (obj instanceof TaskAttemptContext) { - TaskAttemptContext other = (TaskAttemptContext) obj; - return queryId.equals(other.getTaskId()); - } else { - return false; - } - } -} \ No newline at end of file