asterixdb-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Pritom Ahmed (Code Review)" <do-not-re...@asterix-gerrit.ics.uci.edu>
Subject Change in asterixdb[master]: Periodically saving JSON data from admin console API to a te...
Date Wed, 12 Aug 2015 22:30:43 GMT
Pritom Ahmed has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/350

Change subject: Periodically saving JSON data from admin console API to a temporary dataset.
......................................................................

Periodically saving JSON data from admin console API to a temporary dataset.

The following commits from your working branch will be included:

commit ccd6b40d58cc50dfad47a5f999bf20a5eebcb60d
Author: Pritom Ahmed <pritom.11@gmail.com>
Date:   Sun Aug 9 17:02:47 2015 -0700

    JSON data from hyracks admin console API is saved in Temporary Dataset.

    We are now saving JSON data from admin console API in temporary dataset every 5 seconds.

Change-Id: I6cb186abda6a14d9eb866259d459ce5b5e855be8
---
M asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
M asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
A asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/GatherJSONData.java
A asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/JobLifeCycleListener.java
4 files changed, 391 insertions(+), 12 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/50/350/1

diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
index 8e633a9..74442e1 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
@@ -14,13 +14,9 @@
  */
 package edu.uci.ics.asterix.hyracks.bootstrap;
 
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
 import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
 import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork.ClusterState;
 import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse;
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
@@ -31,16 +27,16 @@
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.entities.Dataset;
-import edu.uci.ics.asterix.metadata.entities.Dataverse;
-import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
-import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.metadata.entities.*;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
-import edu.uci.ics.asterix.common.api.IClusterManagementWork.ClusterState;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
 
@@ -58,6 +54,8 @@
     public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds)
{
         state = AsterixClusterProperties.INSTANCE.getState();
         AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(false);
+//        GatherJSONData gatherJSONData = new GatherJSONData(/*GatherJSONData.NODE, GatherJSONData.NODE_FAILURE,
deadNodeIds*/);
+//        (new Thread(gatherJSONData)).start();
         return null;
     }
 
@@ -191,6 +189,18 @@
             });
             state = newState;
             recoveryThread.start();
+
+            /* TODO: 1. check if all the node clusters have joined/started */
+            //Cluster status is changed to ACTIVE so notify to gather JSON data of adminconsole
+            while(!recoveryThread.isAlive()) {
+                try {
+                    wait(100);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+            GatherJSONData gatherJSONData = new GatherJSONData();
+            (new Thread(gatherJSONData)).start();
         }
         return null;
     }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index a93461d..39c5ffb 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -92,7 +92,7 @@
 
         setupFeedServer(externalProperties);
         feedServer.start();
-        centralFeedManager = CentralFeedManager.getInstance(); 
+        centralFeedManager = CentralFeedManager.getInstance();
         centralFeedManager.start();
 
         waitUntilServerStart(webServer);
@@ -104,6 +104,7 @@
         ClusterManager.INSTANCE.registerSubscriber(AsterixGlobalRecoveryManager.INSTANCE);
 
         ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
