incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1365722 [10/11] - in /incubator/hcatalog/trunk: ./ ant/ conf/ hcatalog-pig-adapter/ ivy/ src/docs/src/documentation/content/xdocs/ src/docs/src/documentation/content/xdocs/images/ src/java/org/apache/hcatalog/mapreduce/ src/test/e2e/temple...
Date Wed, 25 Jul 2012 20:29:49 GMT
Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobState.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobState.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobState.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobState.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The persistent state of a job.  The state is stored in one of the
+ * supported storage systems.
+ */
+public class JobState {
+
+    private static final Log LOG = LogFactory.getLog(JobState.class);
+
+    private String id;
+
+    // Storage is instantiated in the constructor
+    private TempletonStorage storage = null;
+
+    private static TempletonStorage.Type type = TempletonStorage.Type.JOB;
+
+    private Configuration config = null;
+
+    public JobState(String id, Configuration conf)
+        throws IOException
+    {
+        this.id = id;
+        config = conf;
+        storage = getStorage(conf);
+    }
+
+    public void delete()
+        throws IOException
+    {
+        try {
+            storage.delete(type, id);
+        } catch (Exception e) {
+            // Error getting children of node -- probably node has been deleted
+            LOG.info("Couldn't delete " + id);
+        }
+    }
+
+    /**
+     * Get an instance of the selected storage class.  Defaults to
+     * HDFS storage if none is specified.
+     */
+    public static TempletonStorage getStorageInstance(Configuration conf) {
+        TempletonStorage storage = null;
+        try {
+            storage = (TempletonStorage)
+                Class.forName(conf.get(TempletonStorage.STORAGE_CLASS))
+                    .newInstance();
+        } catch (Exception e) {
+            LOG.warn("No storage method found: " + e.getMessage());
+            try {
+                storage = new HDFSStorage();
+            } catch (Exception ex) {
+                LOG.error("Couldn't create storage.");
+            }
+        }
+        return storage;
+    }
+
+    /**
+     * Get an open instance of the selected storage class.  Defaults
+     * to HDFS storage if none is specified.
+     */
+    public static TempletonStorage getStorage(Configuration conf) throws IOException {
+        TempletonStorage storage = getStorageInstance(conf);
+        storage.openStorage(conf);
+        return storage;
+    }
+
+    /**
+     * For storage methods that require a connection, this is a hint
+     * that it's time to close the connection.
+     */
+    public void close() throws IOException {
+        storage.closeStorage();
+    }
+
+    //
+    // Properties
+    //
+
+    /**
+     * This job id.
+     */
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * The percent complete of a job
+     */
+    public String  getPercentComplete()
+        throws IOException
+    {
+        return getField("percentComplete");
+    }
+    public void setPercentComplete(String percent)
+        throws IOException
+    {
+        setField("percentComplete", percent);
+    }
+
+    /**
+     * The child id of TempletonControllerJob
+     */
+    public String  getChildId()
+        throws IOException
+    {
+        return getField("childid");
+    }
+    public void setChildId(String childid)
+        throws IOException
+    {
+        setField("childid", childid);
+    }
+
+    /**
+     * Add a jobid to the list of children of this job.
+     *
+     * @param jobid
+     * @throws IOException
+     */
+    public void addChild(String jobid) throws IOException {
+        String jobids = "";
+        try {
+            jobids = getField("children");
+        } catch (Exception e) {
+            // There are none or they're not readable.
+        }
+        if (!jobids.equals("")) {
+            jobids += ",";
+        }
+        jobids += jobid;
+        setField("children", jobids);
+    }
+
+    /**
+     * Get a list of jobstates for jobs that are children of this job.
+     * @throws IOException
+     */
+    public List<JobState> getChildren() throws IOException {
+        ArrayList<JobState> children = new ArrayList<JobState>();
+        for (String jobid : getField("children").split(",")) {
+            children.add(new JobState(jobid, config));
+        }
+        return children;
+    }
+
+    /**
+     * Save a comma-separated list of jobids that are children
+     * of this job.
+     * @param jobids
+     * @throws IOException
+     */
+    public void setChildren(String jobids) throws IOException {
+        setField("children", jobids);
+    }
+
+    /**
+     * Set the list of child jobs of this job
+     * @param children
+     */
+    public void setChildren(List<JobState> children) throws IOException {
+        String val = "";
+        for (JobState jobstate : children) {
+            if (!val.equals("")) {
+                val += ",";
+            }
+            val += jobstate.getId();
+        }
+        setField("children", val);
+    }
+
+    /**
+     * The system exit value of the job.
+     */
+    public Long getExitValue()
+        throws IOException
+    {
+        return getLongField("exitValue");
+    }
+    public void setExitValue(long exitValue)
+        throws IOException
+    {
+        setLongField("exitValue", exitValue);
+    }
+
+    /**
+     * When this job was created.
+     */
+    public Long getCreated()
+        throws IOException
+    {
+        return getLongField("created");
+    }
+    public void setCreated(long created)
+        throws IOException
+    {
+        setLongField("created", created);
+    }
+
+    /**
+     * The user who started this job.
+     */
+    public String getUser()
+        throws IOException
+    {
+        return getField("user");
+    }
+    public void setUser(String user)
+        throws IOException
+    {
+        setField("user", user);
+    }
+
+    /**
+     * The url callback
+     */
+    public String getCallback()
+        throws IOException
+    {
+        return getField("callback");
+    }
+    public void setCallback(String callback)
+        throws IOException
+    {
+        setField("callback", callback);
+    }
+
+    /**
+     * The status of a job once it is completed.
+     */
+    public String getCompleteStatus()
+        throws IOException
+    {
+        return getField("completed");
+    }
+    public void setCompleteStatus(String complete)
+        throws IOException
+    {
+        setField("completed", complete);
+    }
+
+    /**
+     * The time when the callback was sent.
+     */
+    public Long getNotifiedTime()
+        throws IOException
+    {
+        return getLongField("notified");
+    }
+    public void setNotifiedTime(long notified)
+        throws IOException
+    {
+        setLongField("notified", notified);
+    }
+
+    //
+    // Helpers
+    //
+
+    /**
+     * Fetch an integer field from the store.
+     */
+    public Long getLongField(String name)
+        throws IOException
+    {
+        String s = storage.getField(type, id, name);
+        if (s == null)
+            return null;
+        else {
+            try {
+                return new Long(s);
+            } catch (NumberFormatException e) {
+                LOG.error("templeton: bug " + name + " " + s + " : "+ e);
+                return null;
+            }
+        }
+    }
+
+    /**
+     * Store a String field from the store.
+     */
+    public void setField(String name, String val)
+        throws IOException
+    {
+        try {
+            storage.saveField(type, id, name, val);
+        } catch (NotFoundException ne) {
+            throw new IOException(ne.getMessage());
+        }
+    }
+
+    public String getField(String name)
+        throws IOException
+    {
+        return storage.getField(type, id, name);
+    }
+
+    /**
+     * Store a long field.
+     *
+     * @param name
+     * @param val
+     * @throws IOException
+     */
+    public void setLongField(String name, long val)
+        throws IOException
+    {
+        try {
+            storage.saveField(type, id, name, String.valueOf(val));
+        } catch (NotFoundException ne) {
+            throw new IOException("Job " + id + " was not found: " +
+                                  ne.getMessage());
+        }
+    }
+
+    /**
+     * Get an id for each currently existing job, which can be used to create
+     * a JobState object.
+     *
+     * @param conf
+     * @throws IOException
+     */
+    public static List<String> getJobs(Configuration conf) throws IOException {
+        try {
+            return getStorage(conf).getAllForType(type);
+        } catch (Exception e) {
+            throw new IOException("Can't get jobs", e);
+        }
+    }
+}

Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobStateTracker.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobStateTracker.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobStateTracker.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/JobStateTracker.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class JobStateTracker {
+    // The path to the tracking root
+    private String job_trackingroot = null;
+
+    // The zookeeper connection to use
+    private ZooKeeper zk;
+
+    // The id of the tracking node -- must be a SEQUENTIAL node
+    private String trackingnode;
+
+    // The id of the job this tracking node represents
+    private String jobid;
+
+    // The logger
+    private static final Log LOG = LogFactory.getLog(JobStateTracker.class);
+
+    /**
+     * Constructor for a new node -- takes the jobid of an existing job
+     *
+     */
+    public JobStateTracker(String node, ZooKeeper zk, boolean nodeIsTracker,
+            String job_trackingpath) {
+        this.zk = zk;
+        if (nodeIsTracker) {
+            trackingnode = node;
+        } else {
+            jobid = node;
+        }
+        job_trackingroot = job_trackingpath;
+    }
+
+    /**
+     * Create the parent znode for this job state.
+     */
+    public void create()
+        throws IOException
+    {
+        String[] paths = ZooKeeperStorage.getPaths(job_trackingroot);
+        for (String znode : paths) {
+            try {
+                zk.create(znode, new byte[0],
+                          Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            } catch (KeeperException.NodeExistsException e) {
+            } catch (Exception e) {
+                throw new IOException("Unable to create parent nodes");
+            }
+        }
+        try {
+            trackingnode = zk.create(makeTrackingZnode(), jobid.getBytes(),
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+        } catch (Exception e) {
+            throw new IOException("Unable to create " + makeTrackingZnode());
+        }
+    }
+
+    public void delete()
+        throws IOException
+    {
+        try {
+            zk.delete(makeTrackingJobZnode(trackingnode), -1);
+        } catch (Exception e) {
+            // Might have been deleted already
+            LOG.info("Couldn't delete " + makeTrackingJobZnode(trackingnode));
+        }
+    }
+
+    /**
+     * Get the jobid for this tracking node
+     * @throws IOException
+     */
+    public String getJobID() throws IOException {
+        try {
+            return new String(zk.getData(makeTrackingJobZnode(trackingnode),
+                    false, new Stat()));
+        } catch (KeeperException e) {
+            // It was deleted during the transaction
+            throw new IOException("Node already deleted " + trackingnode);
+        } catch (InterruptedException e) {
+            throw new IOException("Couldn't read node " + trackingnode);
+        }
+    }
+
+    /**
+     * Make a ZK path to a new tracking node
+     */
+    public String makeTrackingZnode() {
+        return job_trackingroot + "/";
+    }
+
+    /**
+     * Make a ZK path to an existing tracking node
+     */
+    public String makeTrackingJobZnode(String nodename) {
+        return job_trackingroot + "/" + nodename;
+    }
+
+    /*
+     * Get the list of tracking jobs.  These can be used to determine which jobs have
+     * expired.
+     */
+    public static List<String> getTrackingJobs(Configuration conf, ZooKeeper zk)
+            throws IOException {
+        ArrayList<String> jobs = new ArrayList<String>();
+        try {
+            for (String myid : zk.getChildren(
+                    conf.get(TempletonStorage.STORAGE_ROOT)
+                    + ZooKeeperStorage.TRACKINGDIR, false)) {
+                jobs.add(myid);
+            }
+        } catch (Exception e) {
+            throw new IOException("Can't get tracking children", e);
+        }
+        return jobs;
+    }
+}

Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NotFoundException.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NotFoundException.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NotFoundException.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NotFoundException.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+/**
+ * Simple not found exception.
+ */
+public class NotFoundException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public NotFoundException(String msg) {
+        super(msg);
+    }
+}

Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullRecordReader.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullRecordReader.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullRecordReader.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * An empty record reader.
+ */
+public class NullRecordReader
+    extends RecordReader<NullWritable, NullWritable>
+{
+    @Override
+    public void initialize(InputSplit genericSplit, TaskAttemptContext context)
+        throws IOException
+    {}
+
+    @Override
+    public void close() throws IOException {}
+
+    @Override
+    public NullWritable getCurrentKey() {
+        return NullWritable.get();
+    }
+
+    @Override
+    public NullWritable getCurrentValue() {
+        return NullWritable.get();
+    }
+
+    @Override
+    public float getProgress() { return 1.0f; }
+
+    @Override
+    public boolean nextKeyValue() throws IOException {
+        return false;
+    }
+}
+

Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullSplit.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullSplit.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullSplit.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/NullSplit.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * An empty splitter.
+ */
+public class NullSplit extends InputSplit implements Writable {
+    public long getLength() { return 0; }
+
+    public String[] getLocations() throws IOException {
+        return new String[]{};
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {}
+
+    @Override
+    public void readFields(DataInput in) throws IOException {}
+}
+

Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/SingleInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/SingleInputFormat.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/SingleInputFormat.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/SingleInputFormat.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * An empty InputFormat.
+ */
+public class SingleInputFormat
+    extends InputFormat<NullWritable, NullWritable>
+{
+    public List<InputSplit> getSplits(JobContext job)
+        throws IOException
+    {
+        List<InputSplit> res = new ArrayList<InputSplit>();
+        res.add(new NullSplit());
+        return res;
+    }
+
+    public RecordReader<NullWritable, NullWritable>
+        createRecordReader(InputSplit split,
+                           TaskAttemptContext context)
+        throws IOException
+    {
+        return new NullRecordReader();
+    }
+}
+

Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hcatalog.templeton.SecureProxySupport;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+
+/**
+ * A Map Reduce job that will start another job.
+ *
+ * We have a single Mapper job that starts a child MR job.  The parent
+ * monitors the child child job and ends when the child job exits.  In
+ * addition, we
+ *
+ * - write out the parent job id so the caller can record it.
+ * - run a keep alive thread so the job doesn't end.
+ * - Optionally, store the stdout, stderr, and exit value of the child
+ *   in hdfs files.
+ */
+public class TempletonControllerJob extends Configured implements Tool {
+    static enum ControllerCounters { SIMPLE_COUNTER };
+
+    public static final String COPY_NAME      = "templeton.copy";
+    public static final String STATUSDIR_NAME = "templeton.statusdir";
+    public static final String JAR_ARGS_NAME  = "templeton.args";
+    public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath";
+
+    public static final String STDOUT_FNAME  = "stdout";
+    public static final String STDERR_FNAME  = "stderr";
+    public static final String EXIT_FNAME    = "exit";
+
+    public static final int WATCHER_TIMEOUT_SECS = 10;
+    public static final int KEEP_ALIVE_MSEC      = 60 * 1000;
+
+    private static TrivialExecService execService = TrivialExecService.getInstance();
+
+    private static final Log LOG = LogFactory.getLog(TempletonControllerJob.class);
+    
+    
+    public static class LaunchMapper
+        extends Mapper<NullWritable, NullWritable, Text, Text>
+    {
+        protected Process startJob(Context context, String user,
+                                   String overrideClasspath)
+            throws IOException, InterruptedException
+        {
+            Configuration conf = context.getConfiguration();
+            copyLocal(COPY_NAME, conf);
+            String[] jarArgs
+                = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME));
+
+            ArrayList<String> removeEnv = new ArrayList<String>();
+            removeEnv.add("HADOOP_ROOT_LOGGER");
+            Map<String, String> env = TempletonUtils.hadoopUserEnv(user,
+                                                                   overrideClasspath);
+            List<String> jarArgsList = new LinkedList<String>(Arrays.asList(jarArgs));
+            String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
+            if(tokenFile != null){
+                jarArgsList.add(1, "-Dmapreduce.job.credentials.binary=" + tokenFile );
+            }
+            return execService.run(jarArgsList, removeEnv, env);
+        }
+
+        private void copyLocal(String var, Configuration conf)
+            throws IOException
+        {
+            String[] filenames = TempletonUtils.decodeArray(conf.get(var));
+            if (filenames != null) {
+                for (String filename : filenames) {
+                    Path src = new Path(filename);
+                    Path dst = new Path(src.getName());
+                    FileSystem fs = src.getFileSystem(conf);
+                    System.err.println("templeton: copy " + src + " => " + dst);
+                    fs.copyToLocalFile(src, dst);
+                }
+            }
+        }
+
+        @Override
+        public void run(Context context)
+            throws IOException, InterruptedException
+        {
+
+            Configuration conf = context.getConfiguration();
+
+            Process proc = startJob(context,
+                                    conf.get("user.name"),
+                                    conf.get(OVERRIDE_CLASSPATH));
+
+            String statusdir = conf.get(STATUSDIR_NAME);
+            Counter cnt = context.getCounter(ControllerCounters.SIMPLE_COUNTER);
+
+            ExecutorService pool = Executors.newCachedThreadPool();
+            executeWatcher(pool, conf, context.getJobID(),
+                           proc.getInputStream(), statusdir, STDOUT_FNAME);
+            executeWatcher(pool, conf, context.getJobID(),
+                           proc.getErrorStream(), statusdir, STDERR_FNAME);
+            KeepAlive keepAlive = startCounterKeepAlive(pool, cnt);
+
+            proc.waitFor();
+            keepAlive.sendReport = false;
+            pool.shutdown();
+            if (! pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS))
+                pool.shutdownNow();
+
+            writeExitValue(conf, proc.exitValue(), statusdir);
+            JobState state = new JobState(context.getJobID().toString(), conf);
+            state.setExitValue(proc.exitValue());
+            state.setCompleteStatus("done");
+            state.close();
+
+            if (proc.exitValue() != 0)
+                System.err.println("templeton: job failed with exit code "
+                                   + proc.exitValue());
+            else
+                System.err.println("templeton: job completed with exit code 0");
+        }
+
+        private void executeWatcher(ExecutorService pool, Configuration conf,
+                                    JobID jobid, InputStream in, String statusdir,
+                                    String name)
+            throws IOException
+        {
+            Watcher w = new Watcher(conf, jobid, in, statusdir, name);
+            pool.execute(w);
+        }
+
+        private KeepAlive startCounterKeepAlive(ExecutorService pool, Counter cnt)
+            throws IOException
+        {
+            KeepAlive k = new KeepAlive(cnt);
+            pool.execute(k);
+            return k;
+        }
+
+        private void writeExitValue(Configuration conf, int exitValue, String statusdir)
+            throws IOException
+        {
+            if (TempletonUtils.isset(statusdir)) {
+                Path p = new Path(statusdir, EXIT_FNAME);
+                FileSystem fs = p.getFileSystem(conf);
+                OutputStream out = fs.create(p);
+                System.err.println("templeton: Writing exit value "
+                                   + exitValue + " to " + p);
+                PrintWriter writer = new PrintWriter(out);
+                writer.println(exitValue);
+                writer.close();
+            }
+        }
+    }
+
+    public static class Watcher implements Runnable {
+        private InputStream in;
+        private OutputStream out;
+        private JobID jobid;
+        private Configuration conf;
+
+        public Watcher(Configuration conf, JobID jobid, InputStream in,
+                       String statusdir, String name)
+            throws IOException
+        {
+            this.conf = conf;
+            this.jobid = jobid;
+            this.in = in;
+
+            if (name.equals(STDERR_FNAME))
+                out = System.err;
+            else
+                out = System.out;
+
+            if (TempletonUtils.isset(statusdir)) {
+                Path p = new Path(statusdir, name);
+                FileSystem fs = p.getFileSystem(conf);
+                out = fs.create(p);
+                System.err.println("templeton: Writing status to " + p);
+            }
+        }
+
+        @Override
+        public void run() {
+            try {
+                InputStreamReader isr = new InputStreamReader(in);
+                BufferedReader reader = new BufferedReader(isr);
+                PrintWriter writer = new PrintWriter(out);
+
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    writer.println(line);
+                    JobState state = null;
+                    try {
+                        String percent = TempletonUtils.extractPercentComplete(line);
+                        String childid = TempletonUtils.extractChildJobId(line);
+
+                        if (percent != null || childid != null) {
+                            state = new JobState(jobid.toString(), conf);
+                            state.setPercentComplete(percent);
+                            state.setChildId(childid);
+                        }
+                    } catch (IOException e) {
+                        System.err.println("templeton: state error: " + e);
+                    } finally {
+                        if (state != null) {
+                            try {
+                                state.close();
+                            } catch (IOException e) {
+                            }
+                        }
+                    }
+                }
+                writer.flush();
+            } catch (IOException e) {
+                System.err.println("templeton: execute error: " + e);
+            }
+        }
+    }
+
+    public static class KeepAlive implements Runnable {
+        private Counter cnt;
+        public boolean sendReport;
+
+        public KeepAlive(Counter cnt)
+        {
+            this.cnt = cnt;
+            this.sendReport = true;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (sendReport) {
+                    cnt.increment(1);
+                    Thread.sleep(KEEP_ALIVE_MSEC);
+                }
+            } catch (InterruptedException e) {
+                // Ok to be interrupted
+            }
+        }
+    }
+
+    private JobID submittedJobId;
+    public String getSubmittedId() {
+        if (submittedJobId == null)
+            return null;
+        else
+            return submittedJobId.toString();
+    }
+
+    /**
+     * Enqueue the job and print out the job id for later collection.
+     */
+    @Override
+    public int run(String[] args)
+        throws IOException, InterruptedException, ClassNotFoundException
+    {
+        Configuration conf = getConf();
+        conf.set(JAR_ARGS_NAME, TempletonUtils.encodeArray(args));
+        conf.set("user.name", UserGroupInformation.getCurrentUser().getShortUserName());
+        Job job = new Job(conf);
+        job.setJarByClass(TempletonControllerJob.class);
+        job.setJobName("TempletonControllerJob");
+        job.setMapperClass(LaunchMapper.class);
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(Text.class);
+        job.setInputFormatClass(SingleInputFormat.class);
+        NullOutputFormat<NullWritable, NullWritable> of
+            = new NullOutputFormat<NullWritable, NullWritable>();
+        job.setOutputFormatClass(of.getClass());
+        job.setNumReduceTasks(0);
+        
+        JobClient jc = new JobClient(new JobConf(job.getConfiguration()));
+        
+        Token<DelegationTokenIdentifier> mrdt = jc.getDelegationToken(new Text("mr token"));
+        job.getCredentials().addToken(new Text("mr token"), mrdt);
+        job.submit();
+
+        submittedJobId = job.getJobID();
+
+        return 0;
+    }
+
+    
+    public static void main(String[] args) throws Exception {
+        int ret = ToolRunner.run(new TempletonControllerJob(), args);
+        if (ret != 0)
+            System.err.println("TempletonControllerJob failed!");
+        System.exit(ret);
+    }
+}

Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonStorage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonStorage.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonStorage.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonStorage.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An interface to handle different Templeton storage methods, including
+ * ZooKeeper and HDFS.  Any storage scheme must be able to handle being
+ * run in an HDFS environment, where specific file systems and virtual
+ * machines may not be available.
+ *
+ * Storage is done individually in a hierarchy: type (the data type,
+ * as listed below), then the id (a given jobid, jobtrackingid, etc.),
+ * then the key/value pairs.  So an entry might look like:
+ *
+ * JOB
+ *   jobid00035
+ *     user -&gt; rachel
+ *     datecreated -&gt; 2/5/12
+ *     etc.
+ *
+ * Each field must be available to be fetched/changed individually.
+ */
+public interface TempletonStorage {
+    // These are the possible types referenced by 'type' below.
+    public enum Type {
+        UNKNOWN, JOB, JOBTRACKING, TEMPLETONOVERHEAD
+    }
+    
+    public static final String STORAGE_CLASS    = "templeton.storage.class";
+    public static final String STORAGE_ROOT     = "templeton.storage.root";
+
+    /**
+     * Start the cleanup process for this storage type.
+     * @param config
+     */
+    public void startCleanup(Configuration config);
+    
+    /**
+     * Save a single key/value pair for a specific job id.
+     * @param type The data type (as listed above)
+     * @param id The String id of this data grouping (jobid, etc.)
+     * @param key The name of the field to save
+     * @param val The value of the field to save
+     */
+    public void saveField(Type type, String id, String key, String val)
+        throws NotFoundException;
+
+    /**
+     * Get the value of one field for a given data type.  If the type
+     * is UNKNOWN, search for the id in all types.
+     * @param type The data type (as listed above)
+     * @param id The String id of this data grouping (jobid, etc.)
+     * @param key The name of the field to retrieve
+     * @return The value of the field requested, or null if not
+     * found.
+     */
+    public String getField(Type type, String id, String key);
+
+    /**
+     * Get all the name/value pairs stored for this id.
+     * Be careful using getFields() -- optimistic locking will mean that
+     * your odds of a conflict are decreased if you read/write one field
+     * at a time.  getFields() is intended for read-only usage.
+     *
+     * If the type is UNKNOWN, search for the id in all types.
+     *
+     * @param type The data type (as listed above)
+     * @param id The String id of this data grouping (jobid, etc.)
+     * @return A Map of key/value pairs found for this type/id.
+     */
+    public Map<String, String> getFields(Type type, String id);
+
+    /**
+     * Delete a data grouping (all data for a jobid, all tracking data
+     * for a job, etc.).  If the type is UNKNOWN, search for the id
+     * in all types.
+     *
+     * @param type The data type (as listed above)
+     * @param id The String id of this data grouping (jobid, etc.)
+     * @return True if successful, false if not, throws NotFoundException
+     * if the id wasn't found.
+     */
+    public boolean delete(Type type, String id) throws NotFoundException;
+
+    /**
+     * Get the id of each data grouping in the storage system.
+     *
+     * @return An ArrayList<String> of ids.
+     */
+    public List<String> getAll();
+
+    /**
+     * Get the id of each data grouping of a given type in the storage
+     * system.
+     * @param type The data type (as listed above)
+     * @return An ArrayList<String> of ids.
+     */
+    public List<String> getAllForType(Type type);
+
+    /**
+     * Get the id of each data grouping that has the specific key/value
+     * pair.
+     * @param key The name of the field to search for
+     * @param value The value of the field to search for
+     * @return An ArrayList<String> of ids.
+     */
+    public List<String> getAllForKey(String key, String value);
+
+    /**
+     * Get the id of each data grouping of a given type that has the
+     * specific key/value pair.
+     * @param type The data type (as listed above)
+     * @param key The name of the field to search for
+     * @param value The value of the field to search for
+     * @return An ArrayList<String> of ids.
+     */
+    public List<String> getAllForTypeAndKey(Type type, String key,
+                                            String value);
+
+    /**
+     * For storage methods that require a connection, this is a hint
+     * that it's time to open a connection.
+     */
+    public void openStorage(Configuration config) throws IOException;
+
+    /**
+     * For storage methods that require a connection, this is a hint
+     * that it's time to close the connection.
+     */
+    public void closeStorage() throws IOException;
+}

Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * General utility methods.
+ */
+public class TempletonUtils {
+    /**
+     * Is the object non-empty?
+     */
+    public static boolean isset(String s) {
+        return (s != null) && (s.length() > 0);
+    }
+
+    /**
+     * Is the object non-empty?
+     */
+    public static boolean isset(char ch) {
+        return (ch != 0);
+    }
+
+    /**
+     * Is the object non-empty?
+     */
+    public static <T> boolean isset(T[] a) {
+        return (a != null) && (a.length > 0);
+    }
+
+
+    /**
+     * Is the object non-empty?
+     */
+    public static <T> boolean isset(Collection<T> col) {
+        return (col != null) && (! col.isEmpty());
+    }
+
+    /**
+     * Is the object non-empty?
+     */
+    public static <K, V> boolean isset(Map<K, V> col) {
+        return (col != null) && (! col.isEmpty());
+    }
+
+
+    public static final Pattern JAR_COMPLETE
+        = Pattern.compile(" map \\d+%\\s+reduce \\d+%$");
+    public static final Pattern PIG_COMPLETE = Pattern.compile(" \\d+% complete$");
+
+    /**
+     * Extract the percent complete line from Pig or Jar jobs.
+     */
+    public static String extractPercentComplete(String line) {
+        Matcher jar = JAR_COMPLETE.matcher(line);
+        if (jar.find())
+            return jar.group().trim();
+
+        Matcher pig = PIG_COMPLETE.matcher(line);
+        if (pig.find())
+            return pig.group().trim();
+
+        return null;
+    }
+
+    public static final Pattern JAR_ID = Pattern.compile(" Running job: (\\S+)$");
+    public static final Pattern PIG_ID = Pattern.compile(" HadoopJobId: (\\S+)$");
+    public static final Pattern[] ID_PATTERNS = {JAR_ID, PIG_ID};
+
+    /**
+     * Extract the job id from jar jobs.
+     */
+    public static String extractChildJobId(String line) {
+        for (Pattern p : ID_PATTERNS) {
+            Matcher m = p.matcher(line);
+            if (m.find())
+                return m.group(1);
+        }
+
+        return null;
+    }
+
+    /**
+     * Take an array of strings and encode it into one string.
+     */
+    public static String encodeArray(String[] plain) {
+        if (plain == null)
+            return null;
+
+        String[] escaped = new String[plain.length];
+
+        for (int i = 0; i < plain.length; ++i) {
+            if (plain[i] == null) {
+                plain[i] = "";
+            }
+            escaped[i] = StringUtils.escapeString(plain[i]);
+        }
+
+        return StringUtils.arrayToString(escaped);
+    }
+
+    /**
+     * Encode a List into a string.
+     */
+    public static String encodeArray(List<String> list) {
+        if (list == null)
+            return null;
+        String[] array = new String[list.size()];
+        return encodeArray(list.toArray(array));
+    }
+
+    /**
+     * Take an encode strings and decode it into an array of strings.
+     */
+    public static String[] decodeArray(String s) {
+        if (s == null)
+            return null;
+
+        String[] escaped = StringUtils.split(s);
+        String[] plain = new String[escaped.length];
+
+        for (int i = 0; i < escaped.length; ++i)
+            plain[i] = StringUtils.unEscapeString(escaped[i]);
+
+        return plain;
+    }
+
+    public static String[] hadoopFsListAsArray(String files, Configuration conf,
+                                               String user)
+        throws URISyntaxException, FileNotFoundException, IOException,
+        InterruptedException
+    {
+        if (files == null || conf == null) {
+            return null;
+        }
+        String[] dirty = files.split(",");
+        String[] clean = new String[dirty.length];
+
+        for (int i = 0; i < dirty.length; ++i)
+            clean[i] = hadoopFsFilename(dirty[i], conf, user);
+
+        return clean;
+    }
+
+    public static String hadoopFsListAsString(String files, Configuration conf,
+                                              String user)
+        throws URISyntaxException, FileNotFoundException, IOException,
+        InterruptedException
+    {
+        if (files == null || conf == null) {
+            return null;
+        }
+        return StringUtils.arrayToString(hadoopFsListAsArray(files, conf, user));
+    }
+
+    public static String hadoopFsFilename(String fname, Configuration conf, String user)
+        throws URISyntaxException, FileNotFoundException, IOException,
+        InterruptedException
+    {
+        Path p = hadoopFsPath(fname, conf, user);
+        if (p == null)
+            return null;
+        else
+            return p.toString();
+    }
+
+    /**
+     * @return true iff we are sure the file is not there.
+     */
+    public static boolean hadoopFsIsMissing(FileSystem fs, Path p) {
+        try {
+            return ! fs.exists(p);
+        } catch(Throwable t) {
+            // Got an error, might be there anyway due to a
+            // permissions problem.
+            return false;
+        }
+    }
+
+    public static Path hadoopFsPath(String fname, Configuration conf, String user)
+        throws URISyntaxException, FileNotFoundException, IOException,
+        InterruptedException
+    {
+        if (fname == null || conf == null) {
+            return null;
+        }
+        FileSystem defaultFs = FileSystem.get(new URI(fname), conf, user);
+        URI u = new URI(fname);
+        Path p = new Path(u).makeQualified(defaultFs);
+
+        FileSystem fs = p.getFileSystem(conf);
+        if (hadoopFsIsMissing(fs, p))
+            throw new FileNotFoundException("File " + fname + " does not exist.");
+
+        return p;
+    }
+
+    /**
+     * GET the given url.  Returns the number of bytes received.
+     */
+    public static int fetchUrl(URL url)
+        throws IOException
+    {
+        URLConnection cnx = url.openConnection();
+        InputStream in = cnx.getInputStream();
+
+        byte[] buf = new byte[8192];
+        int total = 0;
+        int len = 0;
+        while ((len = in.read(buf)) >= 0)
+            total += len;
+
+        return total;
+    }
+
+    /**
+     * Set the environment variables to specify the hadoop user.
+     */
+    public static Map<String, String> hadoopUserEnv(String user,
+                                                    String overrideClasspath)
+    {
+        HashMap<String, String> env = new HashMap<String, String>();
+        env.put("HADOOP_USER_NAME", user);
+
+        if (overrideClasspath != null) {
+            env.put("HADOOP_USER_CLASSPATH_FIRST", "true");
+            String cur = System.getenv("HADOOP_CLASSPATH");
+            if (TempletonUtils.isset(cur))
+                overrideClasspath = overrideClasspath + ":" + cur;
+            env.put("HADOOP_CLASSPATH", overrideClasspath);
+        }
+
+        return env;
+    }
+}

Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TrivialExecService.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TrivialExecService.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TrivialExecService.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TrivialExecService.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Execute a local program.  This is a singleton service that will
+ * execute a programs on the local box.
+ */
+public class TrivialExecService {
+    private static volatile TrivialExecService theSingleton;
+
+    /**
+     * Retrieve the singleton.
+     */
+    public static synchronized TrivialExecService getInstance() {
+        if (theSingleton == null)
+            theSingleton = new TrivialExecService();
+        return theSingleton;
+    }
+
+    public Process run(List<String> cmd, List<String> removeEnv,
+                       Map<String, String> environmentVariables)
+        throws IOException
+    {
+        System.err.println("templeton: starting " + cmd);
+        System.err.print("With environment variables: " );
+        for(Map.Entry<String, String> keyVal : environmentVariables.entrySet()){
+            System.err.println(keyVal.getKey() + "=" + keyVal.getValue());
+        }
+        ProcessBuilder pb = new ProcessBuilder(cmd);
+        for (String key : removeEnv)
+            pb.environment().remove(key);
+        pb.environment().putAll(environmentVariables);
+        return pb.start();
+    }
+
+}

Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperCleanup.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperCleanup.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperCleanup.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperCleanup.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Date;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.ZooKeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This does periodic cleanup
+ */
+public class ZooKeeperCleanup extends Thread {
+    protected Configuration appConf;
+
+    // The interval to wake up and check the queue
+    public static final String ZK_CLEANUP_INTERVAL =
+        "templeton.zookeeper.cleanup.interval"; // 12 hours
+
+    // The max age of a task allowed
+    public static final String ZK_CLEANUP_MAX_AGE =
+        "templeton.zookeeper.cleanup.maxage"; // ~ 1 week
+
+    protected static long interval = 1000L * 60L * 60L * 12L;
+    protected static long maxage = 1000L * 60L * 60L * 24L * 7L;
+
+    // The logger
+    private static final Log LOG = LogFactory.getLog(ZooKeeperCleanup.class);
+
+    // Handle to cancel loop
+    private boolean stop = false;
+
+    // The instance
+    private static ZooKeeperCleanup thisclass = null;
+
+    // Whether the cycle is running
+    private static boolean isRunning = false;
+
+    /**
+     * Create a cleanup object.  We use the appConfig to configure JobState.
+     * @param appConf
+     */
+    private ZooKeeperCleanup(Configuration appConf) {
+        this.appConf = appConf;
+        interval = appConf.getLong(ZK_CLEANUP_INTERVAL, interval);
+        maxage = appConf.getLong(ZK_CLEANUP_MAX_AGE, maxage);
+    }
+
+    public static ZooKeeperCleanup getInstance(Configuration appConf) {
+        if (thisclass != null) {
+            return thisclass;
+        }
+        thisclass = new ZooKeeperCleanup(appConf);
+        return thisclass;
+    }
+
+    public static void startInstance(Configuration appConf) throws IOException {
+        if (! isRunning) {
+            getInstance(appConf).start();
+        }
+    }
+
+    /**
+     * Run the cleanup loop.
+     *
+     * @throws IOException
+     */
+    public void run() {
+        ZooKeeper zk = null;
+        List<String> nodes = null;
+        isRunning = true;
+        while (!stop) {
+            try {
+                // Put each check in a separate try/catch, so if that particular
+                // cycle fails, it'll try again on the next cycle.
+                try {
+                    zk = ZooKeeperStorage.zkOpen(appConf);
+
+                    nodes = getChildList(zk);
+
+                    for (String node : nodes) {
+                        boolean deleted = checkAndDelete(node, zk);
+                        if (!deleted) {
+                            break;
+                        }
+                    }
+
+                    zk.close();
+                } catch (Exception e) {
+                    LOG.error("Cleanup cycle failed: " + e.getMessage());
+                } finally {
+                    if (zk != null) {
+                        try {
+                            zk.close();
+                        } catch (InterruptedException e) {
+                            // We're trying to exit anyway, just ignore.
+                        }
+                    }
+                }
+
+                long sleepMillis = (long) (Math.random() * interval);
+                LOG.info("Next execution: " + new Date(new Date().getTime()
+                                                       + sleepMillis));
+                Thread.sleep(sleepMillis);
+
+            } catch (Exception e) {
+                // If sleep fails, we should exit now before things get worse.
+                isRunning = false;
+                LOG.error("Cleanup failed: " + e.getMessage(), e);
+            }
+        }
+        isRunning = false;
+    }
+
+    /**
+     * Get the list of jobs from JobState
+     *
+     * @throws IOException
+     */
+    public List<String> getChildList(ZooKeeper zk) {
+        try {
+            List<String> jobs = JobStateTracker.getTrackingJobs(appConf, zk);
+            Collections.sort(jobs);
+            return jobs;
+        } catch (IOException e) {
+            LOG.info("No jobs to check.");
+        }
+        return new ArrayList<String>();
+    }
+
+    /**
+     * Check to see if a job is more than maxage old, and delete it if so.
+     */
+    public boolean checkAndDelete(String node, ZooKeeper zk) {
+        JobState state = null;
+        try {
+            JobStateTracker tracker = new JobStateTracker(node, zk, true,
+                    appConf.get(TempletonStorage.STORAGE_ROOT +
+                            ZooKeeperStorage.TRACKINGDIR));
+            long now = new Date().getTime();
+            state = new JobState(tracker.getJobID(), appConf);
+
+            // Set the default to 0 -- if the created date is null, there was
+            // an error in creation, and we want to delete it anyway.
+            long then = 0;
+            if (state.getCreated() != null) {
+                then = state.getCreated();
+            }
+            if (now - then > maxage) {
+                LOG.info("Deleting " + tracker.getJobID());
+                state.delete();
+                tracker.delete();
+                return true;
+            }
+            return false;
+        } catch (Exception e) {
+            LOG.info("checkAndDelete failed for " + node);
+            // We don't throw a new exception for this -- just keep going with the
+            // next one.
+            return true;
+        } finally {
+            if (state != null) {
+                try {
+                    state.close();
+                } catch (IOException e) {
+                    LOG.info("Couldn't close job state.");
+                }
+            }
+        }
+    }
+
+    // Handle to stop this process from the outside if needed.
+    public void exit() {
+        stop = true;
+    }
+}

