Return-Path: X-Original-To: apmail-hadoop-mapreduce-user-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3A9E718C24 for ; Mon, 7 Sep 2015 23:36:37 +0000 (UTC) Received: (qmail 72496 invoked by uid 500); 7 Sep 2015 23:36:31 -0000 Delivered-To: apmail-hadoop-mapreduce-user-archive@hadoop.apache.org Received: (qmail 72357 invoked by uid 500); 7 Sep 2015 23:36:31 -0000 Mailing-List: contact user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@hadoop.apache.org Delivered-To: mailing list user@hadoop.apache.org Received: (qmail 72347 invoked by uid 99); 7 Sep 2015 23:36:31 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Sep 2015 23:36:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id A98821A0866 for ; Mon, 7 Sep 2015 23:36:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.879 X-Spam-Level: ** X-Spam-Status: No, score=2.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id DO42fz0esQjg for ; Mon, 7 Sep 2015 23:36:29 +0000 (UTC) Received: from mail-wi0-f179.google.com (mail-wi0-f179.google.com [209.85.212.179]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 5FED0204D9 for ; Mon, 7 Sep 2015 23:36:28 +0000 (UTC) Received: by wicfx3 with SMTP id fx3so96515220wic.0 for ; Mon, 07 Sep 2015 16:36:21 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=to:from:subject:message-id:date:user-agent:mime-version :content-type; bh=emCePBItKJ0MJ9fZsMOFKydJ/pdVGuq/dhVp8qIeQrc=; b=ZhNUV/ByEVJFjmVNVUV46+GPgf6DucDSFxYc63ifOH9vhVWSDvfOdTrXpeM9Oc/w+i TKw1JIZrMvUoUmpQdeKEVHVZqxuYVnz0wPKn5w3Ke0xrwV1hqngngIdWtkO0/2cs6LLh Wl532CCgTbuCSfdrdKuOyGx+m3iZpXmW6dhtV1MX503lRpTvlamNRcCzhL36MQTPzAA/ XPxrztmRlyY66iAOYoWDL7N/M485PDpHBgNkbrYM6vZ/jnym53rStgHO+qdIq6McJ+Q0 ZsMCA/gjMuyczFVYw5yf0Vjev4WHcDIH91/0d1RMXJrbapayvtnzeT9uF4OE6tuIaKWY QwkQ== X-Received: by 10.194.10.165 with SMTP id j5mr42548450wjb.147.1441668981774; Mon, 07 Sep 2015 16:36:21 -0700 (PDT) Received: from [192.168.10.111] (bl12-57-218.dsl.telepac.pt. [85.245.57.218]) by smtp.gmail.com with ESMTPSA id d8sm12666808wiy.1.2015.09.07.16.36.20 for (version=TLSv1.2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Mon, 07 Sep 2015 16:36:20 -0700 (PDT) To: user@hadoop.apache.org From: xeonmailinglist Subject: catch MapReduce calls with AOP? Message-ID: <55EE1F72.5050705@gmail.com> Date: Tue, 8 Sep 2015 00:36:18 +0100 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:38.0) Gecko/20100101 Thunderbird/38.2.0 MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="------------040206030804010500050201" This is a multi-part message in MIME format. --------------040206030804010500050201 Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit Hi, In [1] I show an wordcount example. I am trying to pointcut the invocations of |output.collect| and the method |cleanup| with AOP, but it is very difficult to do this. I have tried to set |org.apache.hadoop.mapred.JobConf| and |org.apache.hadoop.mapreduce.Job;| in beans in order to intercept theses calls, but I can’t. Is it possible to do this? Am I trying to do something that is impossible? [1] My wordcount example |public class MyWordCount { public static class MyMap extends Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private MedusaDigests parser = new MedusaDigests(); public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); parser.updateDigest(word.getBytes(), ByteBuffer.allocate(4).putInt(one.get()).array()); } } public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { System.out.println("Key: " + context.getCurrentKey() + " Value: " + context.getCurrentValue()); map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } protected void cleanup(Context context) throws IOException, InterruptedException { parser.cleanup(context); } } public static class MyReducer extends Reducer { private IntWritable result = new IntWritable(); MedusaDigests parser = new MedusaDigests(); public void reduce(Text key, Iterable values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { System.out.println(" - key ( " + key.getClass().toString() + "): " + key.toString() + " value ( " + val.getClass().toString() + " ): " + val.toString()); sum += val.get(); } result.set(sum); context.write(key, result); } public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) { System.out.println("Key: " + context.getCurrentKey()); reduce(context.getCurrentKey(), context.getValues(), context); // If a back up store is used, reset it Iterator iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator)iter).resetBackupStore(); } } } finally { cleanup(context); } } protected void cleanup(Context context) throws IOException, InterruptedException { parser.cleanup(context); } } } | ​ --------------040206030804010500050201 Content-Type: text/html; charset=utf-8 Content-Transfer-Encoding: 8bit

Hi,

In [1] I show an wordcount example. I am trying to pointcut the invocations of output.collect and the method cleanup with AOP, but it is very difficult to do this.
I have tried to set org.apache.hadoop.mapred.JobConf and org.apache.hadoop.mapreduce.Job; in beans in order to intercept theses calls, but I can’t.

Is it possible to do this? Am I trying to do something that is impossible?

[1] My wordcount example

public class MyWordCount {

    public static class MyMap extends Mapper {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        private MedusaDigests parser = new MedusaDigests();

        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                output.collect(word, one);
                parser.updateDigest(word.getBytes(), ByteBuffer.allocate(4).putInt(one.get()).array());
            }
        }

        public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            try {
                while (context.nextKeyValue()) {
                    System.out.println("Key: " + context.getCurrentKey() + " Value: " + context.getCurrentValue());
                    map(context.getCurrentKey(), context.getCurrentValue(), context);
                }
            } finally {
                cleanup(context);
            }
        }

       protected void cleanup(Context context)
                throws IOException, InterruptedException {
            parser.cleanup(context);
        }
    }

    public static class MyReducer extends Reducer {
        private IntWritable result = new IntWritable();
        MedusaDigests parser = new MedusaDigests();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                System.out.println(" - key ( " + key.getClass().toString() + "): " + key.toString()
                        + " value ( " + val.getClass().toString() + " ): " + val.toString());
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }

        public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            try {
                while (context.nextKey()) {
                    System.out.println("Key: " + context.getCurrentKey());
                    reduce(context.getCurrentKey(), context.getValues(), context);
                    // If a back up store is used, reset it
                    Iterator<IntWritable> iter = context.getValues().iterator();
                    if(iter instanceof ReduceContext.ValueIterator) {
                        ((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();
                    }
                }
            } finally {
                cleanup(context);
            }
        }

       protected void cleanup(Context context)
                throws IOException, InterruptedException {
            parser.cleanup(context);
        }
    }
}
--------------040206030804010500050201--