tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [21/51] [partial] TAJO-22: The package prefix should be org.apache.tajo. (DaeMyung Kang via hyunsik)
Date Tue, 02 Jul 2013 14:16:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
new file mode 100644
index 0000000..26991a0
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ * It retrieves the file chunk ranged between start and end keys.
+ * The start key is inclusive, but the end key is exclusive.
+ *
+ * Internally, there are four cases:
+ * <ul>
+ *   <li>out of scope: the index range does not overlapped with the query range.</li>
+ *   <li>overlapped: the index range is partially overlapped with the query range. </li>
+ *   <li>included: the index range is included in the start and end keys</li>
+ *   <li>covered: the index range covers the query range (i.e., start and end keys).</li>
+ * </ul>
+ */
+public class RangeRetrieverHandler implements RetrieverHandler {
+  private static final Log LOG = LogFactory.getLog(RangeRetrieverHandler.class);
+  private final File file;
+  private final BSTIndex.BSTIndexReader idxReader;
+  private final Schema schema;
+  private final TupleComparator comp;
+
+  public RangeRetrieverHandler(File outDir, Schema schema, TupleComparator comp) throws IOException {
+    this.file = outDir;
+    BSTIndex index = new BSTIndex(new TajoConf());
+    this.schema = schema;
+    this.comp = comp;
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    Path indexPath = fs.makeQualified(new Path(outDir.getCanonicalPath(), "index"));
+    this.idxReader =
+        index.getIndexReader(indexPath, this.schema, this.comp);
+    this.idxReader.open();
+    LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
+        + idxReader.getLastKey());
+  }
+
+  @Override
+  public FileChunk get(Map<String, List<String>> kvs) throws IOException {
+    // nothing to verify the file because AdvancedDataRetriever checks
+    // its validity of the file.
+    File data = new File(this.file, "data/data");
+    byte [] startBytes = Base64.decodeBase64(kvs.get("start").get(0));
+    Tuple start = RowStoreUtil.RowStoreDecoder.toTuple(schema, startBytes);
+    byte [] endBytes;
+    Tuple end;
+    endBytes = Base64.decodeBase64(kvs.get("end").get(0));
+    end = RowStoreUtil.RowStoreDecoder.toTuple(schema, endBytes);
+    boolean last = kvs.containsKey("final");
+
+    if(!comp.isAscendingFirstKey()) {
+      Tuple tmpKey = start;
+      start = end;
+      end = tmpKey;
+    }
+
+    LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
+        (last ? ", last=true" : "") + ")");
+
+    if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
+      LOG.info("There is no contents");
+      return null;
+    }
+
+    if (comp.compare(end, idxReader.getFirstKey()) < 0 ||
+        comp.compare(idxReader.getLastKey(), start) < 0) {
+      LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
+          "], but request start:" + start + ", end: " + end);
+      return null;
+    }
+
+    long startOffset;
+    long endOffset;
+    try {
+      startOffset = idxReader.find(start);
+    } catch (IOException ioe) {
+      LOG.error("State Dump (the requested range: "
+          + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + idxReader.getLastKey());
+      throw ioe;
+    }
+    try {
+      endOffset = idxReader.find(end);
+      if (endOffset == -1) {
+        endOffset = idxReader.find(end, true);
+      }
+    } catch (IOException ioe) {
+      LOG.error("State Dump (the requested range: "
+          + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + idxReader.getLastKey());
+      throw ioe;
+    }
+
+    // if startOffset == -1 then case 2-1 or case 3
+    if (startOffset == -1) { // this is a hack
+      // if case 2-1 or case 3
+      try {
+        startOffset = idxReader.find(start, true);
+      } catch (IOException ioe) {
+        LOG.error("State Dump (the requested range: "
+            + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+            + idxReader.getLastKey());
+        throw ioe;
+      }
+    }
+
+    if (startOffset == -1) {
+      throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
+          "State Dump (the requested range: "
+          + new TupleRange(schema, start, end) + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+          + idxReader.getLastKey());
+    }
+
+    // if greater than indexed values
+    if (last || (endOffset == -1 && comp.compare(idxReader.getLastKey(), end) < 0)) {
+      endOffset = data.length();
+    }
+
+    FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
+    LOG.info("Retrieve File Chunk: " + chunk);
+    return chunk;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
new file mode 100644
index 0000000..13cd98a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.storage.StorageManager;
+
+import java.io.IOException;
+
+public class TajoQueryEngine {
+  private final static Log LOG = LogFactory.getLog(TajoQueryEngine.class);
+  private final StorageManager storageManager;
+  private final PhysicalPlanner phyPlanner;
+
+  public TajoQueryEngine(TajoConf conf) throws IOException {
+    this.storageManager = new StorageManager(conf);
+    this.phyPlanner = new PhysicalPlannerImpl(conf, storageManager);
+  }
+  
+  public PhysicalExec createPlan(TaskAttemptContext ctx, LogicalNode plan)
+      throws InternalException {
+    return phyPlanner.createPlan(ctx, plan);
+  }
+  
+  public void stop() throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
new file mode 100644
index 0000000..c8a11c6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -0,0 +1,640 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.engine.MasterWorkerProtos.*;
+import org.apache.tajo.engine.exception.UnfinishedTaskException;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService.Interface;
+import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
+import org.apache.tajo.master.ExecutionBlock.PartitionType;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.worker.TaskRunner.WorkerContext;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class Task {
+  private static final Log LOG = LogFactory.getLog(Task.class);
+
+  private final QueryConf conf;
+  private final FileSystem localFS;
+  private final WorkerContext workerContext;
+  private final Interface masterProxy;
+  private final LocalDirAllocator lDirAllocator;
+  private final QueryUnitAttemptId taskId;
+
+  private final Path taskDir;
+  private final QueryUnitRequest request;
+  private final TaskAttemptContext context;
+  private List<Fetcher> fetcherRunners;
+  private final LogicalNode plan;
+  private PhysicalExec executor;
+  private boolean interQuery;
+  private boolean killed = false;
+  private boolean aborted = false;
+  private boolean stopped = false;
+  private float progress = 0;
+  private final Reporter reporter;
+  private Path inputTableBaseDir;
+
+  private static int completed = 0;
+  private static int failed = 0;
+  private static int succeeded = 0;
+
+  /**
+   * flag that indicates whether progress update needs to be sent to parent.
+   * If true, it has been set. If false, it has been reset.
+   * Using AtomicBoolean since we need an atomic read & reset method.
+   */
+  private AtomicBoolean progressFlag = new AtomicBoolean(false);
+
+  // TODO - to be refactored
+  private PartitionType partitionType = null;
+  private Schema finalSchema = null;
+  private TupleComparator sortComp = null;
+
+  static final String OUTPUT_FILE_PREFIX="part-";
+  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
+      new ThreadLocal<NumberFormat>() {
+        @Override
+        public NumberFormat initialValue() {
+          NumberFormat fmt = NumberFormat.getInstance();
+          fmt.setGroupingUsed(false);
+          fmt.setMinimumIntegerDigits(2);
+          return fmt;
+        }
+      };
+  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
+      new ThreadLocal<NumberFormat>() {
+        @Override
+        public NumberFormat initialValue() {
+          NumberFormat fmt = NumberFormat.getInstance();
+          fmt.setGroupingUsed(false);
+          fmt.setMinimumIntegerDigits(6);
+          return fmt;
+        }
+      };
+
+  public Task(QueryUnitAttemptId taskId,
+              final WorkerContext worker, final Interface masterProxy,
+              final QueryUnitRequest request) throws IOException {
+    this.request = request;
+    this.reporter = new Reporter(masterProxy);
+    this.reporter.startCommunicationThread();
+
+    this.taskId = request.getId();
+    this.conf = worker.getConf();
+    this.workerContext = worker;
+    this.masterProxy = masterProxy;
+    this.localFS = worker.getLocalFS();
+    this.lDirAllocator = worker.getLocalDirAllocator();
+    this.taskDir = StorageUtil.concatPath(workerContext.getBaseDir(),
+        taskId.getQueryUnitId().getId() + "_" + taskId.getId());
+
+    this.context = new TaskAttemptContext(conf, taskId,
+        request.getFragments().toArray(new Fragment[request.getFragments().size()]),
+        taskDir);
+    plan = GsonCreator.getInstance().fromJson(request.getSerializedData(),
+        LogicalNode.class);
+    interQuery = request.getProto().getInterQuery();
+    if (interQuery) {
+      context.setInterQuery();
+      StoreTableNode store = (StoreTableNode) plan;
+      this.partitionType = store.getPartitionType();
+      if (partitionType == PartitionType.RANGE) {
+        SortNode sortNode = (SortNode) store.getSubNode();
+        this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
+        this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
+      }
+    } else {
+      // The final result of a task will be written in a file named part-ss-nnnnnnn,
+      // where ss is the subquery id associated with this task, and nnnnnn is the task id.
+      Path outFilePath = new Path(conf.getOutputPath(),
+          OUTPUT_FILE_PREFIX +
+          OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getSubQueryId().getId()) + "-" +
+          OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()));
+      LOG.info("Output File Path: " + outFilePath);
+      context.setOutputPath(outFilePath);
+    }
+
+    context.setState(TaskAttemptState.TA_PENDING);
+    LOG.info("==================================");
+    LOG.info("* Subquery " + request.getId() + " is initialized");
+    LOG.info("* InterQuery: " + interQuery
+        + (interQuery ? ", Use " + this.partitionType  + " partitioning":""));
+
+    LOG.info("* Fragments (num: " + request.getFragments().size() + ")");
+    for (Fragment f: request.getFragments()) {
+      LOG.info("==> Table Id:" + f.getId() + ", path:" + f.getPath() + "(" + f.getMeta().getStoreType() + "), " +
+          "(start:" + f.getStartOffset() + ", length: " + f.getLength() + ")");
+    }
+    LOG.info("* Fetches (total:" + request.getFetches().size() + ") :");
+    for (Fetch f : request.getFetches()) {
+      LOG.info("==> Table Id: " + f.getName() + ", url: " + f.getUrls());
+    }
+    LOG.info("* Local task dir: " + taskDir);
+    LOG.info("* plan:\n");
+    LOG.info(plan.toString());
+    LOG.info("==================================");
+  }
+
+  public void init() throws IOException {
+    // initialize a task temporal dir
+    localFS.mkdirs(taskDir);
+
+    if (request.getFetches().size() > 0) {
+      inputTableBaseDir = localFS.makeQualified(
+          lDirAllocator.getLocalPathForWrite(
+              getTaskAttemptDir(context.getTaskId()).toString() + "/in", conf));
+      localFS.mkdirs(inputTableBaseDir);
+      Path tableDir;
+      for (String inputTable : context.getInputTables()) {
+        tableDir = new Path(inputTableBaseDir, inputTable);
+        if (!localFS.exists(tableDir)) {
+          LOG.info("the directory is created  " + tableDir.toUri());
+          localFS.mkdirs(tableDir);
+        }
+      }
+    }
+
+    // for localizing the intermediate data
+    localize(request);
+  }
+
+  public QueryUnitAttemptId getTaskId() {
+    return taskId;
+  }
+
+  // getters and setters for flag
+  void setProgressFlag() {
+    progressFlag.set(true);
+  }
+  boolean resetProgressFlag() {
+    return progressFlag.getAndSet(false);
+  }
+  boolean getProgressFlag() {
+    return progressFlag.get();
+  }
+
+  public void localize(QueryUnitRequest request) throws IOException {
+    fetcherRunners = getFetchRunners(context, request.getFetches());
+
+    List<Fragment> cached = Lists.newArrayList();
+    for (Fragment frag : request.getFragments()) {
+      if (frag.isDistCached()) {
+        cached.add(frag);
+      }
+    }
+
+    if (cached.size() > 0) {
+      Path inFile;
+
+      int i = fetcherRunners.size();
+      for (Fragment cache : cached) {
+        inFile = new Path(inputTableBaseDir, "in_" + i);
+        workerContext.getDefaultFS().copyToLocalFile(cache.getPath(), inFile);
+        cache.setPath(inFile);
+        i++;
+      }
+    }
+  }
+
+  public QueryUnitAttemptId getId() {
+    return context.getTaskId();
+  }
+
+  public TaskAttemptState getStatus() {
+    return context.getState();
+  }
+
+  public String toString() {
+    return "queryId: " + this.getId() + " status: " + this.getStatus();
+  }
+
+  public void setState(TaskAttemptState status) {
+    context.setState(status);
+    setProgressFlag();
+  }
+
+  public boolean hasFetchPhase() {
+    return fetcherRunners.size() > 0;
+  }
+
+  public void fetch() {
+    for (Fetcher f : fetcherRunners) {
+      workerContext.getFetchLauncher().submit(new FetchRunner(context, f));
+    }
+  }
+
+  public void kill() {
+    killed = true;
+    context.stop();
+    context.setState(TaskAttemptState.TA_KILLED);
+    setProgressFlag();
+  }
+
+  public void abort() {
+    aborted = true;
+    context.stop();
+    context.setState(TaskAttemptState.TA_FAILED);
+  }
+
+  public void cleanUp() {
+    // remove itself from worker
+    // 끝난건지 확인
+    if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
+      try {
+        // context.getWorkDir() 지우기
+        localFS.delete(context.getWorkDir(), true);
+        // tasks에서 자기 지우기
+        synchronized (workerContext.getTasks()) {
+          workerContext.getTasks().remove(this.getId());
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    } else {
+      LOG.error(new UnfinishedTaskException("QueryUnitAttemptId: "
+          + context.getTaskId() + " status: " + context.getState()));
+    }
+  }
+
+  public TaskStatusProto getReport() {
+    TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
+    builder.setWorkerName(workerContext.getNodeId());
+    builder.setId(context.getTaskId().getProto())
+        .setProgress(context.getProgress()).setState(context.getState());
+
+    return builder.build();
+  }
+
+  private TaskCompletionReport getTaskCompletionReport() {
+    TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
+    builder.setId(context.getTaskId().getProto());
+
+    if (context.hasResultStats()) {
+      builder.setResultStats(context.getResultStats().getProto());
+    } else {
+      builder.setResultStats(new TableStat().getProto());
+    }
+
+    Iterator<Entry<Integer,String>> it = context.getRepartitions();
+    if (it.hasNext()) {
+      do {
+        Entry<Integer,String> entry = it.next();
+        Partition.Builder part = Partition.newBuilder();
+        part.setPartitionKey(entry.getKey());
+        builder.addPartitions(part.build());
+      } while (it.hasNext());
+    }
+
+    return builder.build();
+  }
+
+  private void waitForFetch() throws InterruptedException, IOException {
+    context.getFetchLatch().await();
+    LOG.info(context.getTaskId() + " All fetches are done!");
+    Collection<String> inputs = Lists.newArrayList(context.getInputTables());
+    for (String inputTable: inputs) {
+      File tableDir = new File(context.getFetchIn(), inputTable);
+      Fragment [] frags = localizeFetchedData(tableDir, inputTable,
+          context.getTable(inputTable).getMeta());
+      context.changeFragment(inputTable, frags);
+    }
+  }
+
+  public void run() {
+
+    String errorMessage = null;
+    try {
+      context.setState(TaskAttemptState.TA_RUNNING);
+      setProgressFlag();
+
+      if (context.hasFetchPhase()) {
+        // If the fetch is still in progress, the query unit must wait for
+        // complete.
+        waitForFetch();
+      }
+
+      if (context.getFragmentSize() > 0) {
+        this.executor = workerContext.getTQueryEngine().
+            createPlan(context, plan);
+        this.executor.init();
+        while(executor.next() != null && !killed) {
+          ++progress;
+        }
+        this.executor.close();
+      }
+    } catch (Exception e) {
+      // errorMessage will be sent to master.
+      errorMessage = ExceptionUtils.getStackTrace(e);
+      LOG.error(errorMessage);
+      aborted = true;
+
+    } finally {
+      setProgressFlag();
+      stopped = true;
+      completed++;
+
+      if (killed || aborted) {
+        context.setProgress(0.0f);
+
+        TaskFatalErrorReport.Builder errorBuilder =
+            TaskFatalErrorReport.newBuilder()
+            .setId(getId().getProto());
+        if (errorMessage != null) {
+            errorBuilder.setErrorMessage(errorMessage);
+        }
+
+        // stopping the status report
+        try {
+          reporter.stopCommunicationThread();
+        } catch (InterruptedException e) {
+          LOG.warn(e);
+        }
+
+        masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
+        failed++;
+
+      } else {
+        // if successful
+        context.setProgress(1.0f);
+
+        // stopping the status report
+        try {
+          reporter.stopCommunicationThread();
+        } catch (InterruptedException e) {
+          LOG.warn(e);
+        }
+
+        TaskCompletionReport report = getTaskCompletionReport();
+        masterProxy.done(null, report, NullCallback.get());
+        succeeded++;
+      }
+
+      cleanupTask();
+      LOG.info("Task Counter - total:" + completed + ", succeeded: " + succeeded
+          + ", failed: " + failed);
+    }
+  }
+
+  public void cleanupTask() {
+    workerContext.getTasks().remove(getId());
+  }
+
+  public int hashCode() {
+    return context.hashCode();
+  }
+
+  public boolean equals(Object obj) {
+    if (obj instanceof Task) {
+      Task other = (Task) obj;
+      return this.context.equals(other.context);
+    }
+    return false;
+  }
+
+  private Fragment[] localizeFetchedData(File file, String name, TableMeta meta)
+      throws IOException {
+    Configuration c = new Configuration(conf);
+    c.set("fs.default.name", "file:///");
+    FileSystem fs = FileSystem.get(c);
+    Path tablePath = new Path(file.getAbsolutePath());
+
+    List<Fragment> listTablets = new ArrayList<Fragment>();
+    Fragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus f : fileLists) {
+      if (f.getLen() == 0) {
+        continue;
+      }
+      tablet = new Fragment(name, f.getPath(), meta, 0l, f.getLen(), null);
+      listTablets.add(tablet);
+    }
+
+    Fragment[] tablets = new Fragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  private class FetchRunner implements Runnable {
+    private final TaskAttemptContext ctx;
+    private final Fetcher fetcher;
+
+    public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
+      this.ctx = ctx;
+      this.fetcher = fetcher;
+    }
+
+    @Override
+    public void run() {
+      int retryNum = 0;
+      int maxRetryNum = 5;
+      int retryWaitTime = 1000;
+
+      try { // for releasing fetch latch
+        while(retryNum < maxRetryNum) {
+          if (retryNum > 0) {
+            try {
+              Thread.sleep(retryWaitTime);
+            } catch (InterruptedException e) {
+              LOG.error(e);
+            }
+            LOG.info("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
+          }
+          try {
+            File fetched = fetcher.get();
+            if (fetched != null) {
+              break;
+            }
+          } catch (IOException e) {
+            LOG.error("Fetch failed: " + fetcher.getURI(), e);
+          }
+          retryNum++;
+        }
+      } finally {
+        ctx.getFetchLatch().countDown();
+      }
+
+      if (retryNum == maxRetryNum) {
+        LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
+      }
+    }
+  }
+
+  private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
+                                        List<Fetch> fetches) throws IOException {
+
+    if (fetches.size() > 0) {
+      Path inputDir = lDirAllocator.
+          getLocalPathToRead(
+              getTaskAttemptDir(ctx.getTaskId()).toString() + "/in", conf);
+      File storeDir;
+
+      int i = 0;
+      File storeFile;
+      List<Fetcher> runnerList = Lists.newArrayList();
+      for (Fetch f : fetches) {
+        storeDir = new File(inputDir.toString(), f.getName());
+        if (!storeDir.exists()) {
+          storeDir.mkdirs();
+        }
+        storeFile = new File(storeDir, "in_" + i);
+        Fetcher fetcher = new Fetcher(URI.create(f.getUrls()), storeFile);
+        runnerList.add(fetcher);
+        i++;
+      }
+      ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
+      return runnerList;
+    } else {
+      return Lists.newArrayList();
+    }
+  }
+
+  protected class Reporter implements Runnable {
+    private Interface masterStub;
+    private Thread pingThread;
+    private Object lock = new Object();
+    private static final int PROGRESS_INTERVAL = 3000;
+
+    public Reporter(Interface masterStub) {
+      this.masterStub = masterStub;
+    }
+
+    @Override
+    public void run() {
+      final int MAX_RETRIES = 3;
+      int remainingRetries = MAX_RETRIES;
+
+      while (!stopped) {
+        try {
+          synchronized (lock) {
+            if (stopped) {
+              break;
+            }
+            lock.wait(PROGRESS_INTERVAL);
+          }
+          if (stopped) {
+            break;
+          }
+          resetProgressFlag();
+
+          if (getProgressFlag()) {
+            resetProgressFlag();
+            masterStub.statusUpdate(null, getReport(), NullCallback.get());
+          } else {
+            masterStub.ping(null, taskId.getProto(), NullCallback.get());
+          }
+
+        } catch (Throwable t) {
+
+          LOG.info("Communication exception: " + StringUtils
+              .stringifyException(t));
+          remainingRetries -=1;
+          if (remainingRetries == 0) {
+            ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
+            LOG.warn("Last retry, killing ");
+            System.exit(65);
+          }
+        }
+      }
+    }
+
+    public void startCommunicationThread() {
+      if (pingThread == null) {
+        pingThread = new Thread(this, "communication thread");
+        pingThread.setDaemon(true);
+        pingThread.start();
+      }
+    }
+
+    public void stopCommunicationThread() throws InterruptedException {
+      if (pingThread != null) {
+        // Intent of the lock is to not send an interupt in the middle of an
+        // umbilical.ping or umbilical.statusUpdate
+        synchronized(lock) {
+          //Interrupt if sleeping. Otherwise wait for the RPC call to return.
+          lock.notify();
+        }
+
+        pingThread.interrupt();
+        pingThread.join();
+      }
+    }
+  }
+
+  public static final String FILECACHE = "filecache";
+  public static final String APPCACHE = "appcache";
+  public static final String USERCACHE = "usercache";
+
+  String fileCache;
+  public String getFileCacheDir() {
+    fileCache = USERCACHE + "/" + "hyunsik" + "/" + APPCACHE + "/" +
+        ConverterUtils.toString(taskId.getQueryId().getApplicationId()) +
+        "/" + "output";
+    return fileCache;
+  }
+
+  public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
+    Path workDir =
+        StorageUtil.concatPath(
+            quid.getSubQueryId().toString(),
+            String.valueOf(quid.getQueryUnitId().getId()),
+            String.valueOf(quid.getId()));
+    return workDir;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
new file mode 100644
index 0000000..05c5165
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.SubQueryId;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
+import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.ipc.MasterWorkerProtocol;
+import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService;
+import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService.Interface;
+import org.apache.tajo.rpc.CallFuture2;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.ProtoAsyncRpcClient;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.apache.tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
+
+/**
+ * The driver class for Tajo QueryUnit processing.
+ */
+public class TaskRunner extends AbstractService {
+  /** class logger */
+  private static final Log LOG = LogFactory.getLog(TaskRunner.class);
+
+  private QueryConf conf;
+
+  private volatile boolean stopped = false;
+
+  private final SubQueryId subQueryId;
+  private ApplicationId appId;
+  private final NodeId nodeId;
+  private final ContainerId containerId;
+
+  // Cluster Management
+  private MasterWorkerProtocolService.Interface master;
+
+  // for temporal or intermediate files
+  private FileSystem localFS;
+  // for input files
+  private FileSystem defaultFS;
+
+  private TajoQueryEngine queryEngine;
+
+  // TODO - this should be configurable
+  private final int coreNum = 4;
+
+  // for Fetcher
+  private final ExecutorService fetchLauncher =
+      Executors.newFixedThreadPool(coreNum * 4);
+  // It keeps all of the query unit attempts while a TaskRunner is running.
+  private final Map<QueryUnitAttemptId, Task> tasks =
+      new ConcurrentHashMap<QueryUnitAttemptId, Task>();
+  private LocalDirAllocator lDirAllocator;
+
+  // A thread to receive each assigned query unit and execute the query unit
+  private Thread taskLauncher;
+
+  // Contains the object references related for TaskRunner
+  private WorkerContext workerContext;
+  // for the doAs block
+  private UserGroupInformation taskOwner;
+
+  // for the local temporal dir
+  private String baseDir;
+  private Path baseDirPath;
+
+  public TaskRunner(
+      final SubQueryId subQueryId,
+      final NodeId nodeId,
+      UserGroupInformation taskOwner,
+      Interface master, ContainerId containerId) {
+    super(TaskRunner.class.getName());
+    this.subQueryId = subQueryId;
+    this.appId = subQueryId.getQueryId().getApplicationId();
+    this.nodeId = nodeId;
+    this.taskOwner = taskOwner;
+    this.master = master;
+    this.containerId = containerId;
+  }
+
+  @Override
+  public void init(Configuration _conf) {
+    this.conf = (QueryConf) _conf;
+
+    try {
+      this.workerContext = new WorkerContext();
+
+      // initialize DFS and LocalFileSystems
+      defaultFS = FileSystem.get(URI.create(conf.getVar(ConfVars.ROOT_DIR)),conf);
+      localFS = FileSystem.getLocal(conf);
+
+      // the base dir for an output dir
+      baseDir = ConverterUtils.toString(appId)
+          + "/output" + "/" + subQueryId.getId();
+
+      // initialize LocalDirAllocator
+      lDirAllocator = new LocalDirAllocator(ConfVars.TASK_LOCAL_DIR.varname);
+
+      baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(baseDir, conf));
+      LOG.info("TaskRunner basedir is created (" + baseDir +")");
+
+      // Setup QueryEngine according to the query plan
+      // Here, we can setup row-based query engine or columnar query engine.
+      this.queryEngine = new TajoQueryEngine(conf);
+
+      Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+    } catch (Throwable t) {
+      LOG.error(t);
+    }
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    run();
+  }
+
+  @Override
+  public void stop() {
+    if (!isStopped()) {
+      // If TaskRunner is stopped, all running or pending tasks will be marked as failed.
+      for (Task task : tasks.values()) {
+        if (task.getStatus() == TaskAttemptState.TA_PENDING ||
+            task.getStatus() == TaskAttemptState.TA_RUNNING) {
+          task.setState(TaskAttemptState.TA_FAILED);
+        }
+      }
+
+      // If this flag become true, taskLauncher will be terminated.
+      this.stopped = true;
+
+      LOG.info("STOPPED: " + nodeId);
+      synchronized (this) {
+        notifyAll();
+      }
+    }
+  }
+
+  class WorkerContext {
+    public QueryConf getConf() {
+      return conf;
+    }
+
+    public String getNodeId() {
+      return nodeId.toString();
+    }
+
+    public MasterWorkerProtocolService.Interface getMaster() {
+      return master;
+    }
+
+    public FileSystem getLocalFS() {
+      return localFS;
+    }
+
+    public FileSystem getDefaultFS() {
+      return defaultFS;
+    }
+
+    public LocalDirAllocator getLocalDirAllocator() {
+      return lDirAllocator;
+    }
+
+    public TajoQueryEngine getTQueryEngine() {
+      return queryEngine;
+    }
+
+    public Map<QueryUnitAttemptId, Task> getTasks() {
+      return tasks;
+    }
+
+    public Task getTask(QueryUnitAttemptId taskId) {
+      return tasks.get(taskId);
+    }
+
+    public ExecutorService getFetchLauncher() {
+      return fetchLauncher;
+    }
+
+    public Path getBaseDir() {
+      return baseDirPath;
+    }
+  }
+
+  static void fatalError(MasterWorkerProtocolService.Interface proxy,
+                         QueryUnitAttemptId taskAttemptId, String message) {
+    TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
+        .setId(taskAttemptId.getProto())
+        .setErrorMessage(message);
+    proxy.fatalError(null, builder.build(), NullCallback.get());
+  }
+
+  public void run() {
+    LOG.info("TaskRunner startup");
+
+    try {
+
+      taskLauncher = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          int receivedNum = 0;
+          CallFuture2<QueryUnitRequestProto> callFuture = null;
+          QueryUnitRequestProto taskRequest = null;
+
+          while(!stopped) {
+            try {
+              if (callFuture == null) {
+                callFuture = new CallFuture2<QueryUnitRequestProto>();
+                master.getTask(null, ((ContainerIdPBImpl) containerId).getProto(),
+                    callFuture);
+              }
+              try {
+                // wait for an assigning task for 3 seconds
+                taskRequest = callFuture.get(3, TimeUnit.SECONDS);
+              } catch (TimeoutException te) {
+                // if there has been no assigning task for a given period,
+                // TaskRunner will retry to request an assigning task.
+                LOG.error(te);
+
+                continue;
+              }
+
+              if (taskRequest != null) {
+                // QueryMaster can send the terminal signal to TaskRunner.
+                // If TaskRunner receives the terminal signal, TaskRunner will be terminated
+                // immediately.
+                if (taskRequest.getShouldDie()) {
+                  LOG.info("received ShouldDie flag");
+                  stop();
+
+                } else {
+
+                  LOG.info("Accumulated Received Task: " + (++receivedNum));
+
+                  QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
+                  if (tasks.containsKey(taskAttemptId)) {
+                    fatalError(master, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
+                    continue;
+                  }
+
+                  LOG.info("Initializing: " + taskAttemptId);
+                  Task task = new Task(taskAttemptId, workerContext, master,
+                      new QueryUnitRequestImpl(taskRequest));
+                  tasks.put(taskAttemptId, task);
+
+                  task.init();
+                  if (task.hasFetchPhase()) {
+                    task.fetch(); // The fetch is performed in an asynchronous way.
+                  }
+                  // task.run() is a blocking call.
+                  task.run();
+
+                  callFuture = null;
+                  taskRequest = null;
+                }
+              }
+            } catch (Throwable t) {
+              LOG.error(t);
+            }
+          }
+        }
+      });
+      taskLauncher.start();
+      taskLauncher.join();
+
+    } catch (Throwable t) {
+      LOG.fatal("Unhandled exception. Starting shutdown.", t);
+    } finally {
+      for (Task t : tasks.values()) {
+        if (t.getStatus() != TaskAttemptState.TA_SUCCEEDED) {
+          t.abort();
+        }
+      }
+    }
+  }
+
+  private class ShutdownHook implements Runnable {
+    @Override
+    public void run() {
+      LOG.info("received SIGINT Signal");
+      stop();
+    }
+  }
+
+  /**
+   * @return true if a stop has been requested.
+   */
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+  /**
+   * TaskRunner takes 5 arguments as follows:
+   * <ol>
+   * <li>1st: TaskRunnerListener hostname</li>
+   * <li>2nd: TaskRunnerListener port</li>
+   * <li>3nd: SubQueryId</li>
+   * <li>4th: NodeId</li>
+   * <li>5th: ContainerId</li>
+   * </ol>
+   */
+  public static void main(String[] args) throws Exception {
+    // Restore QueryConf
+    final QueryConf conf = new QueryConf();
+    conf.addResource(new Path(QueryConf.FILENAME));
+
+    LOG.info("MiniTajoYarn NM Local Dir: " + conf.get(ConfVars.TASK_LOCAL_DIR.varname));
+    LOG.info("OUTPUT DIR: " + conf.getOutputPath());
+    LOG.info("Tajo Root Dir: " + conf.getVar(ConfVars.ROOT_DIR));
+
+    UserGroupInformation.setConfiguration(conf);
+
+    // TaskRunnerListener's address
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+    final InetSocketAddress masterAddr =
+        NetUtils.createSocketAddrForHost(host, port);
+
+    // SubQueryId from String
+    final SubQueryId subQueryId = TajoIdUtils.newSubQueryId(args[2]);
+    // NodeId has a form of hostname:port.
+    NodeId nodeId = ConverterUtils.toNodeId(args[3]);
+    ContainerId containerId = ConverterUtils.toContainerId(args[4]);
+
+    // TODO - 'load credential' should be implemented
+    // Getting taskOwner
+    UserGroupInformation taskOwner =
+        UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.QUERY_USERNAME));
+    //taskOwner.addToken(token);
+
+    // TaskRunnerListener RPC
+    ProtoAsyncRpcClient client;
+    MasterWorkerProtocolService.Interface master;
+
+    // initialize MasterWorkerProtocol as an actual task owner.
+    client =
+        taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
+          @Override
+          public ProtoAsyncRpcClient run() throws Exception {
+            return new ProtoAsyncRpcClient(MasterWorkerProtocol.class, masterAddr);
+          }
+        });
+    master = client.getStub();
+
+
+    TaskRunner taskRunner = new TaskRunner(subQueryId, nodeId, taskOwner, master, containerId);
+    taskRunner.init(conf);
+    taskRunner.start();
+    client.close();
+    LOG.info("TaskRunner (" + nodeId + ") main thread exiting");
+    System.exit(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
new file mode 100644
index 0000000..6c93e4f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import java.io.IOException;
+
+public class FileAccessForbiddenException extends IOException {
+  private static final long serialVersionUID = -3383272565826389213L;
+
+  public FileAccessForbiddenException() {
+  }
+
+  public FileAccessForbiddenException(String message) {
+    super(message);
+  }
+
+  public FileAccessForbiddenException(Throwable cause) {
+    super(cause);
+  }
+
+  public FileAccessForbiddenException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
new file mode 100644
index 0000000..523d6e1
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+public class HttpDataServer {
+  private final static Log LOG = LogFactory.getLog(HttpDataServer.class);
+
+  private final InetSocketAddress addr;
+  private InetSocketAddress bindAddr;
+  private ServerBootstrap bootstrap = null;
+  private ChannelFactory factory = null;
+  private ChannelGroup channelGroup = null;
+
+  public HttpDataServer(final InetSocketAddress addr, 
+      final DataRetriever retriever) {
+    this.addr = addr;
+    this.factory = new NioServerSocketChannelFactory(
+        Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
+        Runtime.getRuntime().availableProcessors() * 2);
+
+    // Configure the server.
+    this.bootstrap = new ServerBootstrap(factory);
+    // Set up the event pipeline factory.
+    this.bootstrap.setPipelineFactory(
+        new HttpDataServerPipelineFactory(retriever));    
+    this.channelGroup = new DefaultChannelGroup();
+  }
+
+  public HttpDataServer(String bindaddr, DataRetriever retriever) {
+    this(NetUtils.createSocketAddr(bindaddr), retriever);
+  }
+
+  public void start() {
+    // Bind and start to accept incoming connections.
+    Channel channel = bootstrap.bind(addr);
+    channelGroup.add(channel);    
+    this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
+    LOG.info("HttpDataServer starts up ("
+        + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
+        + ")");
+  }
+  
+  public InetSocketAddress getBindAddress() {
+    return this.bindAddr;
+  }
+
+  public void stop() {
+    ChannelGroupFuture future = channelGroup.close();
+    future.awaitUninterruptibly();
+    factory.releaseExternalResources();
+
+    LOG.info("HttpDataServer shutdown ("
+        + this.bindAddr.getAddress().getHostAddress() + ":"
+        + this.bindAddr.getPort() + ")");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
new file mode 100644
index 0000000..6b9eea8
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedFile;
+import org.jboss.netty.util.CharsetUtil;
+import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+
+import java.io.*;
+import java.net.URLDecoder;
+
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
+  private final static Log LOG = LogFactory.getLog(HttpDataServer.class);
+  private final DataRetriever retriever;
+
+  public HttpDataServerHandler(DataRetriever retriever) {
+    this.retriever = retriever;
+  }
+
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+      throws Exception {
+    HttpRequest request = (HttpRequest) e.getMessage();
+    if (request.getMethod() != GET) {
+      sendError(ctx, METHOD_NOT_ALLOWED);
+      return;
+    }
+
+    FileChunk [] file;
+    try {
+      file = retriever.handle(ctx, request);
+    } catch (FileNotFoundException fnf) {
+      LOG.error(fnf);
+      sendError(ctx, NOT_FOUND);
+      return;
+    } catch (IllegalArgumentException iae) {
+      LOG.error(iae);
+      sendError(ctx, BAD_REQUEST);
+      return;
+    } catch (FileAccessForbiddenException fafe) {
+      LOG.error(fafe);
+      sendError(ctx, FORBIDDEN);
+      return;
+    } catch (IOException ioe) {
+      LOG.error(ioe);
+      sendError(ctx, INTERNAL_SERVER_ERROR);
+      return;
+    }
+
+    // Write the content.
+    Channel ch = e.getChannel();
+    if (file == null) {
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
+      ch.write(response);
+      if (!isKeepAlive(request)) {
+        ch.close();
+      }
+    }  else {
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+      long totalSize = 0;
+      for (FileChunk chunk : file) {
+        totalSize += chunk.length();
+      }
+      setContentLength(response, totalSize);
+
+      // Write the initial line and the header.
+      ch.write(response);
+
+      ChannelFuture writeFuture = null;
+
+      for (FileChunk chunk : file) {
+        writeFuture = sendFile(ctx, ch, chunk);
+        if (writeFuture == null) {
+          sendError(ctx, NOT_FOUND);
+          return;
+        }
+      }
+
+      // Decide whether to close the connection or not.
+      if (!isKeepAlive(request)) {
+        // Close the connection when the whole content is written out.
+        writeFuture.addListener(ChannelFutureListener.CLOSE);
+      }
+    }
+  }
+
+  private ChannelFuture sendFile(ChannelHandlerContext ctx, Channel ch, FileChunk file) throws IOException {
+    RandomAccessFile raf;
+    try {
+      raf = new RandomAccessFile(file.getFile(), "r");
+    } catch (FileNotFoundException fnfe) {
+      return null;
+    }
+
+    ChannelFuture writeFuture;
+    if (ch.getPipeline().get(SslHandler.class) != null) {
+      // Cannot use zero-copy with HTTPS.
+      writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(), file.length(), 8192));
+    } else {
+      // No encryption - use zero-copy.
+      final FileRegion region = new DefaultFileRegion(raf.getChannel(), file.startOffset(), file.length());
+      writeFuture = ch.write(region);
+      writeFuture.addListener(new ChannelFutureListener() {
+        public void operationComplete(ChannelFuture future) {
+          region.releaseExternalResources();
+        }
+      });
+    }
+
+    return writeFuture;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+      throws Exception {
+    Channel ch = e.getChannel();
+    Throwable cause = e.getCause();
+    if (cause instanceof TooLongFrameException) {
+      sendError(ctx, BAD_REQUEST);
+      return;
+    }
+
+    cause.printStackTrace();
+    if (ch.isConnected()) {
+      sendError(ctx, INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  public static String sanitizeUri(String uri) {
+    // Decode the path.
+    try {
+      uri = URLDecoder.decode(uri, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      try {
+        uri = URLDecoder.decode(uri, "ISO-8859-1");
+      } catch (UnsupportedEncodingException e1) {
+        throw new Error();
+      }
+    }
+
+    // Convert file separators.
+    uri = uri.replace('/', File.separatorChar);
+
+    // Simplistic dumb security check.
+    // You will have to do something serious in the production environment.
+    if (uri.contains(File.separator + ".")
+        || uri.contains("." + File.separator) || uri.startsWith(".")
+        || uri.endsWith(".")) {
+      return null;
+    }
+
+    // Convert to absolute path.
+    return uri;
+  }
+
+  private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
+    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+    response.setContent(ChannelBuffers.copiedBuffer(
+        "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
+
+    // Close the connection as soon as the error message is sent.
+    ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
new file mode 100644
index 0000000..0a47f6b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+public class HttpDataServerPipelineFactory implements ChannelPipelineFactory {
+  private final DataRetriever ret;
+
+  public HttpDataServerPipelineFactory(DataRetriever ret) {
+    this.ret = ret;
+  }
+
+  public ChannelPipeline getPipeline() throws Exception {
+    // Create a default pipeline implementation.
+    ChannelPipeline pipeline = pipeline();
+
+    // Uncomment the following line if you want HTTPS
+    // SSLEngine engine =
+    // SecureChatSslContextFactory.getServerContext().createSSLEngine();
+    // engine.setUseClientMode(false);
+    // pipeline.addLast("ssl", new SslHandler(engine));
+
+    pipeline.addLast("decoder", new HttpRequestDecoder());
+    //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
+    pipeline.addLast("encoder", new HttpResponseEncoder());
+    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
+    //pipeline.addLast("deflater", new HttpContentCompressor());
+    pipeline.addLast("handler", new HttpDataServerHandler(ret));
+    return pipeline;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
new file mode 100644
index 0000000..e688c39
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import com.google.common.collect.Maps;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.util.Map;
+
+public class HttpUtil {
+  public static Map<String,String> getParams(URI uri) throws UnsupportedEncodingException {
+    return getParamsFromQuery(uri.getQuery());
+  }
+
+  /**
+   * It parses a query string into key/value pairs
+   *
+   * @param queryString decoded query string
+   * @return key/value pairs parsed from a given query string
+   * @throws UnsupportedEncodingException
+   */
+  public static Map<String, String> getParamsFromQuery(String queryString) throws UnsupportedEncodingException {
+    String [] queries = queryString.split("&");
+
+    Map<String,String> params = Maps.newHashMap();
+    String [] param;
+    for (String q : queries) {
+      param = q.split("=");
+      params.put(param[0], param[1]);
+    }
+
+    return params;
+  }
+
+  public static String buildQuery(Map<String,String> params) throws UnsupportedEncodingException {
+    StringBuilder sb = new StringBuilder();
+
+    boolean first = true;
+    for (Map.Entry<String,String> param : params.entrySet()) {
+      if (!first) {
+        sb.append("&");
+      }
+      sb.append(URLEncoder.encode(param.getKey(), "UTF-8")).
+          append("=").
+          append(URLEncoder.encode(param.getValue(), "UTF-8"));
+      first = false;
+    }
+
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
new file mode 100644
index 0000000..d602d57
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.SubQueryId;
+import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class AdvancedDataRetriever implements DataRetriever {
+  private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class);
+  private final Map<String, RetrieverHandler> handlerMap = Maps.newConcurrentMap();
+
+  public AdvancedDataRetriever() {
+  }
+  
+  public void register(QueryUnitAttemptId id, RetrieverHandler handler) {
+    synchronized (handlerMap) {
+      if (!handlerMap.containsKey(id.toString())) {
+        handlerMap.put(id.toString(), handler);
+      }
+    } 
+  }
+  
+  public void unregister(QueryUnitAttemptId id) {
+    synchronized (handlerMap) {
+      if (handlerMap.containsKey(id.toString())) {
+        handlerMap.remove(id.toString());
+      }
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest)
+   */
+  @Override
+  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+      throws IOException {
+
+    final Map<String, List<String>> params =
+      new QueryStringDecoder(request.getUri()).getParameters();
+
+    if (!params.containsKey("qid")) {
+      throw new FileNotFoundException("No such qid: " + params.containsKey("qid"));
+    }
+
+    if (params.containsKey("sid")) {
+      List<FileChunk> chunks = Lists.newArrayList();
+      List<String> qids = splitMaps(params.get("qid"));
+      for (String qid : qids) {
+        String[] ids = qid.split("_");
+        SubQueryId suid = TajoIdUtils.newSubQueryId(params.get("sid").get(0));
+        QueryUnitId quid = new QueryUnitId(suid, Integer.parseInt(ids[0]));
+        QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid,
+            Integer.parseInt(ids[1]));
+        RetrieverHandler handler = handlerMap.get(attemptId.toString());
+        FileChunk chunk = handler.get(params);
+        chunks.add(chunk);
+      }
+      return chunks.toArray(new FileChunk[chunks.size()]);
+    } else {
+      RetrieverHandler handler = handlerMap.get(params.get("qid").get(0));
+      FileChunk chunk = handler.get(params);
+      if (chunk == null) {
+        if (params.containsKey("qid")) { // if there is no content corresponding to the query
+          return null;
+        } else { // if there is no
+          throw new FileNotFoundException("No such a file corresponding to " + params.get("qid"));
+        }
+      }
+
+      File file = chunk.getFile();
+      if (file.isHidden() || !file.exists()) {
+        throw new FileNotFoundException("No such file: " + file.getAbsolutePath());
+      }
+      if (!file.isFile()) {
+        throw new FileAccessForbiddenException(file.getAbsolutePath() + " is not file");
+      }
+
+      return new FileChunk[] {chunk};
+    }
+  }
+
+  private List<String> splitMaps(List<String> qids) {
+    if (null == qids) {
+      LOG.error("QueryUnitId is EMPTY");
+      return null;
+    }
+
+    final List<String> ret = new ArrayList<String>();
+    for (String qid : qids) {
+      Collections.addAll(ret, qid.split(","));
+    }
+    return ret;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
new file mode 100644
index 0000000..b26ba74
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+
+import java.io.IOException;
+
+public interface DataRetriever {
+  FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
new file mode 100644
index 0000000..62dabbd
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
+import org.apache.tajo.worker.dataserver.HttpDataServerHandler;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+public class DirectoryRetriever implements DataRetriever {
+  public String baseDir;
+  
+  public DirectoryRetriever(String baseDir) {
+    this.baseDir = baseDir;
+  }
+
+  @Override
+  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+      throws IOException {
+    final String path = HttpDataServerHandler.sanitizeUri(request.getUri());
+    if (path == null) {
+      throw new IllegalArgumentException("Wrong path: " +path);
+    }
+
+    File file = new File(baseDir, path);
+    if (file.isHidden() || !file.exists()) {
+      throw new FileNotFoundException("No such file: " + baseDir + "/" + path);
+    }
+    if (!file.isFile()) {
+      throw new FileAccessForbiddenException("No such file: " 
+          + baseDir + "/" + path); 
+    }
+    
+    return new FileChunk[] {new FileChunk(file, 0, file.length())};
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
new file mode 100644
index 0000000..4f11168
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+public class FileChunk {
+  private final File file;
+  private final long startOffset;
+  private final long length;
+
+  public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
+    this.file = file;
+    this.startOffset = startOffset;
+    this.length = length;
+  }
+
+  public File getFile() {
+    return this.file;
+  }
+
+  public long startOffset() {
+    return this.startOffset;
+  }
+
+  public long length() {
+    return this.length;
+  }
+
+  public String toString() {
+    return " (start=" + startOffset() + ", length=" + length + ") "
+        + file.getAbsolutePath();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
new file mode 100644
index 0000000..e5479cc
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public interface RetrieverHandler {
+  /**
+   *
+   * @param kvs url-decoded key/value pairs
+   * @return a desired part of a file
+   * @throws IOException
+   */
+  public FileChunk get(Map<String, List<String>> kvs) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/QueryConf.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/QueryConf.java b/tajo-core/tajo-core-backend/src/main/java/tajo/QueryConf.java
deleted file mode 100644
index a1e5179..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/QueryConf.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import tajo.conf.TajoConf;
-
-public class QueryConf extends TajoConf {
-  public static String FILENAME = "queryconf.xml";
-
-  public QueryConf() {
-    super();
-  }
-
-  public QueryConf(Configuration conf) {
-    super(conf);
-    if (! (conf instanceof QueryConf)) {
-      this.reloadConfiguration();
-    }
-  }
-
-  public void setUser(String username) {
-    setVar(ConfVars.QUERY_USERNAME, username);
-  }
-
-  public String getUser() {
-    return getVar(ConfVars.QUERY_USERNAME);
-  }
-
-  public void setOutputTable(String tableName) {
-    // it is determined in GlobalEngine.executeQuery().
-    setVar(ConfVars.QUERY_OUTPUT_TABLE, tableName);
-  }
-
-  public String getOutputTable() {
-    return getVar(ConfVars.QUERY_OUTPUT_TABLE);
-  }
-
-  public void setOutputPath(Path path) {
-    // it is determined in QueryMaster.initStagingDir().
-    setVar(ConfVars.QUERY_OUTPUT_DIR, path.toUri().toString());
-  }
-
-  public Path getOutputPath() {
-    return new Path(getVar(ConfVars.QUERY_OUTPUT_DIR));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/TaskAttemptContext.java b/tajo-core/tajo-core-backend/src/main/java/tajo/TaskAttemptContext.java
deleted file mode 100644
index 1e87341..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/TaskAttemptContext.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import tajo.TajoProtos.TaskAttemptState;
-import tajo.catalog.statistics.TableStat;
-import tajo.conf.TajoConf;
-import tajo.storage.Fragment;
-
-import java.io.File;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.CountDownLatch;
-
-
-/**
- * Contains the information about executing subquery.
- */
-public class TaskAttemptContext {
-  private static final Log LOG = LogFactory.getLog(TaskAttemptContext.class);
-  private final TajoConf conf;
-  private final Map<String, List<Fragment>> fragmentMap = new HashMap<String, List<Fragment>>();
-
-  private TaskAttemptState state;
-  private TableStat resultStats;
-  private QueryUnitAttemptId queryId;
-  private final Path workDir;
-  private boolean needFetch = false;
-  private CountDownLatch doneFetchPhaseSignal;
-  private float progress = 0;
-  private Map<Integer, String> repartitions;
-  private File fetchIn;
-  private boolean stopped = false;
-  private boolean interQuery = false;
-  private Path outputPath;
-
-  public TaskAttemptContext(TajoConf conf, final QueryUnitAttemptId queryId,
-                            final Fragment[] fragments,
-                            final Path workDir) {
-    this.conf = conf;
-    this.queryId = queryId;
-    
-    for(Fragment t : fragments) {
-      if (fragmentMap.containsKey(t.getId())) {
-        fragmentMap.get(t.getId()).add(t);
-      } else {
-        List<Fragment> frags = new ArrayList<Fragment>();
-        frags.add(t);
-        fragmentMap.put(t.getId(), frags);
-      }
-    }
-
-    this.workDir = workDir;
-    this.repartitions = Maps.newHashMap();
-    
-    state = TaskAttemptState.TA_PENDING;
-  }
-
-  public TajoConf getConf() {
-    return this.conf;
-  }
-  
-  public TaskAttemptState getState() {
-    return this.state;
-  }
-  
-  public void setState(TaskAttemptState state) {
-    this.state = state;
-    LOG.info("Query status of " + getTaskId() + " is changed to " + state);
-  }
-
-  public boolean hasResultStats() {
-    return resultStats != null;
-  }
-
-  public void setResultStats(TableStat stats) {
-    this.resultStats = stats;
-  }
-
-  public TableStat getResultStats() {
-    return this.resultStats;
-  }
-  
-  public boolean isStopped() {
-    return this.stopped;
-  }
-
-  public void setInterQuery() {
-    this.interQuery = true;
-  }
-
-  public void setOutputPath(Path outputPath) {
-    this.outputPath = outputPath;
-  }
-
-  public Path getOutputPath() {
-    return this.outputPath;
-  }
-
-  public boolean isInterQuery() {
-    return this.interQuery;
-  }
-  
-  public void stop() {
-    this.stopped = true;
-  }
-  
-  public void addFetchPhase(int count, File fetchIn) {
-    this.needFetch = true;
-    this.doneFetchPhaseSignal = new CountDownLatch(count);
-    this.fetchIn = fetchIn;
-  }
-  
-  public File getFetchIn() {
-    return this.fetchIn;
-  }
-  
-  public boolean hasFetchPhase() {
-    return this.needFetch;
-  }
-  
-  public CountDownLatch getFetchLatch() {
-    return doneFetchPhaseSignal;
-  }
-  
-  public void addRepartition(int partKey, String path) {
-    repartitions.put(partKey, path);
-  }
-  
-  public Iterator<Entry<Integer,String>> getRepartitions() {
-    return repartitions.entrySet().iterator();
-  }
-  
-  public void changeFragment(String tableId, Fragment [] fragments) {
-    fragmentMap.remove(tableId);
-    for(Fragment t : fragments) {
-      if (fragmentMap.containsKey(t.getId())) {
-        fragmentMap.get(t.getId()).add(t);
-      } else {
-        List<Fragment> frags = new ArrayList<Fragment>();
-        frags.add(t);
-        fragmentMap.put(t.getId(), frags);
-      }
-    }
-  }
-  
-  public Path getWorkDir() {
-    return this.workDir;
-  }
-  
-  public QueryUnitAttemptId getTaskId() {
-    return this.queryId;
-  }
-  
-  public float getProgress() {
-    return this.progress;
-  }
-  
-  public void setProgress(float progress) {
-    this.progress = progress;
-  }
-
-  public Fragment getTable(String id) {
-    return fragmentMap.get(id).get(0);
-  }
-
-  public int getFragmentSize() {
-    return fragmentMap.size();
-  }
-
-  public Collection<String> getInputTables() {
-    return fragmentMap.keySet();
-  }
-  
-  public Fragment [] getTables(String id) {
-    return fragmentMap.get(id).toArray(new Fragment[fragmentMap.get(id).size()]);
-  }
-  
-  public int hashCode() {
-    return Objects.hashCode(queryId);
-  }
-  
-  public boolean equals(Object obj) {
-    if (obj instanceof TaskAttemptContext) {
-      TaskAttemptContext other = (TaskAttemptContext) obj;
-      return queryId.equals(other.getTaskId());
-    } else {
-      return false;
-    }
-  }
-}
\ No newline at end of file


Mime
View raw message