pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r1745386 - in /pig/branches/branch-0.16: ./ conf/ shims/src/hadoop20/org/apache/pig/backend/hadoop/ shims/src/hadoop23/org/apache/pig/backend/hadoop/ src/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/ap...
Date Tue, 24 May 2016 18:15:25 GMT
Author: daijy
Date: Tue May 24 18:15:25 2016
New Revision: 1745386

URL: http://svn.apache.org/viewvc?rev=1745386&view=rev
Log:
PIG-4714: Improve logging across multiple components with callerId

Added:
    pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/PigATSClient.java
    pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
Modified:
    pig/branches/branch-0.16/CHANGES.txt
    pig/branches/branch-0.16/conf/pig.properties
    pig/branches/branch-0.16/src/org/apache/pig/PigConfiguration.java
    pig/branches/branch-0.16/src/org/apache/pig/PigServer.java
    pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
    pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java
    pig/branches/branch-0.16/src/pig-default.properties

Modified: pig/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1745386&r1=1745385&r2=1745386&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Tue May 24 18:15:25 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4714: Improve logging across multiple components with callerId (daijy)
+
 PIG-4885: Turn off union optimizer if there is PARALLEL clause in union in Tez (rohini)
 
 PIG-4894: Add API for StoreFunc to specify if they are write safe from two different vertices
(rohini)

Modified: pig/branches/branch-0.16/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/conf/pig.properties?rev=1745386&r1=1745385&r2=1745386&view=diff
==============================================================================
--- pig/branches/branch-0.16/conf/pig.properties (original)
+++ pig/branches/branch-0.16/conf/pig.properties Tue May 24 18:15:25 2016
@@ -557,6 +557,9 @@ pig.location.check.strict=false
 #
 hcat.bin=/usr/local/hcat/bin/hcat
 
+# Enable ATS hook to log the Pig specific ATS entry, disable only when ATS server is not
deployed
+pig.ats.enabled=true
+
 ###########################################################################
 #
 # Overrides for extreme environments

Added: pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/PigATSClient.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/PigATSClient.java?rev=1745386&view=auto
==============================================================================
--- pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/PigATSClient.java
(added)
+++ pig/branches/branch-0.16/shims/src/hadoop20/org/apache/pig/backend/hadoop/PigATSClient.java
Tue May 24 18:15:25 2016
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop;
+
+import org.apache.pig.impl.PigContext;
+
+public class PigATSClient {
+    public static class ATSEvent {
+        public ATSEvent(String pigAuditId, String callerId) {
+            this.pigScriptId = pigAuditId;
+            this.callerId = callerId;
+        }
+        String callerId;
+        String pigScriptId;
+    }
+    private static PigATSClient instance;
+
+    public static synchronized PigATSClient getInstance() {
+        if (instance==null) {
+            instance = new PigATSClient();
+        }
+        return instance;
+    }
+
+    private PigATSClient() {
+    }
+
+    public static String getPigAuditId(PigContext context) {
+        return "";
+    }
+
+    synchronized public void logEvent(final ATSEvent event) {
+    }
+}

