incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [28/28] git commit: Rename packages for the crunch-examples project and add license headers
Date Sat, 07 Jul 2012 21:49:08 GMT
Rename packages for the crunch-examples project and add license headers

Signed-off-by: Josh Wills <jwills@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2ff7247c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2ff7247c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2ff7247c

Branch: refs/heads/master
Commit: 2ff7247c081e69fe0fe2f04b7848876d25f802d2
Parents: dcbe378
Author: Josh Wills <jwills@cloudera.com>
Authored: Sat Jul 7 12:28:56 2012 -0700
Committer: Josh Wills <jwills@cloudera.com>
Committed: Sat Jul 7 14:21:30 2012 -0700

----------------------------------------------------------------------
 examples/pom.xml                                   |    8 +-
 .../cloudera/crunch/examples/AverageBytesByIP.java |  131 --------------
 .../cloudera/crunch/examples/TotalBytesByIP.java   |  103 -----------
 .../com/cloudera/crunch/examples/WordCount.java    |   73 --------
 .../apache/crunch/examples/AverageBytesByIP.java   |  134 +++++++++++++++
 .../org/apache/crunch/examples/TotalBytesByIP.java |  106 ++++++++++++
 .../java/org/apache/crunch/examples/WordCount.java |   76 ++++++++
 7 files changed, 320 insertions(+), 311 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2ff7247c/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 21c27ee..df00ac4 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -4,7 +4,7 @@
   <groupId>com.cloudera.crunch</groupId>
   <artifactId>crunch-examples</artifactId>
   <packaging>jar</packaging>
-  <version>0.2.0</version>
+  <version>0.3.0</version>
   <name>crunch-examples</name>
 
   <dependencies>
@@ -22,9 +22,9 @@
     </dependency>
 
     <dependency>
-      <groupId>com.cloudera.crunch</groupId>
+      <groupId>org.apache.crunch</groupId>
       <artifactId>crunch</artifactId>
-      <version>0.2.0</version>
+      <version>0.3.0</version>
     </dependency>
   </dependencies>
 
@@ -47,7 +47,7 @@
           </descriptors>
           <archive>
             <manifest>
-              <mainClass>com.cloudera.crunch.examples.WordCount</mainClass>
+              <mainClass>org.apache.crunch.examples.WordCount</mainClass>
             </manifest>
           </archive>
         </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2ff7247c/examples/src/main/java/com/cloudera/crunch/examples/AverageBytesByIP.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/cloudera/crunch/examples/AverageBytesByIP.java b/examples/src/main/java/com/cloudera/crunch/examples/AverageBytesByIP.java
