incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [4/50] [abbrv] git commit: Fixed and cleaned up the original counter example.
Date Tue, 03 Jan 2012 14:03:28 GMT
Fixed and cleaned up the original counter example.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/1804c476
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/1804c476
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/1804c476

Branch: refs/heads/piper
Commit: 1804c476e079bf156607503249d80ebc20b84cfd
Parents: f617192
Author: Leo Neumeyer <leo@s4.io>
Authored: Sun Dec 18 17:00:05 2011 -0800
Committer: Leo Neumeyer <leo@s4.io>
Committed: Sun Dec 18 17:00:05 2011 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/core/App.java      |    5 +-
 .../java/org/apache/s4/core/ProcessingElement.java |    2 +-
 .../src/main/java/org/apache/s4/core/Receiver.java |   35 ++++-----
 .../main/java/org/apache/s4/core/SingletonPE.java  |   55 --------------
 subprojects/s4-core/src/main/resources/logback.xml |    2 +-
 .../org/apache/s4/example/counter/CounterPE.java   |   28 +------
 .../s4/example/counter/GenerateUserEventPE.java    |   28 ++++---
 .../java/org/apache/s4/example/counter/MyApp.java  |   59 ++++++++-------
 .../org/apache/s4/example/fluent/counter/Main.java |    2 +-
 9 files changed, 73 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/1804c476/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index f65c35b..6468ac4 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -21,7 +21,6 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.base.Event;