Added: pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java?rev=1745386&view=auto
==============================================================================
--- pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
(added)
+++ pig/branches/branch-0.16/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
Tue May 24 18:15:25 2016
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.tools.pigstats.ScriptState;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class PigATSClient {
+    public static class ATSEvent {
+        public ATSEvent(String pigAuditId, String callerId) {
+            this.pigScriptId = pigAuditId;
+            this.callerId = callerId;
+        }
+        String callerId;
+        String pigScriptId;
+    }
+    public static final String ENTITY_TYPE = "PIG_SCRIPT_ID";
+    public static final String ENTITY_CALLERID = "callerId";
+    public static final String CALLER_CONTEXT = "PIG";
+    public static final int AUDIT_ID_MAX_LENGTH = 128;
+
+    private static final Log log = LogFactory.getLog(PigATSClient.class.getName());
+    private static PigATSClient instance;
+    private static ExecutorService executor;
+    private TimelineClient timelineClient;
+
+    public static synchronized PigATSClient getInstance() {
+        if (instance==null) {
+            instance = new PigATSClient();
+        }
+        return instance;
+    }
+
+    private PigATSClient() {
+        if (executor == null) {
+            executor = Executors.newSingleThreadExecutor(
+                    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger
%d").build());
+            YarnConfiguration yarnConf = new YarnConfiguration();
+            timelineClient = TimelineClient.createTimelineClient();
+            timelineClient.init(yarnConf);
+            timelineClient.start();
+        }
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                timelineClient.stop();
+                executor.shutdownNow();
+                executor = null;
+            }
+        });
+        log.info("Created ATS Hook");
+    }
+
+    public static String getPigAuditId(PigContext context) {
+        String auditId;
+        if (context.getProperties().get(PigImplConstants.PIG_AUDIT_ID) != null) {
+            auditId = (String)context.getProperties().get(PigImplConstants.PIG_AUDIT_ID);
+        } else {
+            ScriptState ss = ScriptState.get();
+            String filename = ss.getFileName().isEmpty()?"default" : new File(ss.getFileName()).getName();
+            auditId = CALLER_CONTEXT + "-" + filename + "-" + ss.getId();
+        }
+        return auditId.substring(0, Math.min(auditId.length(), AUDIT_ID_MAX_LENGTH));
+    }
+
+    synchronized public void logEvent(final ATSEvent event) {
+        executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                TimelineEntity entity = new TimelineEntity();
+                entity.setEntityId(event.pigScriptId);
+                entity.setEntityType(ENTITY_TYPE);
+                entity.addPrimaryFilter(ENTITY_CALLERID, event.callerId!=null?event.callerId
: "default");
+                try {
+                    timelineClient.putEntities(entity);
+                } catch (Exception e) {
+                    log.info("Failed to submit plan to ATS: " + e.getMessage());
+                }
+            }
+        });
+    }
+}