deleted file mode 100644
index daa5259..0000000
--- a/examples/src/main/java/com/cloudera/crunch/examples/AverageBytesByIP.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.examples;
-
-import java.io.Serializable;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import com.cloudera.crunch.CombineFn;
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.type.writable.Writables;
-
-@SuppressWarnings("serial")
-public class AverageBytesByIP extends Configured implements Tool, Serializable {
-  static enum COUNTERS  {
-    NO_MATCH,
-    CORRUPT_SIZE
-  }
-  static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\]
\"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
-  public int run(String[] args) throws Exception {
-    if(args.length != 2) {
-      System.err.println();
-      System.err.println("Two and only two arguments are accepted.");
-      System.err.println("Usage: " + this.getClass().getName() + " [generic options] input
output");
-      System.err.println();
-      GenericOptionsParser.printGenericCommandUsage(System.err);
-      return 1;
-    }
-    // Create an object to coordinate pipeline creation and execution.
-    Pipeline pipeline = new MRPipeline(AverageBytesByIP.class, getConf());
-    // Reference a given text file as a collection of Strings.
-    PCollection<String> lines = pipeline.readTextFile(args[0]);
-
-    // Combiner used for summing up response size and count
-    CombineFn<String, Pair<Long, Long>> stringPairOfLongsSumCombiner = 
-      CombineFn.pairAggregator(CombineFn.SUM_LONGS, CombineFn.SUM_LONGS);
-    
-    // Table of (ip, sum(response size), count)
-    PTable<String, Pair<Long, Long>> remoteAddrResponseSize = lines.parallelDo(extractResponseSize,

-        Writables.tableOf(Writables.strings(), 
-            Writables.pairs(Writables.longs(), Writables.longs())))
-            .groupByKey()
-            .combineValues(stringPairOfLongsSumCombiner);
-    
-    // Calculate average response size by ip address
-    PTable<String, Double> avgs = remoteAddrResponseSize.parallelDo(calulateAverage,

-        Writables.tableOf(Writables.strings(), Writables.doubles()));
-    
-    // write the result to a text file
-    pipeline.writeTextFile(avgs, args[1]);
-    // Execute the pipeline as a MapReduce.
-    pipeline.done();
-    return 0;
-  }
-  
-  // Function to calculate the average response size for a given ip address
-  //
-  // Input: (ip, sum(response size), count)
-  // Output: (ip, average response size)
-  MapFn<Pair<String, Pair<Long, Long>>, Pair<String, Double>> calulateAverage
= 
-    new MapFn<Pair<String, Pair<Long, Long>>, Pair<String, Double>>()
{
-    @Override
-    public Pair<String, Double> map(Pair<String, Pair<Long, Long>> arg)
{
-      Pair<Long, Long> sumCount = arg.second();
-      double avg = 0;
-      if(sumCount.second() > 0) {
-        avg = (double)sumCount.first() / (double)sumCount.second();
-      }
-      return Pair.of(arg.first(), avg);
-    }
-  };
-
-  // Function to parse apache log records
-  // Given a standard apache log line, extract the ip address and 
-  // response size. Outputs ip and the response size and a count (1) so that
-  // a combiner can be used.
-  //    
-  // Input: 55.1.3.2  ...... 200 512 ....
-  // Output: (55.1.3.2, (512, 1))  
-  DoFn<String, Pair<String, Pair<Long, Long>>> extractResponseSize = new
DoFn<String, Pair<String, Pair<Long, Long>>>() {
-    transient Pattern pattern;
-    public void initialize() {
-      pattern = Pattern.compile(logRegex);
-    }
-    public void process(String line, Emitter<Pair<String, Pair<Long, Long>>>
emitter) {
-      Matcher matcher = pattern.matcher(line);
-      if(matcher.matches()) {
-        try {
-          Long responseSize = Long.parseLong(matcher.group(7));
-          Pair<Long, Long> sumCount = Pair.of(responseSize, 1L);
-          String remoteAddr = matcher.group(1);
-          emitter.emit(Pair.of(remoteAddr, sumCount));
-        } catch (NumberFormatException e) {
-          this.getCounter(COUNTERS.CORRUPT_SIZE).increment(1);
-        }
-      } else {
-        this.getCounter(COUNTERS.NO_MATCH).increment(1);          
-      }
-    }
-  };
-
-  
-  public static void main(String[] args) throws Exception {
-    ToolRunner.run(new Configuration(), new AverageBytesByIP(), args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2ff7247c/examples/src/main/java/com/cloudera/crunch/examples/TotalBytesByIP.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/cloudera/crunch/examples/TotalBytesByIP.java b/examples/src/main/java/com/cloudera/crunch/examples/TotalBytesByIP.java
deleted file mode 100644
index 3de1f73..0000000
--- a/examples/src/main/java/com/cloudera/crunch/examples/TotalBytesByIP.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.examples;
-
-import java.io.Serializable;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import com.cloudera.crunch.CombineFn;
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.type.writable.Writables;
-
-@SuppressWarnings("serial")
-public class TotalBytesByIP extends Configured implements Tool, Serializable {
-  static enum COUNTERS  {
-    NO_MATCH,
-    CORRUPT_SIZE
-  }
-  static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\]
\"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
-  public int run(String[] args) throws Exception {
-    if(args.length != 2) {
-      System.err.println();
-      System.err.println("Two and only two arguments are accepted.");
-      System.err.println("Usage: " + this.getClass().getName() + " [generic options] input
output");
-      System.err.println();
-      GenericOptionsParser.printGenericCommandUsage(System.err);
-      return 1;
-    }
-    // Create an object to coordinate pipeline creation and execution.
-    Pipeline pipeline = new MRPipeline(TotalBytesByIP.class, getConf());
-    // Reference a given text file as a collection of Strings.
-    PCollection<String> lines = pipeline.readTextFile(args[0]);
-
-    // Combiner used for summing up response size
-    CombineFn<String, Long> longSumCombiner = CombineFn.SUM_LONGS();
-    
-    // Table of (ip, sum(response size))
-    PTable<String, Long> ipAddrResponseSize = 
-      lines.parallelDo(extractIPResponseSize, 
-        Writables.tableOf(Writables.strings(),Writables.longs()))
-            .groupByKey()
-            .combineValues(longSumCombiner);
-    
-    pipeline.writeTextFile(ipAddrResponseSize, args[1]);
-    // Execute the pipeline as a MapReduce.
-    pipeline.done();
-    return 0;
-  }
-  
-  // Function to parse apache log records
-  // Given a standard apache log line, extract the ip address and 
-  // request size. Outputs the ip and response size.
-  //    
-  // Input: 55.1.3.2  ...... 200 512 ....
-  // Output: (55.1.3.2, 512)  
-  DoFn<String, Pair<String, Long>> extractIPResponseSize = new DoFn<String,
Pair<String, Long>>() {
-    transient Pattern pattern;
-    public void initialize() {
-      pattern = Pattern.compile(logRegex);
-    }
-    public void process(String line, Emitter<Pair<String, Long>> emitter) {
-      Matcher matcher = pattern.matcher(line);
-      if(matcher.matches()) {
-        try {
-          Long requestSize = Long.parseLong(matcher.group(7));
-          String remoteAddr = matcher.group(1);
-          emitter.emit(Pair.of(remoteAddr, requestSize));
-        } catch (NumberFormatException e) {
-          // corrupt line, we should increment counter
-        }
-      }
-    }
-  };
-
-  
-  public static void main(String[] args) throws Exception {
-    ToolRunner.run(new Configuration(), new TotalBytesByIP(), args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2ff7247c/examples/src/main/java/com/cloudera/crunch/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/cloudera/crunch/examples/WordCount.java b/examples/src/main/java/com/cloudera/crunch/examples/WordCount.java
deleted file mode 100644
index a87b5a2..0000000
--- a/examples/src/main/java/com/cloudera/crunch/examples/WordCount.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.examples;
-
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.lib.Aggregate;
-import com.cloudera.crunch.type.writable.Writables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import java.io.Serializable;
-
-public class WordCount extends Configured implements Tool, Serializable {
-  public int run(String[] args) throws Exception {
-    if(args.length != 3) {
-      System.err.println();
-      System.err.println("Usage: " + this.getClass().getName() + " [generic options] input
output");
-      System.err.println();
-      GenericOptionsParser.printGenericCommandUsage(System.err);
-      return 1;
-    }
-    // Create an object to coordinate pipeline creation and execution.
-    Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
-    // Reference a given text file as a collection of Strings.
-    PCollection<String> lines = pipeline.readTextFile(args[1]);
-
-    // Define a function that splits each line in a PCollection of Strings into a
-    // PCollection made up of the individual words in the file.
-    PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
-      public void process(String line, Emitter<String> emitter) {
-        for (String word : line.split("\\s+")) {
-          emitter.emit(word);
-        }
-      }
-    }, Writables.strings()); // Indicates the serialization format
-
-    // The count method applies a series of Crunch primitives and returns
-    // a map of the unique words in the input PCollection to their counts.
-    // Best of all, the count() function doesn't need to know anything about
-    // the kind of data stored in the input PCollection.
-    PTable<String, Long> counts = words.count();
-
-    // Instruct the pipeline to write the resulting counts to a text file.
-    pipeline.writeTextFile(counts, args[2]);
-    // Execute the pipeline as a MapReduce.
-    pipeline.done();
-    return 0;
-  }
-
-  public static void main(String[] args) throws Exception {
-    ToolRunner.run(new Configuration(), new WordCount(), args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2ff7247c/examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java b/examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
new file mode 100644
index 0000000..930721f
--- /dev/null
+++ b/examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.examples;
+
+import java.io.Serializable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.types.writable.Writables;
+
+@SuppressWarnings("serial")
+public class AverageBytesByIP extends Configured implements Tool, Serializable {
+  static enum COUNTERS  {
+    NO_MATCH,
+    CORRUPT_SIZE
+  }
+  static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\]
\"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
+  public int run(String[] args) throws Exception {
+    if(args.length != 2) {
+      System.err.println();
+      System.err.println("Two and only two arguments are accepted.");
+      System.err.println("Usage: " + this.getClass().getName() + " [generic options] input
output");
+      System.err.println();
+      GenericOptionsParser.printGenericCommandUsage(System.err);
+      return 1;
+    }
+    // Create an object to coordinate pipeline creation and execution.
+    Pipeline pipeline = new MRPipeline(AverageBytesByIP.class, getConf());
+    // Reference a given text file as a collection of Strings.
+    PCollection<String> lines = pipeline.readTextFile(args[0]);
+
+    // Combiner used for summing up response size and count
+    CombineFn<String, Pair<Long, Long>> stringPairOfLongsSumCombiner = 
+      CombineFn.pairAggregator(CombineFn.SUM_LONGS, CombineFn.SUM_LONGS);
+    
+    // Table of (ip, sum(response size), count)
+    PTable<String, Pair<Long, Long>> remoteAddrResponseSize = lines.parallelDo(extractResponseSize,

+        Writables.tableOf(Writables.strings(), 
+            Writables.pairs(Writables.longs(), Writables.longs())))
+            .groupByKey()
+            .combineValues(stringPairOfLongsSumCombiner);
+    
+    // Calculate average response size by ip address
+    PTable<String, Double> avgs = remoteAddrResponseSize.parallelDo(calulateAverage,

+        Writables.tableOf(Writables.strings(), Writables.doubles()));
+    
+    // write the result to a text file
+    pipeline.writeTextFile(avgs, args[1]);
+    // Execute the pipeline as a MapReduce.
+    pipeline.done();
+    return 0;
+  }
+  
+  // Function to calculate the average response size for a given ip address
+  //
+  // Input: (ip, sum(response size), count)
+  // Output: (ip, average response size)
+  MapFn<Pair<String, Pair<Long, Long>>, Pair<String, Double>> calulateAverage
= 
+    new MapFn<Pair<String, Pair<Long, Long>>, Pair<String, Double>>()
{
+    @Override
+    public Pair<String, Double> map(Pair<String, Pair<Long, Long>> arg)
{
+      Pair<Long, Long> sumCount = arg.second();
+      double avg = 0;
+      if(sumCount.second() > 0) {
+        avg = (double)sumCount.first() / (double)sumCount.second();
+      }
+      return Pair.of(arg.first(), avg);
+    }
+  };
+
+  // Function to parse apache log records
+  // Given a standard apache log line, extract the ip address and 
+  // response size. Outputs ip and the response size and a count (1) so that
+  // a combiner can be used.
+  //    
+  // Input: 55.1.3.2  ...... 200 512 ....
+  // Output: (55.1.3.2, (512, 1))  
+  DoFn<String, Pair<String, Pair<Long, Long>>> extractResponseSize = new
DoFn<String, Pair<String, Pair<Long, Long>>>() {
+    transient Pattern pattern;
+    public void initialize() {
+      pattern = Pattern.compile(logRegex);
+    }
+    public void process(String line, Emitter<Pair<String, Pair<Long, Long>>>
emitter) {
+      Matcher matcher = pattern.matcher(line);
+      if(matcher.matches()) {
+        try {
+          Long responseSize = Long.parseLong(matcher.group(7));
+          Pair<Long, Long> sumCount = Pair.of(responseSize, 1L);
+          String remoteAddr = matcher.group(1);
+          emitter.emit(Pair.of(remoteAddr, sumCount));
+        } catch (NumberFormatException e) {
+          this.getCounter(COUNTERS.CORRUPT_SIZE).increment(1);
+        }
+      } else {
+        this.getCounter(COUNTERS.NO_MATCH).increment(1);          
+      }
+    }
+  };
+
+  
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new AverageBytesByIP(), args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2ff7247c/examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java b/examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
new file mode 100644
index 0000000..1b168fa
--- /dev/null
+++ b/examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.examples;
+
+import java.io.Serializable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.types.writable.Writables;
+
+@SuppressWarnings("serial")
+public class TotalBytesByIP extends Configured implements Tool, Serializable {
+  static enum COUNTERS  {
+    NO_MATCH,
+    CORRUPT_SIZE
+  }
+  static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\]
\"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
+  public int run(String[] args) throws Exception {
+    if(args.length != 2) {
+      System.err.println();
+      System.err.println("Two and only two arguments are accepted.");
+      System.err.println("Usage: " + this.getClass().getName() + " [generic options] input
output");
+      System.err.println();
+      GenericOptionsParser.printGenericCommandUsage(System.err);
+      return 1;
+    }
+    // Create an object to coordinate pipeline creation and execution.
+    Pipeline pipeline = new MRPipeline(TotalBytesByIP.class, getConf());
+    // Reference a given text file as a collection of Strings.
+    PCollection<String> lines = pipeline.readTextFile(args[0]);
+
+    // Combiner used for summing up response size
+    CombineFn<String, Long> longSumCombiner = CombineFn.SUM_LONGS();
+    
+    // Table of (ip, sum(response size))
+    PTable<String, Long> ipAddrResponseSize = 
+      lines.parallelDo(extractIPResponseSize, 
+        Writables.tableOf(Writables.strings(),Writables.longs()))
+            .groupByKey()
+            .combineValues(longSumCombiner);
+    
+    pipeline.writeTextFile(ipAddrResponseSize, args[1]);
+    // Execute the pipeline as a MapReduce.
+    pipeline.done();
+    return 0;
+  }
+  
+  // Function to parse apache log records
+  // Given a standard apache log line, extract the ip address and 
+  // request size. Outputs the ip and response size.
+  //    
+  // Input: 55.1.3.2  ...... 200 512 ....
+  // Output: (55.1.3.2, 512)  
+  DoFn<String, Pair<String, Long>> extractIPResponseSize = new DoFn<String,
Pair<String, Long>>() {
+    transient Pattern pattern;
+    public void initialize() {
+      pattern = Pattern.compile(logRegex);
+    }
+    public void process(String line, Emitter<Pair<String, Long>> emitter) {
+      Matcher matcher = pattern.matcher(line);
+      if(matcher.matches()) {
+        try {
+          Long requestSize = Long.parseLong(matcher.group(7));
+          String remoteAddr = matcher.group(1);
+          emitter.emit(Pair.of(remoteAddr, requestSize));
+        } catch (NumberFormatException e) {
+          // corrupt line, we should increment counter
+        }
+      }
+    }
+  };
+
+  
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new TotalBytesByIP(), args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2ff7247c/examples/src/main/java/org/apache/crunch/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/crunch/examples/WordCount.java b/examples/src/main/java/org/apache/crunch/examples/WordCount.java
new file mode 100644
index 0000000..53db14e
--- /dev/null
+++ b/examples/src/main/java/org/apache/crunch/examples/WordCount.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.examples;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.Serializable;
+
+public class WordCount extends Configured implements Tool, Serializable {
+  public int run(String[] args) throws Exception {
+    if(args.length != 3) {
+      System.err.println();
+      System.err.println("Usage: " + this.getClass().getName() + " [generic options] input
output");
+      System.err.println();
+      GenericOptionsParser.printGenericCommandUsage(System.err);
+      return 1;
+    }
+    // Create an object to coordinate pipeline creation and execution.
+    Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
+    // Reference a given text file as a collection of Strings.
+    PCollection<String> lines = pipeline.readTextFile(args[1]);
+
+    // Define a function that splits each line in a PCollection of Strings into a
+    // PCollection made up of the individual words in the file.
+    PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
+      public void process(String line, Emitter<String> emitter) {
+        for (String word : line.split("\\s+")) {
+          emitter.emit(word);
+        }
+      }
+    }, Writables.strings()); // Indicates the serialization format
+
+    // The count method applies a series of Crunch primitives and returns
+    // a map of the unique words in the input PCollection to their counts.
+    // Best of all, the count() function doesn't need to know anything about
+    // the kind of data stored in the input PCollection.
+    PTable<String, Long> counts = words.count();
+
+    // Instruct the pipeline to write the resulting counts to a text file.
+    pipeline.writeTextFile(counts, args[2]);
+    // Execute the pipeline as a MapReduce.
+    pipeline.done();
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new WordCount(), args);
+  }
+}


Mime
View raw message