+        ccAppCtx.addJobLifecycleListener(JobLifeCycleListener.INSTANCE);
     }
 
     private void waitUntilServerStart(AbstractLifeCycle webServer) throws Exception {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/GatherJSONData.java
b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/GatherJSONData.java
new file mode 100644
index 0000000..823fd37
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/GatherJSONData.java
@@ -0,0 +1,330 @@
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import org.apache.commons.httpclient.*;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.json.JSONObject;
+
+import java.io.*;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+
+/**
+ * Created by pritom on 7/14/15.
+ */
+
+public class GatherJSONData implements Runnable {
+
+    public static final String JOB_RUN = "job-run";
+    public static final String JOB_ACTIVITY_GRAPH = "job-activity-graph";
+    public static final String NODE = "node";
+    public static final String JOBS = "jobs";
+    public static final String NODES = "nodes";
+    public static final String NODE_FAILURE = "failure";
+    public static final String NODE_JOIN = "join";
+
+    private static final String DATATYPE = "TempJSONDataType";
+    private static final String DATASET_NAME = "TempJSONDataSet";
+    private static final String DATAVERSE_NAME = "TempJSONData";
+
+    private static boolean ddlCreated = false;
+
+    public GatherJSONData() {
+    }
+
+    @Override
+    public void run() {
+//        running = true;
+        /*If its running for the first time then we need to run ddls*/
+        if (!ddlCreated) {
+            try {
+                String ddlString = "drop dataverse " + DATAVERSE_NAME + " if exists;\n" +
+                        "create dataverse "+  DATAVERSE_NAME + ";\n" +
+                        "use dataverse " + DATAVERSE_NAME + ";\n" +
+                        "\n" +
+                        "create type " + DATATYPE + " as closed {\n" +
+                        "        id:uuid,\n" +
+                        "        typ: string,\n" +
+                        "        tpid: string,\n" +
+                        "        timestamp: datetime,\n" +
+                        "        data: string\n" +
+                        "}\n" +
+                        "\n" +
+                        "create temporary dataset " + DATASET_NAME + "(" + DATATYPE + ")\n"
+
+                        "primary key id autogenerated;";
+
+                executeDDL(ddlString);
+                ddlCreated = true;
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        while(true) {
+            fetchJSONData();
+            try {
+                Thread.sleep(10000);  /* Waiting for 10 seconds */
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+
+            testQuery();
+        }
+    }
+
+    private static void testQuery() {
+        /*Query*/
+        String queryString = "use dataverse " + DATAVERSE_NAME + ";\n" +
+                "\n" +
+                "    for $tdata in dataset " + DATASET_NAME + "\n" +
+                "    return $tdata;";
+        try {
+            writeOutputToFile(new File("/home/pritom/query-result-" + System.currentTimeMillis()
+ ".txt"), executeQuery(queryString));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private static void writeOutputToFile(File actualFile, InputStream resultStream) throws
Exception {
+        byte[] buffer = new byte[10240];
+        int len;
+        try (FileOutputStream out = new FileOutputStream(actualFile)) {
+            while ((len = resultStream.read(buffer)) != -1) {
+                out.write(buffer, 0, len);
+            }
+        }
+    }
+
+    private void executeDDL(String ddlString) throws Exception {
+        final String url = "http://localhost:19002/ddl";
+
+        // Create a method instance.
+        PostMethod method = new PostMethod(url);
+        method.setRequestEntity(new StringRequestEntity(ddlString));
+
+        // Execute the method.
+        executeHttpMethod(method, ddlString);
+    }
+
+    private void fetchJSONData() {
+        DefaultHttpClient httpclient = new DefaultHttpClient();
+        HttpHost target = new HttpHost("127.0.0.1", 8888, "http");
+        String data = "";
+        try {
+            HttpGet nodeInfoRequest = new HttpGet("/rest/nodes/");
+            HttpResponse nodeInfoResponse = httpclient.execute(target, nodeInfoRequest);
+            HttpEntity nodeEntity = nodeInfoResponse.getEntity();
+
+            if (nodeEntity != null) {
+                data = EntityUtils.toString(nodeEntity);
+                insertIntoTemporaryDatabase(data, NODES, "");
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            httpclient.getConnectionManager().shutdown();
+        }
+
+        try {
+            JSONObject jsonObject = new JSONObject(data);
+            org.json.JSONArray nodes = jsonObject.getJSONArray("result");
+            for (int i = 0; i < nodes.length(); i++) {
+                String nodeId = nodes.getJSONObject(i).getString("node-id");
+                fetchNodeData(nodeId, NODE_JOIN);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        httpclient = new DefaultHttpClient();
+        data = "";
+        try {
+            HttpGet jobRunRequest = new HttpGet("/rest/jobs/");
+            HttpResponse jobRunResponse = httpclient.execute(target, jobRunRequest);
+            HttpEntity jobRunEntity = jobRunResponse.getEntity();
+
+            if (jobRunEntity != null) {
+                data = EntityUtils.toString(jobRunEntity);
+                insertIntoTemporaryDatabase(data, JOBS, "");
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            httpclient.getConnectionManager().shutdown();
+        }
+
+        try {
+            JSONObject jsonObject = new JSONObject(data);
+            org.json.JSONArray nodes = jsonObject.getJSONArray("result");
+            for (int i = 0; i < nodes.length(); i++) {
+                String nodeId = nodes.getJSONObject(i).getString("job-id");
+                fetchJobData(nodeId);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void fetchNodeData(String nodeId, String eventType) {
+        DefaultHttpClient httpclient = new DefaultHttpClient();
+        try {
+            HttpHost target = new HttpHost("127.0.0.1", 8888, "http");
+            HttpGet nodeInfoRequest = new HttpGet("/rest/nodes/" + nodeId);
+            HttpResponse nodeInfoResponse = httpclient.execute(target, nodeInfoRequest);
+            HttpEntity nodeEntity = nodeInfoResponse.getEntity();
+
+            if (nodeEntity != null) {
+                String data = EntityUtils.toString(nodeEntity);
+                insertIntoTemporaryDatabase(data, NODE, nodeId);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            httpclient.getConnectionManager().shutdown();
+        }
+    }
+
+    /* TODO: Decide whether activity graph and job run should be separated so that they can
be called at different stage of
+     the job. For example, job run called after job termination and job activity called after
job creation or job start*/
+    public void fetchJobData(String jobID) {
+        DefaultHttpClient httpclient = new DefaultHttpClient();
+        try {
+            HttpHost target = new HttpHost("127.0.0.1", 8888, "http");
+            HttpGet jobRunRequest = new HttpGet("/rest/jobs/" + jobID + "/job-run");
+            HttpResponse jobRunResponse = httpclient.execute(target, jobRunRequest);
+            HttpEntity jobRunEntity = jobRunResponse.getEntity();
+
+            if (jobRunEntity != null) {
+                insertIntoTemporaryDatabase(EntityUtils.toString(jobRunEntity), JOB_RUN,
jobID);
+            }
+
+            HttpGet jobActivityGraphRequest = new HttpGet("/rest/jobs/" + jobID + "/job-activity-graph");
+            HttpResponse jobActivityGraphResponse = httpclient.execute(target, jobActivityGraphRequest);
+            HttpEntity jobActivityGraphEntity = jobActivityGraphResponse.getEntity();
+
+            if (jobActivityGraphEntity != null) {
+                insertIntoTemporaryDatabase(EntityUtils.toString(jobActivityGraphEntity),
JOB_ACTIVITY_GRAPH, jobID);
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            httpclient.getConnectionManager().shutdown();
+        }
+    }
+
+    public void insertIntoTemporaryDatabase(String data, String type, String typeId) throws
Exception {
+        /* Check if data is empty */
+        JSONObject object = new JSONObject(data);
+        if ("{}".equals(object.get("result").toString())) {
+            return;
+        }
+
+        Map<String, String> functions = new HashMap<>();
+        functions.put("timestamp" , "datetime(\"" + getCurrentDateTime() + "\")");
+
+        JSONObject jsonObject = new JSONObject();
+        jsonObject.put("typ", type);
+        jsonObject.put("tpid", typeId);
+        jsonObject.put("data", data);
+
+        StringBuilder builder = new StringBuilder();
+        builder.append("use dataverse " + DATAVERSE_NAME + ";" + "\n");
+        builder.append("insert into dataset " + DATASET_NAME + " ");
+        builder.append(" (" + processJSON(jsonObject, functions) + ")");
+        builder.append(";");
+
+        executeUpdate(builder.toString());
+    }
+
+    private String processJSON(JSONObject jsonObject, Map<String, String> functions)
{
+        String str = jsonObject.toString();
+        int indexOfClosingBrace = str.lastIndexOf("}");
+        str = (String) str.subSequence(0, indexOfClosingBrace);
+        str += ",";
+        for (String key : functions.keySet()) {
+            str += "\"" + key + "\":" + functions.get(key);
+            str += ",";
+        }
+        str = str.substring(0, str.length()-1);
+        str += "}";
+        return str;
+    }
+
+    private String getCurrentDateTime() {
+        DateFormat dateFormat1 = new SimpleDateFormat("yyyy-MM-dd");
+        DateFormat dateFormat2 = new SimpleDateFormat("HH:mm:ss");
+        Date date = new Date();
+        return dateFormat1.format(date) + "T" + dateFormat2.format(date);
+    }
+
+    /*use dataverse TempJSONData;
+
+    for $tdata in dataset TempJSONDataSet
+    return $tdata;*/
+
+
+    // To execute Update statements
+    // Insert and Delete statements are executed here
+    public static void executeUpdate(String str) throws Exception {
+        final String url = "http://localhost:19002/update";
+        // Create a method instance.
+        PostMethod method = new PostMethod(url);
+        method.setRequestEntity(new StringRequestEntity(str));
+        // Execute the method.
+        executeHttpMethod(method, str);
+    }
+
+    // Executes Query and returns results as JSONArray
+    public static InputStream executeQuery(String str) throws Exception {
+        final String url = "http://localhost:19002/query";
+
+        // Create a method instance.
+        GetMethod method = new GetMethod(url);
+        method.setQueryString(new NameValuePair[] { new NameValuePair("query", str) });
+        method.setRequestHeader("Accept", "application/json");
+
+        // Provide custom retry handler is necessary
+        method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3,
false));
+        executeHttpMethod(method, str);
+        return method.getResponseBodyAsStream();
+    }
+
+    private static int executeHttpMethod(HttpMethod method, String ddlString) throws Exception
{
+        HttpClient client = new HttpClient();
+        int statusCode;
+        try {
+            statusCode = client.executeMethod(method);
+        } catch (Exception e) {
+            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
+            e.printStackTrace();
+            throw e;
+        }
+        if (statusCode != HttpStatus.SC_OK) {
+            String errorBody = method.getResponseBodyAsString();
+            JSONObject result = new JSONObject(errorBody);
+            String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
+                    result.getString("stacktrace") };
+            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
+            throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: "
+ method.getStatusLine()
+                    + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+        }
+        return statusCode;
+    }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/JobLifeCycleListener.java
b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/JobLifeCycleListener.java
new file mode 100644
index 0000000..624f0ec
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/JobLifeCycleListener.java
@@ -0,0 +1,38 @@
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
+import edu.uci.ics.hyracks.api.job.JobId;
+import org.apache.http.util.EntityUtils;
+
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+
+/**
+ * Created by pritom on 7/20/15.
+ */
+
+/* TODO: 2. Then periodically gather various JSON data */
+
+public class JobLifeCycleListener implements IJobLifecycleListener {
+
+    public static JobLifeCycleListener INSTANCE = new JobLifeCycleListener();
+
+    @Override
+    public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory iActivityClusterGraphGeneratorFactory)
throws HyracksException {
+//        GatherJSONData.fetchJobData(jobId);
+    }
+
+    @Override
+    public void notifyJobStart(JobId jobId) throws HyracksException {
+//        GatherJSONData.fetchJobData(jobId);
+    }
+
+    @Override
+    public void notifyJobFinish(JobId jobId) throws HyracksException {
+//        GatherJSONData gatherJSONData = new GatherJSONData("job", jobId);
+//        (new Thread(gatherJSONData)).start();
+    }
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/350
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I6cb186abda6a14d9eb866259d459ce5b5e855be8
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Pritom Ahmed <pritom.11@gmail.com>

Mime
View raw message