Modified: pig/branches/branch-0.16/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/PigConfiguration.java?rev=1745386&r1=1745385&r2=1745386&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/PigConfiguration.java Tue May 24 18:15:25
2016
@@ -418,6 +418,16 @@ public class PigConfiguration {
      */
     public static final String PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE = "pig.spill.unused.memory.threshold.size";
 
+    /**
+     * Log tracing id that can be used by upstream clients for tracking respective logs
+     */
+    public static final String CALLER_ID = "pig.log.trace.id";
+
+    /**
+     * Enable ATS for Pig
+     */
+    public static final String ENABLE_ATS = "pig.ats.enabled";
+
     // Deprecated settings of Pig 0.13
 
     /**

Modified: pig/branches/branch-0.16/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/PigServer.java?rev=1745386&r1=1745385&r2=1745386&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/PigServer.java Tue May 24 18:15:25 2016
@@ -24,7 +24,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
+import java.io.PrintWriter;
 import java.io.StringReader;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -53,6 +57,7 @@ import org.apache.pig.backend.datastorag
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.backend.hadoop.PigATSClient;
 import org.apache.pig.backend.hadoop.executionengine.HJob;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.classification.InterfaceAudience;
@@ -241,6 +246,54 @@ public class PigServer {
         }
         PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
 
+        // log ATS event includes the caller context
+        String auditId = PigATSClient.getPigAuditId(pigContext);
+        String callerId = (String)pigContext.getProperties().get(PigConfiguration.CALLER_ID);
+        log.info("Pig Script ID for the session: " + auditId);
+        if (callerId != null) {
+            log.info("Caller ID for session: " + callerId);
+        }
+        if (Boolean.parseBoolean(pigContext.getProperties()
+                .getProperty(PigConfiguration.ENABLE_ATS))) {
+            if (Boolean.parseBoolean(pigContext.getProperties()
+                    .getProperty("yarn.timeline-service.enabled", "false"))) {
+                PigATSClient.ATSEvent event = new PigATSClient.ATSEvent(auditId, callerId);
+                try {
+                    PigATSClient.getInstance().logEvent(event);
+                } catch (Exception e) {
+                    log.warn("Error posting to ATS: ", e);
+                }
+            } else {
+                log.warn("ATS is disabled since"
+                        + " yarn.timeline-service.enabled set to false");
+            }
+            
+        }
+
+        // set hdfs caller context
+        Class callerContextClass = null;
+        try {
+            callerContextClass = Class.forName("org.apache.hadoop.ipc.CallerContext");
+        } catch (ClassNotFoundException e) {
+            // If pre-Hadoop 2.8.0, skip setting CallerContext
+        }
+        if (callerContextClass != null) {
+            try {
+                // Reflection for the following code since it is only available since hadoop
2.8.0:
+                // CallerContext hdfsContext = new CallerContext.Builder(auditId).build();
+                // CallerContext.setCurrent(hdfsContext);
+                Class callerContextBuilderClass = Class.forName("org.apache.hadoop.ipc.CallerContext$Builder");
+                Constructor callerContextBuilderConstruct = callerContextBuilderClass.getConstructor(String.class);
+                Object builder = callerContextBuilderConstruct.newInstance(auditId);
+                Method builderBuildMethod = builder.getClass().getMethod("build");
+                Object hdfsContext = builderBuildMethod.invoke(builder);
+                Method callerContextSetCurrentMethod = callerContextClass.getMethod("setCurrent",
hdfsContext.getClass());
+                callerContextSetCurrentMethod.invoke(callerContextClass, hdfsContext);
+            } catch (Exception e) {
+                // Shall not happen unless API change in future Hadoop commons
+                throw new ExecException(e);
+            }
+        }
     }
 
     private void addHadoopProperties() throws ExecException {

Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1745386&r1=1745385&r2=1745386&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
(original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
Tue May 24 18:15:25 2016
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.PigATSClient;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
@@ -107,6 +109,26 @@ public class TezJobCompiler {
             }
             DAG tezDag = buildDAG(tezPlanNode, localResources);
             tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
+            // set Tez caller context
+            // Reflection for the following code since it is only available since tez 0.8.1:
+            // CallerContext context = CallerContext.create(ATSService.CallerContext, ATSService.getPigAuditId(pigContext),
+            //     ATSService.EntityType, "");
+            // tezDag.setCallerContext(context);
+            Class callerContextClass = null;
+            try {
+                callerContextClass = Class.forName("org.apache.tez.client.CallerContext");
+            } catch (ClassNotFoundException e) {
+                // If pre-Tez 0.8.1, skip setting CallerContext
+            }
+            if (callerContextClass != null) {
+                Method builderBuildMethod = callerContextClass.getMethod("create", String.class,
+                        String.class, String.class, String.class);
+                Object context = builderBuildMethod.invoke(null, PigATSClient.CALLER_CONTEXT,
+                        PigATSClient.getPigAuditId(pigContext), PigATSClient.ENTITY_TYPE,
"");
+                Method dagSetCallerContext = tezDag.getClass().getMethod("setCallerContext",
+                        context.getClass());
+                dagSetCallerContext.invoke(tezDag, context);
+            }
             log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism());
             return new TezJob(tezConf, tezDag, localResources, tezPlan);
         } catch (Exception e) {

Modified: pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java?rev=1745386&r1=1745385&r2=1745386&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/impl/PigImplConstants.java Tue May 24 18:15:25
2016
@@ -79,4 +79,9 @@ public class PigImplConstants {
      * Pig log4j properties
      */
     public static final String PIG_LOG4J_PROPERTIES = "pig.log4j.properties";
+
+    /**
+     * A unique id for a Pig session used as callerId for underlining component
+     */
+    public static final String PIG_AUDIT_ID = "pig.script.id";
 }

Modified: pig/branches/branch-0.16/src/pig-default.properties
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/pig-default.properties?rev=1745386&r1=1745385&r2=1745386&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/pig-default.properties (original)
+++ pig/branches/branch-0.16/src/pig-default.properties Tue May 24 18:15:25 2016
@@ -61,4 +61,6 @@ pig.stats.output.size.reader.unsupported
 
 pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage,org.apache.pig.piggybank.storage.MultiStorage
 
-pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
\ No newline at end of file
+pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
+
+pig.ats.enabled=true



Mime
View raw message