Added: incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperStorage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperStorage.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperStorage.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/ZooKeeperStorage.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.tool;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * A storage implementation based on storing everything in ZooKeeper.
+ * This keeps everything in a central location that is guaranteed
+ * to be available and accessible.
+ *
+ * Data is stored with each key/value pair being a node in ZooKeeper.
+ */
+public class ZooKeeperStorage implements TempletonStorage {
+
+    public static final String TRACKINGDIR = "/created";
+
+    // Locations for each of the storage types
+    public String storage_root = null;
+    public String job_path = null;
+    public String job_trackingpath = null;
+    public String overhead_path = null;
+
+    public static final String ZK_HOSTS = "templeton.zookeeper.hosts";
+    public static final String ZK_SESSION_TIMEOUT
+        = "templeton.zookeeper.session-timeout";
+
+    public static final String ENCODING = "UTF-8";
+
+    private static final Log LOG = LogFactory.getLog(ZooKeeperStorage.class);
+
+    private ZooKeeper zk;
+
+    /**
+     * Open a ZooKeeper connection for the JobState.
+     */
+    public static ZooKeeper zkOpen(String zkHosts, int zkSessionTimeout)
+        throws IOException
+    {
+        return new ZooKeeper(zkHosts,
+                             zkSessionTimeout,
+                             new Watcher() {
+                                 @Override
+                                 synchronized public void process(WatchedEvent event) {
+                                 }
+                             });
+    }
+
+    /**
+     * Open a ZooKeeper connection for the JobState.
+     */
+    public static ZooKeeper zkOpen(Configuration conf)
+        throws IOException
+    {
+        return zkOpen(conf.get(ZK_HOSTS),
+                      conf.getInt(ZK_SESSION_TIMEOUT, 30000));
+    }
+
+    public ZooKeeperStorage() {
+        // No-op -- this is needed to be able to instantiate the
+        // class from the name.
+    }
+
+    /**
+     * Close this ZK connection.
+     */
+    public void close()
+        throws IOException
+    {
+        if (zk != null) {
+            try {
+                zk.close();
+                zk = null;
+            } catch (InterruptedException e) {
+                throw new IOException("Closing ZooKeeper connection", e);
+            }
+        }
+    }
+
+    public void startCleanup(Configuration config) {
+        try {
+            ZooKeeperCleanup.startInstance(config);
+        } catch (Exception e) {
+            LOG.warn("Cleanup instance didn't start.");
+        }
+    }
+
+    /**
+     * Create a node in ZooKeeper
+     */
+    public void create(Type type, String id)
+        throws IOException
+    {
+        try {
+            String[] paths = getPaths(makeZnode(type, id));
+            boolean wasCreated = false;
+            for (String znode : paths) {
+                try {
+                    zk.create(znode, new byte[0],
+                              Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                    wasCreated = true;
+                } catch (KeeperException.NodeExistsException e) {
+                }
+            }
+            if (wasCreated) {
+                try {
+                    // Really not sure if this should go here.  Will have
+                    // to see how the storage mechanism evolves.
+                    if (type.equals(Type.JOB)) {
+                        JobStateTracker jt = new JobStateTracker(id, zk, false,
+                                job_trackingpath);
+                        jt.create();
+                    }
+                } catch (Exception e) {
+                    LOG.warn("Error tracking: " + e.getMessage());
+                    // If we couldn't create the tracker node, don't
+                    // create the main node.
+                    zk.delete(makeZnode(type, id), -1);
+                }
+            }
+            if (zk.exists(makeZnode(type, id), false) == null)
+                throw new IOException("Unable to create " + makeZnode(type, id));
+            if (wasCreated) {
+                try {
+                    saveField(type, id, "created",
+                              Long.toString(System.currentTimeMillis()));
+                } catch (NotFoundException nfe) {
+                    // Wow, something's really wrong.
+                    throw new IOException("Couldn't write to node " + id, nfe);
+                }
+            }
+        } catch (KeeperException e) {
+            throw new IOException("Creating " + id, e);
+        } catch (InterruptedException e) {
+            throw new IOException("Creating " + id, e);
+        }
+    }
+
+    /**
+     * Get the path based on the job type.
+     *
+     * @param type
+     */
+    public String getPath(Type type) {
+        String typepath = overhead_path;
+        switch (type) {
+        case JOB:
+            typepath = job_path;
+            break;
+        case JOBTRACKING:
+            typepath = job_trackingpath;
+            break;
+        }
+        return typepath;
+    }
+
+    public static String[] getPaths(String fullpath) {
+        ArrayList<String> paths = new ArrayList<String>();
+        if (fullpath.length() < 2) {
+            paths.add(fullpath);
+        } else {
+            int location = 0;
+            while ((location = fullpath.indexOf("/", location + 1)) > 0) {
+                paths.add(fullpath.substring(0, location));
+            }
+            paths.add(fullpath);
+        }
+        String[] strings = new String[paths.size()];
+        return paths.toArray(strings);
+    }
+
+    /**
+     * A helper method that sets a field value.
+     * @param type
+     * @param id
+     * @param name
+     * @param val
+     * @throws KeeperException
+     * @throws UnsupportedEncodingException
+     * @throws InterruptedException
+     */
+    private void setFieldData(Type type, String id, String name, String val)
+        throws KeeperException, UnsupportedEncodingException, InterruptedException
+    {
+        try {
+            zk.create(makeFieldZnode(type, id, name),
+                      val.getBytes(ENCODING),
+                      Ids.OPEN_ACL_UNSAFE,
+                      CreateMode.PERSISTENT);
+        } catch(KeeperException.NodeExistsException e) {
+            zk.setData(makeFieldZnode(type, id, name),
+                       val.getBytes(ENCODING),
+                       -1);
+        }
+    }
+
+    /**
+     * Make a ZK path to the named field.
+     */
+    public String makeFieldZnode(Type type, String id, String name) {
+        return makeZnode(type, id) + "/" + name;
+    }
+
+    /**
+     * Make a ZK path to job
+     */
+    public String makeZnode(Type type, String id) {
+        return getPath(type) + "/" + id;
+    }
+
+    @Override
+    public void saveField(Type type, String id, String key, String val)
+        throws NotFoundException {
+        try {
+            if (val != null) {
+                create(type, id);
+                setFieldData(type, id, key, val);
+            }
+        } catch(Exception e) {
+            throw new NotFoundException("Writing " + key + ": " + val + ", "
+                                        + e.getMessage());
+        }
+    }
+
+    @Override
+    public String getField(Type type, String id, String key) {
+        try {
+            byte[] b = zk.getData(makeFieldZnode(type, id, key), false, null);
+            return new String(b, ENCODING);
+        } catch(Exception e) {
+            return null;
+        }
+    }
+
+    @Override
+    public Map<String, String> getFields(Type type, String id) {
+        HashMap<String, String> map = new HashMap<String, String>();
+        try {
+            for (String node: zk.getChildren(makeZnode(type, id), false)) {
+                byte[] b = zk.getData(makeFieldZnode(type, id, node),
+                                      false, null);
+                map.put(node, new String(b, ENCODING));
+            }
+        } catch(Exception e) {
+            return map;
+        }
+        return map;
+    }
+
+    @Override
+    public boolean delete(Type type, String id) throws NotFoundException {
+        try {
+            for (String child : zk.getChildren(makeZnode(type, id), false)) {
+                try {
+                    zk.delete(makeFieldZnode(type, id, child), -1);
+                } catch (Exception e) {
+                    // Other nodes may be trying to delete this at the same time,
+                    // so just log errors and skip them.
+                    throw new NotFoundException("Couldn't delete " +
+                                                makeFieldZnode(type, id, child));
+                }
+            }
+            try {
+                zk.delete(makeZnode(type, id), -1);
+            } catch (Exception e) {
+                // Same thing -- might be deleted by other nodes, so just go on.
+                throw new NotFoundException("Couldn't delete " +
+                                            makeZnode(type, id));
+            }
+        } catch (Exception e) {
+            // Error getting children of node -- probably node has been deleted
+            throw new NotFoundException("Couldn't get children of " +
+                                        makeZnode(type, id));
+        }
+        return true;
+    }
+
+    @Override
+    public List<String> getAll() {
+        ArrayList<String> allNodes = new ArrayList<String>();
+        for (Type type: Type.values()) {
+            allNodes.addAll(getAllForType(type));
+        }
+        return allNodes;
+    }
+
+    @Override
+    public List<String> getAllForType(Type type) {
+        try {
+            return zk.getChildren(getPath(type), false);
+        } catch (Exception e) {
+            return new ArrayList<String>();
+        }
+    }
+
+    @Override
+    public List<String> getAllForKey(String key, String value) {
+        ArrayList<String> allNodes = new ArrayList<String>();
+        try {
+            for (Type type: Type.values()) {
+                allNodes.addAll(getAllForTypeAndKey(type, key, value));
+            }
+        } catch (Exception e) {
+            LOG.info("Couldn't find children.");
+        }
+        return allNodes;
+    }
+
+    @Override
+    public List<String> getAllForTypeAndKey(Type type, String key, String value) {
+        ArrayList<String> allNodes = new ArrayList<String>();
+        try {
+            for (String id : zk.getChildren(getPath(type), false)) {
+                for (String field : zk.getChildren(id, false)) {
+                    if (field.endsWith("/" + key)) {
+                        byte[] b = zk.getData(field, false, null);
+                        if (new String(b, ENCODING).equals(value)) {
+                            allNodes.add(id);
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            // Log and go to the next type -- this one might not exist
+            LOG.info("Couldn't find children of " + getPath(type));
+        }
+        return allNodes;
+    }
+
+    @Override
+    public void openStorage(Configuration config) throws IOException {
+        storage_root = config.get(STORAGE_ROOT);
+        job_path = storage_root + "/jobs";
+        job_trackingpath = storage_root + TRACKINGDIR;
+        overhead_path = storage_root + "/overhead";
+
+        if (zk == null) {
+            zk = zkOpen(config);
+        }
+    }
+
+    @Override
+    public void closeStorage() throws IOException {
+        close();
+    }
+}

Added: incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestDesc.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestDesc.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestDesc.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestDesc.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import junit.framework.TestCase;
+import org.apache.hcatalog.templeton.ColumnDesc;
+import org.apache.hcatalog.templeton.TableDesc;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * TestDesc - Test the desc objects that are correctly converted to
+ * and from json.  This also sets every field of the TableDesc object.
+ */
+public class TestDesc extends TestCase {
+    public void testTableDesc()
+        throws Exception
+    {
+        TableDesc td = buildTableDesc();
+        assertNotNull(td);
+
+        String json = toJson(td);
+        assertTrue(json.length() > 100);
+
+        TableDesc tdCopy = (TableDesc) fromJson(json, TableDesc.class);
+        assertEquals(td, tdCopy);
+    }
+
+    private TableDesc buildTableDesc() {
+        TableDesc x = new TableDesc();
+        x.group = "staff";
+        x.permissions = "755";
+        x.external = true;
+        x.ifNotExists = true;
+        x.table = "a_table";
+        x.comment = "a comment";
+        x.columns = buildColumns();
+        x.partitionedBy = buildPartitionedBy();
+        x.clusteredBy = buildClusterBy();
+        x.format = buildStorageFormat();
+        x.location = "hdfs://localhost:9000/user/me/a_table";
+        x.tableProperties = buildGenericProperties();
+        return x;
+    }
+
+    public List<ColumnDesc> buildColumns() {
+        ArrayList<ColumnDesc> x = new ArrayList<ColumnDesc>();
+        x.add(new ColumnDesc("id", "bigint", null));
+        x.add(new ColumnDesc("price", "float", "The unit price"));
+        x.add(new ColumnDesc("name", "string", "The item name"));
+        return x;
+    }
+
+    public List<ColumnDesc> buildPartitionedBy() {
+        ArrayList<ColumnDesc> x = new ArrayList<ColumnDesc>();
+        x.add(new ColumnDesc("country", "string", "The country of origin"));
+        return x;
+    }
+
+    public TableDesc.ClusteredByDesc buildClusterBy() {
+        TableDesc.ClusteredByDesc x = new TableDesc.ClusteredByDesc();
+        x.columnNames = new ArrayList<String>();
+        x.columnNames.add("id");
+        x.sortedBy = buildSortedBy();
+        x.numberOfBuckets = 16;
+        return x;
+    }
+
+    public List<TableDesc.ClusterSortOrderDesc> buildSortedBy() {
+        ArrayList<TableDesc.ClusterSortOrderDesc> x
+            = new ArrayList<TableDesc.ClusterSortOrderDesc>();
+        x.add(new TableDesc.ClusterSortOrderDesc("id", TableDesc.SortDirectionDesc.ASC));
+        return x;
+    }
+
+    public TableDesc.StorageFormatDesc buildStorageFormat() {
+        TableDesc.StorageFormatDesc x = new TableDesc.StorageFormatDesc();
+        x.rowFormat = buildRowFormat();
+        x.storedAs = "rcfile";
+        x.storedBy = buildStoredBy();
+        return x;
+    }
+
+    public TableDesc.RowFormatDesc buildRowFormat() {
+        TableDesc.RowFormatDesc x = new TableDesc.RowFormatDesc();
+        x.fieldsTerminatedBy = "\u0001";
+        x.collectionItemsTerminatedBy = "\u0002";
+        x.mapKeysTerminatedBy = "\u0003";
+        x.linesTerminatedBy = "\u0004";
+        x.serde = buildSerde();
+        return x;
+    }
+
+    public TableDesc.SerdeDesc buildSerde() {
+        TableDesc.SerdeDesc x = new TableDesc.SerdeDesc();
+        x.name = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
+        x.properties = new HashMap<String, String>();
+        x.properties.put("field.delim", ",");
+        return x;
+    }
+
+    public TableDesc.StoredByDesc buildStoredBy() {
+        TableDesc.StoredByDesc x = new TableDesc.StoredByDesc();
+        x.className = "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
+        x.properties = new HashMap<String, String>();
+        x.properties.put("hbase.columns.mapping", "cf:string");
+        x.properties.put("hbase.table.name", "hbase_table_0");
+        return x;
+    }
+
+    public Map<String, String> buildGenericProperties() {
+        HashMap<String, String> x = new HashMap<String, String>();
+        x.put("carmas", "evil");
+        x.put("rachel", "better");
+        x.put("ctdean", "angelic");
+        x.put("paul", "dangerously unbalanced");
+        x.put("dra", "organic");
+        return x;
+    }
+
+    private String toJson(Object obj)
+        throws Exception
+    {
+        ObjectMapper mapper = new ObjectMapper();
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        mapper.writeValue(out, obj);
+        return out.toString();
+    }
+
+    private Object fromJson(String json, Class klass)
+        throws Exception
+    {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(json, klass);
+    }
+}

Added: incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestServer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestServer.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestServer.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/TestServer.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton;
+
+import junit.framework.TestCase;
+
+import org.apache.hcatalog.templeton.Main;
+import org.apache.hcatalog.templeton.mock.MockServer;
+import java.util.List;
+
+/*
+ * Test that the server code exists, and responds to basic requests.
+ */
+public class TestServer extends TestCase {
+
+    MockServer server;
+
+    public void setUp() {
+        new Main(null);         // Initialize the config
+        server = new MockServer();
+    }
+
+    public void testServer() {
+        assertNotNull(server);
+    }
+
+    public void testStatus() {
+        assertEquals(server.status().get("status"), "ok");
+    }
+    
+    public void testVersions() {
+        assertEquals(server.version().get("version"), "v1");
+    }
+    
+    public void testFormats() {
+        assertEquals(1, server.requestFormats().size());
+        assertEquals( ((List)server.requestFormats().get("responseTypes")).get(0), "application/json");
+    }
+}

Added: incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/mock/MockExecService.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/mock/MockExecService.java?rev=1365722&view=auto
==============================================================================
--- incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/mock/MockExecService.java (added)
+++ incubator/hcatalog/trunk/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/mock/MockExecService.java Wed Jul 25 20:29:44 2012
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.templeton.mock;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.exec.ExecuteException;
+import org.apache.hcatalog.templeton.ExecBean;
+import org.apache.hcatalog.templeton.ExecService;
+import org.apache.hcatalog.templeton.NotAuthorizedException;
+
+public class MockExecService implements ExecService {
+
+    public ExecBean run(String program, List<String> args,
+                        Map<String, String> env) {
+        ExecBean bean = new ExecBean();
+        bean.stdout = program;
+        bean.stderr = args.toString();
+        return bean;
+    }
+
+    @Override
+    public ExecBean runUnlimited(String program,
+                                 List<String> args, Map<String, String> env)
+        throws NotAuthorizedException, ExecuteException, IOException {
+        ExecBean bean = new ExecBean();
+        bean.stdout = program;
+        bean.stderr = args.toString();
+        return null;
+    }
+}



Mime
View raw message