-import org.apache.s4.core.App.ClockType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,6 +140,8 @@ public abstract class App {
 
         for (ProcessingElement pe : getPePrototypes()) {
 
+            logger.trace("Removing PE proto [{}].", pe.getClass().getName());
+
             /* Remove all instances. */
             pe.removeAll();
 
@@ -148,10 +149,12 @@ public abstract class App {
 
         /* Get the set of streams and close them. */
         for (Streamable<?> stream : getStreams()) {
+            logger.trace("Closing stream [{}].", stream.getName());
             stream.close();
         }
 
         /* Finally remove the entire app graph. */
+        logger.trace("Clear app graph.");
         pe2stream.clear();
         stream2pe.clear();
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/1804c476/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index b3df641..43d5a7b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -125,7 +125,7 @@ abstract public class ProcessingElement implements Cloneable {
     private boolean isPrototype = true;
     private boolean isThreadSafe = false;
     private boolean isFirst = true;
-    private boolean isSingleton = true;
+    private boolean isSingleton = false;
 
     private transient OverloadDispatcher overloadDispatcher;
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/1804c476/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
index 8ce9d8a..7b8f364 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
@@ -5,7 +5,6 @@ import java.util.Map;
 import org.apache.s4.base.Event;
 import org.apache.s4.base.Listener;
 import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.core.Sender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -13,27 +12,22 @@ import com.google.common.collect.MapMaker;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
-
 /**
- * The {@link Receiver} and its counterpart {@link Sender} are the top level
- * classes of the communication layer.
+ * The {@link Receiver} and its counterpart {@link Sender} are the top level classes of the
communication layer.
  * <p>
- * {@link Receiver} is responsible for receiving an event to a
- * {@link ProcessingElement} instance using a hashKey.
+ * {@link Receiver} is responsible for receiving an event to a {@link ProcessingElement}
instance using a hashKey.
  * <p>
- * A Listener implementation receives data from the network and passes an event
- * as a byte array to the {@link Receiver}. The byte array is de-serialized and
- * converted into an {@link Event}. Finally the event is passed to the matching
+ * A Listener implementation receives data from the network and passes an event as a byte
array to the {@link Receiver}.
+ * The byte array is de-serialized and converted into an {@link Event}. Finally the event
is passed to the matching
  * streams. There is a single {@link Receiver} instance per node.
  * 
- * Details on how the cluster is partitioned and how events are serialized and
- * transmitted to its destination are hidden from the application developer.
+ * Details on how the cluster is partitioned and how events are serialized and transmitted
to its destination are hidden
+ * from the application developer.
  */
 @Singleton
 public class Receiver implements Runnable {
 
-    private static final Logger logger = LoggerFactory
-            .getLogger(Receiver.class);
+    private static final Logger logger = LoggerFactory.getLogger(Receiver.class);
 
     final private Listener listener;
     final private SerializerDeserializer serDeser;
@@ -51,7 +45,7 @@ public class Receiver implements Runnable {
     int getPartition() {
         return listener.getPartitionId();
     }
-    
+
     /** Save stream keyed by app id and stream id. */
     void addStream(Stream<? extends Event> stream) {
         int appId = stream.getApp().getId();
@@ -83,17 +77,18 @@ public class Receiver implements Runnable {
             int streamId = event.getStreamId();
 
             /*
-             * Match appId and streamId in event to the target stream and pass
-             * the event to the target stream. TODO: make this more efficient
-             * for the case in which we send the same event to multiple PEs.
+             * Match appId and streamId in event to the target stream and pass the event
to the target stream. TODO:
+             * make this more efficient for the case in which we send the same event to multiple
PEs.
              */
             try {
                 streams.get(appId).get(streamId).receiveEvent(event);
             } catch (NullPointerException e) {
-                logger.error(
-                        "Could not find target stream for event with appId={} and streamId={}",
-                        appId, streamId);
+                logger.error("Could not find target stream for event with appId={} and streamId={}",
appId, streamId);
             }
         }
     }
+
+    public void close() {
+        Thread.currentThread().interrupt();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/1804c476/subprojects/s4-core/src/main/java/org/apache/s4/core/SingletonPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SingletonPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SingletonPE.java
deleted file mode 100644
index ec1c420..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SingletonPE.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.core;
-
-/*
- * This is provided for cases where we want to process all events in 
- * a given node without a key. If the application class extends this class, 
- * then you will not be able to use it with keys limiting reusability.
- */
-public abstract class SingletonPE extends ProcessingElement {
-
-    public SingletonPE(App app) {
-        super(app);
-    }
-
-    /* Return the only PE instance . */
-    public ProcessingElement getInstanceForKey() {
-
-        return this;
-    }
-
-    /* Ignore key, there is only one instance. */
-    @Override
-    public ProcessingElement getInstanceForKey(String id) {
-
-        return this;
-    }
-
-    // abstract protected void onEvent(Event event);
-    //
-    // abstract public void onTrigger(Event event);
-
-    /*
-     * Don't let subclasses override this method. It is not needed. All initialization should
be done by the concrete PE
-     * constructor.
-     */
-    @Override
-    final public void onCreate() {
-        // TODO Auto-generated method stub
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/1804c476/subprojects/s4-core/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/resources/logback.xml b/subprojects/s4-core/src/main/resources/logback.xml
index 6b246ee..ea8c85a 100644
--- a/subprojects/s4-core/src/main/resources/logback.xml
+++ b/subprojects/s4-core/src/main/resources/logback.xml
@@ -8,7 +8,7 @@
     </encoder>
   </appender>
 
-  <root level="info">
+  <root level="debug">
     <appender-ref ref="STDOUT" />
   </root>
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/1804c476/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/CounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/CounterPE.java
b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/CounterPE.java
index 60a8097..35c0583 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/CounterPE.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/CounterPE.java
@@ -20,27 +20,19 @@ import org.apache.s4.core.App;
 import org.apache.s4.core.ProcessingElement;
 import org.apache.s4.core.Stream;
 
-
 public class CounterPE extends ProcessingElement {
 
-    private Stream<CountEvent> countStream = null;
+    private Stream<CountEvent>[] countStream;
 
     public CounterPE(App app) {
         super(app);
     }
-    
-    /**
-     * @return the countStream
-     */
-    public Stream<CountEvent> getCountStream() {
+
+    public Stream<CountEvent>[] getCountStream() {
         return countStream;
     }
 
-    /**
-     * @param countStream
-     *            the countStream to set
-     */
-    public void setCountStream(Stream<CountEvent> countStream) {
+    public void setCountStream(Stream<CountEvent>... countStream) {
         this.countStream = countStream;
     }
 
@@ -51,22 +43,12 @@ public class CounterPE extends ProcessingElement {
         counter += 1;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see io.s4.ProcessingElement#sendOutputEvent()
-     */
     public void onTrigger(Event event) {
 
         CountEvent countEvent = new CountEvent(getId(), counter);
-        countStream.put(countEvent);
+        emit(countEvent, countStream);
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see io.s4.ProcessingElement#init()
-     */
     @Override
     protected void onCreate() {
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/1804c476/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/GenerateUserEventPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/GenerateUserEventPE.java
b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/GenerateUserEventPE.java
index 5664d1b..6016433 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/GenerateUserEventPE.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/GenerateUserEventPE.java
@@ -19,13 +19,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.s4.base.Event;
 import org.apache.s4.core.App;
-import org.apache.s4.core.SingletonPE;
+import org.apache.s4.core.ProcessingElement;
 import org.apache.s4.core.Stream;
 
-
-public class GenerateUserEventPE extends SingletonPE {
+public class GenerateUserEventPE extends ProcessingElement {
 
     static String userIds[] = { "pepe", "jose", "tito", "mr_smith", "joe" };
     static int[] ages = { 25, 2, 33, 6, 67 };
@@ -38,13 +36,14 @@ public class GenerateUserEventPE extends SingletonPE {
     }
 
     /**
-     * @param targetStreams the {@link UserEvent} streams.
+     * @param targetStreams
+     *            the {@link UserEvent} streams.
      */
-    public void setStreams(Stream<UserEvent>... targetStreams){
+    public void setStreams(Stream<UserEvent>... targetStreams) {
         this.targetStreams = targetStreams;
     }
-    
-    public void onTrigger(Event event) {
+
+    public void onTime() {
         List<String> favorites = new ArrayList<String>();
         favorites.add("dulce de leche");
         favorites.add("strawberry");
@@ -53,12 +52,9 @@ public class GenerateUserEventPE extends SingletonPE {
         int indexAge = generator.nextInt(ages.length);
         int indexGender = generator.nextInt(2);
 
-        UserEvent userEvent = new UserEvent(userIds[indexUserID],
-                ages[indexAge], favorites, genders[indexGender]);
+        UserEvent userEvent = new UserEvent(userIds[indexUserID], ages[indexAge], favorites,
genders[indexGender]);
 
-        for (int i = 0; i < targetStreams.length; i++) {
-            targetStreams[i].put(userEvent);
-        }
+        emit(userEvent, targetStreams);
     }
 
     @Override
@@ -68,4 +64,10 @@ public class GenerateUserEventPE extends SingletonPE {
     static int pickRandom(int numElements) {
         return 0;
     }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/1804c476/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
index 42d8e20..baf7c85 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
@@ -60,42 +60,56 @@ final public class MyApp extends App {
 
         /* PE that prints counts to console. */
         PrintPE printPE = createPE(PrintPE.class);
-        printPE.setSingleton(true);
 
-        Stream<CountEvent> userCountStream = createStream(CountEvent.class).setName("User
Count Stream")
-                .setKey(new CountKeyFinder()).setPE(printPE);
+        Stream<CountEvent> userCountStream = createStream(CountEvent.class);
+        userCountStream.setName("User Count Stream");
+        userCountStream.setKey(new CountKeyFinder());
+        userCountStream.setPE(printPE);
 
-        Stream<CountEvent> genderCountStream = createStream(CountEvent.class).setName("Gender
Count Stream")
-                .setKey(new CountKeyFinder()).setPE(printPE);
+        Stream<CountEvent> genderCountStream = createStream(CountEvent.class);
+        genderCountStream.setName("Gender Count Stream");
+        genderCountStream.setKey(new CountKeyFinder());
+        genderCountStream.setPE(printPE);
 
-        Stream<CountEvent> ageCountStream = createStream(CountEvent.class).setName("Age
Count Stream")
-                .setKey(new CountKeyFinder()).setPE(printPE);
+        Stream<CountEvent> ageCountStream = createStream(CountEvent.class);
+        ageCountStream.setName("Age Count Stream");
+        ageCountStream.setKey(new CountKeyFinder());
+        ageCountStream.setPE(printPE);
 
         /* PEs that count events by user, gender, and age. */
         CounterPE userCountPE = createPE(CounterPE.class);
-        userCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+        userCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.MILLISECONDS);
         userCountPE.setCountStream(userCountStream);
 
         CounterPE genderCountPE = createPE(CounterPE.class);
-        genderCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+        genderCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.MILLISECONDS);
         genderCountPE.setCountStream(genderCountStream);
 
         CounterPE ageCountPE = createPE(CounterPE.class);
-        ageCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+        ageCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.MILLISECONDS);
         ageCountPE.setCountStream(ageCountStream);
 
         /* Streams that output user events keyed on user, gender, and age. */
-        Stream<UserEvent> userStream = createStream(UserEvent.class).setName("User
Stream")
-                .setKey(new UserIDKeyFinder()).setPE(userCountPE);
+        Stream<UserEvent> userStream = createStream(UserEvent.class);
+        userStream.setName("User Stream");
+        userStream.setKey(new UserIDKeyFinder());
+        userStream.setPE(userCountPE);
 
-        Stream<UserEvent> genderStream = createStream(UserEvent.class).setName("Gender
Stream")
-                .setKey(new GenderKeyFinder()).setPE(genderCountPE);
+        Stream<UserEvent> genderStream = createStream(UserEvent.class);
+        genderStream.setName("Gender Stream");
+        genderStream.setKey(new GenderKeyFinder());
+        genderStream.setPE(genderCountPE);
 
-        Stream<UserEvent> ageStream = createStream(UserEvent.class).setName("Age Stream").setKey(new
AgeKeyFinder())
-                .setPE(ageCountPE);
+        Stream<UserEvent> ageStream = createStream(UserEvent.class);
+        ageStream.setName("Age Stream");
+        ageStream.setKey(new AgeKeyFinder());
+        ageStream.setPE(ageCountPE);
 
         generateUserEventPE = createPE(GenerateUserEventPE.class);
         generateUserEventPE.setStreams(userStream, genderStream, ageStream);
+        generateUserEventPE.setSingleton(true);
+        generateUserEventPE.setTimerInterval(1, TimeUnit.MILLISECONDS);
+
     }
 
     /*
@@ -106,17 +120,6 @@ final public class MyApp extends App {
     @Override
     protected void onStart() {
 
-        for (int i = 0; i < 200; i++) {
-            generateUserEventPE.onTrigger(null);
-        }
-
-        try {
-            Thread.sleep(5000);
-        } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-
         System.out.println("Done. Wait until the main app closes.");
         // close();
     }
@@ -138,7 +141,7 @@ final public class MyApp extends App {
         myApp.start();
 
         try {
-            Thread.sleep(10000);
+            Thread.sleep(1000);
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/1804c476/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
index cb9c098..b7b61e6 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
@@ -63,7 +63,7 @@ final public class Main extends AppMaker {
     public void configure() {
 
         /* PE that prints counts to console. */
-        PEMaker printPE = addPE(PrintPE.class).asSingleton();
+        PEMaker printPE = addPE(PrintPE.class);
 
         /* PEs that count events by user, gender, and age. */
         PEMaker userCountPE = addPE(CounterPE.class);


Mime
View raw message